""" UDP交互程序主程序 集成模块1、模块2、模块3,实现完整的UDP通信系统 """ import argparse import logging import time import threading from typing import Dict, Tuple, Optional from module1_receiver import UDPReceiver from module2_sender import ResponseSender, StatusPacketSender from module3_data_sender import DataSender from data_structures import ( ControlPacket, WorkMode, WorkInstruction, ExecutionStatus, DeviceStatus, CONTROL_RX_PORT, CONTROL_TX_PORT, DATA_TX_PORT ) class RadarControlSystem: """雷达控制系统""" def __init__(self, local_ip: str = '0.0.0.0'): """初始化系统""" self.local_ip = local_ip self.logger = logging.getLogger('RadarControlSystem') self.logger.setLevel(logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) # 模块 self.receiver = None self.response_sender = None self.data_sender = None self.status_senders: Dict[Tuple[str, int], StatusPacketSender] = {} # 状态 self.connected_clients = {} # {remote_addr: connection_time} self.device_status = DeviceStatus() self.running = False def start(self): """启动系统""" self.logger.info("Starting Radar Control System...") try: # 启动接收模块 self.receiver = UDPReceiver(local_ip=self.local_ip, port=CONTROL_RX_PORT, callback=self._on_control_packet) self.receiver.start() # 创建发送模块(暂不启动) self.response_sender = ResponseSender(local_ip=self.local_ip) self.data_sender = DataSender(local_ip=self.local_ip) self.running = True self.logger.info("Radar Control System started successfully") except Exception as e: self.logger.error(f"Failed to start system: {e}") self.stop() raise def stop(self): """停止系统""" self.logger.info("Stopping Radar Control System...") self.running = False # 停止所有状态包发送器 for sender in self.status_senders.values(): sender.stop() self.status_senders.clear() # 停止接收模块 if self.receiver: self.receiver.stop() # 关闭发送模块 if self.response_sender: self.response_sender.close() if self.data_sender: self.data_sender.close() self.logger.info("Radar Control System stopped") def _on_control_packet(self, packet: ControlPacket, remote_addr: Tuple[str, int]): """处理接收到的控制包""" self.logger.info(f"Processing control packet from {remote_addr}") try: control_data = packet.control_data work_mode = control_data.work_mode work_instruction = control_data.work_instruction # 根据指令类型处理 if work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.CONNECT: self._handle_connect(remote_addr) elif work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.DISCONNECT: self._handle_disconnect(remote_addr) elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.UPLOAD_ROUTE: self._handle_upload_route(remote_addr, control_data) elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.START_CALC: self._handle_start_calc(remote_addr, control_data) elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.MANUAL_BOOT: self._handle_manual_boot(remote_addr) elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.END_TASK: self._handle_end_task(remote_addr) elif work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.END_TASK: self._handle_end_all_tasks(remote_addr) else: self.logger.warning(f"Unknown instruction: mode={work_mode}, instruction={work_instruction}") self._send_response(remote_addr, ExecutionStatus.ERROR) except Exception as e: self.logger.error(f"Error processing control packet: {e}") self._send_response(remote_addr, ExecutionStatus.ERROR) def _handle_connect(self, remote_addr: Tuple[str, int]): """处理连接指令""" self.logger.info(f"Handling CONNECT from {remote_addr}") # 目标地址:保持IP不变,端口改为CONTROL_TX_PORT target_addr = (remote_addr[0], CONTROL_TX_PORT) # 记录连接 self.connected_clients[target_addr] = time.time() # 发送响应 self._send_response(target_addr, ExecutionStatus.SUCCESS) # 启动状态包发送器 if target_addr not in self.status_senders: sender = StatusPacketSender(target_addr, interval=1.0, local_ip=self.local_ip) sender.start() self.status_senders[target_addr] = sender self.logger.info(f"Status packet sender started for {target_addr}") def _handle_disconnect(self, remote_addr: Tuple[str, int]): """处理断开连接指令""" self.logger.info(f"Handling DISCONNECT from {remote_addr}") # 移除连接记录 if remote_addr in self.connected_clients: del self.connected_clients[remote_addr] # 停止状态包发送器 if remote_addr in self.status_senders: self.status_senders[remote_addr].stop() del self.status_senders[remote_addr] self.logger.info(f"Status packet sender stopped for {remote_addr}") # 发送响应 self._send_response(remote_addr, ExecutionStatus.SUCCESS) def _handle_upload_route(self, remote_addr: Tuple[str, int], control_data): """处理上传航线指令""" self.logger.info(f"Handling UPLOAD_ROUTE from {remote_addr}") self.logger.info(f" Route Number: {control_data.route_number}") self.logger.info(f" Route Count: {control_data.route_count}") self.logger.info(f" Boot Point: ({control_data.boot_longitude}, {control_data.boot_latitude})") self.logger.info(f" Shutdown Point: ({control_data.shutdown_longitude}, {control_data.shutdown_latitude})") # 这里可以添加航线验证逻辑 self._send_response(remote_addr, ExecutionStatus.SUCCESS) def _handle_start_calc(self, remote_addr: Tuple[str, int], control_data): """处理开始计算指令""" self.logger.info(f"Handling START_CALC from {remote_addr}") self.logger.info(f" Imaging Mode: {control_data.imaging_mode}") self.logger.info(f" Route Count: {control_data.route_count}") # 这里可以添加计算启动逻辑 self._send_response(remote_addr, ExecutionStatus.SUCCESS) def _handle_manual_boot(self, remote_addr: Tuple[str, int]): """处理手动开机指令""" self.logger.info(f"Handling MANUAL_BOOT from {remote_addr}") # 更新设备状态 self.device_status.is_boot = 1 # 这里可以添加开机逻辑 self._send_response(remote_addr, ExecutionStatus.SUCCESS) def _handle_end_task(self, remote_addr: Tuple[str, int]): """处理结束当前任务指令""" self.logger.info(f"Handling END_TASK from {remote_addr}") # 更新设备状态 self.device_status.is_boot = 0 # 这里可以添加关机逻辑 self._send_response(remote_addr, ExecutionStatus.SUCCESS) def _handle_end_all_tasks(self, remote_addr: Tuple[str, int]): """处理结束所有任务指令""" self.logger.info(f"Handling END_ALL_TASKS from {remote_addr}") # 更新设备状态 self.device_status.is_boot = 0 # 这里可以添加停止所有任务的逻辑 self._send_response(remote_addr, ExecutionStatus.SUCCESS) def _send_response(self, remote_addr: Tuple[str, int], status: int): """发送响应包""" if self.response_sender: self.response_sender.send_response(remote_addr, status, self.device_status) def send_echo_data(self, remote_addr: Tuple[str, int], echo_data: bytes, sequence: int = 0): """发送回波数据""" if self.data_sender: self.data_sender.send_echo_data(remote_addr, echo_data, sequence) def send_image_data(self, remote_addr: Tuple[str, int], image_data: bytes, imaging_params=None): """发送图像数据""" if self.data_sender: self.data_sender.send_image_data(remote_addr, image_data, imaging_params) def get_status(self) -> dict: """获取系统状态""" return { 'running': self.running, 'connected_clients': len(self.connected_clients), 'clients': list(self.connected_clients.keys()), 'device_status': { 'cpu_temp': self.device_status.cpu_temp, 'is_boot': self.device_status.is_boot, 'satellite_num': self.device_status.satellite_num, } } def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser(description='Radar Control System') parser.add_argument('--ip', type=str, default='0.0.0.0', help='Local IP address to bind to (default: 0.0.0.0)') args = parser.parse_args() # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 创建并启动系统 system = RadarControlSystem(local_ip=args.ip) system.start() try: # 保持运行 while True: time.sleep(5) # 定期输出系统状态 status = system.get_status() logging.info(f"System Status: {status}") except KeyboardInterrupt: logging.info("Received interrupt signal") finally: system.stop() if __name__ == '__main__': main()