Untitled

 avatar
unknown
plain_text
5 months ago
5.9 kB
4
Indexable
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
from hailo_rpi_common import (
    get_caps_from_pad,
    get_numpy_from_buffer,
    app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp

# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class user_app_callback_class(app_callback_class):
    def __init__(self):
        super().__init__()
        self.parking_slots = []  # Dynamic parking slots (set via ROI selector)
        self.slot_status = []    # Status of parking slots (True = Occupied, False = Vacant)

# -----------------------------------------------------------------------------------------------
# ROI Selector for Parking Slots
# -----------------------------------------------------------------------------------------------
def select_parking_slots(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Cannot open video file.")
        exit()

    ret, frame = cap.read()
    if not ret:
        print("Error: Cannot read frame from video.")
        exit()

    parking_slots = []  # List to store parking slot coordinates

    def draw_rectangle(event, x, y, flags, param):
        nonlocal start_point, end_point, selecting
        if event == cv2.EVENT_LBUTTONDOWN:
            start_point = (x, y)
            selecting = True
        elif event == cv2.EVENT_LBUTTONUP:
            end_point = (x, y)
            selecting = False
            parking_slots.append((start_point[0], start_point[1], end_point[0], end_point[1]))
            cv2.rectangle(frame, start_point, end_point, (0, 255, 0), 2)
            cv2.imshow("Select Parking Slots", frame)

    cv2.namedWindow("Select Parking Slots")
    cv2.setMouseCallback("Select Parking Slots", draw_rectangle)
    selecting = False

    print("Select parking slots by dragging rectangles. Press 'q' to finish.")
    while True:
        cv2.imshow("Select Parking Slots", frame)
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):  # Press 'q' to finish
            break

    cv2.destroyAllWindows()
    cap.release()
    return parking_slots

# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
def app_callback(pad, info, user_data):
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    format, width, height = get_caps_from_pad(pad)

    frame = None
    if format is not None and width is not None and height is not None:
        frame = get_numpy_from_buffer(buffer, format, width, height)

    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    # Reset slot statuses
    user_data.slot_status = [False] * len(user_data.parking_slots)

    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        if label == "car":  # Detect cars
            x1, y1, x2, y2 = map(int, [bbox.left, bbox.top, bbox.right, bbox.bottom])
            for i, slot in enumerate(user_data.parking_slots):
                # Check if the car is within the parking slot
                sx1, sy1, sx2, sy2 = slot
                if sx1 < x1 < sx2 and sy1 < y1 < sy2:
                    user_data.slot_status[i] = True  # Mark slot as occupied

    # Calculate parking status
    total_slots = len(user_data.parking_slots)
    occupied_slots = sum(user_data.slot_status)
    vacant_slots = total_slots - occupied_slots

    if frame is not None:
        # Draw parking slots and statuses
        for i, slot in enumerate(user_data.parking_slots):
            sx1, sy1, sx2, sy2 = slot
            color = (0, 255, 0) if not user_data.slot_status[i] else (0, 0, 255)
            cv2.rectangle(frame, (sx1, sy1), (sx2, sy2), color, 2)
            status = "Occupied" if user_data.slot_status[i] else "Vacant"
            cv2.putText(frame, status, (sx1, sy1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        # Display parking status on the frame
        cv2.putText(frame, f"Total Slots: {total_slots}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        cv2.putText(frame, f"Occupied: {occupied_slots}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
        cv2.putText(frame, f"Vacant: {vacant_slots}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

        # Convert frame to BGR for display
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        user_data.set_frame(frame)

    return Gst.PadProbeReturn.OK

# -----------------------------------------------------------------------------------------------
# Main Application
# -----------------------------------------------------------------------------------------------
if __name__ == "__main__":
    Gst.init(None)  # Initialize GStreamer

    # Video file path
    video_path = "/home/pi/Downloads/parking.mp4"  # Replace with your video file path

    # Select parking slots
    parking_slots = select_parking_slots(video_path)
    if not parking_slots:
        print("No parking slots selected. Exiting.")
        exit()

    # Create user data and set parking slots
    user_data = user_app_callback_class()
    user_data.parking_slots = parking_slots

    # Initialize GStreamer App
    app = GStreamerDetectionApp(app_callback, user_data)
    app.video_source = video_path  # Set the video source directly
    app.run()
Editor is loading...
Leave a Comment