""" 模块1:UDP接收模块 监听38101端口,接收来自主控的控制指令 """ import socket import threading import logging from typing import Callable, Optional from data_structures import ControlPacket, CONTROL_RX_PORT class UDPReceiver: """UDP接收模块""" def __init__(self, local_ip: str = '0.0.0.0', port: int = CONTROL_RX_PORT, callback: Optional[Callable] = None): """ 初始化接收模块 Args: local_ip: 本地监听IP port: 监听端口 callback: 接收到数据后的回调函数,签名为 callback(packet, remote_addr) """ self.local_ip = local_ip self.port = port self.callback = callback self.socket = None self.running = False self.receive_thread = None # 配置日志 self.logger = logging.getLogger(f'Module1-Receiver:{port}') self.logger.setLevel(logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) def start(self): """启动接收模块""" if self.running: self.logger.warning("Receiver is already running") return try: # 创建UDP套接字 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((self.local_ip, self.port)) self.running = True self.logger.info(f"Receiver started on port {self.port}") # 启动接收线程 self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True) self.receive_thread.start() except Exception as e: self.logger.error(f"Failed to start receiver: {e}") self.running = False raise def stop(self): """停止接收模块""" self.running = False if self.socket: self.socket.close() if self.receive_thread: self.receive_thread.join(timeout=2) self.logger.info("Receiver stopped") def _receive_loop(self): """接收循环""" while self.running: try: # 接收数据(最大1472字节) data, remote_addr = self.socket.recvfrom(2048) self.logger.info(f"Received {len(data)} bytes from {remote_addr}") try: # 解析控制包 packet = ControlPacket.from_bytes(data) self.logger.info(f"Control packet parsed successfully") self.logger.debug(f" Device: {packet.device_number}") self.logger.debug(f" Work Mode: {packet.control_data.work_mode}") self.logger.debug(f" Work Instruction: {packet.control_data.work_instruction}") # 调用回调函数 if self.callback: self.callback(packet, remote_addr) except ValueError as e: self.logger.error(f"Failed to parse control packet: {e}") self.logger.debug(f"Raw data (hex): {data.hex()}") except socket.timeout: continue except Exception as e: if self.running: self.logger.error(f"Error in receive loop: {e}") break def set_callback(self, callback: Callable): """设置接收回调函数""" self.callback = callback if __name__ == '__main__': # 配置日志 logging.basicConfig(level=logging.INFO) def on_packet_received(packet, remote_addr): """处理接收到的包""" print(f"\n=== Packet Received from {remote_addr} ===") print(f"Device Number: {packet.device_number}") print(f"Work Mode: {packet.control_data.work_mode}") print(f"Work Instruction: {packet.control_data.work_instruction}") print(f"Imaging Mode: {packet.control_data.imaging_mode}") print(f"Route Number: {packet.control_data.route_number}") print(f"Route Count: {packet.control_data.route_count}") # 创建并启动接收模块 receiver = UDPReceiver(callback=on_packet_received) receiver.start() try: # 保持运行 import time while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down...") receiver.stop()