skyeyesystem/MiniSAR-Simulator-Py-main/main.py
2026-01-29 09:47:15 +08:00

280 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
UDP交互程序主程序
集成模块1、模块2、模块3实现完整的UDP通信系统
"""
import argparse
import logging
import time
import threading
from typing import Dict, Tuple, Optional
from module1_receiver import UDPReceiver
from module2_sender import ResponseSender, StatusPacketSender
from module3_data_sender import DataSender
from data_structures import (
ControlPacket, WorkMode, WorkInstruction,
ExecutionStatus, DeviceStatus, CONTROL_RX_PORT,
CONTROL_TX_PORT, DATA_TX_PORT
)
class RadarControlSystem:
"""雷达控制系统"""
def __init__(self, local_ip: str = '0.0.0.0'):
"""初始化系统"""
self.local_ip = local_ip
self.logger = logging.getLogger('RadarControlSystem')
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# 模块
self.receiver = None
self.response_sender = None
self.data_sender = None
self.status_senders: Dict[Tuple[str, int], StatusPacketSender] = {}
# 状态
self.connected_clients = {} # {remote_addr: connection_time}
self.device_status = DeviceStatus()
self.running = False
def start(self):
"""启动系统"""
self.logger.info("Starting Radar Control System...")
try:
# 启动接收模块
self.receiver = UDPReceiver(local_ip=self.local_ip, port=CONTROL_RX_PORT, callback=self._on_control_packet)
self.receiver.start()
# 创建发送模块(暂不启动)
self.response_sender = ResponseSender(local_ip=self.local_ip)
self.data_sender = DataSender(local_ip=self.local_ip)
self.running = True
self.logger.info("Radar Control System started successfully")
except Exception as e:
self.logger.error(f"Failed to start system: {e}")
self.stop()
raise
def stop(self):
"""停止系统"""
self.logger.info("Stopping Radar Control System...")
self.running = False
# 停止所有状态包发送器
for sender in self.status_senders.values():
sender.stop()
self.status_senders.clear()
# 停止接收模块
if self.receiver:
self.receiver.stop()
# 关闭发送模块
if self.response_sender:
self.response_sender.close()
if self.data_sender:
self.data_sender.close()
self.logger.info("Radar Control System stopped")
def _on_control_packet(self, packet: ControlPacket, remote_addr: Tuple[str, int]):
"""处理接收到的控制包"""
self.logger.info(f"Processing control packet from {remote_addr}")
try:
control_data = packet.control_data
work_mode = control_data.work_mode
work_instruction = control_data.work_instruction
# 根据指令类型处理
if work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.CONNECT:
self._handle_connect(remote_addr)
elif work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.DISCONNECT:
self._handle_disconnect(remote_addr)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.UPLOAD_ROUTE:
self._handle_upload_route(remote_addr, control_data)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.START_CALC:
self._handle_start_calc(remote_addr, control_data)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.MANUAL_BOOT:
self._handle_manual_boot(remote_addr)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.END_TASK:
self._handle_end_task(remote_addr)
elif work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.END_TASK:
self._handle_end_all_tasks(remote_addr)
else:
self.logger.warning(f"Unknown instruction: mode={work_mode}, instruction={work_instruction}")
self._send_response(remote_addr, ExecutionStatus.ERROR)
except Exception as e:
self.logger.error(f"Error processing control packet: {e}")
self._send_response(remote_addr, ExecutionStatus.ERROR)
def _handle_connect(self, remote_addr: Tuple[str, int]):
"""处理连接指令"""
self.logger.info(f"Handling CONNECT from {remote_addr}")
# 目标地址保持IP不变端口改为CONTROL_TX_PORT
target_addr = (remote_addr[0], CONTROL_TX_PORT)
# 记录连接
self.connected_clients[target_addr] = time.time()
# 发送响应
self._send_response(target_addr, ExecutionStatus.SUCCESS)
# 启动状态包发送器
if target_addr not in self.status_senders:
sender = StatusPacketSender(target_addr, interval=1.0, local_ip=self.local_ip)
sender.start()
self.status_senders[target_addr] = sender
self.logger.info(f"Status packet sender started for {target_addr}")
def _handle_disconnect(self, remote_addr: Tuple[str, int]):
"""处理断开连接指令"""
self.logger.info(f"Handling DISCONNECT from {remote_addr}")
# 移除连接记录
if remote_addr in self.connected_clients:
del self.connected_clients[remote_addr]
# 停止状态包发送器
if remote_addr in self.status_senders:
self.status_senders[remote_addr].stop()
del self.status_senders[remote_addr]
self.logger.info(f"Status packet sender stopped for {remote_addr}")
# 发送响应
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_upload_route(self, remote_addr: Tuple[str, int], control_data):
"""处理上传航线指令"""
self.logger.info(f"Handling UPLOAD_ROUTE from {remote_addr}")
self.logger.info(f" Route Number: {control_data.route_number}")
self.logger.info(f" Route Count: {control_data.route_count}")
self.logger.info(f" Boot Point: ({control_data.boot_longitude}, {control_data.boot_latitude})")
self.logger.info(f" Shutdown Point: ({control_data.shutdown_longitude}, {control_data.shutdown_latitude})")
# 这里可以添加航线验证逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_start_calc(self, remote_addr: Tuple[str, int], control_data):
"""处理开始计算指令"""
self.logger.info(f"Handling START_CALC from {remote_addr}")
self.logger.info(f" Imaging Mode: {control_data.imaging_mode}")
self.logger.info(f" Route Count: {control_data.route_count}")
# 这里可以添加计算启动逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_manual_boot(self, remote_addr: Tuple[str, int]):
"""处理手动开机指令"""
self.logger.info(f"Handling MANUAL_BOOT from {remote_addr}")
# 更新设备状态
self.device_status.is_boot = 1
# 这里可以添加开机逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_end_task(self, remote_addr: Tuple[str, int]):
"""处理结束当前任务指令"""
self.logger.info(f"Handling END_TASK from {remote_addr}")
# 更新设备状态
self.device_status.is_boot = 0
# 这里可以添加关机逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_end_all_tasks(self, remote_addr: Tuple[str, int]):
"""处理结束所有任务指令"""
self.logger.info(f"Handling END_ALL_TASKS from {remote_addr}")
# 更新设备状态
self.device_status.is_boot = 0
# 这里可以添加停止所有任务的逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _send_response(self, remote_addr: Tuple[str, int], status: int):
"""发送响应包"""
if self.response_sender:
self.response_sender.send_response(remote_addr, status, self.device_status)
def send_echo_data(self, remote_addr: Tuple[str, int], echo_data: bytes, sequence: int = 0):
"""发送回波数据"""
if self.data_sender:
self.data_sender.send_echo_data(remote_addr, echo_data, sequence)
def send_image_data(self, remote_addr: Tuple[str, int], image_data: bytes, imaging_params=None):
"""发送图像数据"""
if self.data_sender:
self.data_sender.send_image_data(remote_addr, image_data, imaging_params)
def get_status(self) -> dict:
"""获取系统状态"""
return {
'running': self.running,
'connected_clients': len(self.connected_clients),
'clients': list(self.connected_clients.keys()),
'device_status': {
'cpu_temp': self.device_status.cpu_temp,
'is_boot': self.device_status.is_boot,
'satellite_num': self.device_status.satellite_num,
}
}
def main():
"""主函数"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='Radar Control System')
parser.add_argument('--ip', type=str, default='0.0.0.0', help='Local IP address to bind to (default: 0.0.0.0)')
args = parser.parse_args()
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 创建并启动系统
system = RadarControlSystem(local_ip=args.ip)
system.start()
try:
# 保持运行
while True:
time.sleep(5)
# 定期输出系统状态
status = system.get_status()
logging.info(f"System Status: {status}")
except KeyboardInterrupt:
logging.info("Received interrupt signal")
finally:
system.stop()
if __name__ == '__main__':
main()