277 lines
8.9 KiB
Python
277 lines
8.9 KiB
Python
"""
|
||
模块3:UDP数据包发送模块
|
||
向37004端口发送回波和图像数据
|
||
"""
|
||
|
||
import socket
|
||
import struct
|
||
import logging
|
||
import time
|
||
from typing import Tuple, Optional
|
||
from data_structures import DATA_TX_PORT
|
||
|
||
|
||
class EchoDataPacket:
|
||
"""表12:回波数据分包格式"""
|
||
|
||
def __init__(self, sync_code: int = 0x650F, device_num: int = 1,
|
||
sequence: int = 0, index: int = 0, pieces: int = 1):
|
||
"""
|
||
初始化回波数据包
|
||
|
||
Args:
|
||
sync_code: 同步码 (0x650F=回波, 0x750F=图像)
|
||
device_num: 设备编号
|
||
sequence: 整包数据序号
|
||
index: 分包编号
|
||
pieces: 分包个数
|
||
"""
|
||
self.sync_code = sync_code
|
||
self.device_num = device_num
|
||
self.sequence = sequence
|
||
self.index = index
|
||
self.pieces = pieces
|
||
self.payload = b''
|
||
|
||
def set_payload(self, data: bytes):
|
||
"""设置负载数据"""
|
||
self.payload = data
|
||
|
||
def calculate_checksum(self, data: bytes) -> int:
|
||
"""计算校验和(Uint8累加)"""
|
||
checksum = 0
|
||
for byte in data:
|
||
checksum = (checksum + byte) & 0xFF
|
||
return checksum
|
||
|
||
def to_bytes(self) -> bytes:
|
||
"""序列化为字节流"""
|
||
# 构建数据包头
|
||
header = struct.pack('<HHHHH',
|
||
self.sync_code,
|
||
self.device_num,
|
||
self.sequence,
|
||
self.index,
|
||
self.pieces)
|
||
|
||
# 包大小 = 头大小(14) + 负载大小
|
||
pkg_size = 14 + len(self.payload)
|
||
header += struct.pack('<H', pkg_size)
|
||
|
||
# 计算校验和(除校验字节外所有字节)
|
||
checksum = self.calculate_checksum(header + self.payload)
|
||
|
||
# 添加校验和和帧尾标志
|
||
data = header + self.payload
|
||
data += struct.pack('<BB', checksum, 0xCB)
|
||
|
||
return data
|
||
|
||
@classmethod
|
||
def from_bytes(cls, data: bytes) -> 'EchoDataPacket':
|
||
"""从字节流反序列化"""
|
||
if len(data) < 16:
|
||
raise ValueError(f"Echo packet must be at least 16 bytes, got {len(data)}")
|
||
|
||
sync_code, device_num, sequence, index, pieces, pkg_size = struct.unpack('<HHHHH', data[0:10])
|
||
|
||
# 提取负载
|
||
payload = data[10:pkg_size-2]
|
||
|
||
packet = cls(sync_code, device_num, sequence, index, pieces)
|
||
packet.set_payload(payload)
|
||
|
||
return packet
|
||
|
||
|
||
class ImageDataPacket:
|
||
"""表14:图像包数据格式"""
|
||
|
||
def __init__(self):
|
||
"""初始化图像数据包"""
|
||
self.frame_head = 0x7EFFDC01
|
||
self.imaging_params = b'\x00' * 256 # 256字节成像参数
|
||
self.image_data = b''
|
||
self.frame_checksum = 0
|
||
self.frame_tail = 0x7EFFDC02
|
||
|
||
def set_imaging_params(self, params: bytes):
|
||
"""设置成像参数(256字节)"""
|
||
if len(params) != 256:
|
||
raise ValueError(f"Imaging params must be 256 bytes, got {len(params)}")
|
||
self.imaging_params = params
|
||
|
||
def set_image_data(self, data: bytes):
|
||
"""设置图像数据"""
|
||
self.image_data = data
|
||
|
||
def calculate_checksum(self, data: bytes) -> int:
|
||
"""计算校验和(Uint8累加)"""
|
||
checksum = 0
|
||
for byte in data:
|
||
checksum = (checksum + byte) & 0xFF
|
||
return checksum
|
||
|
||
def to_bytes(self) -> bytes:
|
||
"""序列化为字节流"""
|
||
# 构建数据
|
||
data = struct.pack('<I', self.frame_head)
|
||
data += self.imaging_params
|
||
data += self.image_data
|
||
|
||
# 计算校验和
|
||
self.frame_checksum = self.calculate_checksum(data)
|
||
|
||
# 添加校验和和帧尾
|
||
data += struct.pack('<I', self.frame_checksum)
|
||
data += struct.pack('<I', self.frame_tail)
|
||
|
||
return data
|
||
|
||
|
||
class DataSender:
|
||
"""数据包发送模块"""
|
||
|
||
def __init__(self, remote_host: str = '127.0.0.1', port: int = DATA_TX_PORT, local_ip: Optional[str] = None):
|
||
"""
|
||
初始化数据发送模块
|
||
|
||
Args:
|
||
remote_host: 远程主机地址
|
||
port: 发送端口
|
||
local_ip: 本地绑定IP
|
||
"""
|
||
self.remote_host = remote_host
|
||
self.port = port
|
||
self.local_ip = local_ip
|
||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
if self.local_ip:
|
||
try:
|
||
self.socket.bind((self.local_ip, 0))
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to bind data sender to {self.local_ip}: {e}")
|
||
|
||
# 配置日志
|
||
self.logger = logging.getLogger(f'Module3-DataSender:{port}')
|
||
self.logger.setLevel(logging.INFO)
|
||
if not self.logger.handlers:
|
||
handler = logging.StreamHandler()
|
||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
handler.setFormatter(formatter)
|
||
self.logger.addHandler(handler)
|
||
|
||
def send_echo_data(self, remote_addr: Tuple[str, int], echo_data: bytes,
|
||
sequence: int = 0, max_packet_size: int = 1472) -> bool:
|
||
"""
|
||
发送回波数据(自动分包)
|
||
|
||
Args:
|
||
remote_addr: 远程地址 (host, port)
|
||
echo_data: 回波数据
|
||
sequence: 整包数据序号
|
||
max_packet_size: 最大分包大小(默认1472字节)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
try:
|
||
# 计算分包个数
|
||
payload_size = max_packet_size - 16 # 减去包头和校验尾
|
||
total_pieces = (len(echo_data) + payload_size - 1) // payload_size
|
||
|
||
self.logger.info(f"Sending echo data ({len(echo_data)} bytes) in {total_pieces} packets")
|
||
|
||
# 分包发送
|
||
for index in range(total_pieces):
|
||
start = index * payload_size
|
||
end = min(start + payload_size, len(echo_data))
|
||
payload = echo_data[start:end]
|
||
|
||
# 创建回波数据包
|
||
packet = EchoDataPacket(
|
||
sync_code=0x650F, # 回波
|
||
device_num=1,
|
||
sequence=sequence,
|
||
index=index,
|
||
pieces=total_pieces
|
||
)
|
||
packet.set_payload(payload)
|
||
|
||
# 发送数据包
|
||
data = packet.to_bytes()
|
||
self.socket.sendto(data, remote_addr)
|
||
|
||
self.logger.debug(f"Echo packet {index+1}/{total_pieces} sent ({len(data)} bytes)")
|
||
|
||
# 小延迟避免丢包
|
||
time.sleep(0.001)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to send echo data: {e}")
|
||
return False
|
||
|
||
def send_image_data(self, remote_addr: Tuple[str, int], image_data: bytes,
|
||
imaging_params: Optional[bytes] = None) -> bool:
|
||
"""
|
||
发送图像数据
|
||
|
||
Args:
|
||
remote_addr: 远程地址 (host, port)
|
||
image_data: 图像数据
|
||
imaging_params: 成像参数(256字节,可选)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
try:
|
||
# 创建图像数据包
|
||
packet = ImageDataPacket()
|
||
|
||
if imaging_params:
|
||
packet.set_imaging_params(imaging_params)
|
||
|
||
packet.set_image_data(image_data)
|
||
|
||
# 序列化为字节流
|
||
data = packet.to_bytes()
|
||
|
||
# 发送数据包
|
||
self.socket.sendto(data, remote_addr)
|
||
|
||
self.logger.info(f"Image data sent to {remote_addr} ({len(data)} bytes)")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to send image data: {e}")
|
||
return False
|
||
|
||
def close(self):
|
||
"""关闭发送模块"""
|
||
if self.socket:
|
||
self.socket.close()
|
||
self.logger.info("Data sender closed")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
# 创建发送模块
|
||
sender = DataSender('127.0.0.1', DATA_TX_PORT)
|
||
|
||
# 生成测试回波数据
|
||
echo_data = b'Echo data test' * 100
|
||
print(f"Sending echo data ({len(echo_data)} bytes)...")
|
||
sender.send_echo_data(('127.0.0.1', 12345), echo_data)
|
||
|
||
# 生成测试图像数据
|
||
image_data = b'Image data test' * 100
|
||
imaging_params = b'\x00' * 256
|
||
print(f"Sending image data ({len(image_data)} bytes)...")
|
||
sender.send_image_data(('127.0.0.1', 12345), image_data, imaging_params)
|
||
|
||
sender.close()
|