Untitled
import os from collections import (defaultdict) import cv2 import numpy as np import math from ultralytics import YOLO import time import torch os.environ["CUDA_VISIBLE_DEVICES"] = "0" os.environ["CUDA_LAUNCH_BLOCKING"] = "1" # Load the YOLO11 model model = YOLO("yolo11n.pt") #model.export(format="engine") # creates 'yolo11n.engine' # Load the exported TensorRT model #trt_model = YOLO("yolo11n.engine") # Open the video file video_path = "/home/yoon/Desktop/Research/VideoData.mp4" cap = cv2.VideoCapture(video_path) # Store the track history track_history = defaultdict(lambda: []) prev_positions = {} prev2_positions = {} oldPositionX = None oldPositionY = None old_positions = {} # Variables for Frame Rate Checking prev_frame_time = 0 new_frame_time = 0 bounding_x, bounding_y, bounding_w, bounding_h = 1440, 270, 1000, 600 # Not based on resized window, based on 1920x1080 # Do 1440, 270 for it to count and 960, 540 to not count. bounding_start = (int(bounding_x - bounding_w / 2), (int(bounding_y - bounding_h / 2))) bounding_end = (int(bounding_x + bounding_w / 2), (int(bounding_y + bounding_h / 2))) # Loop through the video frames while cap.isOpened(): # Read a frame from the video success, frame = cap.read() if success: # Run YOLO11 tracking on the frame, persisting tracks between frames results = model.track(frame, persist=True) # Calculate FPS new_frame_time = time.time() fps = 1 / (new_frame_time - prev_frame_time) prev_frame_time = new_frame_time # Convert FPS to integer fps = int(fps) # Put FPS on the frame cv2.putText(frame, f"FPS: {fps}", (7, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (100, 255, 0), 3, cv2.LINE_AA) # Get the boxes and track IDs boxes = results[0].boxes.xywh.cpu() track_ids = results[0].boxes.id.int().cpu().tolist() # Visualize the results on the frame annotated_frame = results[0].plot() cv2.rectangle(annotated_frame, bounding_start, bounding_end, (255, 255, 0), 2) centers = {} # Plot the tracks for box, track_id in zip(boxes, track_ids): x, y, w, h = box track = track_history[track_id] track.append((float(x), float(y))) # x, y center point if len(track) > 30: # retain 90 tracks for 90 frames track.pop(0) # Draw the tracking lines points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) cv2.polylines(annotated_frame, [points], isClosed=False, color=(0, 255, 255), thickness=2) centers[track_id] = (x, y) if track_id in prev_positions: prev_x, prev_y = prev_positions[track_id] prev_2x, prev_2y = prev2_positions[track_id] curr_x, curr_y = centers[track_id] if track_id in old_positions: oldPositionX, oldPositionY = old_positions[track_id] else: oldPositionX, oldPositionY = None, None # Calculate displacement displacement = (curr_x - prev_x, curr_y - prev_y) # Draw motion vector (displacement as an arrow) cv2.arrowedLine(annotated_frame, (int(prev_x), int(prev_y)), (int(curr_x), int(curr_y)), (0, 0, 255), 8) cv2.arrowedLine(annotated_frame, (int(prev_2x), int(prev_2y)), (int(prev_x), int(prev_y)), (255, 144, 255), 8) #Draw Predicted Movement Vector dx1 = prev_x - prev_2x dy1 = prev_y - prev_2y angle1 = math.atan2(dy1, dx1) * 180 / math.pi dx2 = curr_x - prev_x dy2 = curr_y - prev_y angle2 = math.atan2(dy2, dx2) * 180 / math.pi angle_diff = angle2 - angle1 if abs(angle_diff) > 25: predict_radians = math.radians(-(angle2 + angle_diff * 2)) else: predict_radians = math.radians(-(angle2 + angle1)/2) length = 100 end_point = ( int(curr_x + length * math.cos(predict_radians)), int(curr_y - length * math.sin(predict_radians)) ) smoothingFactor = 10 #What portion of the width the object has to move in order to count as new vector EX. 5 = 1/5 box_diagonal = math.sqrt(w**2 + h**2) # Check if the object has moved more than 1/5 of the box diagonal if oldPositionX is not None and oldPositionY is not None: movement_distance = math.sqrt((oldPositionX - curr_x)**2 + (oldPositionY - curr_y)**2) #(math.dist((oldPositionX, oldPositionY), (curr_x, curr_y))) threshold_distance = box_diagonal / smoothingFactor # 1/5 of the diagonal length print(f"Movement Distance: {movement_distance}, Threshold Distance: {threshold_distance} Current Position: {curr_x, curr_y} Old Position: {oldPositionX, oldPositionY}") # Only create a new vector if the object has moved more than the threshold if movement_distance > threshold_distance: old_positions[track_id] = (curr_x, curr_y) cv2.arrowedLine(annotated_frame, (int(curr_x), int(curr_y)), end_point, (0, 255, 0), 8) else: # If it's the first frame, set the old position to the current position old_positions[track_id] = (curr_x, curr_y) # Store the current position for the next iteration prev2_positions[track_id] = prev_positions.get(track_id, (0, 0)) prev_positions[track_id] = centers[track_id] track_ids_list = list(centers.keys()) # Separate human and vehicle objects based on class IDs human_ids_list = [] vehicle_ids_list = [] for box in results[0].boxes: cls = box.cls # class ID x, y, w, h = box.xywh[0].tolist() if bounding_start[0] <= x <= bounding_end[0] and bounding_start[1] <= y <= bounding_end[1]: if cls == 0 or cls == 1: # 0 = human, 1 = bicycle human_ids_list.append(box.id.item()) elif cls == 2 or cls == 3 or cls == 5 or cls == 7: # Vehicle classes, adjust based on YOLO class IDs vehicle_ids_list.append(box.id.item()) for i in range(len(human_ids_list)): # getting the id of primary point for j in range(len(vehicle_ids_list)): # getting the id of secondary point id_1 = human_ids_list[i] id_2 = vehicle_ids_list[j] # Get the center points of both objects p1 = np.array(centers[id_1]) p2 = np.array(centers[id_2]) distance = np.linalg.norm(p1 - p2) threshold = 450 # Draw a line between these two objects cv2.line(annotated_frame, tuple(p1.astype(int)), tuple(p2.astype(int)), (0, 255, 0), 2) if distance < threshold: # ALERT (Chance of Collision is high) cv2.putText(annotated_frame, distance.astype(str), (tuple(((np.abs(p2 + p1)) / 2).astype(int))), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) else: # Safe Distance cv2.putText(annotated_frame, distance.astype(str), (tuple(((np.abs(p2 + p1)) / 2).astype(int))), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2, cv2.LINE_AA) # Display the annotated frame annotated_frame = cv2.resize(annotated_frame, (960, 540), interpolation=cv2.INTER_AREA) cv2.imshow("YOLO11 Tracking", annotated_frame) # Break the loop if 'q' is pressed if cv2.waitKey(1) & 0xFF == ord("q"): # GPIO.cleanup() break else: # Break the loop if the end of the video is reached break # Release the video capture object and close the display window cap.release() cv2.destroyAllWindows()
Leave a Comment