skyeyesystem/MiniSAR-Simulator-Py-main/module3_data_sender.py
2026-01-29 09:47:15 +08:00

277 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
模块3UDP数据包发送模块
向37004端口发送回波和图像数据
"""
import socket
import struct
import logging
import time
from typing import Tuple, Optional
from data_structures import DATA_TX_PORT
class EchoDataPacket:
"""表12回波数据分包格式"""
def __init__(self, sync_code: int = 0x650F, device_num: int = 1,
sequence: int = 0, index: int = 0, pieces: int = 1):
"""
初始化回波数据包
Args:
sync_code: 同步码 (0x650F=回波, 0x750F=图像)
device_num: 设备编号
sequence: 整包数据序号
index: 分包编号
pieces: 分包个数
"""
self.sync_code = sync_code
self.device_num = device_num
self.sequence = sequence
self.index = index
self.pieces = pieces
self.payload = b''
def set_payload(self, data: bytes):
"""设置负载数据"""
self.payload = data
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和Uint8累加"""
checksum = 0
for byte in data:
checksum = (checksum + byte) & 0xFF
return checksum
def to_bytes(self) -> bytes:
"""序列化为字节流"""
# 构建数据包头
header = struct.pack('<HHHHH',
self.sync_code,
self.device_num,
self.sequence,
self.index,
self.pieces)
# 包大小 = 头大小(14) + 负载大小
pkg_size = 14 + len(self.payload)
header += struct.pack('<H', pkg_size)
# 计算校验和(除校验字节外所有字节)
checksum = self.calculate_checksum(header + self.payload)
# 添加校验和和帧尾标志
data = header + self.payload
data += struct.pack('<BB', checksum, 0xCB)
return data
@classmethod
def from_bytes(cls, data: bytes) -> 'EchoDataPacket':
"""从字节流反序列化"""
if len(data) < 16:
raise ValueError(f"Echo packet must be at least 16 bytes, got {len(data)}")
sync_code, device_num, sequence, index, pieces, pkg_size = struct.unpack('<HHHHH', data[0:10])
# 提取负载
payload = data[10:pkg_size-2]
packet = cls(sync_code, device_num, sequence, index, pieces)
packet.set_payload(payload)
return packet
class ImageDataPacket:
"""表14图像包数据格式"""
def __init__(self):
"""初始化图像数据包"""
self.frame_head = 0x7EFFDC01
self.imaging_params = b'\x00' * 256 # 256字节成像参数
self.image_data = b''
self.frame_checksum = 0
self.frame_tail = 0x7EFFDC02
def set_imaging_params(self, params: bytes):
"""设置成像参数256字节"""
if len(params) != 256:
raise ValueError(f"Imaging params must be 256 bytes, got {len(params)}")
self.imaging_params = params
def set_image_data(self, data: bytes):
"""设置图像数据"""
self.image_data = data
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和Uint8累加"""
checksum = 0
for byte in data:
checksum = (checksum + byte) & 0xFF
return checksum
def to_bytes(self) -> bytes:
"""序列化为字节流"""
# 构建数据
data = struct.pack('<I', self.frame_head)
data += self.imaging_params
data += self.image_data
# 计算校验和
self.frame_checksum = self.calculate_checksum(data)
# 添加校验和和帧尾
data += struct.pack('<I', self.frame_checksum)
data += struct.pack('<I', self.frame_tail)
return data
class DataSender:
"""数据包发送模块"""
def __init__(self, remote_host: str = '127.0.0.1', port: int = DATA_TX_PORT, local_ip: Optional[str] = None):
"""
初始化数据发送模块
Args:
remote_host: 远程主机地址
port: 发送端口
local_ip: 本地绑定IP
"""
self.remote_host = remote_host
self.port = port
self.local_ip = local_ip
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.local_ip:
try:
self.socket.bind((self.local_ip, 0))
except Exception as e:
self.logger.error(f"Failed to bind data sender to {self.local_ip}: {e}")
# 配置日志
self.logger = logging.getLogger(f'Module3-DataSender:{port}')
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def send_echo_data(self, remote_addr: Tuple[str, int], echo_data: bytes,
sequence: int = 0, max_packet_size: int = 1472) -> bool:
"""
发送回波数据(自动分包)
Args:
remote_addr: 远程地址 (host, port)
echo_data: 回波数据
sequence: 整包数据序号
max_packet_size: 最大分包大小默认1472字节
Returns:
是否发送成功
"""
try:
# 计算分包个数
payload_size = max_packet_size - 16 # 减去包头和校验尾
total_pieces = (len(echo_data) + payload_size - 1) // payload_size
self.logger.info(f"Sending echo data ({len(echo_data)} bytes) in {total_pieces} packets")
# 分包发送
for index in range(total_pieces):
start = index * payload_size
end = min(start + payload_size, len(echo_data))
payload = echo_data[start:end]
# 创建回波数据包
packet = EchoDataPacket(
sync_code=0x650F, # 回波
device_num=1,
sequence=sequence,
index=index,
pieces=total_pieces
)
packet.set_payload(payload)
# 发送数据包
data = packet.to_bytes()
self.socket.sendto(data, remote_addr)
self.logger.debug(f"Echo packet {index+1}/{total_pieces} sent ({len(data)} bytes)")
# 小延迟避免丢包
time.sleep(0.001)
return True
except Exception as e:
self.logger.error(f"Failed to send echo data: {e}")
return False
def send_image_data(self, remote_addr: Tuple[str, int], image_data: bytes,
imaging_params: Optional[bytes] = None) -> bool:
"""
发送图像数据
Args:
remote_addr: 远程地址 (host, port)
image_data: 图像数据
imaging_params: 成像参数256字节可选
Returns:
是否发送成功
"""
try:
# 创建图像数据包
packet = ImageDataPacket()
if imaging_params:
packet.set_imaging_params(imaging_params)
packet.set_image_data(image_data)
# 序列化为字节流
data = packet.to_bytes()
# 发送数据包
self.socket.sendto(data, remote_addr)
self.logger.info(f"Image data sent to {remote_addr} ({len(data)} bytes)")
return True
except Exception as e:
self.logger.error(f"Failed to send image data: {e}")
return False
def close(self):
"""关闭发送模块"""
if self.socket:
self.socket.close()
self.logger.info("Data sender closed")
if __name__ == '__main__':
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建发送模块
sender = DataSender('127.0.0.1', DATA_TX_PORT)
# 生成测试回波数据
echo_data = b'Echo data test' * 100
print(f"Sending echo data ({len(echo_data)} bytes)...")
sender.send_echo_data(('127.0.0.1', 12345), echo_data)
# 生成测试图像数据
image_data = b'Image data test' * 100
imaging_params = b'\x00' * 256
print(f"Sending image data ({len(image_data)} bytes)...")
sender.send_image_data(('127.0.0.1', 12345), image_data, imaging_params)
sender.close()