363 lines
13 KiB
Python
363 lines
13 KiB
Python
|
|
from abc import ABC
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from enum import StrEnum
|
||
|
|
import geopandas as gpd
|
||
|
|
import numpy as np
|
||
|
|
from shapely.affinity import rotate
|
||
|
|
from shapely.geometry import Polygon, Point, box, LineString, MultiPoint, MultiLineString, MultiPolygon
|
||
|
|
from shapely.ops import transform
|
||
|
|
|
||
|
|
from planning.beidou import LocationCode
|
||
|
|
from planning.coords import WGS84_TO_UTM, UTM_TO_WGS84
|
||
|
|
from planning.config import CONTAINER_LENGTH, CONTAINER_WIDTH, CONTAINER_SLOT_LENGTH, CONTAINER_SLOT_WIDTH, FRONT_DISTANCE
|
||
|
|
|
||
|
|
def get_target(target_type, payload_type):
|
||
|
|
pass
|
||
|
|
|
||
|
|
class TargetType(StrEnum):
|
||
|
|
point = "point"
|
||
|
|
line = "line"
|
||
|
|
region = "region"
|
||
|
|
around = "around"
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Target(ABC):
|
||
|
|
id: int = field(init=False)
|
||
|
|
shape: Polygon | LineString | Point = field(init=False)
|
||
|
|
gdf: gpd.GeoDataFrame = field(init=False)
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class OpticsRegionTarget(Target):
|
||
|
|
|
||
|
|
id: int
|
||
|
|
shape: Polygon
|
||
|
|
level: int # 网格层级
|
||
|
|
gdf: gpd.GeoDataFrame
|
||
|
|
|
||
|
|
def __init__(self, id:int , shape: Polygon, level: int = 5):
|
||
|
|
"""
|
||
|
|
初始化区域。
|
||
|
|
|
||
|
|
Args:
|
||
|
|
id(int): 无人机ID
|
||
|
|
coordinates(List[Tuple[float, float]]): 区域顶点经纬度坐标(deg)
|
||
|
|
level(int): 北斗网格层级
|
||
|
|
"""
|
||
|
|
|
||
|
|
self.id = id
|
||
|
|
# 初始化 Polygon 类
|
||
|
|
self.shape = shape
|
||
|
|
|
||
|
|
# 初始化网格
|
||
|
|
self.level = level
|
||
|
|
delta_lon = LocationCode._DELTA_LON[level -1]
|
||
|
|
delta_lat = LocationCode._DELTA_LAT[level -1]
|
||
|
|
|
||
|
|
# 离散化区域
|
||
|
|
min_lon, min_lat, max_lon, max_lat = shape.bounds
|
||
|
|
|
||
|
|
min_lon = min_lon // delta_lon * delta_lon - delta_lon
|
||
|
|
min_lat = min_lat // delta_lat * delta_lat - delta_lat
|
||
|
|
max_lon = max_lon // delta_lon * delta_lon + delta_lon
|
||
|
|
max_lat = max_lat // delta_lat * delta_lat + delta_lat
|
||
|
|
|
||
|
|
lon_values = np.arange(min_lon, max_lon, delta_lon)
|
||
|
|
lat_values = np.arange(min_lat, max_lat, delta_lat)
|
||
|
|
|
||
|
|
# 光学载荷区域离散化, 将区域离散为网格
|
||
|
|
lon_mesh, lat_mesh = np.meshgrid(lon_values, lat_values)
|
||
|
|
lon_mesh = lon_mesh.ravel()
|
||
|
|
lat_mesh = lat_mesh.ravel()
|
||
|
|
|
||
|
|
grid_list = []
|
||
|
|
grid_status_list = []
|
||
|
|
|
||
|
|
for lon, lat in zip(lon_mesh, lat_mesh):
|
||
|
|
cell = box(lon, lat, lon + delta_lon, lat + delta_lat) # 创建一个正方形
|
||
|
|
if shape.contains(cell): # 仅保留与多边形相交的单元
|
||
|
|
grid_list.append(cell)
|
||
|
|
grid_status_list.append("complete")
|
||
|
|
elif shape.intersects(cell):
|
||
|
|
cell = shape.intersection(cell)
|
||
|
|
if cell.area > 1e-15:
|
||
|
|
grid_list.append(cell)
|
||
|
|
grid_status_list.append("incomplete")
|
||
|
|
|
||
|
|
gdf = gpd.GeoDataFrame(geometry=grid_list, crs="EPSG:4326")
|
||
|
|
gdf["centroid"] = gdf.centroid
|
||
|
|
gdf["code"] = gdf["centroid"].apply(lambda x: LocationCode.to_code(lon=x.x, lat=x.y, level=level))
|
||
|
|
gdf["level"] = self.level
|
||
|
|
gdf["id"] = self.id
|
||
|
|
gdf["type"] = TargetType.region
|
||
|
|
self.gdf = gdf
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class OpticsFrontRegionTarget(Target):
|
||
|
|
|
||
|
|
id: int
|
||
|
|
shape: Polygon
|
||
|
|
base_height: float
|
||
|
|
utm_points: MultiPoint
|
||
|
|
utm_look_at_points: MultiPoint
|
||
|
|
gdf: gpd.GeoDataFrame
|
||
|
|
|
||
|
|
def __init__(self, id:int , shape: Polygon, height: float = 0):
|
||
|
|
self.id = id
|
||
|
|
self.shape = shape
|
||
|
|
self.base_height = height
|
||
|
|
box_utm = transform(WGS84_TO_UTM, shape)
|
||
|
|
box_utm = box_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域
|
||
|
|
|
||
|
|
# 计算矩形长宽
|
||
|
|
box_utm_coords = np.array(box_utm.exterior.coords)
|
||
|
|
edge_vec1 = box_utm_coords[1] - box_utm_coords[0]
|
||
|
|
edge_vec2 = box_utm_coords[2] - box_utm_coords[1]
|
||
|
|
edge_length1 = np.linalg.norm(edge_vec1)
|
||
|
|
edge_length2 = np.linalg.norm(edge_vec2)
|
||
|
|
|
||
|
|
if edge_length1 * edge_length2 > 200 * 200:
|
||
|
|
raise ValueError(f"The target area is too large.")
|
||
|
|
|
||
|
|
if edge_length1 > edge_length2:
|
||
|
|
if edge_vec1[0] == 0:
|
||
|
|
angle = np.pi / 2
|
||
|
|
else:
|
||
|
|
angle = np.arctan(edge_vec1[1] / edge_vec1[0])
|
||
|
|
else:
|
||
|
|
if edge_vec2[0] == 0:
|
||
|
|
angle = np.pi / 2
|
||
|
|
else:
|
||
|
|
angle = np.arctan(edge_vec2[1] / edge_vec2[0])
|
||
|
|
|
||
|
|
angle = np.degrees(angle)
|
||
|
|
|
||
|
|
# 计算集装箱中点
|
||
|
|
box_utm_rotated = rotate(box_utm, angle=-angle , origin=box_utm.centroid)
|
||
|
|
x_min, y_min, x_max, y_max = box_utm_rotated.bounds
|
||
|
|
x = np.arange(x_min + CONTAINER_SLOT_LENGTH / 2, x_max, CONTAINER_SLOT_LENGTH)
|
||
|
|
y = np.arange(y_min + CONTAINER_SLOT_WIDTH / 2, y_max, CONTAINER_SLOT_WIDTH)
|
||
|
|
top = np.stack([x, np.zeros_like(x) + y_max]).T
|
||
|
|
down = np.stack([x, np.zeros_like(x) + y_min]).T
|
||
|
|
left = np.stack([np.zeros_like(y) + x_min, y]).T
|
||
|
|
right = np.stack([np.zeros_like(y) + x_max, y]).T
|
||
|
|
|
||
|
|
utm_rotated_look_at_points = MultiPoint(np.concat([top, right, down, left]))
|
||
|
|
utm_rotated_points = MultiPoint(np.concat([
|
||
|
|
top + np.array([0, FRONT_DISTANCE]),
|
||
|
|
right + np.array([0, -FRONT_DISTANCE]),
|
||
|
|
down + np.array([-FRONT_DISTANCE, 0]),
|
||
|
|
left + np.array([FRONT_DISTANCE, 0])
|
||
|
|
]))
|
||
|
|
|
||
|
|
utm_points = rotate(utm_rotated_points, angle=angle, origin=box_utm.centroid)
|
||
|
|
utm_look_at_points = rotate(utm_rotated_look_at_points, angle=angle, origin=box_utm.centroid)
|
||
|
|
|
||
|
|
self.utm_points = utm_points
|
||
|
|
self.utm_look_at_points = utm_look_at_points
|
||
|
|
|
||
|
|
centroid = transform(UTM_TO_WGS84, box_utm.centroid)
|
||
|
|
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
|
||
|
|
gdf["centroid"] = centroid
|
||
|
|
gdf["code"] = None
|
||
|
|
gdf["id"] = id
|
||
|
|
gdf["type"] = TargetType.region
|
||
|
|
self.gdf = gdf
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SarRegionTarget(Target):
|
||
|
|
|
||
|
|
id: int
|
||
|
|
shape: Polygon
|
||
|
|
level: int # 网格层级
|
||
|
|
gdf: gpd.GeoDataFrame
|
||
|
|
|
||
|
|
|
||
|
|
_is_crossing = False # 跨180度经度线
|
||
|
|
|
||
|
|
def __init__(self, id:int , shape: Polygon, width: float = 100):
|
||
|
|
"""
|
||
|
|
初始化区域。
|
||
|
|
|
||
|
|
Args:
|
||
|
|
id(int): 无人机ID
|
||
|
|
coordinates(List[Tuple[float, float]]): 区域顶点经纬度坐标(deg)
|
||
|
|
level(int): 北斗网格层级
|
||
|
|
"""
|
||
|
|
|
||
|
|
self.id = id
|
||
|
|
# 初始化 Polygon 类
|
||
|
|
self.shape = shape
|
||
|
|
region_utm = transform(WGS84_TO_UTM, shape)
|
||
|
|
if region_utm.area > 2.5e7:
|
||
|
|
raise ValueError("The target area is too large.")
|
||
|
|
|
||
|
|
box_utm = region_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域
|
||
|
|
|
||
|
|
# 计算矩形长宽
|
||
|
|
box_utm_coords = np.array(box_utm.exterior.coords)
|
||
|
|
edge_vec1 = box_utm_coords[1] - box_utm_coords[0]
|
||
|
|
edge_vec2 = box_utm_coords[2] - box_utm_coords[1]
|
||
|
|
edge_length1 = np.linalg.norm(edge_vec1)
|
||
|
|
edge_length2 = np.linalg.norm(edge_vec2)
|
||
|
|
|
||
|
|
if edge_length1 > edge_length2:
|
||
|
|
if edge_vec1[0] == 0:
|
||
|
|
angle = np.pi / 2
|
||
|
|
else:
|
||
|
|
angle = np.arctan(edge_vec1[1] / edge_vec1[0])
|
||
|
|
else:
|
||
|
|
if edge_vec2[0] == 0:
|
||
|
|
angle = np.pi / 2
|
||
|
|
else:
|
||
|
|
angle = np.arctan(edge_vec2[1] / edge_vec2[0])
|
||
|
|
|
||
|
|
angle = np.degrees(angle)
|
||
|
|
self.angle = angle
|
||
|
|
|
||
|
|
region_utm_rotated = rotate(region_utm, angle=-angle , origin=region_utm.centroid)
|
||
|
|
x_min, y_min, x_max, y_max = region_utm_rotated.bounds
|
||
|
|
print(region_utm_rotated)
|
||
|
|
Y = np.arange(y_min, y_max + width, width)
|
||
|
|
region_rotated_list = []
|
||
|
|
for y_bottom, y_top in zip(Y[:-1], Y[1:]):
|
||
|
|
split_box = box(x_min, y_bottom, x_max, y_top)
|
||
|
|
split_region = region_utm_rotated.intersection(split_box)
|
||
|
|
xy = split_region.bounds
|
||
|
|
region_rotated_list.append(box(*xy))
|
||
|
|
|
||
|
|
regions_rotated = MultiPolygon(region_rotated_list)
|
||
|
|
regions_utm = rotate(regions_rotated, angle=angle, origin=region_utm.centroid)
|
||
|
|
centroids_utm = MultiPoint([poly.centroid for poly in regions_utm.geoms])
|
||
|
|
regions = transform(UTM_TO_WGS84, regions_utm)
|
||
|
|
centroids = transform(UTM_TO_WGS84, centroids_utm)
|
||
|
|
|
||
|
|
gdf = gpd.GeoDataFrame(geometry=[region for region in regions.geoms], crs="EPSG:4326")
|
||
|
|
gdf["centroid"] = [centroid for centroid in centroids.geoms]
|
||
|
|
gdf["code"] = None
|
||
|
|
gdf["id"] = id
|
||
|
|
gdf["type"] = TargetType.region
|
||
|
|
self.gdf = gdf
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class LaserRegionTarget(Target):
|
||
|
|
|
||
|
|
id: int
|
||
|
|
shape: Polygon
|
||
|
|
angle: float
|
||
|
|
container_centroids: MultiPoint
|
||
|
|
gdf: gpd.GeoDataFrame
|
||
|
|
|
||
|
|
def __init__(self, id:int , shape: Polygon):
|
||
|
|
self.id = id
|
||
|
|
self.shape = shape
|
||
|
|
|
||
|
|
box_utm = transform(WGS84_TO_UTM, shape)
|
||
|
|
box_utm = box_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域
|
||
|
|
|
||
|
|
# 计算矩形长宽
|
||
|
|
box_utm_coords = np.array(box_utm.exterior.coords)
|
||
|
|
edge_vec1 = box_utm_coords[1] - box_utm_coords[0]
|
||
|
|
edge_vec2 = box_utm_coords[2] - box_utm_coords[1]
|
||
|
|
edge_length1 = np.linalg.norm(edge_vec1)
|
||
|
|
edge_length2 = np.linalg.norm(edge_vec2)
|
||
|
|
|
||
|
|
if edge_length1 * edge_length2 > 200 * 200:
|
||
|
|
raise ValueError(f"The target area is too large.")
|
||
|
|
|
||
|
|
if edge_length1 > edge_length2:
|
||
|
|
if edge_vec1[0] == 0:
|
||
|
|
angle = np.pi / 2
|
||
|
|
else:
|
||
|
|
angle = np.arctan(edge_vec1[1] / edge_vec1[0])
|
||
|
|
else:
|
||
|
|
if edge_vec2[0] == 0:
|
||
|
|
angle = np.pi / 2
|
||
|
|
else:
|
||
|
|
angle = np.arctan(edge_vec2[1] / edge_vec2[0])
|
||
|
|
|
||
|
|
angle = np.degrees(angle)
|
||
|
|
self.angle = angle
|
||
|
|
|
||
|
|
# 计算集装箱中点
|
||
|
|
box_utm_rotated = rotate(box_utm, angle=-angle , origin=box_utm.centroid)
|
||
|
|
x_min, y_min, x_max, y_max = box_utm_rotated.bounds
|
||
|
|
X = np.arange(x_min + CONTAINER_SLOT_LENGTH / 2, x_max, CONTAINER_SLOT_LENGTH)
|
||
|
|
Y = np.arange(y_min + CONTAINER_SLOT_WIDTH / 2, y_max, CONTAINER_SLOT_WIDTH)
|
||
|
|
xx, yy = np.meshgrid(X, Y)
|
||
|
|
xx = xx.ravel()
|
||
|
|
yy = yy.ravel()
|
||
|
|
|
||
|
|
line_base = np.array([[x_min, 0], [x_max, 0]])
|
||
|
|
line_array = line_base[None, :, :] + np.array([0, 1])[None, None, :] * Y[:, None, None]
|
||
|
|
lines = MultiLineString(line_array.tolist())
|
||
|
|
lines = rotate(lines, angle=angle, origin=box_utm.centroid)
|
||
|
|
self.lines = lines
|
||
|
|
|
||
|
|
centroid_array = np.stack([xx.ravel(), yy.ravel()]).T
|
||
|
|
slot_mins = centroid_array - np.array([CONTAINER_LENGTH / 2, CONTAINER_WIDTH / 2])
|
||
|
|
slot_maxs = centroid_array + np.array([CONTAINER_LENGTH / 2, CONTAINER_WIDTH / 2])
|
||
|
|
slots = MultiPolygon([box(xmin, ymin, xmax, ymax) for (xmin, ymin), (xmax, ymax) in zip(slot_mins, slot_maxs)])
|
||
|
|
slots = rotate(slots, angle=angle, origin=box_utm.centroid)
|
||
|
|
slots = transform(UTM_TO_WGS84, slots)
|
||
|
|
self.slots = slots
|
||
|
|
# slot_matrix = np.array(slots.geoms, dtype=object).reshape((len(Y), len(X)))
|
||
|
|
# self.slot_matrix = slot_matrix
|
||
|
|
|
||
|
|
centroid = transform(UTM_TO_WGS84, box_utm.centroid)
|
||
|
|
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
|
||
|
|
gdf["centroid"] = centroid
|
||
|
|
gdf["code"] = None
|
||
|
|
gdf["id"] = id
|
||
|
|
gdf["type"] = TargetType.region
|
||
|
|
self.gdf = gdf
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class LineTarget(Target):
|
||
|
|
|
||
|
|
id:int
|
||
|
|
shape:LineString
|
||
|
|
gdf:gpd.GeoDataFrame
|
||
|
|
|
||
|
|
|
||
|
|
def __init__(self, id: int, shape: LineString):
|
||
|
|
|
||
|
|
self.id = id
|
||
|
|
self.shape = shape
|
||
|
|
code_list = []
|
||
|
|
for point in shape.coords:
|
||
|
|
code_list.append(LocationCode.to_code(lon=point[0], lat=point[1], level=10))
|
||
|
|
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
|
||
|
|
gdf["centroid"] = gdf.centroid
|
||
|
|
gdf["code"] = [code_list]
|
||
|
|
gdf["level"] = 10
|
||
|
|
gdf["id"] = self.id
|
||
|
|
gdf["type"] = TargetType.line
|
||
|
|
self.gdf = gdf
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class PointTarget(Target):
|
||
|
|
|
||
|
|
id:int = None
|
||
|
|
gdf:gpd.GeoDataFrame = None
|
||
|
|
shape:Point = None
|
||
|
|
|
||
|
|
def __init__(self, id: int, shape:Point):
|
||
|
|
|
||
|
|
self.id = id
|
||
|
|
self.shape = shape
|
||
|
|
gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326")
|
||
|
|
gdf["centroid"] = shape
|
||
|
|
gdf["code"] = LocationCode.to_code(lon=shape.x, lat=shape.y, level=10)
|
||
|
|
gdf["level"] = 10
|
||
|
|
gdf["id"] = self.id
|
||
|
|
gdf["type"] = TargetType.point
|
||
|
|
self.gdf = gdf
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class AroundTarget(PointTarget):
|
||
|
|
|
||
|
|
def __init__(self, id: int, shape:Point):
|
||
|
|
super().__init__(id, shape)
|
||
|
|
self.gdf["type"] = TargetType.around
|
||
|
|
|
||
|
|
|