""" 模块3:UDP数据包发送模块 向37004端口发送回波和图像数据 """ import socket import struct import logging import time from typing import Tuple, Optional from data_structures import DATA_TX_PORT class EchoDataPacket: """表12:回波数据分包格式""" def __init__(self, sync_code: int = 0x650F, device_num: int = 1, sequence: int = 0, index: int = 0, pieces: int = 1): """ 初始化回波数据包 Args: sync_code: 同步码 (0x650F=回波, 0x750F=图像) device_num: 设备编号 sequence: 整包数据序号 index: 分包编号 pieces: 分包个数 """ self.sync_code = sync_code self.device_num = device_num self.sequence = sequence self.index = index self.pieces = pieces self.payload = b'' def set_payload(self, data: bytes): """设置负载数据""" self.payload = data def calculate_checksum(self, data: bytes) -> int: """计算校验和(Uint8累加)""" checksum = 0 for byte in data: checksum = (checksum + byte) & 0xFF return checksum def to_bytes(self) -> bytes: """序列化为字节流""" # 构建数据包头 header = struct.pack(' 'EchoDataPacket': """从字节流反序列化""" if len(data) < 16: raise ValueError(f"Echo packet must be at least 16 bytes, got {len(data)}") sync_code, device_num, sequence, index, pieces, pkg_size = struct.unpack(' int: """计算校验和(Uint8累加)""" checksum = 0 for byte in data: checksum = (checksum + byte) & 0xFF return checksum def to_bytes(self) -> bytes: """序列化为字节流""" # 构建数据 data = struct.pack(' bool: """ 发送回波数据(自动分包) Args: remote_addr: 远程地址 (host, port) echo_data: 回波数据 sequence: 整包数据序号 max_packet_size: 最大分包大小(默认1472字节) Returns: 是否发送成功 """ try: # 计算分包个数 payload_size = max_packet_size - 16 # 减去包头和校验尾 total_pieces = (len(echo_data) + payload_size - 1) // payload_size self.logger.info(f"Sending echo data ({len(echo_data)} bytes) in {total_pieces} packets") # 分包发送 for index in range(total_pieces): start = index * payload_size end = min(start + payload_size, len(echo_data)) payload = echo_data[start:end] # 创建回波数据包 packet = EchoDataPacket( sync_code=0x650F, # 回波 device_num=1, sequence=sequence, index=index, pieces=total_pieces ) packet.set_payload(payload) # 发送数据包 data = packet.to_bytes() self.socket.sendto(data, remote_addr) self.logger.debug(f"Echo packet {index+1}/{total_pieces} sent ({len(data)} bytes)") # 小延迟避免丢包 time.sleep(0.001) return True except Exception as e: self.logger.error(f"Failed to send echo data: {e}") return False def send_image_data(self, remote_addr: Tuple[str, int], image_data: bytes, imaging_params: Optional[bytes] = None) -> bool: """ 发送图像数据 Args: remote_addr: 远程地址 (host, port) image_data: 图像数据 imaging_params: 成像参数(256字节,可选) Returns: 是否发送成功 """ try: # 创建图像数据包 packet = ImageDataPacket() if imaging_params: packet.set_imaging_params(imaging_params) packet.set_image_data(image_data) # 序列化为字节流 data = packet.to_bytes() # 发送数据包 self.socket.sendto(data, remote_addr) self.logger.info(f"Image data sent to {remote_addr} ({len(data)} bytes)") return True except Exception as e: self.logger.error(f"Failed to send image data: {e}") return False def close(self): """关闭发送模块""" if self.socket: self.socket.close() self.logger.info("Data sender closed") if __name__ == '__main__': # 配置日志 logging.basicConfig(level=logging.INFO) # 创建发送模块 sender = DataSender('127.0.0.1', DATA_TX_PORT) # 生成测试回波数据 echo_data = b'Echo data test' * 100 print(f"Sending echo data ({len(echo_data)} bytes)...") sender.send_echo_data(('127.0.0.1', 12345), echo_data) # 生成测试图像数据 image_data = b'Image data test' * 100 imaging_params = b'\x00' * 256 print(f"Sending image data ({len(image_data)} bytes)...") sender.send_image_data(('127.0.0.1', 12345), image_data, imaging_params) sender.close()