skyeyesystem/MiniSAR-Simulator-Py-main/data_structures.py
2026-01-29 09:47:15 +08:00

501 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
UDP交互程序数据结构定义
根据文档表格定义所有数据结构
"""
import struct
from enum import IntEnum
from dataclasses import dataclass
from typing import Optional
# ==================== 常量定义 ====================
# 帧头尾标志
FRAME_HEAD_SYMBOL = 0x7EFFDC01
FRAME_TAIL_SYMBOL = 0x7EFFDC02
# 端口定义
CONTROL_RX_PORT = 38101 # 模块1接收控制指令
CONTROL_TX_PORT = 37001 # 模块2发送控制响应
DATA_TX_PORT = 37004 # 模块3发送数据包
# 工作模式dWorkMode
class WorkMode(IntEnum):
NORMAL = 0 # 正常模式
AUTO = 4 # 自动模式
# 工作指令dWorkInstruction
class WorkInstruction(IntEnum):
UPLOAD_ROUTE = 0 # 上传航线
START_CALC = 1 # 开始计算
MANUAL_BOOT = 2 # 手动开机
END_TASK = 3 # 结束当前任务
CONNECT = 4 # 连接
DISCONNECT = 5 # 断开连接
# 成像模式
class ImagingMode(IntEnum):
STRIP = 0 # 条带
FOCUS = 1 # 聚束
# 成像模式分辨率
class ImagingResolution(IntEnum):
RES_0_1M = 0 # 0.1m
RES_0_2M = 1 # 0.2m
RES_0_3M = 2 # 0.3m
RES_0_5M = 3 # 0.5m
RES_1M = 4 # 1m
# 状态包类型
class StatusPacketType(IntEnum):
CONTROL_RESULT = 0 # 控制命令执行结果
IMAGING_READY = 1 # 成像准备完成
DEVICE_FAULT = 2 # 单机工作故障
# 执行状态
class ExecutionStatus(IntEnum):
SUCCESS = 0 # 成功
ERROR = 1 # 错误
# ==================== 数据结构定义 ====================
@dataclass
class ControlData:
"""表3雷达控制数据格式192字节"""
work_mode: int = 0 # Uint8
work_instruction: int = 0 # Uint8
imaging_mode: int = 0 # Uint8
imaging_resolution: int = 0 # Uint8
reserved1: bytes = b'\x00\x00\x00\x00' # Uint8*4
route_number: int = 0 # Uint16
route_count: int = 0 # Uint16
# 航线开机点
boot_longitude: int = 0 # Uint32 (1e-7量化)
boot_latitude: int = 0 # Uint32 (1e-7量化)
boot_altitude: int = 0 # Int16
# 地面开机点
ground_boot_longitude: int = 0 # Uint32
ground_boot_latitude: int = 0 # Uint32
ground_boot_altitude: int = 0 # Int16
# 航线关机点
shutdown_longitude: int = 0 # Uint32
shutdown_latitude: int = 0 # Uint32
shutdown_altitude: int = 0 # Int16
# 成像区域中心
imaging_center_longitude: int = 0 # Uint32
imaging_center_latitude: int = 0 # Uint32
imaging_center_altitude: int = 0 # Int16
# 成像区域参数
imaging_width: int = 0 # Uint16
imaging_length: int = 0 # Uint16
imaging_angle: int = 0 # Uint16
# 飞行参数
flight_speed: float = 0.0 # double64
flight_altitude: float = 0.0 # double64
grazing_angle: float = 0.0 # double64
slant_angle: float = 0.0 # double64
# 其他参数
route_type: int = 0 # Uint8
side_direction: int = 0 # Int8
polarization: int = 0 # Uint8
auto_focus: int = 0 # Uint8
motion_compensation: int = 0 # Uint8
image_format: int = 0 # Uint8
echo_disable: int = 0 # Uint8
installation_angle: float = 35.0 # Float
image_bit: int = 0 # Uint8
prf: float = 0.0 # Float
reserved2: bytes = b'\x00' * 86 # 86字节补齐192B
def to_bytes(self) -> bytes:
"""将控制数据序列化为字节流(小端模式)"""
data = struct.pack('<BBBB',
self.work_mode,
self.work_instruction,
self.imaging_mode,
self.imaging_resolution)
data += self.reserved1
data += struct.pack('<HHIIhIIhIIhIIhHHHddddBBBBBBBfBf',
self.route_number,
self.route_count,
self.boot_longitude,
self.boot_latitude,
self.boot_altitude,
self.ground_boot_longitude,
self.ground_boot_latitude,
self.ground_boot_altitude,
self.shutdown_longitude,
self.shutdown_latitude,
self.shutdown_altitude,
self.imaging_center_longitude,
self.imaging_center_latitude,
self.imaging_center_altitude,
self.imaging_width,
self.imaging_length,
self.imaging_angle,
self.flight_speed,
self.flight_altitude,
self.grazing_angle,
self.slant_angle,
self.route_type,
self.side_direction,
self.polarization,
self.auto_focus,
self.motion_compensation,
self.image_format,
self.echo_disable,
self.installation_angle,
self.image_bit,
self.prf)
data += self.reserved2
return data[:192] # 确保正好192字节
@classmethod
def from_bytes(cls, data: bytes) -> 'ControlData':
"""从字节流反序列化控制数据"""
if len(data) < 192:
raise ValueError(f"Control data must be 192 bytes, got {len(data)}")
# 解析前4个字节
work_mode, work_instruction, imaging_mode, imaging_resolution = struct.unpack('<BBBB', data[0:4])
reserved1 = data[4:8]
# 解析剩余数据
values = struct.unpack('<HHIIhIIhIIhIIhHHHddddBBBBBBBfBf', data[8:106])
return cls(
work_mode=work_mode,
work_instruction=work_instruction,
imaging_mode=imaging_mode,
imaging_resolution=imaging_resolution,
reserved1=reserved1,
route_number=values[0],
route_count=values[1],
boot_longitude=values[2],
boot_latitude=values[3],
boot_altitude=values[4],
ground_boot_longitude=values[5],
ground_boot_latitude=values[6],
ground_boot_altitude=values[7],
shutdown_longitude=values[8],
shutdown_latitude=values[9],
shutdown_altitude=values[10],
imaging_center_longitude=values[11],
imaging_center_latitude=values[12],
imaging_center_altitude=values[13],
imaging_width=values[14],
imaging_length=values[15],
imaging_angle=values[16],
flight_speed=values[17],
flight_altitude=values[18],
grazing_angle=values[19],
slant_angle=values[20],
route_type=values[21],
side_direction=values[22],
polarization=values[23],
auto_focus=values[24],
motion_compensation=values[25],
image_format=values[26],
echo_disable=values[27],
installation_angle=values[28],
image_bit=values[29],
prf=values[30]
)
@dataclass
class ControlPacket:
"""表4数传通路控制包格式"""
frame_head: int = FRAME_HEAD_SYMBOL # Uint32
device_number: int = 1 # Uint8
reserved: bytes = b'\x00\x00\x00' # Uint8*3
control_data: ControlData = None # 192字节
frame_checksum: int = 0 # Uint32
frame_tail: int = FRAME_TAIL_SYMBOL # Uint32
def __post_init__(self):
if self.control_data is None:
self.control_data = ControlData()
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和无符号int32累加"""
checksum = 0
for i in range(0, len(data), 4):
if i + 4 <= len(data):
val = struct.unpack('<I', data[i:i+4])[0]
checksum = (checksum + val) & 0xFFFFFFFF
return checksum
def to_bytes(self) -> bytes:
"""将控制包序列化为字节流"""
# 构建帧头到校验和之前的数据
data = struct.pack('<I', self.frame_head)
data += struct.pack('<B', self.device_number)
data += self.reserved
data += self.control_data.to_bytes()
# 计算校验和
self.frame_checksum = self.calculate_checksum(data)
# 添加校验和和帧尾
data += struct.pack('<I', self.frame_checksum)
data += struct.pack('<I', self.frame_tail)
return data
@classmethod
def from_bytes(cls, data: bytes) -> 'ControlPacket':
"""从字节流反序列化控制包"""
if len(data) < 208: # 4+1+3+192+4+4
raise ValueError(f"Control packet must be at least 208 bytes, got {len(data)}")
frame_head = struct.unpack('<I', data[0:4])[0]
if frame_head != FRAME_HEAD_SYMBOL:
raise ValueError(f"Invalid frame head: 0x{frame_head:08X}")
device_number = struct.unpack('<B', data[4:5])[0]
reserved = data[5:8]
control_data = ControlData.from_bytes(data[8:200])
frame_checksum = struct.unpack('<I', data[200:204])[0]
frame_tail = struct.unpack('<I', data[204:208])[0]
if frame_tail != FRAME_TAIL_SYMBOL:
raise ValueError(f"Invalid frame tail: 0x{frame_tail:08X}")
# 验证校验和
packet = cls(
frame_head=frame_head,
device_number=device_number,
reserved=reserved,
control_data=control_data,
frame_checksum=frame_checksum,
frame_tail=frame_tail
)
calculated_checksum = packet.calculate_checksum(data[0:200])
if calculated_checksum != frame_checksum:
raise ValueError(f"Checksum mismatch: expected 0x{frame_checksum:08X}, got 0x{calculated_checksum:08X}")
return packet
@dataclass
class ErrorPacket:
"""表8错误信息包数据格式"""
status_packet_type: int = 0 # Uint32
execution_status: int = 0 # Uint32
def to_bytes(self) -> bytes:
"""序列化为字节流"""
return struct.pack('<II', self.status_packet_type, self.execution_status)
@classmethod
def from_bytes(cls, data: bytes) -> 'ErrorPacket':
"""从字节流反序列化"""
if len(data) < 8:
raise ValueError(f"Error packet must be at least 8 bytes, got {len(data)}")
status_packet_type, execution_status = struct.unpack('<II', data[0:8])
return cls(status_packet_type=status_packet_type, execution_status=execution_status)
@dataclass
class DeviceStatus:
"""表5设备状态包数据格式64字节"""
cpu_temp: int = 0 # Int8
ant_temp: int = 0 # Int8
rf_temp: int = 0 # Int8
nvme_temp: int = 0 # Int8
fpga_temp: int = 0 # Int8
sys_voltage: int = 0 # Uint8
clock_lock: int = 0 # Uint8
imu_state: int = 0 # Uint8
mem_volume: int = 0 # Uint16
disk_volume: int = 0 # Uint16
north_velocity: float = 0.0 # Float
sky_velocity: float = 0.0 # Float
east_velocity: float = 0.0 # Float
altitude: float = 0.0 # Float
longitude: int = 0 # Int32
latitude: int = 0 # Int32
angle_roll: float = 0.0 # Float
angle_yaw: float = 0.0 # Float
angle_pitch: float = 0.0 # Float
year_month_day: int = 0 # Uint32
hour_min_sec: int = 0 # Uint32
antenna_azimuth: int = 0 # Int16
antenna_pitch: int = 0 # Int16
is_boot: int = 0 # Uint8
satellite_num: int = 0 # Uint8
pos_flag: int = 0 # Uint8
orient_flag: int = 0 # Uint8
def to_bytes(self) -> bytes:
"""序列化为字节流"""
data = struct.pack('<bbbbbBBBHHffffiifffIIhhBBBB',
self.cpu_temp,
self.ant_temp,
self.rf_temp,
self.nvme_temp,
self.fpga_temp,
self.sys_voltage,
self.clock_lock,
self.imu_state,
self.mem_volume,
self.disk_volume,
self.north_velocity,
self.sky_velocity,
self.east_velocity,
self.altitude,
self.longitude,
self.latitude,
self.angle_roll,
self.angle_yaw,
self.angle_pitch,
self.year_month_day,
self.hour_min_sec,
self.antenna_azimuth,
self.antenna_pitch,
self.is_boot,
self.satellite_num,
self.pos_flag,
self.orient_flag)
return data[:64] # 确保正好64字节
@classmethod
def from_bytes(cls, data: bytes) -> 'DeviceStatus':
"""从字节流反序列化"""
if len(data) < 64:
raise ValueError(f"Device status must be 64 bytes, got {len(data)}")
values = struct.unpack('<bbbbbBBBHHffffiifffIIhhBBBB', data[0:64])
return cls(
cpu_temp=values[0],
ant_temp=values[1],
rf_temp=values[2],
nvme_temp=values[3],
fpga_temp=values[4],
sys_voltage=values[5],
clock_lock=values[6],
imu_state=values[7],
mem_volume=values[8],
disk_volume=values[9],
north_velocity=values[10],
sky_velocity=values[11],
east_velocity=values[12],
altitude=values[13],
longitude=values[14],
latitude=values[15],
angle_roll=values[16],
angle_yaw=values[17],
angle_pitch=values[18],
year_month_day=values[19],
hour_min_sec=values[20],
antenna_azimuth=values[21],
antenna_pitch=values[22],
is_boot=values[23],
satellite_num=values[24],
pos_flag=values[25],
orient_flag=values[26]
)
@dataclass
class ResponsePacket:
"""表9接收总包数据格式响应包"""
frame_head: int = FRAME_HEAD_SYMBOL # Uint32
status_packet_type: list = None # Uint32[2]
device_number: int = 1 # Uint8
reserved: bytes = b'\x00\x00\x00' # Uint8*3
system_version: int = 0x00040001 # Uint32
error_packet: ErrorPacket = None # 表8
device_status: Optional[DeviceStatus] = None # 表5可选
frame_checksum: int = 0 # Uint32
frame_tail: int = FRAME_TAIL_SYMBOL # Uint32
def __post_init__(self):
if self.status_packet_type is None:
self.status_packet_type = [0, 0]
if self.error_packet is None:
self.error_packet = ErrorPacket()
if self.device_status is None:
self.device_status = DeviceStatus()
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和无符号int32累加"""
checksum = 0
for i in range(0, len(data), 4):
if i + 4 <= len(data):
val = struct.unpack('<I', data[i:i+4])[0]
checksum = (checksum + val) & 0xFFFFFFFF
return checksum
def to_bytes(self) -> bytes:
"""将响应包序列化为字节流"""
data = struct.pack('<I', self.frame_head)
data += struct.pack('<II', self.status_packet_type[0], self.status_packet_type[1])
data += struct.pack('<B', self.device_number)
data += self.reserved
data += struct.pack('<I', self.system_version)
if self.status_packet_type[0] == 1:
data += self.error_packet.to_bytes()
if self.status_packet_type[1] == 1:
data += self.device_status.to_bytes()
# 计算校验和
self.frame_checksum = self.calculate_checksum(data)
# 添加校验和和帧尾
data += struct.pack('<I', self.frame_checksum)
data += struct.pack('<I', self.frame_tail)
return data
@classmethod
def from_bytes(cls, data: bytes) -> 'ResponsePacket':
"""从字节流反序列化响应包"""
if len(data) < 88: # 4+8+1+3+4+8+64+4+4
raise ValueError(f"Response packet must be at least 88 bytes, got {len(data)}")
frame_head = struct.unpack('<I', data[0:4])[0]
if frame_head != FRAME_HEAD_SYMBOL:
raise ValueError(f"Invalid frame head: 0x{frame_head:08X}")
status_type_0, status_type_1 = struct.unpack('<II', data[4:12])
device_number = struct.unpack('<B', data[12:13])[0]
reserved = data[13:16]
system_version = struct.unpack('<I', data[16:20])[0]
error_packet = ErrorPacket.from_bytes(data[20:28])
device_status = DeviceStatus.from_bytes(data[28:92])
frame_checksum = struct.unpack('<I', data[92:96])[0]
frame_tail = struct.unpack('<I', data[96:100])[0]
if frame_tail != FRAME_TAIL_SYMBOL:
raise ValueError(f"Invalid frame tail: 0x{frame_tail:08X}")
return cls(
frame_head=frame_head,
status_packet_type=[status_type_0, status_type_1],
device_number=device_number,
reserved=reserved,
system_version=system_version,
error_packet=error_packet,
device_status=device_status,
frame_checksum=frame_checksum,
frame_tail=frame_tail
)