skyeyesystem/backend/Skyeye-sys-dev/skyeye-service-py/klkx/planning/process.py

377 lines
15 KiB
Python
Raw Normal View History

import copy
import numpy as np
from abc import ABC, abstractmethod
from ortools.constraint_solver import routing_enums_pb2, pywrapcp
from shapely.geometry import Point, LineString, Polygon, MultiLineString, MultiPolygon, MultiPoint
from shapely.ops import transform
from shapely.affinity import rotate
from .coords import WGS84_TO_UTM, UTM_TO_WGS84
from .uav import Uav
import logging
logger = logging.getLogger("Process")
class Process(ABC):
@abstractmethod
def process_region(self, region):
pass
@abstractmethod
def process_line(self, region):
pass
@abstractmethod
def process_point(self, region):
pass
@staticmethod
def _access_sequence(point1, point2, prev_point, next_point):
distance1 = prev_point.distance(Point(point1)) + next_point.distance(Point(point2))
distance2 = prev_point.distance(Point(point2)) + next_point.distance(Point(point1))
return distance1 < distance2
@staticmethod
def _discrete_curve(line: LineString, width: float) -> list[LineString]:
"""
离散化SAR曲线拍摄目标
Args:
line(LineString): 目标曲线
width(float): 幅宽
Return:
MultiLineString: 离散化后目标拍摄中线
"""
coords = list(line.coords)
n = len(coords)
line_list = []
i = 0
max_dev = width / 2
while i < n - 1:
j = i + 1
projection_distance = 0
while j < n:
coord_array = np.array(line.coords[i:j+1])
if len(coord_array) < 2:
break
a, b = coord_array[0], coord_array[-1]
vec = b - a
vec_norm = vec / np.linalg.norm(vec)
projection_distance_array = np.dot(coord_array - a, vec_norm[:, None])
projected = projection_distance_array * vec_norm
projection_points = a + projected
perpendicular_distance_array = np.linalg.norm(coord_array - projection_points, axis=1)
if np.max(perpendicular_distance_array) > max_dev:
break
else:
projection_distance = np.max(projection_distance_array)
j += 1
j -= 1 # 回退到最后一个合法点
a, b = np.array(coords[i]), np.array(coords[j])
vec = b - a
vec_norm = vec / np.linalg.norm(vec)
while j < n - 1:
coord = np.array(line.coords[j + 1])
this_projection_distance = np.dot(coord - a, vec_norm)
projection_point = a + this_projection_distance * vec_norm
this_vertical_distance = np.linalg.norm(coord - projection_point)
if this_vertical_distance > max_dev:
break
else:
if projection_distance < this_projection_distance:
projection_distance = this_projection_distance
j += 1
b = a + vec_norm * projection_distance
line_list.append(LineString([a, b]))
i = j # 继续下一个段落
return line_list
@staticmethod
def _lines_sequence(lines: LineString | list[LineString], prev_point: Point, next_point: Point) -> list[LineString]:
if isinstance(lines, LineString):
lines = [lines]
if len(lines) > 1:
sequence = Process._access_sequence(lines[0].centroid, lines[-1].centroid, prev_point, next_point)
if not sequence:
lines = lines[::-1]
prev_point, next_point = next_point, prev_point
processed_lines = []
for i, line in enumerate(lines):
if i < len(lines) - 1:
next_centroid = lines[i + 1].centroid
else:
next_centroid = next_point
sequence = Process._access_sequence(line.coords[0], line.coords[-1], prev_point, next_centroid)
if not sequence:
line = LineString(line.coords[::-1])
processed_lines.append(line)
prev_point = Point(line.coords[-1])
return processed_lines
@staticmethod
def _vrp_solver(distance_matrix) -> list[int]:
manager = pywrapcp.RoutingIndexManager(
len(distance_matrix),
1,
[len(distance_matrix) - 2],
[len(distance_matrix) - 1]
)
# 创建路线
routing = pywrapcp.RoutingModel(manager)
def distance_callback(from_index, to_index):
"""距离回调"""
# Convert from routing variable Index to distance matrix NodeIndex.
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return distance_matrix[from_node][to_node]
# 添加函数回调
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index) #
# 设置求解器
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
)
search_parameters.time_limit.seconds = 10
# Solve the problem.
routing.CloseModelWithParameters(search_parameters)
# 求解
solution = routing.SolveWithParameters(search_parameters)
node_list = []
index = routing.Start(0)
while True:
node_list.append(manager.IndexToNode(index))
if routing.IsEnd(index):
break
index = solution.Value(routing.NextVar(index))
# 记录数据
return node_list
class SarProcess(Process):
def __init__(self, uav:Uav):
self.reserved_distance = uav.reserved_distance
self.deviation_distance = uav.deviation_distance
self.length = uav.length
self.width = uav.width
self.height = uav.height
self.theta = uav.theta
self.direction = uav.direction
# self.min_distance = uav.min_distance
# TODO
def process_point(self, point: Point, prev_point: Point, next_point: Point) -> dict[str, float]:
# vec = np.array(next_point.coords)[0] - np.array(prev_point.coords)[0]
# if np.linalg.norm(vec) == 0:
# vec = np.array(prev_point.coords)[0] - np.array(point.coords)[0]
# angle = np.degrees(np.atan2(vec[1], vec[0]))
# return self._calculate_point_route(point, angle)
result = {}
point_array = np.array(point.coords)[0]
vec_x = np.array(next_point.coords)[0] - np.array(prev_point.coords)[0]
if np.linalg.norm(vec_x) == 0:
vec_y = point_array - np.array(prev_point.coords)[0]
vec_y_norm = vec_y / np.linalg.norm(vec_y)
vec_x_norm = np.array([vec_y_norm[1], -vec_y_norm[0]])
else:
vec_x_norm = vec_x / np.linalg.norm(vec_x)
vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]])
a = point_array
b = point_array
vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance
result['ground_start'] = a - vec_x_norm * self.length / 2
result['ground_end'] = b + vec_x_norm * self.length / 2
result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation
result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation
result['flight_height'] = self.height
result['target_centroid'] = (a + b) / 2
result['target_height'] = 0
result['terget_width'] = self.width
result['target_length'] = self.length
result['target_heading'] = float(np.degrees(np.atan2(vec_x[0], vec_x[1])))
result['squint_angle'] = 0
result['grazing_angle'] = 90 - self.theta
result['flight_type'] = 0
result['direction'] = self.direction
return result
def process_line(self, line: LineString, prev_point: Point, next_point: Point) -> list[dict[str, float]]:
lines = self._discrete_curve(line, self.width)
processed_lines = self._lines_sequence(lines, prev_point, next_point)
result_list = [self._calculate_line_route(line) for line in processed_lines]
return result_list
def process_region(self, region: Polygon, prev_point: Point, next_point: Point, angle: float) -> list[dict[str, float]]:
region_rotated = rotate(region, angle=-angle, origin=region.centroid)
minx, miny, maxx, maxy = region_rotated.bounds
delta_y = maxy - miny
line_nums = np.ceil(delta_y / self.width).astype(int)
yy = np.linspace(miny + delta_y / line_nums / 2, maxy, line_nums)
lines = [rotate(LineString([(minx, y), (maxx, y)]), angle=angle, origin=region.centroid) for y in yy]
processed_lines = self._lines_sequence(lines, prev_point, next_point)
result_list = [self._calculate_line_route(line) for line in processed_lines]
return result_list
def process_around(self, point: Point, prev_point: Point, edge = 8):
point_array = np.array(point.coords)[0]
prev_point_array = np.array(prev_point.coords)[0]
vec = prev_point_array - point_array
init_angle = np.degrees(np.atan2(vec[1], vec[0]))
results = []
if self.direction == 1:
angle_array = np.linspace(0, 360, edge + 1)[:-1]
elif self.direction == -1:
angle_array = np.linspace(180, -180, edge + 1)[:-1]
for angle in angle_array:
results.append(self._calculate_point_route(point, init_angle + angle))
return results
def _calculate_point_route(self, point: Point, angle: float):
result = {}
vec_y_norm = - np.array([np.cos(np.radians(angle)), np.sin(np.radians(angle))])
point_array = np.array(point.coords)[0]
vec_x_norm = np.array([vec_y_norm[1], -vec_y_norm[0]])
a = point_array
b = point_array
vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance
result['ground_start'] = a - vec_x_norm * self.length / 2
result['ground_end'] = b + vec_x_norm * self.length / 2
result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation
result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation
result['flight_height'] = self.height
result['target_centroid'] = (a + b) / 2
result['target_height'] = 0
result['terget_width'] = self.width
result['target_length'] = self.length
result['target_heading'] = float(np.degrees(np.atan2(vec_x_norm[0], vec_x_norm[1])))
result['squint_angle'] = 0
result['grazing_angle'] = 90 - self.theta
result['flight_type'] = 0
result['direction'] = self.direction
return result
def _calculate_line_route(self, line: LineString | list[LineString]):
result = {}
a, b = np.array(line.coords)
vec_x = b - a
vec_x_norm = vec_x / np.linalg.norm(vec_x)
vec_y_norm = np.array([-vec_x_norm[1], vec_x_norm[0]])
vec_deviation = -1 * self.direction * vec_y_norm * self.deviation_distance
result['ground_start'] = a - vec_x_norm * self.length / 2
result['ground_end'] = b + vec_x_norm * self.length / 2
result['uav_start'] = a - vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
result['uav_end'] = b + vec_x_norm * (self.length / 2 + self.reserved_distance) + vec_deviation
result['flight_start'] = a - vec_x_norm * self.length / 2 + vec_deviation
result['flight_end'] = b + vec_x_norm * self.length / 2 + vec_deviation
result['flight_height'] = self.height
result['target_centroid'] = (a + b) / 2
result['target_height'] = 0
result['terget_width'] = self.width
result['target_length'] = line.length if line.length > self.length else self.length
result['target_heading'] = float(np.degrees(np.atan2(vec_x[0], vec_x[1])))
result['squint_angle'] = 0
result['grazing_angle'] = 90 - self.theta
result['flight_type'] = 0
result['direction'] = self.direction
return result
def process_result(self, result_list: list[dict[str, float]]) -> list[dict[str, float]]:
utm_dict = {
'uav_start':[],
'uav_end': [],
'flight_start':[],
'flight_end': [],
'ground_start':[],
'ground_end':[],
'target_centroid': []
}
result_list_copy = copy.deepcopy(result_list)
for result in result_list_copy:
for key, value in utm_dict.items():
value.append(result[key])
wgs84_dict = {}
for key, value in utm_dict.items():
wgs84_points = transform(UTM_TO_WGS84, MultiPoint(value))
wgs84_dict[key] = wgs84_points
for result, uav_start, uav_end, flight_start, flight_end, ground_start, ground_end, target_centroid in zip(
result_list_copy,
wgs84_dict['uav_start'].geoms,
wgs84_dict['uav_end'].geoms,
wgs84_dict['flight_start'].geoms,
wgs84_dict['flight_end'].geoms,
wgs84_dict['ground_start'].geoms,
wgs84_dict['ground_end'].geoms,
wgs84_dict['target_centroid'].geoms
):
flight_height = result.pop('flight_height')
target_height = result.pop('target_height')
result['uav_start'] = [uav_start.x, uav_start.y, flight_height]
result['uav_end'] = [uav_end.x, uav_end.y, flight_height]
result['flight_start'] = [flight_start.x, flight_start.y, flight_height]
result['flight_end'] = [flight_end.x, flight_end.y, flight_height]
result['ground_start'] = [ground_start.x, ground_start.y, target_height]
result['ground_end'] = [ground_end.x, ground_end.y, target_height]
result['target_centroid'] = [target_centroid.x, target_centroid.y, target_height]
return result_list_copy
@staticmethod
def results_to_route(results: list[dict[str, float]] | dict[str, float]) -> list[Point]:
route = []
if isinstance(results, dict):
route.append(Point(results['uav_start']))
route.append(Point(results['uav_end']))
else:
for result in results:
route.append(Point(result['uav_start']))
route.append(Point(result['uav_end']))
return route