""" 模块2:UDP发送响应包模块 向37001端口发送控制指令的执行结果 """ import socket import logging import time from typing import Optional, Tuple from data_structures import ( ResponsePacket, ErrorPacket, DeviceStatus, CONTROL_TX_PORT, ExecutionStatus, StatusPacketType ) class ResponseSender: """响应包发送模块""" def __init__(self, remote_host: str = '127.0.0.1', port: int = CONTROL_TX_PORT, local_ip: Optional[str] = None): """ 初始化响应发送模块 Args: remote_host: 远程主机地址 port: 发送端口 local_ip: 本地绑定IP(用于指定发送源IP) """ self.remote_host = remote_host self.port = port self.local_ip = local_ip self.socket = None # 配置日志 self.logger = logging.getLogger(f'Module2-Sender:{port}') self.logger.setLevel(logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) # 创建UDP套接字 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if self.local_ip: try: self.socket.bind((self.local_ip, 0)) self.logger.info(f"Response sender bound to {self.local_ip}") except Exception as e: self.logger.error(f"Failed to bind response sender to {self.local_ip}: {e}") def send_response(self, remote_addr: Tuple[str, int], status: int = ExecutionStatus.SUCCESS, device_status: Optional[DeviceStatus] = None) -> bool: """ 发送控制命令执行结果响应包 Args: remote_addr: 远程地址 (host, port) status: 执行状态 (0=成功, 1=错误) device_status: 设备状态(可选) Returns: 是否发送成功 """ try: # 创建错误包 error_packet = ErrorPacket( status_packet_type=StatusPacketType.CONTROL_RESULT, execution_status=status ) # 创建响应包 # 状态包类型:[0]=错误包(回报), [1]=设备状态 # 如果是回报则[0]设为1,如果是设备状态则[1]设为1 response = ResponsePacket( status_packet_type=[1, 1], # [0]=1表示错误包有效, [1]=0表示设备状态无效 device_number=1, error_packet=error_packet, device_status=device_status if device_status else DeviceStatus() ) # 序列化为字节流 data = response.to_bytes() # 发送数据 self.socket.sendto(data, remote_addr) self.logger.info(f"Response sent to {remote_addr} - Status: {status}") self.logger.info(f"DEBUG: Raw packet data ({len(data)} bytes): {data.hex()}") #self.logger.debug(f"Packet size: {len(data)} bytes") return True except Exception as e: self.logger.error(f"Failed to send response: {e}") return False def send_status_packet(self, remote_addr: Tuple[str, int], device_status: DeviceStatus) -> bool: """ 发送设备状态包 Args: remote_addr: 远程地址 (host, port) device_status: 设备状态 Returns: 是否发送成功 """ try: # 创建错误包(状态设为成功) error_packet = ErrorPacket( status_packet_type=StatusPacketType.CONTROL_RESULT, execution_status=ExecutionStatus.SUCCESS ) # 创建响应包,包含设备状态 # 状态包类型:[0]=错误包(回报), [1]=设备状态 # 如果是回报则[0]设为1,如果是设备状态则[1]设为1 response = ResponsePacket( status_packet_type=[1, 1], # [0]=0表示错误包无效, [1]=1表示设备状态有效 device_number=1, error_packet=error_packet, device_status=device_status ) # 序列化为字节流 data = response.to_bytes() # 发送数据 self.socket.sendto(data, remote_addr) self.logger.info(f"Status packet sent to {remote_addr}") #self.logger.info(f"Status sent to {remote_addr} - Status: {status}") self.logger.info(f"DEBUG: Raw packet data ({len(data)} bytes): {data.hex()}") #self.logger.debug(f"Packet size: {len(data)} bytes") return True except Exception as e: self.logger.error(f"Failed to send status packet: {e}") return False def close(self): """关闭发送模块""" if self.socket: self.socket.close() self.logger.info("Sender closed") class StatusPacketSender: """定时发送状态包的模块""" def __init__(self, remote_addr: Tuple[str, int], interval: float = 1.0, local_ip: Optional[str] = None): """ 初始化定时状态包发送器 Args: remote_addr: 远程地址 (host, port) interval: 发送间隔(秒) local_ip: 本地绑定IP """ self.remote_addr = remote_addr self.interval = interval self.local_ip = local_ip self.sender = ResponseSender(remote_addr[0], CONTROL_TX_PORT, local_ip=local_ip) self.running = False self.send_thread = None # 配置日志 self.logger = logging.getLogger(f'StatusPacketSender:{remote_addr}') self.logger.setLevel(logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) def start(self): """启动定时发送""" if self.running: self.logger.warning("Status packet sender is already running") return self.running = True self.logger.info(f"Starting status packet sender for {self.remote_addr}") # 启动发送线程 import threading self.send_thread = threading.Thread(target=self._send_loop, daemon=True) self.send_thread.start() def stop(self): """停止定时发送""" self.running = False if self.send_thread: self.send_thread.join(timeout=2) self.sender.close() self.logger.info("Status packet sender stopped") def _send_loop(self): """发送循环""" while self.running: try: # 创建设备状态 device_status = DeviceStatus( cpu_temp=45, ant_temp=40, rf_temp=50, nvme_temp=35, fpga_temp=48, sys_voltage=12, clock_lock=1, imu_state=1, mem_volume=8192, disk_volume=50, north_velocity=10.5, sky_velocity=0.5, east_velocity=5.2, altitude=1000.0, longitude=1159154650, # 116.0° latitude=403585769, # 40.0° angle_roll=0.0, angle_yaw=90.0, angle_pitch=0.0, year_month_day=20260119, hour_min_sec=120000, antenna_azimuth=0, antenna_pitch=0, is_boot=1, satellite_num=12, pos_flag=4, orient_flag=1 ) # 发送状态包 self.sender.send_status_packet(self.remote_addr, device_status) # 等待指定间隔 time.sleep(self.interval) except Exception as e: if self.running: self.logger.error(f"Error in send loop: {e}") break if __name__ == '__main__': # 配置日志 logging.basicConfig(level=logging.INFO) # 创建发送模块 sender = ResponseSender('127.0.0.1', CONTROL_TX_PORT) # 发送响应包 print("Sending response packet...") sender.send_response(('127.0.0.1', 12345), ExecutionStatus.SUCCESS) # 发送设备状态 device_status = DeviceStatus( cpu_temp=45, ant_temp=40, rf_temp=50, is_boot=1, satellite_num=12 ) print("Sending status packet...") sender.send_status_packet(('127.0.0.1', 12345), device_status) sender.close()