skyeyesystem/MiniSAR-Simulator-Py-main/main.py

280 lines
10 KiB
Python
Raw Normal View History

"""
UDP交互程序主程序
集成模块1模块2模块3实现完整的UDP通信系统
"""
import argparse
import logging
import time
import threading
from typing import Dict, Tuple, Optional
from module1_receiver import UDPReceiver
from module2_sender import ResponseSender, StatusPacketSender
from module3_data_sender import DataSender
from data_structures import (
ControlPacket, WorkMode, WorkInstruction,
ExecutionStatus, DeviceStatus, CONTROL_RX_PORT,
CONTROL_TX_PORT, DATA_TX_PORT
)
class RadarControlSystem:
"""雷达控制系统"""
def __init__(self, local_ip: str = '0.0.0.0'):
"""初始化系统"""
self.local_ip = local_ip
self.logger = logging.getLogger('RadarControlSystem')
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# 模块
self.receiver = None
self.response_sender = None
self.data_sender = None
self.status_senders: Dict[Tuple[str, int], StatusPacketSender] = {}
# 状态
self.connected_clients = {} # {remote_addr: connection_time}
self.device_status = DeviceStatus()
self.running = False
def start(self):
"""启动系统"""
self.logger.info("Starting Radar Control System...")
try:
# 启动接收模块
self.receiver = UDPReceiver(local_ip=self.local_ip, port=CONTROL_RX_PORT, callback=self._on_control_packet)
self.receiver.start()
# 创建发送模块(暂不启动)
self.response_sender = ResponseSender(local_ip=self.local_ip)
self.data_sender = DataSender(local_ip=self.local_ip)
self.running = True
self.logger.info("Radar Control System started successfully")
except Exception as e:
self.logger.error(f"Failed to start system: {e}")
self.stop()
raise
def stop(self):
"""停止系统"""
self.logger.info("Stopping Radar Control System...")
self.running = False
# 停止所有状态包发送器
for sender in self.status_senders.values():
sender.stop()
self.status_senders.clear()
# 停止接收模块
if self.receiver:
self.receiver.stop()
# 关闭发送模块
if self.response_sender:
self.response_sender.close()
if self.data_sender:
self.data_sender.close()
self.logger.info("Radar Control System stopped")
def _on_control_packet(self, packet: ControlPacket, remote_addr: Tuple[str, int]):
"""处理接收到的控制包"""
self.logger.info(f"Processing control packet from {remote_addr}")
try:
control_data = packet.control_data
work_mode = control_data.work_mode
work_instruction = control_data.work_instruction
# 根据指令类型处理
if work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.CONNECT:
self._handle_connect(remote_addr)
elif work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.DISCONNECT:
self._handle_disconnect(remote_addr)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.UPLOAD_ROUTE:
self._handle_upload_route(remote_addr, control_data)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.START_CALC:
self._handle_start_calc(remote_addr, control_data)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.MANUAL_BOOT:
self._handle_manual_boot(remote_addr)
elif work_mode == WorkMode.AUTO and work_instruction == WorkInstruction.END_TASK:
self._handle_end_task(remote_addr)
elif work_mode == WorkMode.NORMAL and work_instruction == WorkInstruction.END_TASK:
self._handle_end_all_tasks(remote_addr)
else:
self.logger.warning(f"Unknown instruction: mode={work_mode}, instruction={work_instruction}")
self._send_response(remote_addr, ExecutionStatus.ERROR)
except Exception as e:
self.logger.error(f"Error processing control packet: {e}")
self._send_response(remote_addr, ExecutionStatus.ERROR)
def _handle_connect(self, remote_addr: Tuple[str, int]):
"""处理连接指令"""
self.logger.info(f"Handling CONNECT from {remote_addr}")
# 目标地址保持IP不变端口改为CONTROL_TX_PORT
target_addr = (remote_addr[0], CONTROL_TX_PORT)
# 记录连接
self.connected_clients[target_addr] = time.time()
# 发送响应
self._send_response(target_addr, ExecutionStatus.SUCCESS)
# 启动状态包发送器
if target_addr not in self.status_senders:
sender = StatusPacketSender(target_addr, interval=1.0, local_ip=self.local_ip)
sender.start()
self.status_senders[target_addr] = sender
self.logger.info(f"Status packet sender started for {target_addr}")
def _handle_disconnect(self, remote_addr: Tuple[str, int]):
"""处理断开连接指令"""
self.logger.info(f"Handling DISCONNECT from {remote_addr}")
# 移除连接记录
if remote_addr in self.connected_clients:
del self.connected_clients[remote_addr]
# 停止状态包发送器
if remote_addr in self.status_senders:
self.status_senders[remote_addr].stop()
del self.status_senders[remote_addr]
self.logger.info(f"Status packet sender stopped for {remote_addr}")
# 发送响应
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_upload_route(self, remote_addr: Tuple[str, int], control_data):
"""处理上传航线指令"""
self.logger.info(f"Handling UPLOAD_ROUTE from {remote_addr}")
self.logger.info(f" Route Number: {control_data.route_number}")
self.logger.info(f" Route Count: {control_data.route_count}")
self.logger.info(f" Boot Point: ({control_data.boot_longitude}, {control_data.boot_latitude})")
self.logger.info(f" Shutdown Point: ({control_data.shutdown_longitude}, {control_data.shutdown_latitude})")
# 这里可以添加航线验证逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_start_calc(self, remote_addr: Tuple[str, int], control_data):
"""处理开始计算指令"""
self.logger.info(f"Handling START_CALC from {remote_addr}")
self.logger.info(f" Imaging Mode: {control_data.imaging_mode}")
self.logger.info(f" Route Count: {control_data.route_count}")
# 这里可以添加计算启动逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_manual_boot(self, remote_addr: Tuple[str, int]):
"""处理手动开机指令"""
self.logger.info(f"Handling MANUAL_BOOT from {remote_addr}")
# 更新设备状态
self.device_status.is_boot = 1
# 这里可以添加开机逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_end_task(self, remote_addr: Tuple[str, int]):
"""处理结束当前任务指令"""
self.logger.info(f"Handling END_TASK from {remote_addr}")
# 更新设备状态
self.device_status.is_boot = 0
# 这里可以添加关机逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _handle_end_all_tasks(self, remote_addr: Tuple[str, int]):
"""处理结束所有任务指令"""
self.logger.info(f"Handling END_ALL_TASKS from {remote_addr}")
# 更新设备状态
self.device_status.is_boot = 0
# 这里可以添加停止所有任务的逻辑
self._send_response(remote_addr, ExecutionStatus.SUCCESS)
def _send_response(self, remote_addr: Tuple[str, int], status: int):
"""发送响应包"""
if self.response_sender:
self.response_sender.send_response(remote_addr, status, self.device_status)
def send_echo_data(self, remote_addr: Tuple[str, int], echo_data: bytes, sequence: int = 0):
"""发送回波数据"""
if self.data_sender:
self.data_sender.send_echo_data(remote_addr, echo_data, sequence)
def send_image_data(self, remote_addr: Tuple[str, int], image_data: bytes, imaging_params=None):
"""发送图像数据"""
if self.data_sender:
self.data_sender.send_image_data(remote_addr, image_data, imaging_params)
def get_status(self) -> dict:
"""获取系统状态"""
return {
'running': self.running,
'connected_clients': len(self.connected_clients),
'clients': list(self.connected_clients.keys()),
'device_status': {
'cpu_temp': self.device_status.cpu_temp,
'is_boot': self.device_status.is_boot,
'satellite_num': self.device_status.satellite_num,
}
}
def main():
"""主函数"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='Radar Control System')
parser.add_argument('--ip', type=str, default='0.0.0.0', help='Local IP address to bind to (default: 0.0.0.0)')
args = parser.parse_args()
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 创建并启动系统
system = RadarControlSystem(local_ip=args.ip)
system.start()
try:
# 保持运行
while True:
time.sleep(5)
# 定期输出系统状态
status = system.get_status()
logging.info(f"System Status: {status}")
except KeyboardInterrupt:
logging.info("Received interrupt signal")
finally:
system.stop()
if __name__ == '__main__':
main()