import copy import numpy as np from abc import ABC, abstractmethod from ortools.constraint_solver import routing_enums_pb2, pywrapcp from shapely.geometry import Point, LineString, Polygon, MultiLineString, MultiPolygon, MultiPoint from shapely.ops import transform from shapely.affinity import rotate from .coords import WGS84_TO_UTM, UTM_TO_WGS84 from .uav import Uav import logging logger = logging.getLogger("Process") class Process(ABC): @abstractmethod def process_region(self, region): pass @abstractmethod def process_line(self, region): pass @abstractmethod def process_point(self, region): pass @staticmethod def _access_sequence(point1, point2, prev_point, next_point): distance1 = prev_point.distance(Point(point1)) + next_point.distance(Point(point2)) distance2 = prev_point.distance(Point(point2)) + next_point.distance(Point(point1)) return distance1 < distance2 @staticmethod def _discrete_curve(line: LineString, width: float) -> list[LineString]: """ 离散化SAR曲线拍摄目标 Args: line(LineString): 目标曲线 width(float): 幅宽 Return: MultiLineString: 离散化后目标拍摄中线 """ coords = list(line.coords) n = len(coords) line_list = [] i = 0 max_dev = width / 2 while i < n - 1: j = i + 1 projection_distance = 0 while j < n: coord_array = np.array(line.coords[i:j+1]) if len(coord_array) < 2: break a, b = coord_array[0], coord_array[-1] vec = b - a vec_norm = vec / np.linalg.norm(vec) projection_distance_array = np.dot(coord_array - a, vec_norm[:, None]) projected = projection_distance_array * vec_norm projection_points = a + projected perpendicular_distance_array = np.linalg.norm(coord_array - projection_points, axis=1) if np.max(perpendicular_distance_array) > max_dev: break else: projection_distance = np.max(projection_distance_array) j += 1 j -= 1 # 回退到最后一个合法点 a, b = np.array(coords[i]), np.array(coords[j]) vec = b - a vec_norm = vec / np.linalg.norm(vec) while j < n - 1: coord = np.array(line.coords[j + 1]) this_projection_distance = np.dot(coord - a, vec_norm) projection_point = a + this_projection_distance * vec_norm this_vertical_distance = np.linalg.norm(coord - projection_point) if this_vertical_distance > max_dev: break else: if projection_distance < this_projection_distance: projection_distance = this_projection_distance j += 1 b = a + vec_norm * projection_distance line_list.append(LineString([a, b])) i = j # 继续下一个段落 return line_list @staticmethod def _lines_sequence(lines: LineString | list[LineString], prev_point: Point, next_point: Point) -> list[LineString]: if isinstance(lines, LineString): lines = [lines] if len(lines) > 1: sequence = Process._access_sequence(lines[0].centroid, lines[-1].centroid, prev_point, next_point) if not sequence: lines = lines[::-1] prev_point, next_point = next_point, prev_point processed_lines = [] for i, line in enumerate(lines): if i < len(lines) - 1: next_centroid = lines[i + 1].centroid else: next_centroid = next_point sequence = Process._access_sequence(line.coords[0], line.coords[-1], prev_point, next_centroid) if not sequence: line = LineString(line.coords[::-1]) processed_lines.append(line) prev_point = Point(line.coords[-1]) return processed_lines @staticmethod def _vrp_solver(distance_matrix) -> list[int]: manager = pywrapcp.RoutingIndexManager( len(distance_matrix), 1, [len(distance_matrix) - 2], [len(distance_matrix) - 1] ) # 创建路线 routing = pywrapcp.RoutingModel(manager) def distance_callback(from_index, to_index): """距离回调""" # Convert from routing variable Index to distance matrix NodeIndex. from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) return distance_matrix[from_node][to_node] # 添加函数回调 transit_callback_index = routing.RegisterTransitCallback(distance_callback) routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) # # 设置求解器 search_parameters = pywrapcp.DefaultRoutingSearchParameters() search_parameters.first_solution_strategy = ( routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC ) search_parameters.time_limit.seconds = 10 # Solve the problem. routing.CloseModelWithParameters(search_parameters) # 求解 solution = routing.SolveWithParameters(search_parameters) node_list = [] index = routing.Start(0) while True: node_list.append(manager.IndexToNode(index)) if routing.IsEnd(index): break index = solution.Value(routing.NextVar(index)) # 记录数据 return node_list class SarProcess(Process): def __init__(self, uav:Uav): self.reserved_distance = uav.reserved_distance self.deviation_distance = uav.deviation_distance self.length = uav.length self.width = uav.width self.height = uav.height self.theta = uav.theta self.direction = uav.direction # self.min_distance = uav.min_distance # TODO def process_point(self, point: Point, prev_point: Point, next_point: Point) -> dict[str, float]: # vec = np.array(next_point.coords)[0] - np.array(prev_point.coords)[0] # if np.linalg.norm(vec) == 0: # vec = np.array(prev_point.coords)[0] - np.array(point.coords)[0] # angle = np.degrees(np.atan2(vec[1], vec[0])) # return self._calculate_point_route(point, angle) result = {} point_array = np.array(point.coords)[0] vec_x = np.array(next_point.coords)[0] - np.array(prev_point.coords)[0] if np.linalg.norm(vec_x) == 0: vec_y = point_array - np.array(prev_point.coords)[0] vec_y_norm = vec_y / np.linalg.norm(vec_y) vec_x_norm = np.array([vec_y_norm[1], -vec_y_norm[0]]) else: vec_x_norm = vec_x / np.linalg.norm(vec_x) vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]]) a = point_array b = point_array vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance result['ground_start'] = a - vec_x_norm * self.length / 2 result['ground_end'] = b + vec_x_norm * self.length / 2 result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation result['flight_height'] = self.height result['target_centroid'] = (a + b) / 2 result['target_height'] = 0 result['terget_width'] = self.width result['target_length'] = self.length result['target_heading'] = float(np.degrees(np.atan2(vec_x[0], vec_x[1]))) result['squint_angle'] = 0 result['grazing_angle'] = 90 - self.theta result['flight_type'] = 0 result['direction'] = self.direction return result def process_line(self, line: LineString, prev_point: Point, next_point: Point) -> list[dict[str, float]]: lines = self._discrete_curve(line, self.width) processed_lines = self._lines_sequence(lines, prev_point, next_point) result_list = [self._calculate_line_route(line) for line in processed_lines] return result_list def process_region(self, region: Polygon, prev_point: Point, next_point: Point, angle: float) -> list[dict[str, float]]: region_rotated = rotate(region, angle=-angle, origin=region.centroid) minx, miny, maxx, maxy = region_rotated.bounds delta_y = maxy - miny line_nums = np.ceil(delta_y / self.width).astype(int) yy = np.linspace(miny + delta_y / line_nums / 2, maxy, line_nums) lines = [rotate(LineString([(minx, y), (maxx, y)]), angle=angle, origin=region.centroid) for y in yy] processed_lines = self._lines_sequence(lines, prev_point, next_point) result_list = [self._calculate_line_route(line) for line in processed_lines] return result_list def process_around(self, point: Point, prev_point: Point, edge = 8): point_array = np.array(point.coords)[0] prev_point_array = np.array(prev_point.coords)[0] vec = prev_point_array - point_array init_angle = np.degrees(np.atan2(vec[1], vec[0])) results = [] if self.direction == 1: angle_array = np.linspace(0, 360, edge + 1)[:-1] elif self.direction == -1: angle_array = np.linspace(180, -180, edge + 1)[:-1] for angle in angle_array: results.append(self._calculate_point_route(point, init_angle + angle)) return results def _calculate_point_route(self, point: Point, angle: float): result = {} vec_y_norm = - np.array([np.cos(np.radians(angle)), np.sin(np.radians(angle))]) point_array = np.array(point.coords)[0] vec_x_norm = np.array([vec_y_norm[1], -vec_y_norm[0]]) a = point_array b = point_array vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance result['ground_start'] = a - vec_x_norm * self.length / 2 result['ground_end'] = b + vec_x_norm * self.length / 2 result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation result['flight_height'] = self.height result['target_centroid'] = (a + b) / 2 result['target_height'] = 0 result['terget_width'] = self.width result['target_length'] = self.length result['target_heading'] = float(np.degrees(np.atan2(vec_x_norm[0], vec_x_norm[1]))) result['squint_angle'] = 0 result['grazing_angle'] = 90 - self.theta result['flight_type'] = 0 result['direction'] = self.direction return result def _calculate_line_route(self, line: LineString | list[LineString]): result = {} a, b = np.array(line.coords) vec_x = b - a vec_x_norm = vec_x / np.linalg.norm(vec_x) vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]]) vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance result['ground_start'] = a - vec_x_norm * self.length / 2 result['ground_end'] = b + vec_x_norm * self.length / 2 result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation result['flight_height'] = self.height result['target_centroid'] = (a + b) / 2 result['target_height'] = 0 result['terget_width'] = self.width result['target_length'] = line.length if line.length > self.length else self.length result['target_heading'] = float(np.degrees(np.atan2(vec_x[0], vec_x[1]))) result['squint_angle'] = 0 result['grazing_angle'] = 90 - self.theta result['flight_type'] = 0 result['direction'] = self.direction return result def process_result(self, result_list: list[dict[str, float]]) -> list[dict[str, float]]: utm_dict = { 'uav_start':[], 'uav_end': [], 'flight_start':[], 'flight_end': [], 'ground_start':[], 'ground_end':[], 'target_centroid': [] } result_list_copy = copy.deepcopy(result_list) for result in result_list_copy: for key, value in utm_dict.items(): value.append(result[key]) wgs84_dict = {} for key, value in utm_dict.items(): wgs84_points = transform(UTM_TO_WGS84, MultiPoint(value)) wgs84_dict[key] = wgs84_points for result, uav_start, uav_end, flight_start, flight_end, ground_start, ground_end, target_centroid in zip( result_list_copy, wgs84_dict['uav_start'].geoms, wgs84_dict['uav_end'].geoms, wgs84_dict['flight_start'].geoms, wgs84_dict['flight_end'].geoms, wgs84_dict['ground_start'].geoms, wgs84_dict['ground_end'].geoms, wgs84_dict['target_centroid'].geoms ): flight_height = result.pop('flight_height') target_height = result.pop('target_height') result['uav_start'] = [uav_start.x, uav_start.y, flight_height] result['uav_end'] = [uav_end.x, uav_end.y, flight_height] result['flight_start'] = [flight_start.x, flight_start.y, flight_height] result['flight_end'] = [flight_end.x, flight_end.y, flight_height] result['ground_start'] = [ground_start.x, ground_start.y, target_height] result['ground_end'] = [ground_end.x, ground_end.y, target_height] result['target_centroid'] = [target_centroid.x, target_centroid.y, target_height] return result_list_copy @staticmethod def results_to_route(results: list[dict[str, float]] | dict[str, float]) -> list[Point]: route = [] if isinstance(results, dict): route.append(Point(results['uav_start'])) route.append(Point(results['uav_end'])) else: for result in results: route.append(Point(result['uav_start'])) route.append(Point(result['uav_end'])) return route