skyeyesystem/backend/Skyeye-sys-dev/skyeye-service-py/klkx/planning/planning.py

683 lines
30 KiB
Python
Raw Permalink Normal View History

import geopandas as gpd
import numpy as np
import pandas as pd
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp
from scipy.spatial.distance import pdist, squareform
from scipy.spatial import distance_matrix as dm
from shapely.geometry import Point, LineString, Polygon, MultiLineString, MultiPolygon, MultiPoint
from shapely.ops import transform
from pyproj import Transformer
from .beidou import LocationCode
from .coords import WGS84_TO_UTM, UTM_TO_WGS84
from .config import CONTAINER_HEIGHT_MAX, UTM_EPSG
from .process import SarProcess
from .target import Target
from .uav import Uav, PayloadType
import logging
logger = logging.getLogger("Planning")
class UavPlanning:
target_dict: dict[Target]
uav_list: list[Uav]
payload_type: PayloadType
gdf: gpd.GeoDataFrame
solution_list: list
route_list: list
distance_list: list
_start_node_list = None
_end_node_list = None
_distance_matrix = None
_node_cost = None
_coordinate_array = None
_uav_nums = None
_grid_nums = None
_endurance_list = None
_init_distance = None
def __init__(
self,
target_list: list[Target],
uav_list: list[Uav],
payload_type: str
):
"""
初始化任务规划
Args:
target_list(list[Region|SarRegion|LineTarget|PointTarget]): 目标列表
uav_list(list[Uav]): 无人机列表
payload_type(str): 载荷
"""
self.payload_type = payload_type
logger.info("Starting count nodes.")
self.target_dict = {}
target_gdf_list = []
for target in target_list:
target_gdf_list.append(target.gdf)
self.target_dict[target.id] = target
target_gdf = gpd.GeoDataFrame(pd.concat(target_gdf_list, ignore_index=True))
self._grid_nums = len(target_gdf)
self.uav_list = uav_list
self._uav_nums = len(uav_list)
self._endurance_list = []
self._start_node_list = []
self._end_node_list = []
uav_data = {
"centroid": [],
"code": [],
"level": 10,
"id": [],
"type": "uav"
}
for i, uav in enumerate(uav_list):
self._start_node_list.append(self._grid_nums + 2 * i)
self._end_node_list.append(self._grid_nums + 2 * i + 1)
uav_data['centroid'].append(uav.start_coord)
uav_data['centroid'].append(uav.end_coord)
uav_data['code'].append(uav.start_code)
uav_data['code'].append(uav.end_code)
uav_data['id'].append(uav.id)
uav_data['id'].append(uav.id)
self._endurance_list.append(uav.endurance)
uav_gdf = gpd.GeoDataFrame(uav_data, geometry=uav_data['centroid'], crs="EPSG:4326")
gdf = gpd.GeoDataFrame(pd.concat([target_gdf, uav_gdf], ignore_index=True)) # 合并目标与无人机gdf
gdf = gdf.to_crs(epsg=UTM_EPSG)
gdf['centroid'] = gdf['centroid'].apply(lambda x: transform(WGS84_TO_UTM, x))
self._node_cost = gdf.length.astype(int).tolist()
self.gdf = gdf
logger.info("Counting nodes successful.")
logger.info("Starting Calculate distance matrix.")
# centroid_gdf = gpd.GeoDataFrame(geometry=self.gdf["centroid"], crs="EPSG:4326") # 提取中心点
# projected_gdf = centroid_gdf.to_crs(CRS.from_epsg(6933)) # 将坐标转为Equal Earth坐标系
coordinate_array = np.array(gdf["centroid"].apply(lambda x:[x.x, x.y]).to_list()) # 将坐标提取至np.array
self._distance_matrix = squareform(pdist(coordinate_array, metric='euclidean')).astype(int) # 计算距离矩阵
logger.info("Calculating distance matrix successful.")
def solve(self) -> list[list[int]]:
"""
使用贪心算法计算任务规划初始解
Returns:
list[list[int]]: 无人机节点规划结果
- list[i]: 第i架无人机规划结果
- list[i][j]: 第i架无人机经过的第j个节点
"""
logger.info("Starting solve initial solution.")
# 初始化参数
unvisited = set(range(self._grid_nums))
uav_set = set(range(self._uav_nums))
routes = [[self._start_node_list[i]] for i in range(self._uav_nums)]
uav_distance_array = np.zeros(self._uav_nums, dtype=int)
cur_node_array = np.array(self._start_node_list, dtype=int)
back_distance_array = np.zeros(self._uav_nums, dtype=int)
while unvisited and uav_set:
# 根据距离选取规划无人机
uav_index = min(uav_set, key = lambda x: uav_distance_array[x] + back_distance_array[x])
# 获取无人机属性
cur_node = cur_node_array[uav_index]
cur_distance = uav_distance_array[uav_index]
end_node = self._end_node_list[uav_index]
endurance = self._endurance_list[uav_index]
# 计算代价最小节点
next_node = min(
unvisited,
key = lambda x: self._distance_matrix[cur_node][x]
+ 0.1 * self._distance_matrix[x][end_node]
)
next_distance = cur_distance + self._distance_matrix[cur_node][next_node] + self._node_cost[next_node]
# 判断约束
if next_distance + self._distance_matrix[next_node][end_node] > endurance:
uav_set.remove(uav_index)
routes[uav_index].append(end_node)
else:
routes[uav_index].append(next_node)
uav_distance_array[uav_index] = next_distance
back_distance_array[uav_index] = self._distance_matrix[next_node][end_node]
cur_node_array[uav_index] = next_node
unvisited.remove(next_node)
for uav_index in uav_set:
routes[uav_index].append(self._end_node_list[uav_index])
self.solution_list = routes
self._init_distance = self.solution_distance()
logger.info("Solving initial solution successful.")
def optimize(self, time: int = 5):
"""
无人机规划优化, 需先调用init_solve
Args:
time(SupportsFloat): 最长优化时间
"""
logger.info("Starting optimize solution.")
# Create the routing index manager.
manager = pywrapcp.RoutingIndexManager(
len(self._distance_matrix),
self._uav_nums,
self._start_node_list,
self._end_node_list
)
# Create Routing Model.
routing = pywrapcp.RoutingModel(manager)
def distance_callback(from_index, to_index):
"""Returns the distance between the two nodes."""
# Convert from routing variable Index to distance matrix NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return self._distance_matrix[from_node][to_node] + self._node_cost[to_node]
# Create and register a transit callback.
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
# Define cost of each arc.
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
init_endurance_list = []
for distance, endurance in zip(self._init_distance, self._endurance_list):
init_endurance = int(distance * 1.1) if distance * 1.1 < endurance else endurance
init_endurance_list.append(init_endurance)
# Add Distance constraint.
routing.AddDimensionWithVehicleCapacity(
transit_callback_index,
0, # no slack
# self._init_distance, # vehicle maximum travel distance
init_endurance_list,
True, # start cumul to zero
"Distance",
)
penalty = 4000000
for node in range(self._grid_nums):
routing.AddDisjunction([manager.NodeToIndex(node)], penalty)
distance_dimension = routing.GetDimensionOrDie("Distance")
distance_dimension.SetGlobalSpanCostCoefficient(100)
# 设置每辆车的最大距离软约束及惩罚系数
penalty_coefficient = 5 # 每超出单位距离的惩罚
for vehicle_id in range(self._uav_nums):
end_node = routing.End(vehicle_id)
distance_dimension.SetCumulVarSoftUpperBound(
end_node, int(init_endurance_list[vehicle_id] * 0.75), penalty_coefficient)
# 定义搜索策略
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
# search_parameters.first_solution_strategy = (routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)
# search_parameters.local_search_metaheuristic = (routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
search_parameters.local_search_metaheuristic = (routing_enums_pb2.LocalSearchMetaheuristic.AUTOMATIC)
search_parameters.time_limit.seconds = time # 运行时间限制
# search_parameters.log_search = True
# Solve the problem.
routing.CloseModelWithParameters(search_parameters)
# self._solution = routing.SolveWithParameters(search_parameters)
initial_solution = routing.ReadAssignmentFromRoutes(self.solution_list, True)
self._solution = routing.SolveFromAssignmentWithParameters(initial_solution, search_parameters)
logger.info("Optimization solution successful.")
# Print solution on console.
optimal_solution=[]
for uav_id in range(self._uav_nums):
node_list=[]
index = routing.Start(uav_id)
while not routing.IsEnd(index):
node_list.append(manager.IndexToNode(index))
index = self._solution.Value(routing.NextVar(index))
node_list.append(manager.IndexToNode(index))
optimal_solution.append(node_list)
self.solution_list = optimal_solution
def solve_route(self):
if self.payload_type == PayloadType.optics_down:
print(PayloadType.optics_down)
return self._solve_optics_route()
elif self.payload_type == PayloadType.sar:
return self._solve_sar_route()
elif self.payload_type == PayloadType.laser:
return self._solve_laser_route()
elif self.payload_type == PayloadType.optics_front:
return self._solve_optics_front_route()
else:
raise TypeError(f"{self.payload_type} is not a valid payload type.")
def _solve_optics_route(self):
solution_list = self.solution_list
result_dict = {}
distance_list = []
for uav_index, solution in enumerate(solution_list):
route_list = []
payload_list = []
uav = self.uav_list[uav_index]
valid_length = uav.valid_length
valid_width = uav.valid_width
planned_map = {}
# TODO 多进程处理
for i, target_index in enumerate(solution):
target = self.gdf.iloc[target_index]
if target["type"] == "uav":
route_list.append(target["geometry"])
payload_list.append(0)
else:
prev_point = route_list[-1]
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
# 处理点目标
if target["type"] == "point":
point = target["geometry"]
route_list.append(point)
payload_list.append(1)
# TODO
# 处理线目标
elif target["type"] == "line":
pass
# 处理区域目标
elif target["type"] == "region":
level = target["level"]
delta_lon = LocationCode._DELTA_LON[level - 1]
delta_lat = LocationCode._DELTA_LAT[level - 1]
centroid_wgs84 = transform(UTM_TO_WGS84, target["centroid"])
ref = np.array(
transform(
WGS84_TO_UTM,
Point(np.array(centroid_wgs84.coords[0]) - np.array(delta_lon, delta_lat) / 2)
).coords[0]
)
# 按照有效幅宽, 对网格进行离散化
if level in planned_map:
points, distance_matrix = planned_map[level]
else:
delta_x, delta_y = np.array(
transform(
WGS84_TO_UTM,
Point(np.array(centroid_wgs84.coords[0]) + np.array(delta_lon, delta_lat) / 2)
).coords[0]
)- ref
x_point_num = delta_x // valid_length + 1
x_remainder = delta_x / x_point_num / 2
y_point_num = delta_y // valid_width
y_remainder = delta_y / y_point_num / 2
x_coords = np.linspace(x_remainder, delta_x - x_remainder, int(x_point_num))
y_coords = np.linspace(y_remainder, delta_y - y_remainder, int(y_point_num))
x_points, y_points = np.meshgrid(x_coords, y_coords)
points = np.column_stack([x_points.ravel(), y_points.ravel()])
distance_matrix = squareform(pdist(points, metric='euclidean'))
planned_map[level] = (points, distance_matrix)
if len(points) == 1:
route_list.append(target["centroid"])
else:
points_ref = points + ref
points_new = np.stack((prev_point.coords[0], next_point.coords[0]), axis=0)
# 计算包含开始结束点的距离矩阵
distance_new_to_old = dm(points_new, points_ref)
distance_new_to_new = dm(points_new, points_new)
top = np.hstack([distance_matrix, distance_new_to_old.T])
bottom = np.hstack([distance_new_to_old, distance_new_to_new])
distance_matrix_new = np.vstack([top, bottom]).astype(int)
# 计算局部最优路径
node_list = UavPlanning._solve_one_uav_optimal(distance_matrix_new)
for node in node_list[1: -1]:
route_list.append(Point(points_ref[node]))
payload_list.append(1)
route_utm = LineString(route_list)
route_wgs84 = transform(UTM_TO_WGS84, route_utm)
result = []
for route, payload in zip(route_wgs84.coords, payload_list):
node = {
"coord": route,
"payload": payload
}
result.append(node)
result_dict[uav.id] = result
distance_list.append(route_utm.length)
return result_dict, distance_list
def _solve_sar_route(self):
solution_list = self.solution_list
result_dict = {}
distance_list = []
for uav_index, solution in enumerate(solution_list):
result_list = []
route = []
uav = self.uav_list[uav_index]
process = SarProcess(uav)
# TODO 多进程处理
for i, target_index in enumerate(solution):
target = self.gdf.iloc[target_index]
# 处理无人机
if target["type"] == "uav":
route.append(target["geometry"])
# 处理目标
else:
prev_point = route[-1]
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
shape = target["geometry"]
# 处理点目标
if target["type"] == "point":
result = process.process_point(shape, prev_point, next_point)
result_list.append(result)
# 处理线目标
elif target["type"] == "line":
result = process.process_line(shape, prev_point, next_point)
result_list.extend(result)
# 处理区域目标
elif target["type"] == "region":
angle = self.target_dict[target["id"]].angle
result = process.process_region(shape, prev_point, next_point, angle)
result_list.extend(result)
elif target["type"] == "around":
result = process.process_around(shape, prev_point)
result_list.extend(result)
route.extend(process.results_to_route(result))
result_dict[uav.id] = process.process_result(result_list)
distance_list.append(LineString(route).length)
return result_dict, distance_list
def _solve_laser_route(self):
solution_list = self.solution_list
result_dict = {}
distance_list = []
for uav_index, solution in enumerate(solution_list):
route_list = []
payload_list = []
target_id_list = []
uav = self.uav_list[uav_index]
# TODO 多进程处理
for i, target_index in enumerate(solution):
target = self.gdf.iloc[target_index]
# 处理无人机
if target["type"] == "uav":
route_list.append(target["geometry"])
target_id_list.append(None)
payload_list.append(0)
# 处理目标
else:
prev_point = route_list[-1]
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
# 处理点目标
if target["type"] == "point":
point = target["geometry"]
route_list.append(point)
payload_list.append(1)
# 处理线目标 TODO
elif target["type"] == "line":
pass
# 处理区域目标
elif target["type"] == "region":
id = target["id"]
lines = self.target_dict[id].lines
centroid_utm_array = np.array([line.centroid.coords[0] for line in lines.geoms])
target_array = np.concat([centroid_utm_array, np.array(prev_point.coords), np.array(next_point.coords)])
distance_matrix = squareform(pdist(target_array * 1000, metric='euclidean')).astype(int)
node_list = UavPlanning._solve_one_uav_optimal(distance_matrix)
start_point = target_array[node_list[0]]
for i in range(1, len(node_list) - 1):
end_point = target_array[node_list[i + 1]]
line = lines.geoms[node_list[i]]
first_point = np.array(line.coords[0])
last_point = np.array(line.coords[-1])
distance1 = np.linalg.norm(start_point - first_point) + np.linalg.norm(end_point - last_point)
distance2 = np.linalg.norm(end_point - first_point) + np.linalg.norm(start_point - last_point)
if distance1 < distance2:
route_list.append(Point(first_point))
route_list.append(Point(last_point))
start_point = last_point
else:
route_list.append(Point(last_point))
route_list.append(Point(first_point))
start_point = first_point
payload_list.append(1)
payload_list.append(0)
target_id_list.append(target["id"])
target_id_list.append(target["id"])
route_utm = LineString(route_list)
route_wgs84 = transform(UTM_TO_WGS84, route_utm)
result = []
for route, target_id, payload in zip(route_wgs84.coords, target_id_list, payload_list):
node = {
"coord": route,
"target_id": target_id,
"payload": payload
}
result.append(node)
result_dict[uav.id] = result
distance_list.append(route_utm.length)
return result_dict, distance_list
def _solve_optics_front_route(self):
solution_list = self.solution_list
result_dict = {}
distance_list = []
# 定义三维坐标转换
def UTM_TO_WGS84_3D(x, y, z = None):
return UTM_TO_WGS84(x, y, z)
for uav_index, solution in enumerate(solution_list):
route_list = []
payload_list = []
look_at_list = []
uav = self.uav_list[uav_index]
# 计算拍摄高度
width = uav.width
photo_nums = np.ceil(CONTAINER_HEIGHT_MAX / width)
height_array = np.arange(width / 2, photo_nums * width, width)
# TODO 多进程处理
for i, target_index in enumerate(solution):
target = self.gdf.iloc[target_index]
# 处理无人机
if target["type"] == "uav":
if len(route_list) == 0:
route_list.append(Point(target["geometry"].x, target["geometry"].y, uav.start_height))
else:
route_list.append(Point(target["geometry"].x, target["geometry"].y, uav.end_height))
payload_list.append(0)
look_at_list.append([0, 0, 0])
# 处理目标
else:
prev_point = route_list[-1]
next_point = self.gdf.iloc[solution[i + 1]]["centroid"]
next_point = Point(next_point.x, next_point.y, 0)
# 处理点目标
if target["type"] == "point":
pass
# 处理线目标 TODO
elif target["type"] == "line":
pass
# 处理区域目标
elif target["type"] == "region":
id = target["id"]
utm_point_array = np.array([point.coords[0] for point in self.target_dict[id].utm_points.geoms])
utm_look_at_array = np.array([point.coords[0] for point in self.target_dict[id].utm_look_at_points.geoms])
point_3d_list = []
look_at_3d_list = []
for height in height_array:
point_3d = np.hstack((utm_point_array, np.zeros((utm_point_array.shape[0], 1)) + height + self.target_dict[id].base_height))
look_at_3d = np.hstack((utm_look_at_array, np.zeros((utm_look_at_array.shape[0], 1)) + height + self.target_dict[id].base_height))
point_3d_list.append(point_3d)
look_at_3d_list.append(look_at_3d)
point_3d_list.append(np.array(prev_point.coords))
point_3d_list.append(np.array(next_point.coords))
point_3d_array = np.concat(point_3d_list)
look_at_3d_array = np.concat(look_at_3d_list)
distance_matrix = squareform(pdist(point_3d_array * 1000, metric='euclidean')).astype(int)
node_list = UavPlanning._solve_one_uav_optimal(distance_matrix)
for node in node_list[1: -1]:
route_list.append(Point(point_3d_array[node]))
look_at_list.append(Point(look_at_3d_array[node]))
payload_list.append(1)
route_utm = LineString(route_list)
route_wgs84 = transform(UTM_TO_WGS84_3D, route_utm)
look_at_wgs84 = transform(UTM_TO_WGS84_3D, LineString(look_at_list))
result = []
for route, look_at, payload in zip(route_wgs84.coords,look_at_wgs84.coords, payload_list):
node = {
"coord": route,
"payload": {
"status": payload,
"look_at": look_at
}
}
result.append(node)
result_dict[uav.id] = result
distance_list.append(route_utm.length)
return result_dict, distance_list
def solution_distance(self) -> list[float]:
distance_list = []
for solution in self.solution_list:
distance = 0
for cur_node, next_node in zip(solution[: -1], solution[1:]):
distance += self._distance_matrix[cur_node][next_node] + self._node_cost[next_node]
distance_list.append(int(distance))
return distance_list
@staticmethod
def _process_optics_point(point:Point, prev_point:Point, uav:Uav):
length = uav.valid_length
width = uav.valid_width
point_array = np.array(point.coords)[0]
vec_x = point_array - np.array(prev_point.coords)[0]
vec_x_norm = vec_x / np.linalg.norm(vec_x)
vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]])
vec_length = vec_x_norm * length / 2
vec_width = vec_y_norm * width / 2
photo = Polygon([
point_array + vec_length + vec_width,
point_array + vec_length - vec_width,
point_array - vec_length - vec_width,
point_array - vec_length + vec_width
])
return point, photo, 1
@staticmethod
def _solve_one_uav_optimal(distance_matrix: np.ndarray[int]) -> list[int]:
"""
求解单无人机区域内规划, 无人机开始结束点为最后两位
Args:
distance_matrix(np.ndarray[int]): 目标点距离矩阵
Returns:
"""
manager = pywrapcp.RoutingIndexManager(
len(distance_matrix),
1,
[len(distance_matrix) - 2],
[len(distance_matrix) - 1]
)
# 创建路线
routing = pywrapcp.RoutingModel(manager)
def distance_callback(from_index, to_index):
"""距离回调"""
# Convert from routing variable Index to distance matrix NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return distance_matrix[from_node][to_node]
# 添加函数回调
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) #
# 设置求解器
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
search_parameters.time_limit.seconds = 10
# Solve the problem.
routing.CloseModelWithParameters(search_parameters)
# 求解
solution = routing.SolveWithParameters(search_parameters)
node_list = []
index = routing.Start(0)
while True:
node_list.append(manager.IndexToNode(index))
if routing.IsEnd(index):
break
index = solution.Value(routing.NextVar(index))
# 记录数据
return node_list