119 lines
4.7 KiB
Python
119 lines
4.7 KiB
Python
|
|
import traceback
|
||
|
|
from flask import Flask,jsonify, request
|
||
|
|
from shapely.geometry import Polygon, LineString, Point
|
||
|
|
|
||
|
|
from planning.uav import Uav, PayloadType
|
||
|
|
from planning import UavPlanning
|
||
|
|
from planning.target import (PointTarget, LineTarget, OpticsRegionTarget, OpticsFrontRegionTarget, SarRegionTarget, LaserRegionTarget, TargetType)
|
||
|
|
from planning.app import sar_planning
|
||
|
|
|
||
|
|
app = Flask(__name__)
|
||
|
|
|
||
|
|
@app.route('/ktkx/UavPlanning', methods=['POST'])
|
||
|
|
def cal_uav_planning():
|
||
|
|
try:
|
||
|
|
inputs = request.get_json()
|
||
|
|
|
||
|
|
# 创建无人机
|
||
|
|
uav_dict = {}
|
||
|
|
for uav_param in inputs['uavs']:
|
||
|
|
uav = Uav(
|
||
|
|
id = uav_param["id"],
|
||
|
|
start_coord = Point(uav_param["start_coord"]),
|
||
|
|
end_coord = Point(uav_param["end_coord"]),
|
||
|
|
payload = uav_param["payload"],
|
||
|
|
constraint = uav_param["constraint"]
|
||
|
|
)
|
||
|
|
payload_type = uav_param["payload"]["type"]
|
||
|
|
if payload_type in uav_dict:
|
||
|
|
uav_dict[payload_type].append(uav)
|
||
|
|
else:
|
||
|
|
uav_dict[payload_type] = [uav]
|
||
|
|
|
||
|
|
# 创建目标
|
||
|
|
target_dict = {}
|
||
|
|
target_result_list = []
|
||
|
|
for target_param in inputs['targets']:
|
||
|
|
target_type = target_param["type"]
|
||
|
|
payload_type = target_param["payload"]
|
||
|
|
if target_type == TargetType.point:
|
||
|
|
target = PointTarget(
|
||
|
|
id = target_param["id"],
|
||
|
|
shape = Point(target_param["coords"])
|
||
|
|
)
|
||
|
|
elif target_type == TargetType.line:
|
||
|
|
target = LineTarget(
|
||
|
|
id = target_param["id"],
|
||
|
|
shape = LineString(target_param["coords"])
|
||
|
|
)
|
||
|
|
elif target_type == TargetType.region:
|
||
|
|
if payload_type == PayloadType.optics_down:
|
||
|
|
target = OpticsRegionTarget(
|
||
|
|
id = target_param["id"],
|
||
|
|
shape = Polygon(target_param["coords"])
|
||
|
|
)
|
||
|
|
elif payload_type == PayloadType.sar:
|
||
|
|
target = SarRegionTarget(
|
||
|
|
id = target_param["id"],
|
||
|
|
shape = Polygon(target_param["coords"])
|
||
|
|
)
|
||
|
|
elif payload_type == PayloadType.laser:
|
||
|
|
target = LaserRegionTarget(
|
||
|
|
id = target_param["id"],
|
||
|
|
shape = Polygon(target_param["coords"])
|
||
|
|
)
|
||
|
|
target_result_list.append({
|
||
|
|
"id": target.id,
|
||
|
|
"angle": (90 - target.angle) % 360,
|
||
|
|
"slots": str(target.slots)
|
||
|
|
})
|
||
|
|
elif payload_type == PayloadType.optics_front:
|
||
|
|
target = OpticsFrontRegionTarget(
|
||
|
|
id = target_param["id"],
|
||
|
|
shape = Polygon(target_param["coords"]),
|
||
|
|
height = target_param["height"]
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
raise TypeError(f"{payload_type} is not a valid payload type.")
|
||
|
|
else:
|
||
|
|
raise TypeError(f"{target_type} is not a valid target type.")
|
||
|
|
if payload_type in target_dict:
|
||
|
|
target_dict[payload_type].append(target)
|
||
|
|
else:
|
||
|
|
target_dict[payload_type] = [target]
|
||
|
|
|
||
|
|
# 任务规划
|
||
|
|
optimize = inputs["optimize"]
|
||
|
|
result_dict = {}
|
||
|
|
for payload_type, target_list in target_dict.items():
|
||
|
|
uav_list = uav_dict[payload_type]
|
||
|
|
planning = UavPlanning(target_list, uav_list, payload_type)
|
||
|
|
planning.solve()
|
||
|
|
if optimize:
|
||
|
|
planning.optimize()
|
||
|
|
result, _ = planning.solve_route()
|
||
|
|
for uav_id in result:
|
||
|
|
result[uav_id] = result[uav_id][1: -1]
|
||
|
|
result_dict.update(result)
|
||
|
|
|
||
|
|
return jsonify({'message':'success','data':result_dict, 'target': target_result_list}), 200
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
traceback.print_exc()
|
||
|
|
return jsonify({'message':f"{type(e).__name__}:{str(e)}",'data':dict()}), 500
|
||
|
|
|
||
|
|
@app.route('/ktkx/UavPlanning/SAR', methods=['POST'])
|
||
|
|
def cal_sar_planning():
|
||
|
|
try:
|
||
|
|
inputs = request.get_json()
|
||
|
|
|
||
|
|
result = sar_planningsar_planning(inputs)
|
||
|
|
|
||
|
|
return jsonify({'message':'success','data':result, 'code': 200}), 200
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
traceback.print_exc()
|
||
|
|
return jsonify({'message':f"{type(e).__name__}:{str(e)}",'data':dict(), 'code': 500}), 500
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
app.run(host='0.0.0.0', port=18090)
|