skyeyesystem/MiniSAR-Simulator-Py-main/module2_sender.py

271 lines
9.2 KiB
Python
Raw Permalink Normal View History

"""
模块2UDP发送响应包模块
向37001端口发送控制指令的执行结果
"""
import socket
import logging
import time
from typing import Optional, Tuple
from data_structures import (
ResponsePacket, ErrorPacket, DeviceStatus,
CONTROL_TX_PORT, ExecutionStatus, StatusPacketType
)
class ResponseSender:
"""响应包发送模块"""
def __init__(self, remote_host: str = '127.0.0.1', port: int = CONTROL_TX_PORT, local_ip: Optional[str] = None):
"""
初始化响应发送模块
Args:
remote_host: 远程主机地址
port: 发送端口
local_ip: 本地绑定IP用于指定发送源IP
"""
self.remote_host = remote_host
self.port = port
self.local_ip = local_ip
self.socket = None
# 配置日志
self.logger = logging.getLogger(f'Module2-Sender:{port}')
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# 创建UDP套接字
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.local_ip:
try:
self.socket.bind((self.local_ip, 0))
self.logger.info(f"Response sender bound to {self.local_ip}")
except Exception as e:
self.logger.error(f"Failed to bind response sender to {self.local_ip}: {e}")
def send_response(self, remote_addr: Tuple[str, int],
status: int = ExecutionStatus.SUCCESS,
device_status: Optional[DeviceStatus] = None) -> bool:
"""
发送控制命令执行结果响应包
Args:
remote_addr: 远程地址 (host, port)
status: 执行状态 (0=成功, 1=错误)
device_status: 设备状态可选
Returns:
是否发送成功
"""
try:
# 创建错误包
error_packet = ErrorPacket(
status_packet_type=StatusPacketType.CONTROL_RESULT,
execution_status=status
)
# 创建响应包
# 状态包类型:[0]=错误包(回报), [1]=设备状态
# 如果是回报则[0]设为1如果是设备状态则[1]设为1
response = ResponsePacket(
status_packet_type=[1, 1], # [0]=1表示错误包有效, [1]=0表示设备状态无效
device_number=1,
error_packet=error_packet,
device_status=device_status if device_status else DeviceStatus()
)
# 序列化为字节流
data = response.to_bytes()
# 发送数据
self.socket.sendto(data, remote_addr)
self.logger.info(f"Response sent to {remote_addr} - Status: {status}")
self.logger.info(f"DEBUG: Raw packet data ({len(data)} bytes): {data.hex()}")
#self.logger.debug(f"Packet size: {len(data)} bytes")
return True
except Exception as e:
self.logger.error(f"Failed to send response: {e}")
return False
def send_status_packet(self, remote_addr: Tuple[str, int],
device_status: DeviceStatus) -> bool:
"""
发送设备状态包
Args:
remote_addr: 远程地址 (host, port)
device_status: 设备状态
Returns:
是否发送成功
"""
try:
# 创建错误包(状态设为成功)
error_packet = ErrorPacket(
status_packet_type=StatusPacketType.CONTROL_RESULT,
execution_status=ExecutionStatus.SUCCESS
)
# 创建响应包,包含设备状态
# 状态包类型:[0]=错误包(回报), [1]=设备状态
# 如果是回报则[0]设为1如果是设备状态则[1]设为1
response = ResponsePacket(
status_packet_type=[1, 1], # [0]=0表示错误包无效, [1]=1表示设备状态有效
device_number=1,
error_packet=error_packet,
device_status=device_status
)
# 序列化为字节流
data = response.to_bytes()
# 发送数据
self.socket.sendto(data, remote_addr)
self.logger.info(f"Status packet sent to {remote_addr}")
#self.logger.info(f"Status sent to {remote_addr} - Status: {status}")
self.logger.info(f"DEBUG: Raw packet data ({len(data)} bytes): {data.hex()}")
#self.logger.debug(f"Packet size: {len(data)} bytes")
return True
except Exception as e:
self.logger.error(f"Failed to send status packet: {e}")
return False
def close(self):
"""关闭发送模块"""
if self.socket:
self.socket.close()
self.logger.info("Sender closed")
class StatusPacketSender:
"""定时发送状态包的模块"""
def __init__(self, remote_addr: Tuple[str, int], interval: float = 1.0, local_ip: Optional[str] = None):
"""
初始化定时状态包发送器
Args:
remote_addr: 远程地址 (host, port)
interval: 发送间隔
local_ip: 本地绑定IP
"""
self.remote_addr = remote_addr
self.interval = interval
self.local_ip = local_ip
self.sender = ResponseSender(remote_addr[0], CONTROL_TX_PORT, local_ip=local_ip)
self.running = False
self.send_thread = None
# 配置日志
self.logger = logging.getLogger(f'StatusPacketSender:{remote_addr}')
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def start(self):
"""启动定时发送"""
if self.running:
self.logger.warning("Status packet sender is already running")
return
self.running = True
self.logger.info(f"Starting status packet sender for {self.remote_addr}")
# 启动发送线程
import threading
self.send_thread = threading.Thread(target=self._send_loop, daemon=True)
self.send_thread.start()
def stop(self):
"""停止定时发送"""
self.running = False
if self.send_thread:
self.send_thread.join(timeout=2)
self.sender.close()
self.logger.info("Status packet sender stopped")
def _send_loop(self):
"""发送循环"""
while self.running:
try:
# 创建设备状态
device_status = DeviceStatus(
cpu_temp=45,
ant_temp=40,
rf_temp=50,
nvme_temp=35,
fpga_temp=48,
sys_voltage=12,
clock_lock=1,
imu_state=1,
mem_volume=8192,
disk_volume=50,
north_velocity=10.5,
sky_velocity=0.5,
east_velocity=5.2,
altitude=1000.0,
longitude=1159154650, # 116.0°
latitude=403585769, # 40.0°
angle_roll=0.0,
angle_yaw=90.0,
angle_pitch=0.0,
year_month_day=20260119,
hour_min_sec=120000,
antenna_azimuth=0,
antenna_pitch=0,
is_boot=1,
satellite_num=12,
pos_flag=4,
orient_flag=1
)
# 发送状态包
self.sender.send_status_packet(self.remote_addr, device_status)
# 等待指定间隔
time.sleep(self.interval)
except Exception as e:
if self.running:
self.logger.error(f"Error in send loop: {e}")
break
if __name__ == '__main__':
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建发送模块
sender = ResponseSender('127.0.0.1', CONTROL_TX_PORT)
# 发送响应包
print("Sending response packet...")
sender.send_response(('127.0.0.1', 12345), ExecutionStatus.SUCCESS)
# 发送设备状态
device_status = DeviceStatus(
cpu_temp=45,
ant_temp=40,
rf_temp=50,
is_boot=1,
satellite_num=12
)
print("Sending status packet...")
sender.send_status_packet(('127.0.0.1', 12345), device_status)
sender.close()