377 lines
15 KiB
Python
377 lines
15 KiB
Python
|
|
import copy
|
||
|
|
import numpy as np
|
||
|
|
from abc import ABC, abstractmethod
|
||
|
|
from ortools.constraint_solver import routing_enums_pb2, pywrapcp
|
||
|
|
from shapely.geometry import Point, LineString, Polygon, MultiLineString, MultiPolygon, MultiPoint
|
||
|
|
from shapely.ops import transform
|
||
|
|
from shapely.affinity import rotate
|
||
|
|
|
||
|
|
from .coords import WGS84_TO_UTM, UTM_TO_WGS84
|
||
|
|
from .uav import Uav
|
||
|
|
|
||
|
|
import logging
|
||
|
|
logger = logging.getLogger("Process")
|
||
|
|
|
||
|
|
class Process(ABC):
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def process_region(self, region):
|
||
|
|
pass
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def process_line(self, region):
|
||
|
|
pass
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def process_point(self, region):
|
||
|
|
pass
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _access_sequence(point1, point2, prev_point, next_point):
|
||
|
|
distance1 = prev_point.distance(Point(point1)) + next_point.distance(Point(point2))
|
||
|
|
distance2 = prev_point.distance(Point(point2)) + next_point.distance(Point(point1))
|
||
|
|
return distance1 < distance2
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _discrete_curve(line: LineString, width: float) -> list[LineString]:
|
||
|
|
"""
|
||
|
|
离散化SAR曲线拍摄目标
|
||
|
|
Args:
|
||
|
|
line(LineString): 目标曲线
|
||
|
|
width(float): 幅宽
|
||
|
|
|
||
|
|
Return:
|
||
|
|
MultiLineString: 离散化后目标拍摄中线
|
||
|
|
"""
|
||
|
|
coords = list(line.coords)
|
||
|
|
n = len(coords)
|
||
|
|
line_list = []
|
||
|
|
i = 0
|
||
|
|
max_dev = width / 2
|
||
|
|
|
||
|
|
while i < n - 1:
|
||
|
|
j = i + 1
|
||
|
|
projection_distance = 0
|
||
|
|
while j < n:
|
||
|
|
coord_array = np.array(line.coords[i:j+1])
|
||
|
|
if len(coord_array) < 2:
|
||
|
|
break
|
||
|
|
a, b = coord_array[0], coord_array[-1]
|
||
|
|
vec = b - a
|
||
|
|
vec_norm = vec / np.linalg.norm(vec)
|
||
|
|
projection_distance_array = np.dot(coord_array - a, vec_norm[:, None])
|
||
|
|
projected = projection_distance_array * vec_norm
|
||
|
|
projection_points = a + projected
|
||
|
|
perpendicular_distance_array = np.linalg.norm(coord_array - projection_points, axis=1)
|
||
|
|
|
||
|
|
if np.max(perpendicular_distance_array) > max_dev:
|
||
|
|
break
|
||
|
|
else:
|
||
|
|
projection_distance = np.max(projection_distance_array)
|
||
|
|
j += 1
|
||
|
|
|
||
|
|
j -= 1 # 回退到最后一个合法点
|
||
|
|
|
||
|
|
a, b = np.array(coords[i]), np.array(coords[j])
|
||
|
|
vec = b - a
|
||
|
|
vec_norm = vec / np.linalg.norm(vec)
|
||
|
|
while j < n - 1:
|
||
|
|
coord = np.array(line.coords[j + 1])
|
||
|
|
this_projection_distance = np.dot(coord - a, vec_norm)
|
||
|
|
projection_point = a + this_projection_distance * vec_norm
|
||
|
|
this_vertical_distance = np.linalg.norm(coord - projection_point)
|
||
|
|
if this_vertical_distance > max_dev:
|
||
|
|
break
|
||
|
|
else:
|
||
|
|
if projection_distance < this_projection_distance:
|
||
|
|
projection_distance = this_projection_distance
|
||
|
|
j += 1
|
||
|
|
|
||
|
|
b = a + vec_norm * projection_distance
|
||
|
|
line_list.append(LineString([a, b]))
|
||
|
|
i = j # 继续下一个段落
|
||
|
|
|
||
|
|
return line_list
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _lines_sequence(lines: LineString | list[LineString], prev_point: Point, next_point: Point) -> list[LineString]:
|
||
|
|
|
||
|
|
if isinstance(lines, LineString):
|
||
|
|
lines = [lines]
|
||
|
|
|
||
|
|
if len(lines) > 1:
|
||
|
|
sequence = Process._access_sequence(lines[0].centroid, lines[-1].centroid, prev_point, next_point)
|
||
|
|
if not sequence:
|
||
|
|
lines = lines[::-1]
|
||
|
|
prev_point, next_point = next_point, prev_point
|
||
|
|
|
||
|
|
processed_lines = []
|
||
|
|
for i, line in enumerate(lines):
|
||
|
|
if i < len(lines) - 1:
|
||
|
|
next_centroid = lines[i + 1].centroid
|
||
|
|
else:
|
||
|
|
next_centroid = next_point
|
||
|
|
|
||
|
|
sequence = Process._access_sequence(line.coords[0], line.coords[-1], prev_point, next_centroid)
|
||
|
|
if not sequence:
|
||
|
|
line = LineString(line.coords[::-1])
|
||
|
|
processed_lines.append(line)
|
||
|
|
prev_point = Point(line.coords[-1])
|
||
|
|
|
||
|
|
return processed_lines
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _vrp_solver(distance_matrix) -> list[int]:
|
||
|
|
manager = pywrapcp.RoutingIndexManager(
|
||
|
|
len(distance_matrix),
|
||
|
|
1,
|
||
|
|
[len(distance_matrix) - 2],
|
||
|
|
[len(distance_matrix) - 1]
|
||
|
|
)
|
||
|
|
|
||
|
|
# 创建路线
|
||
|
|
routing = pywrapcp.RoutingModel(manager)
|
||
|
|
|
||
|
|
def distance_callback(from_index, to_index):
|
||
|
|
"""距离回调"""
|
||
|
|
# Convert from routing variable Index to distance matrix NodeIndex.
|
||
|
|
from_node = manager.IndexToNode(from_index)
|
||
|
|
to_node = manager.IndexToNode(to_index)
|
||
|
|
return distance_matrix[from_node][to_node]
|
||
|
|
|
||
|
|
# 添加函数回调
|
||
|
|
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
|
||
|
|
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) #
|
||
|
|
|
||
|
|
# 设置求解器
|
||
|
|
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
|
||
|
|
search_parameters.first_solution_strategy = (
|
||
|
|
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
|
||
|
|
)
|
||
|
|
search_parameters.time_limit.seconds = 10
|
||
|
|
|
||
|
|
# Solve the problem.
|
||
|
|
routing.CloseModelWithParameters(search_parameters)
|
||
|
|
# 求解
|
||
|
|
solution = routing.SolveWithParameters(search_parameters)
|
||
|
|
|
||
|
|
node_list = []
|
||
|
|
index = routing.Start(0)
|
||
|
|
while True:
|
||
|
|
node_list.append(manager.IndexToNode(index))
|
||
|
|
if routing.IsEnd(index):
|
||
|
|
break
|
||
|
|
index = solution.Value(routing.NextVar(index))
|
||
|
|
|
||
|
|
|
||
|
|
# 记录数据
|
||
|
|
return node_list
|
||
|
|
|
||
|
|
class SarProcess(Process):
|
||
|
|
|
||
|
|
def __init__(self, uav:Uav):
|
||
|
|
self.reserved_distance = uav.reserved_distance
|
||
|
|
self.deviation_distance = uav.deviation_distance
|
||
|
|
self.length = uav.length
|
||
|
|
self.width = uav.width
|
||
|
|
self.height = uav.height
|
||
|
|
self.theta = uav.theta
|
||
|
|
self.direction = uav.direction
|
||
|
|
# self.min_distance = uav.min_distance
|
||
|
|
|
||
|
|
# TODO
|
||
|
|
def process_point(self, point: Point, prev_point: Point, next_point: Point) -> dict[str, float]:
|
||
|
|
|
||
|
|
# vec = np.array(next_point.coords)[0] - np.array(prev_point.coords)[0]
|
||
|
|
# if np.linalg.norm(vec) == 0:
|
||
|
|
# vec = np.array(prev_point.coords)[0] - np.array(point.coords)[0]
|
||
|
|
|
||
|
|
# angle = np.degrees(np.atan2(vec[1], vec[0]))
|
||
|
|
|
||
|
|
# return self._calculate_point_route(point, angle)
|
||
|
|
|
||
|
|
result = {}
|
||
|
|
|
||
|
|
point_array = np.array(point.coords)[0]
|
||
|
|
vec_x = np.array(next_point.coords)[0] - np.array(prev_point.coords)[0]
|
||
|
|
if np.linalg.norm(vec_x) == 0:
|
||
|
|
vec_y = point_array - np.array(prev_point.coords)[0]
|
||
|
|
vec_y_norm = vec_y / np.linalg.norm(vec_y)
|
||
|
|
vec_x_norm = np.array([vec_y_norm[1], -vec_y_norm[0]])
|
||
|
|
else:
|
||
|
|
vec_x_norm = vec_x / np.linalg.norm(vec_x)
|
||
|
|
vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]])
|
||
|
|
|
||
|
|
a = point_array
|
||
|
|
b = point_array
|
||
|
|
|
||
|
|
vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance
|
||
|
|
|
||
|
|
result['ground_start'] = a - vec_x_norm * self.length / 2
|
||
|
|
result['ground_end'] = b + vec_x_norm * self.length / 2
|
||
|
|
result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
|
||
|
|
result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
|
||
|
|
result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation
|
||
|
|
result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation
|
||
|
|
result['flight_height'] = self.height
|
||
|
|
result['target_centroid'] = (a + b) / 2
|
||
|
|
result['target_height'] = 0
|
||
|
|
result['terget_width'] = self.width
|
||
|
|
result['target_length'] = self.length
|
||
|
|
result['target_heading'] = float(np.degrees(np.atan2(vec_x[0], vec_x[1])))
|
||
|
|
result['squint_angle'] = 0
|
||
|
|
result['grazing_angle'] = 90 - self.theta
|
||
|
|
result['flight_type'] = 0
|
||
|
|
result['direction'] = self.direction
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def process_line(self, line: LineString, prev_point: Point, next_point: Point) -> list[dict[str, float]]:
|
||
|
|
lines = self._discrete_curve(line, self.width)
|
||
|
|
processed_lines = self._lines_sequence(lines, prev_point, next_point)
|
||
|
|
result_list = [self._calculate_line_route(line) for line in processed_lines]
|
||
|
|
return result_list
|
||
|
|
|
||
|
|
def process_region(self, region: Polygon, prev_point: Point, next_point: Point, angle: float) -> list[dict[str, float]]:
|
||
|
|
region_rotated = rotate(region, angle=-angle, origin=region.centroid)
|
||
|
|
minx, miny, maxx, maxy = region_rotated.bounds
|
||
|
|
delta_y = maxy - miny
|
||
|
|
line_nums = np.ceil(delta_y / self.width).astype(int)
|
||
|
|
yy = np.linspace(miny + delta_y / line_nums / 2, maxy, line_nums)
|
||
|
|
lines = [rotate(LineString([(minx, y), (maxx, y)]), angle=angle, origin=region.centroid) for y in yy]
|
||
|
|
processed_lines = self._lines_sequence(lines, prev_point, next_point)
|
||
|
|
result_list = [self._calculate_line_route(line) for line in processed_lines]
|
||
|
|
return result_list
|
||
|
|
|
||
|
|
def process_around(self, point: Point, prev_point: Point, edge = 8):
|
||
|
|
point_array = np.array(point.coords)[0]
|
||
|
|
prev_point_array = np.array(prev_point.coords)[0]
|
||
|
|
vec = prev_point_array - point_array
|
||
|
|
init_angle = np.degrees(np.atan2(vec[1], vec[0]))
|
||
|
|
results = []
|
||
|
|
if self.direction == 1:
|
||
|
|
angle_array = np.linspace(0, 360, edge + 1)[:-1]
|
||
|
|
elif self.direction == -1:
|
||
|
|
angle_array = np.linspace(180, -180, edge + 1)[:-1]
|
||
|
|
|
||
|
|
for angle in angle_array:
|
||
|
|
results.append(self._calculate_point_route(point, init_angle + angle))
|
||
|
|
|
||
|
|
return results
|
||
|
|
|
||
|
|
|
||
|
|
def _calculate_point_route(self, point: Point, angle: float):
|
||
|
|
result = {}
|
||
|
|
vec_y_norm = - np.array([np.cos(np.radians(angle)), np.sin(np.radians(angle))])
|
||
|
|
point_array = np.array(point.coords)[0]
|
||
|
|
vec_x_norm = np.array([vec_y_norm[1], -vec_y_norm[0]])
|
||
|
|
|
||
|
|
a = point_array
|
||
|
|
b = point_array
|
||
|
|
|
||
|
|
vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance
|
||
|
|
|
||
|
|
result['ground_start'] = a - vec_x_norm * self.length / 2
|
||
|
|
result['ground_end'] = b + vec_x_norm * self.length / 2
|
||
|
|
result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
|
||
|
|
result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
|
||
|
|
result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation
|
||
|
|
result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation
|
||
|
|
result['flight_height'] = self.height
|
||
|
|
result['target_centroid'] = (a + b) / 2
|
||
|
|
result['target_height'] = 0
|
||
|
|
result['terget_width'] = self.width
|
||
|
|
result['target_length'] = self.length
|
||
|
|
result['target_heading'] = float(np.degrees(np.atan2(vec_x_norm[0], vec_x_norm[1])))
|
||
|
|
result['squint_angle'] = 0
|
||
|
|
result['grazing_angle'] = 90 - self.theta
|
||
|
|
result['flight_type'] = 0
|
||
|
|
result['direction'] = self.direction
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def _calculate_line_route(self, line: LineString | list[LineString]):
|
||
|
|
result = {}
|
||
|
|
a, b = np.array(line.coords)
|
||
|
|
vec_x = b - a
|
||
|
|
vec_x_norm = vec_x / np.linalg.norm(vec_x)
|
||
|
|
vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]])
|
||
|
|
vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance
|
||
|
|
|
||
|
|
result['ground_start'] = a - vec_x_norm * self.length / 2
|
||
|
|
result['ground_end'] = b + vec_x_norm * self.length / 2
|
||
|
|
result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
|
||
|
|
result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
|
||
|
|
result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation
|
||
|
|
result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation
|
||
|
|
result['flight_height'] = self.height
|
||
|
|
result['target_centroid'] = (a + b) / 2
|
||
|
|
result['target_height'] = 0
|
||
|
|
result['terget_width'] = self.width
|
||
|
|
result['target_length'] = line.length if line.length > self.length else self.length
|
||
|
|
result['target_heading'] = float(np.degrees(np.atan2(vec_x[0], vec_x[1])))
|
||
|
|
result['squint_angle'] = 0
|
||
|
|
result['grazing_angle'] = 90 - self.theta
|
||
|
|
result['flight_type'] = 0
|
||
|
|
result['direction'] = self.direction
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
def process_result(self, result_list: list[dict[str, float]]) -> list[dict[str, float]]:
|
||
|
|
utm_dict = {
|
||
|
|
'uav_start':[],
|
||
|
|
'uav_end': [],
|
||
|
|
'flight_start':[],
|
||
|
|
'flight_end': [],
|
||
|
|
'ground_start':[],
|
||
|
|
'ground_end':[],
|
||
|
|
'target_centroid': []
|
||
|
|
}
|
||
|
|
result_list_copy = copy.deepcopy(result_list)
|
||
|
|
for result in result_list_copy:
|
||
|
|
for key, value in utm_dict.items():
|
||
|
|
value.append(result[key])
|
||
|
|
wgs84_dict = {}
|
||
|
|
for key, value in utm_dict.items():
|
||
|
|
wgs84_points = transform(UTM_TO_WGS84, MultiPoint(value))
|
||
|
|
wgs84_dict[key] = wgs84_points
|
||
|
|
|
||
|
|
for result, uav_start, uav_end, flight_start, flight_end, ground_start, ground_end, target_centroid in zip(
|
||
|
|
result_list_copy,
|
||
|
|
wgs84_dict['uav_start'].geoms,
|
||
|
|
wgs84_dict['uav_end'].geoms,
|
||
|
|
wgs84_dict['flight_start'].geoms,
|
||
|
|
wgs84_dict['flight_end'].geoms,
|
||
|
|
wgs84_dict['ground_start'].geoms,
|
||
|
|
wgs84_dict['ground_end'].geoms,
|
||
|
|
wgs84_dict['target_centroid'].geoms
|
||
|
|
):
|
||
|
|
flight_height = result.pop('flight_height')
|
||
|
|
target_height = result.pop('target_height')
|
||
|
|
result['uav_start'] = [uav_start.x, uav_start.y, flight_height]
|
||
|
|
result['uav_end'] = [uav_end.x, uav_end.y, flight_height]
|
||
|
|
result['flight_start'] = [flight_start.x, flight_start.y, flight_height]
|
||
|
|
result['flight_end'] = [flight_end.x, flight_end.y, flight_height]
|
||
|
|
result['ground_start'] = [ground_start.x, ground_start.y, target_height]
|
||
|
|
result['ground_end'] = [ground_end.x, ground_end.y, target_height]
|
||
|
|
result['target_centroid'] = [target_centroid.x, target_centroid.y, target_height]
|
||
|
|
|
||
|
|
return result_list_copy
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def results_to_route(results: list[dict[str, float]] | dict[str, float]) -> list[Point]:
|
||
|
|
route = []
|
||
|
|
if isinstance(results, dict):
|
||
|
|
route.append(Point(results['uav_start']))
|
||
|
|
route.append(Point(results['uav_end']))
|
||
|
|
else:
|
||
|
|
for result in results:
|
||
|
|
route.append(Point(result['uav_start']))
|
||
|
|
route.append(Point(result['uav_end']))
|
||
|
|
|
||
|
|
return route
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|