skyeyesystem/MiniSAR-Simulator-Py-main/data_structures.py

501 lines
19 KiB
Python
Raw Normal View History

"""
UDP交互程序数据结构定义
根据文档表格定义所有数据结构
"""
import struct
from enum import IntEnum
from dataclasses import dataclass
from typing import Optional
# ==================== 常量定义 ====================
# 帧头尾标志
FRAME_HEAD_SYMBOL = 0x7EFFDC01
FRAME_TAIL_SYMBOL = 0x7EFFDC02
# 端口定义
CONTROL_RX_PORT = 38101 # 模块1接收控制指令
CONTROL_TX_PORT = 37001 # 模块2发送控制响应
DATA_TX_PORT = 37004 # 模块3发送数据包
# 工作模式dWorkMode
class WorkMode(IntEnum):
NORMAL = 0 # 正常模式
AUTO = 4 # 自动模式
# 工作指令dWorkInstruction
class WorkInstruction(IntEnum):
UPLOAD_ROUTE = 0 # 上传航线
START_CALC = 1 # 开始计算
MANUAL_BOOT = 2 # 手动开机
END_TASK = 3 # 结束当前任务
CONNECT = 4 # 连接
DISCONNECT = 5 # 断开连接
# 成像模式
class ImagingMode(IntEnum):
STRIP = 0 # 条带
FOCUS = 1 # 聚束
# 成像模式分辨率
class ImagingResolution(IntEnum):
RES_0_1M = 0 # 0.1m
RES_0_2M = 1 # 0.2m
RES_0_3M = 2 # 0.3m
RES_0_5M = 3 # 0.5m
RES_1M = 4 # 1m
# 状态包类型
class StatusPacketType(IntEnum):
CONTROL_RESULT = 0 # 控制命令执行结果
IMAGING_READY = 1 # 成像准备完成
DEVICE_FAULT = 2 # 单机工作故障
# 执行状态
class ExecutionStatus(IntEnum):
SUCCESS = 0 # 成功
ERROR = 1 # 错误
# ==================== 数据结构定义 ====================
@dataclass
class ControlData:
"""表3雷达控制数据格式192字节"""
work_mode: int = 0 # Uint8
work_instruction: int = 0 # Uint8
imaging_mode: int = 0 # Uint8
imaging_resolution: int = 0 # Uint8
reserved1: bytes = b'\x00\x00\x00\x00' # Uint8*4
route_number: int = 0 # Uint16
route_count: int = 0 # Uint16
# 航线开机点
boot_longitude: int = 0 # Uint32 (1e-7量化)
boot_latitude: int = 0 # Uint32 (1e-7量化)
boot_altitude: int = 0 # Int16
# 地面开机点
ground_boot_longitude: int = 0 # Uint32
ground_boot_latitude: int = 0 # Uint32
ground_boot_altitude: int = 0 # Int16
# 航线关机点
shutdown_longitude: int = 0 # Uint32
shutdown_latitude: int = 0 # Uint32
shutdown_altitude: int = 0 # Int16
# 成像区域中心
imaging_center_longitude: int = 0 # Uint32
imaging_center_latitude: int = 0 # Uint32
imaging_center_altitude: int = 0 # Int16
# 成像区域参数
imaging_width: int = 0 # Uint16
imaging_length: int = 0 # Uint16
imaging_angle: int = 0 # Uint16
# 飞行参数
flight_speed: float = 0.0 # double64
flight_altitude: float = 0.0 # double64
grazing_angle: float = 0.0 # double64
slant_angle: float = 0.0 # double64
# 其他参数
route_type: int = 0 # Uint8
side_direction: int = 0 # Int8
polarization: int = 0 # Uint8
auto_focus: int = 0 # Uint8
motion_compensation: int = 0 # Uint8
image_format: int = 0 # Uint8
echo_disable: int = 0 # Uint8
installation_angle: float = 35.0 # Float
image_bit: int = 0 # Uint8
prf: float = 0.0 # Float
reserved2: bytes = b'\x00' * 86 # 86字节补齐192B
def to_bytes(self) -> bytes:
"""将控制数据序列化为字节流(小端模式)"""
data = struct.pack('<BBBB',
self.work_mode,
self.work_instruction,
self.imaging_mode,
self.imaging_resolution)
data += self.reserved1
data += struct.pack('<HHIIhIIhIIhIIhHHHddddBBBBBBBfBf',
self.route_number,
self.route_count,
self.boot_longitude,
self.boot_latitude,
self.boot_altitude,
self.ground_boot_longitude,
self.ground_boot_latitude,
self.ground_boot_altitude,
self.shutdown_longitude,
self.shutdown_latitude,
self.shutdown_altitude,
self.imaging_center_longitude,
self.imaging_center_latitude,
self.imaging_center_altitude,
self.imaging_width,
self.imaging_length,
self.imaging_angle,
self.flight_speed,
self.flight_altitude,
self.grazing_angle,
self.slant_angle,
self.route_type,
self.side_direction,
self.polarization,
self.auto_focus,
self.motion_compensation,
self.image_format,
self.echo_disable,
self.installation_angle,
self.image_bit,
self.prf)
data += self.reserved2
return data[:192] # 确保正好192字节
@classmethod
def from_bytes(cls, data: bytes) -> 'ControlData':
"""从字节流反序列化控制数据"""
if len(data) < 192:
raise ValueError(f"Control data must be 192 bytes, got {len(data)}")
# 解析前4个字节
work_mode, work_instruction, imaging_mode, imaging_resolution = struct.unpack('<BBBB', data[0:4])
reserved1 = data[4:8]
# 解析剩余数据
values = struct.unpack('<HHIIhIIhIIhIIhHHHddddBBBBBBBfBf', data[8:106])
return cls(
work_mode=work_mode,
work_instruction=work_instruction,
imaging_mode=imaging_mode,
imaging_resolution=imaging_resolution,
reserved1=reserved1,
route_number=values[0],
route_count=values[1],
boot_longitude=values[2],
boot_latitude=values[3],
boot_altitude=values[4],
ground_boot_longitude=values[5],
ground_boot_latitude=values[6],
ground_boot_altitude=values[7],
shutdown_longitude=values[8],
shutdown_latitude=values[9],
shutdown_altitude=values[10],
imaging_center_longitude=values[11],
imaging_center_latitude=values[12],
imaging_center_altitude=values[13],
imaging_width=values[14],
imaging_length=values[15],
imaging_angle=values[16],
flight_speed=values[17],
flight_altitude=values[18],
grazing_angle=values[19],
slant_angle=values[20],
route_type=values[21],
side_direction=values[22],
polarization=values[23],
auto_focus=values[24],
motion_compensation=values[25],
image_format=values[26],
echo_disable=values[27],
installation_angle=values[28],
image_bit=values[29],
prf=values[30]
)
@dataclass
class ControlPacket:
"""表4数传通路控制包格式"""
frame_head: int = FRAME_HEAD_SYMBOL # Uint32
device_number: int = 1 # Uint8
reserved: bytes = b'\x00\x00\x00' # Uint8*3
control_data: ControlData = None # 192字节
frame_checksum: int = 0 # Uint32
frame_tail: int = FRAME_TAIL_SYMBOL # Uint32
def __post_init__(self):
if self.control_data is None:
self.control_data = ControlData()
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和无符号int32累加"""
checksum = 0
for i in range(0, len(data), 4):
if i + 4 <= len(data):
val = struct.unpack('<I', data[i:i+4])[0]
checksum = (checksum + val) & 0xFFFFFFFF
return checksum
def to_bytes(self) -> bytes:
"""将控制包序列化为字节流"""
# 构建帧头到校验和之前的数据
data = struct.pack('<I', self.frame_head)
data += struct.pack('<B', self.device_number)
data += self.reserved
data += self.control_data.to_bytes()
# 计算校验和
self.frame_checksum = self.calculate_checksum(data)
# 添加校验和和帧尾
data += struct.pack('<I', self.frame_checksum)
data += struct.pack('<I', self.frame_tail)
return data
@classmethod
def from_bytes(cls, data: bytes) -> 'ControlPacket':
"""从字节流反序列化控制包"""
if len(data) < 208: # 4+1+3+192+4+4
raise ValueError(f"Control packet must be at least 208 bytes, got {len(data)}")
frame_head = struct.unpack('<I', data[0:4])[0]
if frame_head != FRAME_HEAD_SYMBOL:
raise ValueError(f"Invalid frame head: 0x{frame_head:08X}")
device_number = struct.unpack('<B', data[4:5])[0]
reserved = data[5:8]
control_data = ControlData.from_bytes(data[8:200])
frame_checksum = struct.unpack('<I', data[200:204])[0]
frame_tail = struct.unpack('<I', data[204:208])[0]
if frame_tail != FRAME_TAIL_SYMBOL:
raise ValueError(f"Invalid frame tail: 0x{frame_tail:08X}")
# 验证校验和
packet = cls(
frame_head=frame_head,
device_number=device_number,
reserved=reserved,
control_data=control_data,
frame_checksum=frame_checksum,
frame_tail=frame_tail
)
calculated_checksum = packet.calculate_checksum(data[0:200])
if calculated_checksum != frame_checksum:
raise ValueError(f"Checksum mismatch: expected 0x{frame_checksum:08X}, got 0x{calculated_checksum:08X}")
return packet
@dataclass
class ErrorPacket:
"""表8错误信息包数据格式"""
status_packet_type: int = 0 # Uint32
execution_status: int = 0 # Uint32
def to_bytes(self) -> bytes:
"""序列化为字节流"""
return struct.pack('<II', self.status_packet_type, self.execution_status)
@classmethod
def from_bytes(cls, data: bytes) -> 'ErrorPacket':
"""从字节流反序列化"""
if len(data) < 8:
raise ValueError(f"Error packet must be at least 8 bytes, got {len(data)}")
status_packet_type, execution_status = struct.unpack('<II', data[0:8])
return cls(status_packet_type=status_packet_type, execution_status=execution_status)
@dataclass
class DeviceStatus:
"""表5设备状态包数据格式64字节"""
cpu_temp: int = 0 # Int8
ant_temp: int = 0 # Int8
rf_temp: int = 0 # Int8
nvme_temp: int = 0 # Int8
fpga_temp: int = 0 # Int8
sys_voltage: int = 0 # Uint8
clock_lock: int = 0 # Uint8
imu_state: int = 0 # Uint8
mem_volume: int = 0 # Uint16
disk_volume: int = 0 # Uint16
north_velocity: float = 0.0 # Float
sky_velocity: float = 0.0 # Float
east_velocity: float = 0.0 # Float
altitude: float = 0.0 # Float
longitude: int = 0 # Int32
latitude: int = 0 # Int32
angle_roll: float = 0.0 # Float
angle_yaw: float = 0.0 # Float
angle_pitch: float = 0.0 # Float
year_month_day: int = 0 # Uint32
hour_min_sec: int = 0 # Uint32
antenna_azimuth: int = 0 # Int16
antenna_pitch: int = 0 # Int16
is_boot: int = 0 # Uint8
satellite_num: int = 0 # Uint8
pos_flag: int = 0 # Uint8
orient_flag: int = 0 # Uint8
def to_bytes(self) -> bytes:
"""序列化为字节流"""
data = struct.pack('<bbbbbBBBHHffffiifffIIhhBBBB',
self.cpu_temp,
self.ant_temp,
self.rf_temp,
self.nvme_temp,
self.fpga_temp,
self.sys_voltage,
self.clock_lock,
self.imu_state,
self.mem_volume,
self.disk_volume,
self.north_velocity,
self.sky_velocity,
self.east_velocity,
self.altitude,
self.longitude,
self.latitude,
self.angle_roll,
self.angle_yaw,
self.angle_pitch,
self.year_month_day,
self.hour_min_sec,
self.antenna_azimuth,
self.antenna_pitch,
self.is_boot,
self.satellite_num,
self.pos_flag,
self.orient_flag)
return data[:64] # 确保正好64字节
@classmethod
def from_bytes(cls, data: bytes) -> 'DeviceStatus':
"""从字节流反序列化"""
if len(data) < 64:
raise ValueError(f"Device status must be 64 bytes, got {len(data)}")
values = struct.unpack('<bbbbbBBBHHffffiifffIIhhBBBB', data[0:64])
return cls(
cpu_temp=values[0],
ant_temp=values[1],
rf_temp=values[2],
nvme_temp=values[3],
fpga_temp=values[4],
sys_voltage=values[5],
clock_lock=values[6],
imu_state=values[7],
mem_volume=values[8],
disk_volume=values[9],
north_velocity=values[10],
sky_velocity=values[11],
east_velocity=values[12],
altitude=values[13],
longitude=values[14],
latitude=values[15],
angle_roll=values[16],
angle_yaw=values[17],
angle_pitch=values[18],
year_month_day=values[19],
hour_min_sec=values[20],
antenna_azimuth=values[21],
antenna_pitch=values[22],
is_boot=values[23],
satellite_num=values[24],
pos_flag=values[25],
orient_flag=values[26]
)
@dataclass
class ResponsePacket:
"""表9接收总包数据格式响应包"""
frame_head: int = FRAME_HEAD_SYMBOL # Uint32
status_packet_type: list = None # Uint32[2]
device_number: int = 1 # Uint8
reserved: bytes = b'\x00\x00\x00' # Uint8*3
system_version: int = 0x00040001 # Uint32
error_packet: ErrorPacket = None # 表8
device_status: Optional[DeviceStatus] = None # 表5可选
frame_checksum: int = 0 # Uint32
frame_tail: int = FRAME_TAIL_SYMBOL # Uint32
def __post_init__(self):
if self.status_packet_type is None:
self.status_packet_type = [0, 0]
if self.error_packet is None:
self.error_packet = ErrorPacket()
if self.device_status is None:
self.device_status = DeviceStatus()
def calculate_checksum(self, data: bytes) -> int:
"""计算校验和无符号int32累加"""
checksum = 0
for i in range(0, len(data), 4):
if i + 4 <= len(data):
val = struct.unpack('<I', data[i:i+4])[0]
checksum = (checksum + val) & 0xFFFFFFFF
return checksum
def to_bytes(self) -> bytes:
"""将响应包序列化为字节流"""
data = struct.pack('<I', self.frame_head)
data += struct.pack('<II', self.status_packet_type[0], self.status_packet_type[1])
data += struct.pack('<B', self.device_number)
data += self.reserved
data += struct.pack('<I', self.system_version)
if self.status_packet_type[0] == 1:
data += self.error_packet.to_bytes()
if self.status_packet_type[1] == 1:
data += self.device_status.to_bytes()
# 计算校验和
self.frame_checksum = self.calculate_checksum(data)
# 添加校验和和帧尾
data += struct.pack('<I', self.frame_checksum)
data += struct.pack('<I', self.frame_tail)
return data
@classmethod
def from_bytes(cls, data: bytes) -> 'ResponsePacket':
"""从字节流反序列化响应包"""
if len(data) < 88: # 4+8+1+3+4+8+64+4+4
raise ValueError(f"Response packet must be at least 88 bytes, got {len(data)}")
frame_head = struct.unpack('<I', data[0:4])[0]
if frame_head != FRAME_HEAD_SYMBOL:
raise ValueError(f"Invalid frame head: 0x{frame_head:08X}")
status_type_0, status_type_1 = struct.unpack('<II', data[4:12])
device_number = struct.unpack('<B', data[12:13])[0]
reserved = data[13:16]
system_version = struct.unpack('<I', data[16:20])[0]
error_packet = ErrorPacket.from_bytes(data[20:28])
device_status = DeviceStatus.from_bytes(data[28:92])
frame_checksum = struct.unpack('<I', data[92:96])[0]
frame_tail = struct.unpack('<I', data[96:100])[0]
if frame_tail != FRAME_TAIL_SYMBOL:
raise ValueError(f"Invalid frame tail: 0x{frame_tail:08X}")
return cls(
frame_head=frame_head,
status_packet_type=[status_type_0, status_type_1],
device_number=device_number,
reserved=reserved,
system_version=system_version,
error_packet=error_packet,
device_status=device_status,
frame_checksum=frame_checksum,
frame_tail=frame_tail
)