from abc import ABC from dataclasses import dataclass, field from enum import StrEnum import geopandas as gpd import numpy as np from shapely.affinity import rotate from shapely.geometry import Polygon, Point, box, LineString, MultiPoint, MultiLineString, MultiPolygon from shapely.ops import transform from planning.beidou import LocationCode from planning.coords import WGS84_TO_UTM, UTM_TO_WGS84 from planning.config import CONTAINER_LENGTH, CONTAINER_WIDTH, CONTAINER_SLOT_LENGTH, CONTAINER_SLOT_WIDTH, FRONT_DISTANCE def get_target(target_type, payload_type): pass class TargetType(StrEnum): point = "point" line = "line" region = "region" around = "around" @dataclass class Target(ABC): id: int = field(init=False) shape: Polygon | LineString | Point = field(init=False) gdf: gpd.GeoDataFrame = field(init=False) @dataclass class OpticsRegionTarget(Target): id: int shape: Polygon level: int # 网格层级 gdf: gpd.GeoDataFrame def __init__(self, id:int , shape: Polygon, level: int = 5): """ 初始化区域。 Args: id(int): 无人机ID coordinates(List[Tuple[float, float]]): 区域顶点经纬度坐标(deg) level(int): 北斗网格层级 """ self.id = id # 初始化 Polygon 类 self.shape = shape # 初始化网格 self.level = level delta_lon = LocationCode._DELTA_LON[level -1] delta_lat = LocationCode._DELTA_LAT[level -1] # 离散化区域 min_lon, min_lat, max_lon, max_lat = shape.bounds min_lon = min_lon // delta_lon * delta_lon - delta_lon min_lat = min_lat // delta_lat * delta_lat - delta_lat max_lon = max_lon // delta_lon * delta_lon + delta_lon max_lat = max_lat // delta_lat * delta_lat + delta_lat lon_values = np.arange(min_lon, max_lon, delta_lon) lat_values = np.arange(min_lat, max_lat, delta_lat) # 光学载荷区域离散化, 将区域离散为网格 lon_mesh, lat_mesh = np.meshgrid(lon_values, lat_values) lon_mesh = lon_mesh.ravel() lat_mesh = lat_mesh.ravel() grid_list = [] grid_status_list = [] for lon, lat in zip(lon_mesh, lat_mesh): cell = box(lon, lat, lon + delta_lon, lat + delta_lat) # 创建一个正方形 if shape.contains(cell): # 仅保留与多边形相交的单元 grid_list.append(cell) grid_status_list.append("complete") elif shape.intersects(cell): cell = shape.intersection(cell) if cell.area > 1e-15: grid_list.append(cell) grid_status_list.append("incomplete") gdf = gpd.GeoDataFrame(geometry=grid_list, crs="EPSG:4326") gdf["centroid"] = gdf.centroid gdf["code"] = gdf["centroid"].apply(lambda x: LocationCode.to_code(lon=x.x, lat=x.y, level=level)) gdf["level"] = self.level gdf["id"] = self.id gdf["type"] = TargetType.region self.gdf = gdf @dataclass class OpticsFrontRegionTarget(Target): id: int shape: Polygon base_height: float utm_points: MultiPoint utm_look_at_points: MultiPoint gdf: gpd.GeoDataFrame def __init__(self, id:int , shape: Polygon, height: float = 0): self.id = id self.shape = shape self.base_height = height box_utm = transform(WGS84_TO_UTM, shape) box_utm = box_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域 # 计算矩形长宽 box_utm_coords = np.array(box_utm.exterior.coords) edge_vec1 = box_utm_coords[1] - box_utm_coords[0] edge_vec2 = box_utm_coords[2] - box_utm_coords[1] edge_length1 = np.linalg.norm(edge_vec1) edge_length2 = np.linalg.norm(edge_vec2) if edge_length1 * edge_length2 > 200 * 200: raise ValueError(f"The target area is too large.") if edge_length1 > edge_length2: if edge_vec1[0] == 0: angle = np.pi / 2 else: angle = np.arctan(edge_vec1[1] / edge_vec1[0]) else: if edge_vec2[0] == 0: angle = np.pi / 2 else: angle = np.arctan(edge_vec2[1] / edge_vec2[0]) angle = np.degrees(angle) # 计算集装箱中点 box_utm_rotated = rotate(box_utm, angle=-angle , origin=box_utm.centroid) x_min, y_min, x_max, y_max = box_utm_rotated.bounds x = np.arange(x_min + CONTAINER_SLOT_LENGTH / 2, x_max, CONTAINER_SLOT_LENGTH) y = np.arange(y_min + CONTAINER_SLOT_WIDTH / 2, y_max, CONTAINER_SLOT_WIDTH) top = np.stack([x, np.zeros_like(x) + y_max]).T down = np.stack([x, np.zeros_like(x) + y_min]).T left = np.stack([np.zeros_like(y) + x_min, y]).T right = np.stack([np.zeros_like(y) + x_max, y]).T utm_rotated_look_at_points = MultiPoint(np.concat([top, right, down, left])) utm_rotated_points = MultiPoint(np.concat([ top + np.array([0, FRONT_DISTANCE]), right + np.array([0, -FRONT_DISTANCE]), down + np.array([-FRONT_DISTANCE, 0]), left + np.array([FRONT_DISTANCE, 0]) ])) utm_points = rotate(utm_rotated_points, angle=angle, origin=box_utm.centroid) utm_look_at_points = rotate(utm_rotated_look_at_points, angle=angle, origin=box_utm.centroid) self.utm_points = utm_points self.utm_look_at_points = utm_look_at_points centroid = transform(UTM_TO_WGS84, box_utm.centroid) gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326") gdf["centroid"] = centroid gdf["code"] = None gdf["id"] = id gdf["type"] = TargetType.region self.gdf = gdf @dataclass class SarRegionTarget(Target): id: int shape: Polygon level: int # 网格层级 gdf: gpd.GeoDataFrame _is_crossing = False # 跨180度经度线 def __init__(self, id:int , shape: Polygon, width: float = 100): """ 初始化区域。 Args: id(int): 无人机ID coordinates(List[Tuple[float, float]]): 区域顶点经纬度坐标(deg) level(int): 北斗网格层级 """ self.id = id # 初始化 Polygon 类 self.shape = shape region_utm = transform(WGS84_TO_UTM, shape) if region_utm.area > 2.5e7: raise ValueError("The target area is too large.") box_utm = region_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域 # 计算矩形长宽 box_utm_coords = np.array(box_utm.exterior.coords) edge_vec1 = box_utm_coords[1] - box_utm_coords[0] edge_vec2 = box_utm_coords[2] - box_utm_coords[1] edge_length1 = np.linalg.norm(edge_vec1) edge_length2 = np.linalg.norm(edge_vec2) if edge_length1 > edge_length2: if edge_vec1[0] == 0: angle = np.pi / 2 else: angle = np.arctan(edge_vec1[1] / edge_vec1[0]) else: if edge_vec2[0] == 0: angle = np.pi / 2 else: angle = np.arctan(edge_vec2[1] / edge_vec2[0]) angle = np.degrees(angle) self.angle = angle region_utm_rotated = rotate(region_utm, angle=-angle , origin=region_utm.centroid) x_min, y_min, x_max, y_max = region_utm_rotated.bounds print(region_utm_rotated) Y = np.arange(y_min, y_max + width, width) region_rotated_list = [] for y_bottom, y_top in zip(Y[:-1], Y[1:]): split_box = box(x_min, y_bottom, x_max, y_top) split_region = region_utm_rotated.intersection(split_box) xy = split_region.bounds region_rotated_list.append(box(*xy)) regions_rotated = MultiPolygon(region_rotated_list) regions_utm = rotate(regions_rotated, angle=angle, origin=region_utm.centroid) centroids_utm = MultiPoint([poly.centroid for poly in regions_utm.geoms]) regions = transform(UTM_TO_WGS84, regions_utm) centroids = transform(UTM_TO_WGS84, centroids_utm) gdf = gpd.GeoDataFrame(geometry=[region for region in regions.geoms], crs="EPSG:4326") gdf["centroid"] = [centroid for centroid in centroids.geoms] gdf["code"] = None gdf["id"] = id gdf["type"] = TargetType.region self.gdf = gdf @dataclass class LaserRegionTarget(Target): id: int shape: Polygon angle: float container_centroids: MultiPoint gdf: gpd.GeoDataFrame def __init__(self, id:int , shape: Polygon): self.id = id self.shape = shape box_utm = transform(WGS84_TO_UTM, shape) box_utm = box_utm.minimum_rotated_rectangle # 使用最小外接矩形矫正区域 # 计算矩形长宽 box_utm_coords = np.array(box_utm.exterior.coords) edge_vec1 = box_utm_coords[1] - box_utm_coords[0] edge_vec2 = box_utm_coords[2] - box_utm_coords[1] edge_length1 = np.linalg.norm(edge_vec1) edge_length2 = np.linalg.norm(edge_vec2) if edge_length1 * edge_length2 > 200 * 200: raise ValueError(f"The target area is too large.") if edge_length1 > edge_length2: if edge_vec1[0] == 0: angle = np.pi / 2 else: angle = np.arctan(edge_vec1[1] / edge_vec1[0]) else: if edge_vec2[0] == 0: angle = np.pi / 2 else: angle = np.arctan(edge_vec2[1] / edge_vec2[0]) angle = np.degrees(angle) self.angle = angle # 计算集装箱中点 box_utm_rotated = rotate(box_utm, angle=-angle , origin=box_utm.centroid) x_min, y_min, x_max, y_max = box_utm_rotated.bounds X = np.arange(x_min + CONTAINER_SLOT_LENGTH / 2, x_max, CONTAINER_SLOT_LENGTH) Y = np.arange(y_min + CONTAINER_SLOT_WIDTH / 2, y_max, CONTAINER_SLOT_WIDTH) xx, yy = np.meshgrid(X, Y) xx = xx.ravel() yy = yy.ravel() line_base = np.array([[x_min, 0], [x_max, 0]]) line_array = line_base[None, :, :] + np.array([0, 1])[None, None, :] * Y[:, None, None] lines = MultiLineString(line_array.tolist()) lines = rotate(lines, angle=angle, origin=box_utm.centroid) self.lines = lines centroid_array = np.stack([xx.ravel(), yy.ravel()]).T slot_mins = centroid_array - np.array([CONTAINER_LENGTH / 2, CONTAINER_WIDTH / 2]) slot_maxs = centroid_array + np.array([CONTAINER_LENGTH / 2, CONTAINER_WIDTH / 2]) slots = MultiPolygon([box(xmin, ymin, xmax, ymax) for (xmin, ymin), (xmax, ymax) in zip(slot_mins, slot_maxs)]) slots = rotate(slots, angle=angle, origin=box_utm.centroid) slots = transform(UTM_TO_WGS84, slots) self.slots = slots # slot_matrix = np.array(slots.geoms, dtype=object).reshape((len(Y), len(X))) # self.slot_matrix = slot_matrix centroid = transform(UTM_TO_WGS84, box_utm.centroid) gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326") gdf["centroid"] = centroid gdf["code"] = None gdf["id"] = id gdf["type"] = TargetType.region self.gdf = gdf @dataclass class LineTarget(Target): id:int shape:LineString gdf:gpd.GeoDataFrame def __init__(self, id: int, shape: LineString): self.id = id self.shape = shape code_list = [] for point in shape.coords: code_list.append(LocationCode.to_code(lon=point[0], lat=point[1], level=10)) gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326") gdf["centroid"] = gdf.centroid gdf["code"] = [code_list] gdf["level"] = 10 gdf["id"] = self.id gdf["type"] = TargetType.line self.gdf = gdf @dataclass class PointTarget(Target): id:int = None gdf:gpd.GeoDataFrame = None shape:Point = None def __init__(self, id: int, shape:Point): self.id = id self.shape = shape gdf = gpd.GeoDataFrame(geometry=[shape], crs="EPSG:4326") gdf["centroid"] = shape gdf["code"] = LocationCode.to_code(lon=shape.x, lat=shape.y, level=10) gdf["level"] = 10 gdf["id"] = self.id gdf["type"] = TargetType.point self.gdf = gdf @dataclass class AroundTarget(PointTarget): def __init__(self, id: int, shape:Point): super().__init__(id, shape) self.gdf["type"] = TargetType.around