Untitled
unknown
plain_text
10 months ago
8.1 kB
3
Indexable
import os
from collections import (defaultdict)
import cv2
import numpy as np
import math
from ultralytics import YOLO
import time
import torch
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
# Load the YOLO11 model
model = YOLO("yolo11n.pt")
#model.export(format="engine") # creates 'yolo11n.engine'
# Load the exported TensorRT model
#trt_model = YOLO("yolo11n.engine")
# Open the video file
video_path = "/home/yoon/Desktop/Research/VideoData.mp4"
cap = cv2.VideoCapture(video_path)
# Store the track history
track_history = defaultdict(lambda: [])
prev_positions = {}
prev2_positions = {}
oldPositionX = None
oldPositionY = None
old_positions = {}
# Variables for Frame Rate Checking
prev_frame_time = 0
new_frame_time = 0
bounding_x, bounding_y, bounding_w, bounding_h = 1440, 270, 1000, 600 # Not based on resized window, based on 1920x1080
# Do 1440, 270 for it to count and 960, 540 to not count.
bounding_start = (int(bounding_x - bounding_w / 2), (int(bounding_y - bounding_h / 2)))
bounding_end = (int(bounding_x + bounding_w / 2), (int(bounding_y + bounding_h / 2)))
# Loop through the video frames
while cap.isOpened():
# Read a frame from the video
success, frame = cap.read()
if success:
# Run YOLO11 tracking on the frame, persisting tracks between frames
results = model.track(frame, persist=True)
# Calculate FPS
new_frame_time = time.time()
fps = 1 / (new_frame_time - prev_frame_time)
prev_frame_time = new_frame_time
# Convert FPS to integer
fps = int(fps)
# Put FPS on the frame
cv2.putText(frame, f"FPS: {fps}", (7, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (100, 255, 0), 3, cv2.LINE_AA)
# Get the boxes and track IDs
boxes = results[0].boxes.xywh.cpu()
track_ids = results[0].boxes.id.int().cpu().tolist()
# Visualize the results on the frame
annotated_frame = results[0].plot()
cv2.rectangle(annotated_frame, bounding_start, bounding_end, (255, 255, 0), 2)
centers = {}
# Plot the tracks
for box, track_id in zip(boxes, track_ids):
x, y, w, h = box
track = track_history[track_id]
track.append((float(x), float(y))) # x, y center point
if len(track) > 30: # retain 90 tracks for 90 frames
track.pop(0)
# Draw the tracking lines
points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
cv2.polylines(annotated_frame, [points], isClosed=False, color=(0, 255, 255), thickness=2)
centers[track_id] = (x, y)
if track_id in prev_positions:
prev_x, prev_y = prev_positions[track_id]
prev_2x, prev_2y = prev2_positions[track_id]
curr_x, curr_y = centers[track_id]
if track_id in old_positions:
oldPositionX, oldPositionY = old_positions[track_id]
else:
oldPositionX, oldPositionY = None, None
# Calculate displacement
displacement = (curr_x - prev_x, curr_y - prev_y)
# Draw motion vector (displacement as an arrow)
cv2.arrowedLine(annotated_frame, (int(prev_x), int(prev_y)), (int(curr_x), int(curr_y)), (0, 0, 255), 8)
cv2.arrowedLine(annotated_frame, (int(prev_2x), int(prev_2y)), (int(prev_x), int(prev_y)), (255, 144, 255), 8)
#Draw Predicted Movement Vector
dx1 = prev_x - prev_2x
dy1 = prev_y - prev_2y
angle1 = math.atan2(dy1, dx1) * 180 / math.pi
dx2 = curr_x - prev_x
dy2 = curr_y - prev_y
angle2 = math.atan2(dy2, dx2) * 180 / math.pi
angle_diff = angle2 - angle1
if abs(angle_diff) > 25:
predict_radians = math.radians(-(angle2 + angle_diff * 2))
else:
predict_radians = math.radians(-(angle2 + angle1)/2)
length = 100
end_point = (
int(curr_x + length * math.cos(predict_radians)),
int(curr_y - length * math.sin(predict_radians))
)
smoothingFactor = 10 #What portion of the width the object has to move in order to count as new vector EX. 5 = 1/5
box_diagonal = math.sqrt(w**2 + h**2)
# Check if the object has moved more than 1/5 of the box diagonal
if oldPositionX is not None and oldPositionY is not None:
movement_distance = math.sqrt((oldPositionX - curr_x)**2 + (oldPositionY - curr_y)**2) #(math.dist((oldPositionX, oldPositionY), (curr_x, curr_y)))
threshold_distance = box_diagonal / smoothingFactor # 1/5 of the diagonal length
print(f"Movement Distance: {movement_distance}, Threshold Distance: {threshold_distance} Current Position: {curr_x, curr_y} Old Position: {oldPositionX, oldPositionY}")
# Only create a new vector if the object has moved more than the threshold
if movement_distance > threshold_distance:
old_positions[track_id] = (curr_x, curr_y)
cv2.arrowedLine(annotated_frame, (int(curr_x), int(curr_y)), end_point, (0, 255, 0), 8)
else:
# If it's the first frame, set the old position to the current position
old_positions[track_id] = (curr_x, curr_y)
# Store the current position for the next iteration
prev2_positions[track_id] = prev_positions.get(track_id, (0, 0))
prev_positions[track_id] = centers[track_id]
track_ids_list = list(centers.keys())
# Separate human and vehicle objects based on class IDs
human_ids_list = []
vehicle_ids_list = []
for box in results[0].boxes:
cls = box.cls # class ID
x, y, w, h = box.xywh[0].tolist()
if bounding_start[0] <= x <= bounding_end[0] and bounding_start[1] <= y <= bounding_end[1]:
if cls == 0 or cls == 1: # 0 = human, 1 = bicycle
human_ids_list.append(box.id.item())
elif cls == 2 or cls == 3 or cls == 5 or cls == 7: # Vehicle classes, adjust based on YOLO class IDs
vehicle_ids_list.append(box.id.item())
for i in range(len(human_ids_list)): # getting the id of primary point
for j in range(len(vehicle_ids_list)): # getting the id of secondary point
id_1 = human_ids_list[i]
id_2 = vehicle_ids_list[j]
# Get the center points of both objects
p1 = np.array(centers[id_1])
p2 = np.array(centers[id_2])
distance = np.linalg.norm(p1 - p2)
threshold = 450
# Draw a line between these two objects
cv2.line(annotated_frame, tuple(p1.astype(int)), tuple(p2.astype(int)), (0, 255, 0), 2)
if distance < threshold: # ALERT (Chance of Collision is high)
cv2.putText(annotated_frame, distance.astype(str), (tuple(((np.abs(p2 + p1)) / 2).astype(int))),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
else: # Safe Distance
cv2.putText(annotated_frame, distance.astype(str), (tuple(((np.abs(p2 + p1)) / 2).astype(int))),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2, cv2.LINE_AA)
# Display the annotated frame
annotated_frame = cv2.resize(annotated_frame, (960, 540), interpolation=cv2.INTER_AREA)
cv2.imshow("YOLO11 Tracking", annotated_frame)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord("q"):
# GPIO.cleanup()
break
else:
# Break the loop if the end of the video is reached
break
# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()
Editor is loading...
Leave a Comment