import geopandas as gpd import numpy as np import pandas as pd from ortools.constraint_solver import routing_enums_pb2 from ortools.constraint_solver import pywrapcp from scipy.spatial.distance import pdist, squareform from scipy.spatial import distance_matrix as dm from shapely.geometry import Point, LineString, Polygon, MultiLineString, MultiPolygon, MultiPoint from shapely.ops import transform from pyproj import Transformer from .beidou import LocationCode from .coords import WGS84_TO_UTM, UTM_TO_WGS84 from .config import CONTAINER_HEIGHT_MAX, UTM_EPSG from .process import SarProcess from .target import Target from .uav import Uav, PayloadType import logging logger = logging.getLogger("Planning") class UavPlanning: target_dict: dict[Target] uav_list: list[Uav] payload_type: PayloadType gdf: gpd.GeoDataFrame solution_list: list route_list: list distance_list: list _start_node_list = None _end_node_list = None _distance_matrix = None _node_cost = None _coordinate_array = None _uav_nums = None _grid_nums = None _endurance_list = None _init_distance = None def __init__( self, target_list: list[Target], uav_list: list[Uav], payload_type: str ): """ 初始化任务规划 Args: target_list(list[Region|SarRegion|LineTarget|PointTarget]): 目标列表 uav_list(list[Uav]): 无人机列表 payload_type(str): 载荷 """ self.payload_type = payload_type logger.info("Starting count nodes.") self.target_dict = {} target_gdf_list = [] for target in target_list: target_gdf_list.append(target.gdf) self.target_dict[target.id] = target target_gdf = gpd.GeoDataFrame(pd.concat(target_gdf_list, ignore_index=True)) self._grid_nums = len(target_gdf) self.uav_list = uav_list self._uav_nums = len(uav_list) self._endurance_list = [] self._start_node_list = [] self._end_node_list = [] uav_data = { "centroid": [], "code": [], "level": 10, "id": [], "type": "uav" } for i, uav in enumerate(uav_list): self._start_node_list.append(self._grid_nums + 2 * i) self._end_node_list.append(self._grid_nums + 2 * i + 1) uav_data['centroid'].append(uav.start_coord) uav_data['centroid'].append(uav.end_coord) uav_data['code'].append(uav.start_code) uav_data['code'].append(uav.end_code) uav_data['id'].append(uav.id) uav_data['id'].append(uav.id) self._endurance_list.append(uav.endurance) uav_gdf = gpd.GeoDataFrame(uav_data, geometry=uav_data['centroid'], crs="EPSG:4326") gdf = gpd.GeoDataFrame(pd.concat([target_gdf, uav_gdf], ignore_index=True)) # 合并目标与无人机gdf gdf = gdf.to_crs(epsg=UTM_EPSG) gdf['centroid'] = gdf['centroid'].apply(lambda x: transform(WGS84_TO_UTM, x)) self._node_cost = gdf.length.astype(int).tolist() self.gdf = gdf logger.info("Counting nodes successful.") logger.info("Starting Calculate distance matrix.") # centroid_gdf = gpd.GeoDataFrame(geometry=self.gdf["centroid"], crs="EPSG:4326") # 提取中心点 # projected_gdf = centroid_gdf.to_crs(CRS.from_epsg(6933)) # 将坐标转为Equal Earth坐标系 coordinate_array = np.array(gdf["centroid"].apply(lambda x:[x.x, x.y]).to_list()) # 将坐标提取至np.array self._distance_matrix = squareform(pdist(coordinate_array, metric='euclidean')).astype(int) # 计算距离矩阵 logger.info("Calculating distance matrix successful.") def solve(self) -> list[list[int]]: """ 使用贪心算法计算任务规划初始解 Returns: list[list[int]]: 无人机节点规划结果 - list[i]: 第i架无人机规划结果 - list[i][j]: 第i架无人机经过的第j个节点 """ logger.info("Starting solve initial solution.") # 初始化参数 unvisited = set(range(self._grid_nums)) uav_set = set(range(self._uav_nums)) routes = [[self._start_node_list[i]] for i in range(self._uav_nums)] uav_distance_array = np.zeros(self._uav_nums, dtype=int) cur_node_array = np.array(self._start_node_list, dtype=int) back_distance_array = np.zeros(self._uav_nums, dtype=int) while unvisited and uav_set: # 根据距离选取规划无人机 uav_index = min(uav_set, key = lambda x: uav_distance_array[x] + back_distance_array[x]) # 获取无人机属性 cur_node = cur_node_array[uav_index] cur_distance = uav_distance_array[uav_index] end_node = self._end_node_list[uav_index] endurance = self._endurance_list[uav_index] # 计算代价最小节点 next_node = min( unvisited, key = lambda x: self._distance_matrix[cur_node][x] + 0.1 * self._distance_matrix[x][end_node] ) next_distance = cur_distance + self._distance_matrix[cur_node][next_node] + self._node_cost[next_node] # 判断约束 if next_distance + self._distance_matrix[next_node][end_node] > endurance: uav_set.remove(uav_index) routes[uav_index].append(end_node) else: routes[uav_index].append(next_node) uav_distance_array[uav_index] = next_distance back_distance_array[uav_index] = self._distance_matrix[next_node][end_node] cur_node_array[uav_index] = next_node unvisited.remove(next_node) for uav_index in uav_set: routes[uav_index].append(self._end_node_list[uav_index]) self.solution_list = routes self._init_distance = self.solution_distance() logger.info("Solving initial solution successful.") def optimize(self, time: int = 5): """ 无人机规划优化, 需先调用init_solve Args: time(SupportsFloat): 最长优化时间 """ logger.info("Starting optimize solution.") # Create the routing index manager. manager = pywrapcp.RoutingIndexManager( len(self._distance_matrix), self._uav_nums, self._start_node_list, self._end_node_list ) # Create Routing Model. routing = pywrapcp.RoutingModel(manager) def distance_callback(from_index, to_index): """Returns the distance between the two nodes.""" # Convert from routing variable Index to distance matrix NodeIndex. from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) return self._distance_matrix[from_node][to_node] + self._node_cost[to_node] # Create and register a transit callback. transit_callback_index = routing.RegisterTransitCallback(distance_callback) # Define cost of each arc. routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) init_endurance_list = [] for distance, endurance in zip(self._init_distance, self._endurance_list): init_endurance = int(distance * 1.1) if distance * 1.1 < endurance else endurance init_endurance_list.append(init_endurance) # Add Distance constraint. routing.AddDimensionWithVehicleCapacity( transit_callback_index, 0, # no slack # self._init_distance, # vehicle maximum travel distance init_endurance_list, True, # start cumul to zero "Distance", ) penalty = 4000000 for node in range(self._grid_nums): routing.AddDisjunction([manager.NodeToIndex(node)], penalty) distance_dimension = routing.GetDimensionOrDie("Distance") distance_dimension.SetGlobalSpanCostCoefficient(100) # 设置每辆车的最大距离软约束及惩罚系数 penalty_coefficient = 5 # 每超出单位距离的惩罚 for vehicle_id in range(self._uav_nums): end_node = routing.End(vehicle_id) distance_dimension.SetCumulVarSoftUpperBound( end_node, int(init_endurance_list[vehicle_id] * 0.75), penalty_coefficient) # 定义搜索策略 search_parameters = pywrapcp.DefaultRoutingSearchParameters() # search_parameters.first_solution_strategy = (routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC) # search_parameters.local_search_metaheuristic = (routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH) search_parameters.local_search_metaheuristic = (routing_enums_pb2.LocalSearchMetaheuristic.AUTOMATIC) search_parameters.time_limit.seconds = time # 运行时间限制 # search_parameters.log_search = True # Solve the problem. routing.CloseModelWithParameters(search_parameters) # self._solution = routing.SolveWithParameters(search_parameters) initial_solution = routing.ReadAssignmentFromRoutes(self.solution_list, True) self._solution = routing.SolveFromAssignmentWithParameters(initial_solution, search_parameters) logger.info("Optimization solution successful.") # Print solution on console. optimal_solution=[] for uav_id in range(self._uav_nums): node_list=[] index = routing.Start(uav_id) while not routing.IsEnd(index): node_list.append(manager.IndexToNode(index)) index = self._solution.Value(routing.NextVar(index)) node_list.append(manager.IndexToNode(index)) optimal_solution.append(node_list) self.solution_list = optimal_solution def solve_route(self): if self.payload_type == PayloadType.optics_down: print(PayloadType.optics_down) return self._solve_optics_route() elif self.payload_type == PayloadType.sar: return self._solve_sar_route() elif self.payload_type == PayloadType.laser: return self._solve_laser_route() elif self.payload_type == PayloadType.optics_front: return self._solve_optics_front_route() else: raise TypeError(f"{self.payload_type} is not a valid payload type.") def _solve_optics_route(self): solution_list = self.solution_list result_dict = {} distance_list = [] for uav_index, solution in enumerate(solution_list): route_list = [] payload_list = [] uav = self.uav_list[uav_index] valid_length = uav.valid_length valid_width = uav.valid_width planned_map = {} # TODO 多进程处理 for i, target_index in enumerate(solution): target = self.gdf.iloc[target_index] if target["type"] == "uav": route_list.append(target["geometry"]) payload_list.append(0) else: prev_point = route_list[-1] next_point = self.gdf.iloc[solution[i + 1]]["centroid"] # 处理点目标 if target["type"] == "point": point = target["geometry"] route_list.append(point) payload_list.append(1) # TODO # 处理线目标 elif target["type"] == "line": pass # 处理区域目标 elif target["type"] == "region": level = target["level"] delta_lon = LocationCode._DELTA_LON[level - 1] delta_lat = LocationCode._DELTA_LAT[level - 1] centroid_wgs84 = transform(UTM_TO_WGS84, target["centroid"]) ref = np.array( transform( WGS84_TO_UTM, Point(np.array(centroid_wgs84.coords[0]) - np.array(delta_lon, delta_lat) / 2) ).coords[0] ) # 按照有效幅宽, 对网格进行离散化 if level in planned_map: points, distance_matrix = planned_map[level] else: delta_x, delta_y = np.array( transform( WGS84_TO_UTM, Point(np.array(centroid_wgs84.coords[0]) + np.array(delta_lon, delta_lat) / 2) ).coords[0] )- ref x_point_num = delta_x // valid_length + 1 x_remainder = delta_x / x_point_num / 2 y_point_num = delta_y // valid_width y_remainder = delta_y / y_point_num / 2 x_coords = np.linspace(x_remainder, delta_x - x_remainder, int(x_point_num)) y_coords = np.linspace(y_remainder, delta_y - y_remainder, int(y_point_num)) x_points, y_points = np.meshgrid(x_coords, y_coords) points = np.column_stack([x_points.ravel(), y_points.ravel()]) distance_matrix = squareform(pdist(points, metric='euclidean')) planned_map[level] = (points, distance_matrix) if len(points) == 1: route_list.append(target["centroid"]) else: points_ref = points + ref points_new = np.stack((prev_point.coords[0], next_point.coords[0]), axis=0) # 计算包含开始结束点的距离矩阵 distance_new_to_old = dm(points_new, points_ref) distance_new_to_new = dm(points_new, points_new) top = np.hstack([distance_matrix, distance_new_to_old.T]) bottom = np.hstack([distance_new_to_old, distance_new_to_new]) distance_matrix_new = np.vstack([top, bottom]).astype(int) # 计算局部最优路径 node_list = UavPlanning._solve_one_uav_optimal(distance_matrix_new) for node in node_list[1: -1]: route_list.append(Point(points_ref[node])) payload_list.append(1) route_utm = LineString(route_list) route_wgs84 = transform(UTM_TO_WGS84, route_utm) result = [] for route, payload in zip(route_wgs84.coords, payload_list): node = { "coord": route, "payload": payload } result.append(node) result_dict[uav.id] = result distance_list.append(route_utm.length) return result_dict, distance_list def _solve_sar_route(self): solution_list = self.solution_list result_dict = {} distance_list = [] for uav_index, solution in enumerate(solution_list): result_list = [] route = [] uav = self.uav_list[uav_index] process = SarProcess(uav) # TODO 多进程处理 for i, target_index in enumerate(solution): target = self.gdf.iloc[target_index] # 处理无人机 if target["type"] == "uav": route.append(target["geometry"]) # 处理目标 else: prev_point = route[-1] next_point = self.gdf.iloc[solution[i + 1]]["centroid"] shape = target["geometry"] # 处理点目标 if target["type"] == "point": result = process.process_point(shape, prev_point, next_point) result_list.append(result) # 处理线目标 elif target["type"] == "line": result = process.process_line(shape, prev_point, next_point) result_list.extend(result) # 处理区域目标 elif target["type"] == "region": angle = self.target_dict[target["id"]].angle result = process.process_region(shape, prev_point, next_point, angle) result_list.extend(result) elif target["type"] == "around": result = process.process_around(shape, prev_point) result_list.extend(result) route.extend(process.results_to_route(result)) result_dict[uav.id] = process.process_result(result_list) distance_list.append(LineString(route).length) return result_dict, distance_list def _solve_laser_route(self): solution_list = self.solution_list result_dict = {} distance_list = [] for uav_index, solution in enumerate(solution_list): route_list = [] payload_list = [] target_id_list = [] uav = self.uav_list[uav_index] # TODO 多进程处理 for i, target_index in enumerate(solution): target = self.gdf.iloc[target_index] # 处理无人机 if target["type"] == "uav": route_list.append(target["geometry"]) target_id_list.append(None) payload_list.append(0) # 处理目标 else: prev_point = route_list[-1] next_point = self.gdf.iloc[solution[i + 1]]["centroid"] # 处理点目标 if target["type"] == "point": point = target["geometry"] route_list.append(point) payload_list.append(1) # 处理线目标 TODO elif target["type"] == "line": pass # 处理区域目标 elif target["type"] == "region": id = target["id"] lines = self.target_dict[id].lines centroid_utm_array = np.array([line.centroid.coords[0] for line in lines.geoms]) target_array = np.concat([centroid_utm_array, np.array(prev_point.coords), np.array(next_point.coords)]) distance_matrix = squareform(pdist(target_array * 1000, metric='euclidean')).astype(int) node_list = UavPlanning._solve_one_uav_optimal(distance_matrix) start_point = target_array[node_list[0]] for i in range(1, len(node_list) - 1): end_point = target_array[node_list[i + 1]] line = lines.geoms[node_list[i]] first_point = np.array(line.coords[0]) last_point = np.array(line.coords[-1]) distance1 = np.linalg.norm(start_point - first_point) + np.linalg.norm(end_point - last_point) distance2 = np.linalg.norm(end_point - first_point) + np.linalg.norm(start_point - last_point) if distance1 < distance2: route_list.append(Point(first_point)) route_list.append(Point(last_point)) start_point = last_point else: route_list.append(Point(last_point)) route_list.append(Point(first_point)) start_point = first_point payload_list.append(1) payload_list.append(0) target_id_list.append(target["id"]) target_id_list.append(target["id"]) route_utm = LineString(route_list) route_wgs84 = transform(UTM_TO_WGS84, route_utm) result = [] for route, target_id, payload in zip(route_wgs84.coords, target_id_list, payload_list): node = { "coord": route, "target_id": target_id, "payload": payload } result.append(node) result_dict[uav.id] = result distance_list.append(route_utm.length) return result_dict, distance_list def _solve_optics_front_route(self): solution_list = self.solution_list result_dict = {} distance_list = [] # 定义三维坐标转换 def UTM_TO_WGS84_3D(x, y, z = None): return UTM_TO_WGS84(x, y, z) for uav_index, solution in enumerate(solution_list): route_list = [] payload_list = [] look_at_list = [] uav = self.uav_list[uav_index] # 计算拍摄高度 width = uav.width photo_nums = np.ceil(CONTAINER_HEIGHT_MAX / width) height_array = np.arange(width / 2, photo_nums * width, width) # TODO 多进程处理 for i, target_index in enumerate(solution): target = self.gdf.iloc[target_index] # 处理无人机 if target["type"] == "uav": if len(route_list) == 0: route_list.append(Point(target["geometry"].x, target["geometry"].y, uav.start_height)) else: route_list.append(Point(target["geometry"].x, target["geometry"].y, uav.end_height)) payload_list.append(0) look_at_list.append([0, 0, 0]) # 处理目标 else: prev_point = route_list[-1] next_point = self.gdf.iloc[solution[i + 1]]["centroid"] next_point = Point(next_point.x, next_point.y, 0) # 处理点目标 if target["type"] == "point": pass # 处理线目标 TODO elif target["type"] == "line": pass # 处理区域目标 elif target["type"] == "region": id = target["id"] utm_point_array = np.array([point.coords[0] for point in self.target_dict[id].utm_points.geoms]) utm_look_at_array = np.array([point.coords[0] for point in self.target_dict[id].utm_look_at_points.geoms]) point_3d_list = [] look_at_3d_list = [] for height in height_array: point_3d = np.hstack((utm_point_array, np.zeros((utm_point_array.shape[0], 1)) + height + self.target_dict[id].base_height)) look_at_3d = np.hstack((utm_look_at_array, np.zeros((utm_look_at_array.shape[0], 1)) + height + self.target_dict[id].base_height)) point_3d_list.append(point_3d) look_at_3d_list.append(look_at_3d) point_3d_list.append(np.array(prev_point.coords)) point_3d_list.append(np.array(next_point.coords)) point_3d_array = np.concat(point_3d_list) look_at_3d_array = np.concat(look_at_3d_list) distance_matrix = squareform(pdist(point_3d_array * 1000, metric='euclidean')).astype(int) node_list = UavPlanning._solve_one_uav_optimal(distance_matrix) for node in node_list[1: -1]: route_list.append(Point(point_3d_array[node])) look_at_list.append(Point(look_at_3d_array[node])) payload_list.append(1) route_utm = LineString(route_list) route_wgs84 = transform(UTM_TO_WGS84_3D, route_utm) look_at_wgs84 = transform(UTM_TO_WGS84_3D, LineString(look_at_list)) result = [] for route, look_at, payload in zip(route_wgs84.coords,look_at_wgs84.coords, payload_list): node = { "coord": route, "payload": { "status": payload, "look_at": look_at } } result.append(node) result_dict[uav.id] = result distance_list.append(route_utm.length) return result_dict, distance_list def solution_distance(self) -> list[float]: distance_list = [] for solution in self.solution_list: distance = 0 for cur_node, next_node in zip(solution[: -1], solution[1:]): distance += self._distance_matrix[cur_node][next_node] + self._node_cost[next_node] distance_list.append(int(distance)) return distance_list @staticmethod def _process_optics_point(point:Point, prev_point:Point, uav:Uav): length = uav.valid_length width = uav.valid_width point_array = np.array(point.coords)[0] vec_x = point_array - np.array(prev_point.coords)[0] vec_x_norm = vec_x / np.linalg.norm(vec_x) vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]]) vec_length = vec_x_norm * length / 2 vec_width = vec_y_norm * width / 2 photo = Polygon([ point_array + vec_length + vec_width, point_array + vec_length - vec_width, point_array - vec_length - vec_width, point_array - vec_length + vec_width ]) return point, photo, 1 @staticmethod def _solve_one_uav_optimal(distance_matrix: np.ndarray[int]) -> list[int]: """ 求解单无人机区域内规划, 无人机开始结束点为最后两位 Args: distance_matrix(np.ndarray[int]): 目标点距离矩阵 Returns: """ manager = pywrapcp.RoutingIndexManager( len(distance_matrix), 1, [len(distance_matrix) - 2], [len(distance_matrix) - 1] ) # 创建路线 routing = pywrapcp.RoutingModel(manager) def distance_callback(from_index, to_index): """距离回调""" # Convert from routing variable Index to distance matrix NodeIndex. from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) return distance_matrix[from_node][to_node] # 添加函数回调 transit_callback_index = routing.RegisterTransitCallback(distance_callback) routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) # # 设置求解器 search_parameters = pywrapcp.DefaultRoutingSearchParameters() search_parameters.first_solution_strategy = ( routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC ) search_parameters.time_limit.seconds = 10 # Solve the problem. routing.CloseModelWithParameters(search_parameters) # 求解 solution = routing.SolveWithParameters(search_parameters) node_list = [] index = routing.Start(0) while True: node_list.append(manager.IndexToNode(index)) if routing.IsEnd(index): break index = solution.Value(routing.NextVar(index)) # 记录数据 return node_list