Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
3.4 kB
2
Indexable
Never
def get_waypoints_of_driving_straight_segment(
    context: PathPlanningContext,
    mission: Mission,
    start_point: Point,
    start_heading: float,
    end_point: Point,
    end_heading: float,
    graph: list[Node],
) -> list[Waypoint]:
    logger.debug(
        f"Create waypoints for driving straight segment from {start_point} rotation {start_heading} "
        f"to {end_point} rotation {end_heading}"
    )

    if mission.reroute_status and mission.reroute_status.status is RerouteStatus.IN_PROGRESS:
        lane_width = context.settings.LANE_WIDTH * 3.0
    else:
        lane_width = context.settings.LANE_WIDTH

    max_segment_length = context.settings.MAX_SEGMENT_LENGTH
    min_segment_length = context.settings.MIN_SEGMENT_LENGTH

    desired_direction = Direction.FORWARD

    subtle_segments = split_graph_into_segments(
        context, mission, graph, lane_width, max_segment_length, min_segment_length
    )

    waypoints: list[Waypoint] = []

    all_obstacles = [Point(x, y) for x, y in zip(*context.obstacles)]
    logger.debug(f"Total obstacles count: {len(all_obstacles)}")

    lane_width_buffer_percent = context.settings.LANE_WIDTH_BUFFER_PERCENT
    buffer_distance = lane_width * lane_width_buffer_percent
    logger.debug(
        f"lane_width: {lane_width}, lane_width_buffer_percent: {lane_width_buffer_percent}, "
        f"buffer_distance: {buffer_distance}"
    )

    for idx, segment in enumerate(subtle_segments):
        segment_obstacles = filter_obstacles_in_segment_buffer(segment, buffer_distance, all_obstacles)
        segment_obstacles_size = len(segment_obstacles[0])
        logger.debug(f"Number of segment obstacles inside buffer: {segment_obstacles_size}")

        # We have to create a temp copy to avoid affecting different plannings
        temp_context = context.copy(update={"obstacles": segment_obstacles}) if segment_obstacles_size > 0 else context
        logger.debug(
            f"Number of obstacles taken into the calculation "
            f"for segment: {len(temp_context.obstacles[0]) if temp_context.obstacles else 0}"
        )
        logger.debug(f"Get waypoints for segment number {idx}/{len(subtle_segments)-1}")

        end_point = segment[-1].point
        end_heading = __get_end_heading_for_sublte_segment(temp_context, mission, subtle_segments, idx)

        hybrid_graph = HybridGraph.from_graph(segment, lane_width)

        subtle_waypoints = get_waypoints_of_subtle_segment(
            context=temp_context,
            mission=mission,
            start_point=start_point,
            start_heading=start_heading,
            end_point=end_point,
            end_heading=end_heading,
            graph=hybrid_graph,
        )

        logger.debug(
            f"Waypoints for segment number {idx}/{len(subtle_segments)-1} are ready. "
            f"Generated {len(subtle_waypoints)} waypoints"
        )

        waypoints = interpolation_service.connect_waypoints(
            temp_context,
            mission,
            waypoints,
            subtle_waypoints,
            check_curvature=False,
            desired_direction=desired_direction,
        )

        start_point = waypoints[-1].position
        start_heading = waypoints[-1].heading

    results = simplify_waypoints_in_direction(context, mission, desired_direction, waypoints)

    return results