683 lines
30 KiB
Python
683 lines
30 KiB
Python
import geopandas as gpd
|
|
import numpy as np
|
|
import pandas as pd
|
|
from ortools.constraint_solver import routing_enums_pb2
|
|
from ortools.constraint_solver import pywrapcp
|
|
from scipy.spatial.distance import pdist, squareform
|
|
from scipy.spatial import distance_matrix as dm
|
|
|
|
from shapely.geometry import Point, LineString, Polygon, MultiLineString, MultiPolygon, MultiPoint
|
|
from shapely.ops import transform
|
|
from pyproj import Transformer
|
|
|
|
from .beidou import LocationCode
|
|
from .coords import WGS84_TO_UTM, UTM_TO_WGS84
|
|
from .config import CONTAINER_HEIGHT_MAX, UTM_EPSG
|
|
from .process import SarProcess
|
|
from .target import Target
|
|
from .uav import Uav, PayloadType
|
|
|
|
import logging
|
|
logger = logging.getLogger("Planning")
|
|
|
|
class UavPlanning:
|
|
|
|
target_dict: dict[Target]
|
|
uav_list: list[Uav]
|
|
payload_type: PayloadType
|
|
gdf: gpd.GeoDataFrame
|
|
solution_list: list
|
|
route_list: list
|
|
distance_list: list
|
|
|
|
_start_node_list = None
|
|
_end_node_list = None
|
|
_distance_matrix = None
|
|
_node_cost = None
|
|
_coordinate_array = None
|
|
_uav_nums = None
|
|
_grid_nums = None
|
|
_endurance_list = None
|
|
_init_distance = None
|
|
|
|
def __init__(
|
|
self,
|
|
target_list: list[Target],
|
|
uav_list: list[Uav],
|
|
payload_type: str
|
|
):
|
|
"""
|
|
初始化任务规划
|
|
|
|
Args:
|
|
target_list(list[Region|SarRegion|LineTarget|PointTarget]): 目标列表
|
|
uav_list(list[Uav]): 无人机列表
|
|
payload_type(str): 载荷
|
|
"""
|
|
|
|
self.payload_type = payload_type
|
|
logger.info("Starting count nodes.")
|
|
self.target_dict = {}
|
|
target_gdf_list = []
|
|
for target in target_list:
|
|
target_gdf_list.append(target.gdf)
|
|
self.target_dict[target.id] = target
|
|
target_gdf = gpd.GeoDataFrame(pd.concat(target_gdf_list, ignore_index=True))
|
|
|
|
self._grid_nums = len(target_gdf)
|
|
self.uav_list = uav_list
|
|
self._uav_nums = len(uav_list)
|
|
self._endurance_list = []
|
|
self._start_node_list = []
|
|
self._end_node_list = []
|
|
uav_data = {
|
|
"centroid": [],
|
|
"code": [],
|
|
"level": 10,
|
|
"id": [],
|
|
"type": "uav"
|
|
}
|
|
for i, uav in enumerate(uav_list):
|
|
self._start_node_list.append(self._grid_nums + 2 * i)
|
|
self._end_node_list.append(self._grid_nums + 2 * i + 1)
|
|
uav_data['centroid'].append(uav.start_coord)
|
|
uav_data['centroid'].append(uav.end_coord)
|
|
uav_data['code'].append(uav.start_code)
|
|
uav_data['code'].append(uav.end_code)
|
|
uav_data['id'].append(uav.id)
|
|
uav_data['id'].append(uav.id)
|
|
self._endurance_list.append(uav.endurance)
|
|
uav_gdf = gpd.GeoDataFrame(uav_data, geometry=uav_data['centroid'], crs="EPSG:4326")
|
|
gdf = gpd.GeoDataFrame(pd.concat([target_gdf, uav_gdf], ignore_index=True)) # 合并目标与无人机gdf
|
|
gdf = gdf.to_crs(epsg=UTM_EPSG)
|
|
gdf['centroid'] = gdf['centroid'].apply(lambda x: transform(WGS84_TO_UTM, x))
|
|
self._node_cost = gdf.length.astype(int).tolist()
|
|
self.gdf = gdf
|
|
logger.info("Counting nodes successful.")
|
|
|
|
logger.info("Starting Calculate distance matrix.")
|
|
# centroid_gdf = gpd.GeoDataFrame(geometry=self.gdf["centroid"], crs="EPSG:4326") # 提取中心点
|
|
# projected_gdf = centroid_gdf.to_crs(CRS.from_epsg(6933)) # 将坐标转为Equal Earth坐标系
|
|
coordinate_array = np.array(gdf["centroid"].apply(lambda x:[x.x, x.y]).to_list()) # 将坐标提取至np.array
|
|
self._distance_matrix = squareform(pdist(coordinate_array, metric='euclidean')).astype(int) # 计算距离矩阵
|
|
logger.info("Calculating distance matrix successful.")
|
|
|
|
def solve(self) -> list[list[int]]:
|
|
"""
|
|
使用贪心算法计算任务规划初始解
|
|
|
|
Returns:
|
|
list[list[int]]: 无人机节点规划结果
|
|
- list[i]: 第i架无人机规划结果
|
|
- list[i][j]: 第i架无人机经过的第j个节点
|
|
"""
|
|
|
|
logger.info("Starting solve initial solution.")
|
|
# 初始化参数
|
|
unvisited = set(range(self._grid_nums))
|
|
uav_set = set(range(self._uav_nums))
|
|
routes = [[self._start_node_list[i]] for i in range(self._uav_nums)]
|
|
uav_distance_array = np.zeros(self._uav_nums, dtype=int)
|
|
cur_node_array = np.array(self._start_node_list, dtype=int)
|
|
back_distance_array = np.zeros(self._uav_nums, dtype=int)
|
|
|
|
while unvisited and uav_set:
|
|
|
|
# 根据距离选取规划无人机
|
|
uav_index = min(uav_set, key = lambda x: uav_distance_array[x] + back_distance_array[x])
|
|
|
|
# 获取无人机属性
|
|
cur_node = cur_node_array[uav_index]
|
|
cur_distance = uav_distance_array[uav_index]
|
|
end_node = self._end_node_list[uav_index]
|
|
endurance = self._endurance_list[uav_index]
|
|
|
|
# 计算代价最小节点
|
|
next_node = min(
|
|
unvisited,
|
|
key = lambda x: self._distance_matrix[cur_node][x]
|
|
+ 0.1 * self._distance_matrix[x][end_node]
|
|
)
|
|
next_distance = cur_distance + self._distance_matrix[cur_node][next_node] + self._node_cost[next_node]
|
|
|
|
# 判断约束
|
|
if next_distance + self._distance_matrix[next_node][end_node] > endurance:
|
|
uav_set.remove(uav_index)
|
|
routes[uav_index].append(end_node)
|
|
|
|
else:
|
|
routes[uav_index].append(next_node)
|
|
uav_distance_array[uav_index] = next_distance
|
|
back_distance_array[uav_index] = self._distance_matrix[next_node][end_node]
|
|
cur_node_array[uav_index] = next_node
|
|
unvisited.remove(next_node)
|
|
|
|
for uav_index in uav_set:
|
|
routes[uav_index].append(self._end_node_list[uav_index])
|
|
|
|
self.solution_list = routes
|
|
self._init_distance = self.solution_distance()
|
|
logger.info("Solving initial solution successful.")
|
|
|
|
def optimize(self, time: int = 5):
|
|
"""
|
|
无人机规划优化, 需先调用init_solve
|
|
|
|
Args:
|
|
time(SupportsFloat): 最长优化时间
|
|
"""
|
|
|
|
logger.info("Starting optimize solution.")
|
|
# Create the routing index manager.
|
|
manager = pywrapcp.RoutingIndexManager(
|
|
len(self._distance_matrix),
|
|
self._uav_nums,
|
|
self._start_node_list,
|
|
self._end_node_list
|
|
)
|
|
|
|
# Create Routing Model.
|
|
routing = pywrapcp.RoutingModel(manager)
|
|
|
|
def distance_callback(from_index, to_index):
|
|
"""Returns the distance between the two nodes."""
|
|
# Convert from routing variable Index to distance matrix NodeIndex.
|
|
from_node = manager.IndexToNode(from_index)
|
|
to_node = manager.IndexToNode(to_index)
|
|
return self._distance_matrix[from_node][to_node] + self._node_cost[to_node]
|
|
|
|
# Create and register a transit callback.
|
|
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
|
|
|
|
# Define cost of each arc.
|
|
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
|
|
|
|
init_endurance_list = []
|
|
for distance, endurance in zip(self._init_distance, self._endurance_list):
|
|
init_endurance = int(distance * 1.1) if distance * 1.1 < endurance else endurance
|
|
init_endurance_list.append(init_endurance)
|
|
|
|
# Add Distance constraint.
|
|
routing.AddDimensionWithVehicleCapacity(
|
|
transit_callback_index,
|
|
0, # no slack
|
|
# self._init_distance, # vehicle maximum travel distance
|
|
init_endurance_list,
|
|
True, # start cumul to zero
|
|
"Distance",
|
|
)
|
|
|
|
penalty = 4000000
|
|
for node in range(self._grid_nums):
|
|
routing.AddDisjunction([manager.NodeToIndex(node)], penalty)
|
|
|
|
distance_dimension = routing.GetDimensionOrDie("Distance")
|
|
distance_dimension.SetGlobalSpanCostCoefficient(100)
|
|
|
|
# 设置每辆车的最大距离软约束及惩罚系数
|
|
penalty_coefficient = 5 # 每超出单位距离的惩罚
|
|
|
|
for vehicle_id in range(self._uav_nums):
|
|
end_node = routing.End(vehicle_id)
|
|
distance_dimension.SetCumulVarSoftUpperBound(
|
|
end_node, int(init_endurance_list[vehicle_id] * 0.75), penalty_coefficient)
|
|
|
|
# 定义搜索策略
|
|
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
|
|
# search_parameters.first_solution_strategy = (routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)
|
|
# search_parameters.local_search_metaheuristic = (routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
|
|
search_parameters.local_search_metaheuristic = (routing_enums_pb2.LocalSearchMetaheuristic.AUTOMATIC)
|
|
search_parameters.time_limit.seconds = time # 运行时间限制
|
|
# search_parameters.log_search = True
|
|
|
|
# Solve the problem.
|
|
routing.CloseModelWithParameters(search_parameters)
|
|
|
|
|
|
# self._solution = routing.SolveWithParameters(search_parameters)
|
|
|
|
initial_solution = routing.ReadAssignmentFromRoutes(self.solution_list, True)
|
|
self._solution = routing.SolveFromAssignmentWithParameters(initial_solution, search_parameters)
|
|
logger.info("Optimization solution successful.")
|
|
|
|
# Print solution on console.
|
|
optimal_solution=[]
|
|
for uav_id in range(self._uav_nums):
|
|
node_list=[]
|
|
index = routing.Start(uav_id)
|
|
while not routing.IsEnd(index):
|
|
node_list.append(manager.IndexToNode(index))
|
|
index = self._solution.Value(routing.NextVar(index))
|
|
node_list.append(manager.IndexToNode(index))
|
|
optimal_solution.append(node_list)
|
|
|
|
self.solution_list = optimal_solution
|
|
|
|
def solve_route(self):
|
|
if self.payload_type == PayloadType.optics_down:
|
|
print(PayloadType.optics_down)
|
|
return self._solve_optics_route()
|
|
|
|
elif self.payload_type == PayloadType.sar:
|
|
return self._solve_sar_route()
|
|
|
|
elif self.payload_type == PayloadType.laser:
|
|
return self._solve_laser_route()
|
|
|
|
elif self.payload_type == PayloadType.optics_front:
|
|
return self._solve_optics_front_route()
|
|
|
|
else:
|
|
raise TypeError(f"{self.payload_type} is not a valid payload type.")
|
|
|
|
def _solve_optics_route(self):
|
|
solution_list = self.solution_list
|
|
result_dict = {}
|
|
distance_list = []
|
|
for uav_index, solution in enumerate(solution_list):
|
|
route_list = []
|
|
payload_list = []
|
|
uav = self.uav_list[uav_index]
|
|
valid_length = uav.valid_length
|
|
valid_width = uav.valid_width
|
|
planned_map = {}
|
|
|
|
# TODO 多进程处理
|
|
for i, target_index in enumerate(solution):
|
|
target = self.gdf.iloc[target_index]
|
|
|
|
if target["type"] == "uav":
|
|
route_list.append(target["geometry"])
|
|
payload_list.append(0)
|
|
|
|
else:
|
|
prev_point = route_list[-1]
|
|
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
|
|
# 处理点目标
|
|
if target["type"] == "point":
|
|
point = target["geometry"]
|
|
route_list.append(point)
|
|
payload_list.append(1)
|
|
|
|
# TODO
|
|
# 处理线目标
|
|
elif target["type"] == "line":
|
|
pass
|
|
|
|
# 处理区域目标
|
|
elif target["type"] == "region":
|
|
level = target["level"]
|
|
delta_lon = LocationCode._DELTA_LON[level - 1]
|
|
delta_lat = LocationCode._DELTA_LAT[level - 1]
|
|
centroid_wgs84 = transform(UTM_TO_WGS84, target["centroid"])
|
|
ref = np.array(
|
|
transform(
|
|
WGS84_TO_UTM,
|
|
Point(np.array(centroid_wgs84.coords[0]) - np.array(delta_lon, delta_lat) / 2)
|
|
).coords[0]
|
|
)
|
|
|
|
# 按照有效幅宽, 对网格进行离散化
|
|
if level in planned_map:
|
|
points, distance_matrix = planned_map[level]
|
|
|
|
else:
|
|
delta_x, delta_y = np.array(
|
|
transform(
|
|
WGS84_TO_UTM,
|
|
Point(np.array(centroid_wgs84.coords[0]) + np.array(delta_lon, delta_lat) / 2)
|
|
).coords[0]
|
|
)- ref
|
|
x_point_num = delta_x // valid_length + 1
|
|
x_remainder = delta_x / x_point_num / 2
|
|
y_point_num = delta_y // valid_width
|
|
y_remainder = delta_y / y_point_num / 2
|
|
x_coords = np.linspace(x_remainder, delta_x - x_remainder, int(x_point_num))
|
|
y_coords = np.linspace(y_remainder, delta_y - y_remainder, int(y_point_num))
|
|
x_points, y_points = np.meshgrid(x_coords, y_coords)
|
|
points = np.column_stack([x_points.ravel(), y_points.ravel()])
|
|
distance_matrix = squareform(pdist(points, metric='euclidean'))
|
|
planned_map[level] = (points, distance_matrix)
|
|
|
|
if len(points) == 1:
|
|
route_list.append(target["centroid"])
|
|
|
|
else:
|
|
points_ref = points + ref
|
|
points_new = np.stack((prev_point.coords[0], next_point.coords[0]), axis=0)
|
|
|
|
# 计算包含开始结束点的距离矩阵
|
|
distance_new_to_old = dm(points_new, points_ref)
|
|
distance_new_to_new = dm(points_new, points_new)
|
|
top = np.hstack([distance_matrix, distance_new_to_old.T])
|
|
bottom = np.hstack([distance_new_to_old, distance_new_to_new])
|
|
distance_matrix_new = np.vstack([top, bottom]).astype(int)
|
|
|
|
# 计算局部最优路径
|
|
node_list = UavPlanning._solve_one_uav_optimal(distance_matrix_new)
|
|
for node in node_list[1: -1]:
|
|
route_list.append(Point(points_ref[node]))
|
|
payload_list.append(1)
|
|
|
|
route_utm = LineString(route_list)
|
|
route_wgs84 = transform(UTM_TO_WGS84, route_utm)
|
|
|
|
result = []
|
|
for route, payload in zip(route_wgs84.coords, payload_list):
|
|
node = {
|
|
"coord": route,
|
|
"payload": payload
|
|
}
|
|
result.append(node)
|
|
|
|
result_dict[uav.id] = result
|
|
distance_list.append(route_utm.length)
|
|
return result_dict, distance_list
|
|
|
|
def _solve_sar_route(self):
|
|
solution_list = self.solution_list
|
|
result_dict = {}
|
|
distance_list = []
|
|
for uav_index, solution in enumerate(solution_list):
|
|
result_list = []
|
|
route = []
|
|
uav = self.uav_list[uav_index]
|
|
process = SarProcess(uav)
|
|
|
|
# TODO 多进程处理
|
|
for i, target_index in enumerate(solution):
|
|
target = self.gdf.iloc[target_index]
|
|
|
|
# 处理无人机
|
|
if target["type"] == "uav":
|
|
route.append(target["geometry"])
|
|
|
|
# 处理目标
|
|
else:
|
|
prev_point = route[-1]
|
|
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
|
|
shape = target["geometry"]
|
|
# 处理点目标
|
|
if target["type"] == "point":
|
|
result = process.process_point(shape, prev_point, next_point)
|
|
result_list.append(result)
|
|
|
|
# 处理线目标
|
|
elif target["type"] == "line":
|
|
result = process.process_line(shape, prev_point, next_point)
|
|
result_list.extend(result)
|
|
|
|
# 处理区域目标
|
|
elif target["type"] == "region":
|
|
angle = self.target_dict[target["id"]].angle
|
|
result = process.process_region(shape, prev_point, next_point, angle)
|
|
result_list.extend(result)
|
|
|
|
elif target["type"] == "around":
|
|
result = process.process_around(shape, prev_point)
|
|
result_list.extend(result)
|
|
|
|
route.extend(process.results_to_route(result))
|
|
|
|
result_dict[uav.id] = process.process_result(result_list)
|
|
distance_list.append(LineString(route).length)
|
|
|
|
return result_dict, distance_list
|
|
|
|
def _solve_laser_route(self):
|
|
solution_list = self.solution_list
|
|
result_dict = {}
|
|
distance_list = []
|
|
for uav_index, solution in enumerate(solution_list):
|
|
route_list = []
|
|
payload_list = []
|
|
target_id_list = []
|
|
uav = self.uav_list[uav_index]
|
|
|
|
# TODO 多进程处理
|
|
for i, target_index in enumerate(solution):
|
|
target = self.gdf.iloc[target_index]
|
|
|
|
# 处理无人机
|
|
if target["type"] == "uav":
|
|
route_list.append(target["geometry"])
|
|
target_id_list.append(None)
|
|
payload_list.append(0)
|
|
|
|
# 处理目标
|
|
else:
|
|
prev_point = route_list[-1]
|
|
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
|
|
# 处理点目标
|
|
if target["type"] == "point":
|
|
point = target["geometry"]
|
|
route_list.append(point)
|
|
payload_list.append(1)
|
|
|
|
# 处理线目标 TODO
|
|
elif target["type"] == "line":
|
|
pass
|
|
|
|
# 处理区域目标
|
|
elif target["type"] == "region":
|
|
id = target["id"]
|
|
lines = self.target_dict[id].lines
|
|
centroid_utm_array = np.array([line.centroid.coords[0] for line in lines.geoms])
|
|
target_array = np.concat([centroid_utm_array, np.array(prev_point.coords), np.array(next_point.coords)])
|
|
distance_matrix = squareform(pdist(target_array * 1000, metric='euclidean')).astype(int)
|
|
node_list = UavPlanning._solve_one_uav_optimal(distance_matrix)
|
|
start_point = target_array[node_list[0]]
|
|
for i in range(1, len(node_list) - 1):
|
|
end_point = target_array[node_list[i + 1]]
|
|
line = lines.geoms[node_list[i]]
|
|
first_point = np.array(line.coords[0])
|
|
last_point = np.array(line.coords[-1])
|
|
distance1 = np.linalg.norm(start_point - first_point) + np.linalg.norm(end_point - last_point)
|
|
distance2 = np.linalg.norm(end_point - first_point) + np.linalg.norm(start_point - last_point)
|
|
if distance1 < distance2:
|
|
route_list.append(Point(first_point))
|
|
route_list.append(Point(last_point))
|
|
start_point = last_point
|
|
else:
|
|
route_list.append(Point(last_point))
|
|
route_list.append(Point(first_point))
|
|
start_point = first_point
|
|
payload_list.append(1)
|
|
payload_list.append(0)
|
|
target_id_list.append(target["id"])
|
|
target_id_list.append(target["id"])
|
|
route_utm = LineString(route_list)
|
|
route_wgs84 = transform(UTM_TO_WGS84, route_utm)
|
|
|
|
result = []
|
|
for route, target_id, payload in zip(route_wgs84.coords, target_id_list, payload_list):
|
|
node = {
|
|
"coord": route,
|
|
"target_id": target_id,
|
|
"payload": payload
|
|
}
|
|
result.append(node)
|
|
|
|
result_dict[uav.id] = result
|
|
distance_list.append(route_utm.length)
|
|
|
|
|
|
|
|
return result_dict, distance_list
|
|
|
|
def _solve_optics_front_route(self):
|
|
solution_list = self.solution_list
|
|
result_dict = {}
|
|
distance_list = []
|
|
|
|
# 定义三维坐标转换
|
|
def UTM_TO_WGS84_3D(x, y, z = None):
|
|
return UTM_TO_WGS84(x, y, z)
|
|
|
|
for uav_index, solution in enumerate(solution_list):
|
|
route_list = []
|
|
payload_list = []
|
|
look_at_list = []
|
|
uav = self.uav_list[uav_index]
|
|
|
|
# 计算拍摄高度
|
|
width = uav.width
|
|
photo_nums = np.ceil(CONTAINER_HEIGHT_MAX / width)
|
|
height_array = np.arange(width / 2, photo_nums * width, width)
|
|
|
|
# TODO 多进程处理
|
|
for i, target_index in enumerate(solution):
|
|
target = self.gdf.iloc[target_index]
|
|
|
|
# 处理无人机
|
|
if target["type"] == "uav":
|
|
if len(route_list) == 0:
|
|
route_list.append(Point(target["geometry"].x, target["geometry"].y, uav.start_height))
|
|
else:
|
|
route_list.append(Point(target["geometry"].x, target["geometry"].y, uav.end_height))
|
|
payload_list.append(0)
|
|
look_at_list.append([0, 0, 0])
|
|
|
|
# 处理目标
|
|
else:
|
|
prev_point = route_list[-1]
|
|
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
|
|
next_point = Point(next_point.x, next_point.y, 0)
|
|
# 处理点目标
|
|
if target["type"] == "point":
|
|
pass
|
|
|
|
# 处理线目标 TODO
|
|
elif target["type"] == "line":
|
|
pass
|
|
|
|
# 处理区域目标
|
|
elif target["type"] == "region":
|
|
id = target["id"]
|
|
utm_point_array = np.array([point.coords[0] for point in self.target_dict[id].utm_points.geoms])
|
|
utm_look_at_array = np.array([point.coords[0] for point in self.target_dict[id].utm_look_at_points.geoms])
|
|
point_3d_list = []
|
|
look_at_3d_list = []
|
|
for height in height_array:
|
|
point_3d = np.hstack((utm_point_array, np.zeros((utm_point_array.shape[0], 1)) + height + self.target_dict[id].base_height))
|
|
look_at_3d = np.hstack((utm_look_at_array, np.zeros((utm_look_at_array.shape[0], 1)) + height + self.target_dict[id].base_height))
|
|
point_3d_list.append(point_3d)
|
|
look_at_3d_list.append(look_at_3d)
|
|
point_3d_list.append(np.array(prev_point.coords))
|
|
point_3d_list.append(np.array(next_point.coords))
|
|
point_3d_array = np.concat(point_3d_list)
|
|
look_at_3d_array = np.concat(look_at_3d_list)
|
|
distance_matrix = squareform(pdist(point_3d_array * 1000, metric='euclidean')).astype(int)
|
|
node_list = UavPlanning._solve_one_uav_optimal(distance_matrix)
|
|
for node in node_list[1: -1]:
|
|
route_list.append(Point(point_3d_array[node]))
|
|
look_at_list.append(Point(look_at_3d_array[node]))
|
|
payload_list.append(1)
|
|
|
|
route_utm = LineString(route_list)
|
|
route_wgs84 = transform(UTM_TO_WGS84_3D, route_utm)
|
|
look_at_wgs84 = transform(UTM_TO_WGS84_3D, LineString(look_at_list))
|
|
result = []
|
|
for route, look_at, payload in zip(route_wgs84.coords,look_at_wgs84.coords, payload_list):
|
|
node = {
|
|
"coord": route,
|
|
"payload": {
|
|
"status": payload,
|
|
"look_at": look_at
|
|
}
|
|
}
|
|
result.append(node)
|
|
|
|
result_dict[uav.id] = result
|
|
distance_list.append(route_utm.length)
|
|
|
|
return result_dict, distance_list
|
|
|
|
def solution_distance(self) -> list[float]:
|
|
distance_list = []
|
|
for solution in self.solution_list:
|
|
distance = 0
|
|
for cur_node, next_node in zip(solution[: -1], solution[1:]):
|
|
distance += self._distance_matrix[cur_node][next_node] + self._node_cost[next_node]
|
|
distance_list.append(int(distance))
|
|
return distance_list
|
|
|
|
@staticmethod
|
|
def _process_optics_point(point:Point, prev_point:Point, uav:Uav):
|
|
length = uav.valid_length
|
|
width = uav.valid_width
|
|
|
|
point_array = np.array(point.coords)[0]
|
|
vec_x = point_array - np.array(prev_point.coords)[0]
|
|
vec_x_norm = vec_x / np.linalg.norm(vec_x)
|
|
vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]])
|
|
|
|
vec_length = vec_x_norm * length / 2
|
|
vec_width = vec_y_norm * width / 2
|
|
|
|
photo = Polygon([
|
|
point_array + vec_length + vec_width,
|
|
point_array + vec_length - vec_width,
|
|
point_array - vec_length - vec_width,
|
|
point_array - vec_length + vec_width
|
|
])
|
|
|
|
return point, photo, 1
|
|
|
|
@staticmethod
|
|
def _solve_one_uav_optimal(distance_matrix: np.ndarray[int]) -> list[int]:
|
|
"""
|
|
求解单无人机区域内规划, 无人机开始结束点为最后两位
|
|
|
|
Args:
|
|
distance_matrix(np.ndarray[int]): 目标点距离矩阵
|
|
|
|
Returns:
|
|
|
|
"""
|
|
manager = pywrapcp.RoutingIndexManager(
|
|
len(distance_matrix),
|
|
1,
|
|
[len(distance_matrix) - 2],
|
|
[len(distance_matrix) - 1]
|
|
)
|
|
|
|
# 创建路线
|
|
routing = pywrapcp.RoutingModel(manager)
|
|
|
|
def distance_callback(from_index, to_index):
|
|
"""距离回调"""
|
|
# Convert from routing variable Index to distance matrix NodeIndex.
|
|
from_node = manager.IndexToNode(from_index)
|
|
to_node = manager.IndexToNode(to_index)
|
|
return distance_matrix[from_node][to_node]
|
|
|
|
# 添加函数回调
|
|
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
|
|
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) #
|
|
|
|
# 设置求解器
|
|
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
|
|
search_parameters.first_solution_strategy = (
|
|
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
|
|
)
|
|
search_parameters.time_limit.seconds = 10
|
|
|
|
# Solve the problem.
|
|
routing.CloseModelWithParameters(search_parameters)
|
|
# 求解
|
|
solution = routing.SolveWithParameters(search_parameters)
|
|
|
|
node_list = []
|
|
index = routing.Start(0)
|
|
while True:
|
|
node_list.append(manager.IndexToNode(index))
|
|
if routing.IsEnd(index):
|
|
break
|
|
index = solution.Value(routing.NextVar(index))
|
|
|
|
|
|
# 记录数据
|
|
return node_list
|
|
|