skyeyesystem/backend/Skyeye-sys-dev/skyeye-service-py/klkx/planning/target/target.py

363 lines
13 KiB
Python

from abc import ABC
from dataclasses import dataclass, field
from enum import StrEnum
import geopandas as gpd
import numpy as np
from shapely.affinity import rotate
from shapely.geometry import Polygon, Point, box, LineString, MultiPoint, MultiLineString, MultiPolygon
from shapely.ops import transform
from planning.beidou import LocationCode
from planning.coords import WGS84_TO_UTM, UTM_TO_WGS84
from planning.config import CONTAINER_LENGTH, CONTAINER_WIDTH, CONTAINER_SLOT_LENGTH, CONTAINER_SLOT_WIDTH, FRONT_DISTANCE
def get_target(target_type, payload_type):
pass
class TargetType(StrEnum):
point = "point"
line = "line"
region = "region"
around = "around"
@dataclass
class Target(ABC):
id: int = field(init=False)
shape: Polygon | LineString | Point = field(init=False)
gdf: gpd.GeoDataFrame = field(init=False)
@dataclass
class OpticsRegionTarget(Target):
id: int
shape: Polygon
level: int # 网格层级
gdf: gpd.GeoDataFrame
def __init__(self, id:int , shape: Polygon, level: int = 5):
"""
初始化区域。
Args:
id(int): 无人机ID
coordinates(List[Tuple[float, float]]): 区域顶点经纬度坐标(deg)
level(int): 北斗网格层级
"""
self.id = id
# 初始化 Polygon 类
self.shape = shape
# 初始化网格
self.level = level
delta_lon = LocationCode._DELTA_LON[level -1]
delta_lat = LocationCode._DELTA_LAT[level -1]
# 离散化区域
min_lon, min_lat, max_lon, max_lat = shape.bounds
min_lon = min_lon // delta_lon * delta_lon - delta_lon
min_lat = min_lat // delta_lat * delta_lat - delta_lat
max_lon = max_lon // delta_lon * delta_lon + delta_lon
max_lat = max_lat // delta_lat * delta_lat + delta_lat
lon_values = np.arange(min_lon, max_lon, delta_lon)
lat_values = np.arange(min_lat, max_lat, delta_lat)
# 光学载荷区域离散化, 将区域离散为网格
lon_mesh, lat_mesh = np.meshgrid(lon_values, lat_values)
lon_mesh = lon_mesh.ravel()
lat_mesh = lat_mesh.ravel()
grid_list = []
grid_status_list = []
for lon, lat in zip(lon_mesh, lat_mesh):
cell = box(lon, lat, lon + delta_lon, lat + delta_lat) # 创建一个正方形
if shape.contains(cell): # 仅保留与多边形相交的单元
grid_list.append(cell)
grid_status_list.append("complete")
elif shape.intersects(cell):
cell = shape.intersection(cell)
if cell.area > 1e-15:
grid_list.append(cell)
grid_status_list.append("incomplete")
gdf = gpd.GeoDataFrame(geometry=grid_list, crs="EPSG:4326")
gdf["centroid"] = gdf.centroid
gdf["code"] = gdf["centroid"].apply(lambda x: LocationCode.to_code(lon=x.x, lat=x.y, level=level))
gdf["level"] = self.level
gdf["id"] = self.id
gdf["type"] = TargetType.region
self.gdf = gdf
@dataclass
class OpticsFrontRegionTarget(Target):
id: int
shape: Polygon
base_height: float
utm_points: MultiPoint
utm_look_at_points: MultiPoint
gdf: gpd.GeoDataFrame
def __init__(self, id:int , shape: Polygon, height: float = 0):
self.id = id
self.shape = shape
self.base_height = height
box_utm = transform(WGS84_TO_UTM, shape)
box_utm = box_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域
# 计算矩形长宽
box_utm_coords = np.array(box_utm.exterior.coords)
edge_vec1 = box_utm_coords[1] - box_utm_coords[0]
edge_vec2 = box_utm_coords[2] - box_utm_coords[1]
edge_length1 = np.linalg.norm(edge_vec1)
edge_length2 = np.linalg.norm(edge_vec2)
if edge_length1 * edge_length2 > 200 * 200:
raise ValueError(f"The target area is too large.")
if edge_length1 > edge_length2:
if edge_vec1[0] == 0:
angle = np.pi / 2
else:
angle = np.arctan(edge_vec1[1] / edge_vec1[0])
else:
if edge_vec2[0] == 0:
angle = np.pi / 2
else:
angle = np.arctan(edge_vec2[1] / edge_vec2[0])
angle = np.degrees(angle)
# 计算集装箱中点
box_utm_rotated = rotate(box_utm, angle=-angle , origin=box_utm.centroid)
x_min, y_min, x_max, y_max = box_utm_rotated.bounds
x = np.arange(x_min + CONTAINER_SLOT_LENGTH / 2, x_max, CONTAINER_SLOT_LENGTH)
y = np.arange(y_min + CONTAINER_SLOT_WIDTH / 2, y_max, CONTAINER_SLOT_WIDTH)
top = np.stack([x, np.zeros_like(x) + y_max]).T
down = np.stack([x, np.zeros_like(x) + y_min]).T
left = np.stack([np.zeros_like(y) + x_min, y]).T
right = np.stack([np.zeros_like(y) + x_max, y]).T
utm_rotated_look_at_points = MultiPoint(np.concat([top, right, down, left]))
utm_rotated_points = MultiPoint(np.concat([
top + np.array([0, FRONT_DISTANCE]),
right + np.array([0, -FRONT_DISTANCE]),
down + np.array([-FRONT_DISTANCE, 0]),
left + np.array([FRONT_DISTANCE, 0])
]))
utm_points = rotate(utm_rotated_points, angle=angle, origin=box_utm.centroid)
utm_look_at_points = rotate(utm_rotated_look_at_points, angle=angle, origin=box_utm.centroid)
self.utm_points = utm_points
self.utm_look_at_points = utm_look_at_points
centroid = transform(UTM_TO_WGS84, box_utm.centroid)
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
gdf["centroid"] = centroid
gdf["code"] = None
gdf["id"] = id
gdf["type"] = TargetType.region
self.gdf = gdf
@dataclass
class SarRegionTarget(Target):
id: int
shape: Polygon
level: int # 网格层级
gdf: gpd.GeoDataFrame
_is_crossing = False # 跨180度经度线
def __init__(self, id:int , shape: Polygon, width: float = 100):
"""
初始化区域。
Args:
id(int): 无人机ID
coordinates(List[Tuple[float, float]]): 区域顶点经纬度坐标(deg)
level(int): 北斗网格层级
"""
self.id = id
# 初始化 Polygon 类
self.shape = shape
region_utm = transform(WGS84_TO_UTM, shape)
if region_utm.area > 2.5e7:
raise ValueError("The target area is too large.")
box_utm = region_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域
# 计算矩形长宽
box_utm_coords = np.array(box_utm.exterior.coords)
edge_vec1 = box_utm_coords[1] - box_utm_coords[0]
edge_vec2 = box_utm_coords[2] - box_utm_coords[1]
edge_length1 = np.linalg.norm(edge_vec1)
edge_length2 = np.linalg.norm(edge_vec2)
if edge_length1 > edge_length2:
if edge_vec1[0] == 0:
angle = np.pi / 2
else:
angle = np.arctan(edge_vec1[1] / edge_vec1[0])
else:
if edge_vec2[0] == 0:
angle = np.pi / 2
else:
angle = np.arctan(edge_vec2[1] / edge_vec2[0])
angle = np.degrees(angle)
self.angle = angle
region_utm_rotated = rotate(region_utm, angle=-angle , origin=region_utm.centroid)
x_min, y_min, x_max, y_max = region_utm_rotated.bounds
print(region_utm_rotated)
Y = np.arange(y_min, y_max + width, width)
region_rotated_list = []
for y_bottom, y_top in zip(Y[:-1], Y[1:]):
split_box = box(x_min, y_bottom, x_max, y_top)
split_region = region_utm_rotated.intersection(split_box)
xy = split_region.bounds
region_rotated_list.append(box(*xy))
regions_rotated = MultiPolygon(region_rotated_list)
regions_utm = rotate(regions_rotated, angle=angle, origin=region_utm.centroid)
centroids_utm = MultiPoint([poly.centroid for poly in regions_utm.geoms])
regions = transform(UTM_TO_WGS84, regions_utm)
centroids = transform(UTM_TO_WGS84, centroids_utm)
gdf = gpd.GeoDataFrame(geometry=[region for region in regions.geoms], crs="EPSG:4326")
gdf["centroid"] = [centroid for centroid in centroids.geoms]
gdf["code"] = None
gdf["id"] = id
gdf["type"] = TargetType.region
self.gdf = gdf
@dataclass
class LaserRegionTarget(Target):
id: int
shape: Polygon
angle: float
container_centroids: MultiPoint
gdf: gpd.GeoDataFrame
def __init__(self, id:int , shape: Polygon):
self.id = id
self.shape = shape
box_utm = transform(WGS84_TO_UTM, shape)
box_utm = box_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域
# 计算矩形长宽
box_utm_coords = np.array(box_utm.exterior.coords)
edge_vec1 = box_utm_coords[1] - box_utm_coords[0]
edge_vec2 = box_utm_coords[2] - box_utm_coords[1]
edge_length1 = np.linalg.norm(edge_vec1)
edge_length2 = np.linalg.norm(edge_vec2)
if edge_length1 * edge_length2 > 200 * 200:
raise ValueError(f"The target area is too large.")
if edge_length1 > edge_length2:
if edge_vec1[0] == 0:
angle = np.pi / 2
else:
angle = np.arctan(edge_vec1[1] / edge_vec1[0])
else:
if edge_vec2[0] == 0:
angle = np.pi / 2
else:
angle = np.arctan(edge_vec2[1] / edge_vec2[0])
angle = np.degrees(angle)
self.angle = angle
# 计算集装箱中点
box_utm_rotated = rotate(box_utm, angle=-angle , origin=box_utm.centroid)
x_min, y_min, x_max, y_max = box_utm_rotated.bounds
X = np.arange(x_min + CONTAINER_SLOT_LENGTH / 2, x_max, CONTAINER_SLOT_LENGTH)
Y = np.arange(y_min + CONTAINER_SLOT_WIDTH / 2, y_max, CONTAINER_SLOT_WIDTH)
xx, yy = np.meshgrid(X, Y)
xx = xx.ravel()
yy = yy.ravel()
line_base = np.array([[x_min, 0], [x_max, 0]])
line_array = line_base[None, :, :] + np.array([0, 1])[None, None, :] * Y[:, None, None]
lines = MultiLineString(line_array.tolist())
lines = rotate(lines, angle=angle, origin=box_utm.centroid)
self.lines = lines
centroid_array = np.stack([xx.ravel(), yy.ravel()]).T
slot_mins = centroid_array - np.array([CONTAINER_LENGTH / 2, CONTAINER_WIDTH / 2])
slot_maxs = centroid_array + np.array([CONTAINER_LENGTH / 2, CONTAINER_WIDTH / 2])
slots = MultiPolygon([box(xmin, ymin, xmax, ymax) for (xmin, ymin), (xmax, ymax) in zip(slot_mins, slot_maxs)])
slots = rotate(slots, angle=angle, origin=box_utm.centroid)
slots = transform(UTM_TO_WGS84, slots)
self.slots = slots
# slot_matrix = np.array(slots.geoms, dtype=object).reshape((len(Y), len(X)))
# self.slot_matrix = slot_matrix
centroid = transform(UTM_TO_WGS84, box_utm.centroid)
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
gdf["centroid"] = centroid
gdf["code"] = None
gdf["id"] = id
gdf["type"] = TargetType.region
self.gdf = gdf
@dataclass
class LineTarget(Target):
id:int
shape:LineString
gdf:gpd.GeoDataFrame
def __init__(self, id: int, shape: LineString):
self.id = id
self.shape = shape
code_list = []
for point in shape.coords:
code_list.append(LocationCode.to_code(lon=point[0], lat=point[1], level=10))
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
gdf["centroid"] = gdf.centroid
gdf["code"] = [code_list]
gdf["level"] = 10
gdf["id"] = self.id
gdf["type"] = TargetType.line
self.gdf = gdf
@dataclass
class PointTarget(Target):
id:int = None
gdf:gpd.GeoDataFrame = None
shape:Point = None
def __init__(self, id: int, shape:Point):
self.id = id
self.shape = shape
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
gdf["centroid"] = shape
gdf["code"] = LocationCode.to_code(lon=shape.x, lat=shape.y, level=10)
gdf["level"] = 10
gdf["id"] = self.id
gdf["type"] = TargetType.point
self.gdf = gdf
@dataclass
class AroundTarget(PointTarget):
def __init__(self, id: int, shape:Point):
super().__init__(id, shape)
self.gdf["type"] = TargetType.around