skyeyesystem/MiniSAR-Simulator-Py-main/module3_data_sender.py

277 lines
8.9 KiB
Python
Raw Permalink Normal View History

"""
模块3UDP数据包发送模块
向37004端口发送回波和图像数据
"""
import socket
import struct
import logging
import time
from typing import Tuple, Optional
from data_structures import DATA_TX_PORT
class EchoDataPacket:
"""表12回波数据分包格式"""
def __init__(self, sync_code: int = 0x650F, device_num: int = 1,
sequence: int = 0, index: int = 0, pieces: int = 1):
"""
初始化回波数据包
Args:
sync_code: 同步码 (0x650F=回波, 0x750F=图像)
device_num: 设备编号
sequence: 整包数据序号
index: 分包编号
pieces: 分包个数
"""
self.sync_code = sync_code
self.device_num = device_num
self.sequence = sequence
self.index = index
self.pieces = pieces
self.payload = b''
def set_payload(self, data: bytes):
"""设置负载数据"""
self.payload = data
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和Uint8累加"""
checksum = 0
for byte in data:
checksum = (checksum + byte) & 0xFF
return checksum
def to_bytes(self) -> bytes:
"""序列化为字节流"""
# 构建数据包头
header = struct.pack('<HHHHH',
self.sync_code,
self.device_num,
self.sequence,
self.index,
self.pieces)
# 包大小 = 头大小(14) + 负载大小
pkg_size = 14 + len(self.payload)
header += struct.pack('<H', pkg_size)
# 计算校验和(除校验字节外所有字节)
checksum = self.calculate_checksum(header + self.payload)
# 添加校验和和帧尾标志
data = header + self.payload
data += struct.pack('<BB', checksum, 0xCB)
return data
@classmethod
def from_bytes(cls, data: bytes) -> 'EchoDataPacket':
"""从字节流反序列化"""
if len(data) < 16:
raise ValueError(f"Echo packet must be at least 16 bytes, got {len(data)}")
sync_code, device_num, sequence, index, pieces, pkg_size = struct.unpack('<HHHHH', data[0:10])
# 提取负载
payload = data[10:pkg_size-2]
packet = cls(sync_code, device_num, sequence, index, pieces)
packet.set_payload(payload)
return packet
class ImageDataPacket:
"""表14图像包数据格式"""
def __init__(self):
"""初始化图像数据包"""
self.frame_head = 0x7EFFDC01
self.imaging_params = b'\x00' * 256 # 256字节成像参数
self.image_data = b''
self.frame_checksum = 0
self.frame_tail = 0x7EFFDC02
def set_imaging_params(self, params: bytes):
"""设置成像参数256字节"""
if len(params) != 256:
raise ValueError(f"Imaging params must be 256 bytes, got {len(params)}")
self.imaging_params = params
def set_image_data(self, data: bytes):
"""设置图像数据"""
self.image_data = data
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和Uint8累加"""
checksum = 0
for byte in data:
checksum = (checksum + byte) & 0xFF
return checksum
def to_bytes(self) -> bytes:
"""序列化为字节流"""
# 构建数据
data = struct.pack('<I', self.frame_head)
data += self.imaging_params
data += self.image_data
# 计算校验和
self.frame_checksum = self.calculate_checksum(data)
# 添加校验和和帧尾
data += struct.pack('<I', self.frame_checksum)
data += struct.pack('<I', self.frame_tail)
return data
class DataSender:
"""数据包发送模块"""
def __init__(self, remote_host: str = '127.0.0.1', port: int = DATA_TX_PORT, local_ip: Optional[str] = None):
"""
初始化数据发送模块
Args:
remote_host: 远程主机地址
port: 发送端口
local_ip: 本地绑定IP
"""
self.remote_host = remote_host
self.port = port
self.local_ip = local_ip
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.local_ip:
try:
self.socket.bind((self.local_ip, 0))
except Exception as e:
self.logger.error(f"Failed to bind data sender to {self.local_ip}: {e}")
# 配置日志
self.logger = logging.getLogger(f'Module3-DataSender:{port}')
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def send_echo_data(self, remote_addr: Tuple[str, int], echo_data: bytes,
sequence: int = 0, max_packet_size: int = 1472) -> bool:
"""
发送回波数据自动分包
Args:
remote_addr: 远程地址 (host, port)
echo_data: 回波数据
sequence: 整包数据序号
max_packet_size: 最大分包大小默认1472字节
Returns:
是否发送成功
"""
try:
# 计算分包个数
payload_size = max_packet_size - 16 # 减去包头和校验尾
total_pieces = (len(echo_data) + payload_size - 1) // payload_size
self.logger.info(f"Sending echo data ({len(echo_data)} bytes) in {total_pieces} packets")
# 分包发送
for index in range(total_pieces):
start = index * payload_size
end = min(start + payload_size, len(echo_data))
payload = echo_data[start:end]
# 创建回波数据包
packet = EchoDataPacket(
sync_code=0x650F, # 回波
device_num=1,
sequence=sequence,
index=index,
pieces=total_pieces
)
packet.set_payload(payload)
# 发送数据包
data = packet.to_bytes()
self.socket.sendto(data, remote_addr)
self.logger.debug(f"Echo packet {index+1}/{total_pieces} sent ({len(data)} bytes)")
# 小延迟避免丢包
time.sleep(0.001)
return True
except Exception as e:
self.logger.error(f"Failed to send echo data: {e}")
return False
def send_image_data(self, remote_addr: Tuple[str, int], image_data: bytes,
imaging_params: Optional[bytes] = None) -> bool:
"""
发送图像数据
Args:
remote_addr: 远程地址 (host, port)
image_data: 图像数据
imaging_params: 成像参数256字节可选
Returns:
是否发送成功
"""
try:
# 创建图像数据包
packet = ImageDataPacket()
if imaging_params:
packet.set_imaging_params(imaging_params)
packet.set_image_data(image_data)
# 序列化为字节流
data = packet.to_bytes()
# 发送数据包
self.socket.sendto(data, remote_addr)
self.logger.info(f"Image data sent to {remote_addr} ({len(data)} bytes)")
return True
except Exception as e:
self.logger.error(f"Failed to send image data: {e}")
return False
def close(self):
"""关闭发送模块"""
if self.socket:
self.socket.close()
self.logger.info("Data sender closed")
if __name__ == '__main__':
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建发送模块
sender = DataSender('127.0.0.1', DATA_TX_PORT)
# 生成测试回波数据
echo_data = b'Echo data test' * 100
print(f"Sending echo data ({len(echo_data)} bytes)...")
sender.send_echo_data(('127.0.0.1', 12345), echo_data)
# 生成测试图像数据
image_data = b'Image data test' * 100
imaging_params = b'\x00' * 256
print(f"Sending image data ({len(image_data)} bytes)...")
sender.send_image_data(('127.0.0.1', 12345), image_data, imaging_params)
sender.close()