Untitled
unknown
plain_text
a year ago
5.9 kB
6
Indexable
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
from hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
self.parking_slots = [] # Dynamic parking slots (set via ROI selector)
self.slot_status = [] # Status of parking slots (True = Occupied, False = Vacant)
# -----------------------------------------------------------------------------------------------
# ROI Selector for Parking Slots
# -----------------------------------------------------------------------------------------------
def select_parking_slots(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Error: Cannot open video file.")
exit()
ret, frame = cap.read()
if not ret:
print("Error: Cannot read frame from video.")
exit()
parking_slots = [] # List to store parking slot coordinates
def draw_rectangle(event, x, y, flags, param):
nonlocal start_point, end_point, selecting
if event == cv2.EVENT_LBUTTONDOWN:
start_point = (x, y)
selecting = True
elif event == cv2.EVENT_LBUTTONUP:
end_point = (x, y)
selecting = False
parking_slots.append((start_point[0], start_point[1], end_point[0], end_point[1]))
cv2.rectangle(frame, start_point, end_point, (0, 255, 0), 2)
cv2.imshow("Select Parking Slots", frame)
cv2.namedWindow("Select Parking Slots")
cv2.setMouseCallback("Select Parking Slots", draw_rectangle)
selecting = False
print("Select parking slots by dragging rectangles. Press 'q' to finish.")
while True:
cv2.imshow("Select Parking Slots", frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'): # Press 'q' to finish
break
cv2.destroyAllWindows()
cap.release()
return parking_slots
# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
def app_callback(pad, info, user_data):
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK
format, width, height = get_caps_from_pad(pad)
frame = None
if format is not None and width is not None and height is not None:
frame = get_numpy_from_buffer(buffer, format, width, height)
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
# Reset slot statuses
user_data.slot_status = [False] * len(user_data.parking_slots)
for detection in detections:
label = detection.get_label()
bbox = detection.get_bbox()
if label == "car": # Detect cars
x1, y1, x2, y2 = map(int, [bbox.left, bbox.top, bbox.right, bbox.bottom])
for i, slot in enumerate(user_data.parking_slots):
# Check if the car is within the parking slot
sx1, sy1, sx2, sy2 = slot
if sx1 < x1 < sx2 and sy1 < y1 < sy2:
user_data.slot_status[i] = True # Mark slot as occupied
# Calculate parking status
total_slots = len(user_data.parking_slots)
occupied_slots = sum(user_data.slot_status)
vacant_slots = total_slots - occupied_slots
if frame is not None:
# Draw parking slots and statuses
for i, slot in enumerate(user_data.parking_slots):
sx1, sy1, sx2, sy2 = slot
color = (0, 255, 0) if not user_data.slot_status[i] else (0, 0, 255)
cv2.rectangle(frame, (sx1, sy1), (sx2, sy2), color, 2)
status = "Occupied" if user_data.slot_status[i] else "Vacant"
cv2.putText(frame, status, (sx1, sy1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
# Display parking status on the frame
cv2.putText(frame, f"Total Slots: {total_slots}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(frame, f"Occupied: {occupied_slots}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
cv2.putText(frame, f"Vacant: {vacant_slots}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# Convert frame to BGR for display
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
user_data.set_frame(frame)
return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# Main Application
# -----------------------------------------------------------------------------------------------
if __name__ == "__main__":
Gst.init(None) # Initialize GStreamer
# Video file path
video_path = "/home/pi/Downloads/parking.mp4" # Replace with your video file path
# Select parking slots
parking_slots = select_parking_slots(video_path)
if not parking_slots:
print("No parking slots selected. Exiting.")
exit()
# Create user data and set parking slots
user_data = user_app_callback_class()
user_data.parking_slots = parking_slots
# Initialize GStreamer App
app = GStreamerDetectionApp(app_callback, user_data)
app.video_source = video_path # Set the video source directly
app.run()
Editor is loading...
Leave a Comment