Untitled

 avatar
unknown
plain_text
a month ago
8.1 kB
1
Indexable
import os
from collections import (defaultdict)
import cv2
import numpy as np
import math
from ultralytics import YOLO
import time
import torch


os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"


# Load the YOLO11 model
model = YOLO("yolo11n.pt")


#model.export(format="engine")  # creates 'yolo11n.engine'


# Load the exported TensorRT model
#trt_model = YOLO("yolo11n.engine")


# Open the video file
video_path = "/home/yoon/Desktop/Research/VideoData.mp4"
cap = cv2.VideoCapture(video_path)


# Store the track history
track_history = defaultdict(lambda: [])

prev_positions = {}
prev2_positions = {}
oldPositionX = None
oldPositionY = None
old_positions = {}

# Variables for Frame Rate Checking
prev_frame_time = 0
new_frame_time = 0
bounding_x, bounding_y, bounding_w, bounding_h = 1440, 270, 1000, 600  # Not based on resized window, based on 1920x1080
# Do 1440, 270 for it to count and 960, 540 to not count.
bounding_start = (int(bounding_x - bounding_w / 2), (int(bounding_y - bounding_h / 2)))
bounding_end = (int(bounding_x + bounding_w / 2), (int(bounding_y + bounding_h / 2)))


# Loop through the video frames
while cap.isOpened():
   # Read a frame from the video
   success, frame = cap.read()


   if success:
       # Run YOLO11 tracking on the frame, persisting tracks between frames
       results = model.track(frame, persist=True)

       # Calculate FPS
       new_frame_time = time.time()
       fps = 1 / (new_frame_time - prev_frame_time)
       prev_frame_time = new_frame_time

       # Convert FPS to integer
       fps = int(fps)

       # Put FPS on the frame
       cv2.putText(frame, f"FPS: {fps}", (7, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (100, 255, 0), 3, cv2.LINE_AA)

       # Get the boxes and track IDs
       boxes = results[0].boxes.xywh.cpu()
       track_ids = results[0].boxes.id.int().cpu().tolist()


       # Visualize the results on the frame
       annotated_frame = results[0].plot()


       cv2.rectangle(annotated_frame, bounding_start, bounding_end, (255, 255, 0), 2)


       centers = {}


       # Plot the tracks
       for box, track_id in zip(boxes, track_ids):
           x, y, w, h = box
           track = track_history[track_id]
           track.append((float(x), float(y)))  # x, y center point
           if len(track) > 30:  # retain 90 tracks for 90 frames
               track.pop(0)

           # Draw the tracking lines
           points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
           cv2.polylines(annotated_frame, [points], isClosed=False, color=(0, 255, 255), thickness=2)
           centers[track_id] = (x, y)

           if track_id in prev_positions:
               prev_x, prev_y = prev_positions[track_id]
               prev_2x, prev_2y = prev2_positions[track_id]
               curr_x, curr_y = centers[track_id]

               if track_id in old_positions:
                   oldPositionX, oldPositionY = old_positions[track_id]
               else:
                   oldPositionX, oldPositionY = None, None

               # Calculate displacement
               displacement = (curr_x - prev_x, curr_y - prev_y)
               # Draw motion vector (displacement as an arrow)
               cv2.arrowedLine(annotated_frame, (int(prev_x), int(prev_y)), (int(curr_x), int(curr_y)), (0, 0, 255), 8)
               cv2.arrowedLine(annotated_frame, (int(prev_2x), int(prev_2y)), (int(prev_x), int(prev_y)), (255, 144, 255), 8)

               #Draw Predicted Movement Vector
               dx1 = prev_x - prev_2x
               dy1 = prev_y - prev_2y
               angle1 = math.atan2(dy1, dx1) * 180 / math.pi

               dx2 = curr_x - prev_x
               dy2 = curr_y - prev_y
               angle2 = math.atan2(dy2, dx2) * 180 / math.pi

               angle_diff = angle2 - angle1

               if abs(angle_diff) > 25:
                   predict_radians = math.radians(-(angle2 + angle_diff * 2))
               else:
                   predict_radians = math.radians(-(angle2 + angle1)/2)
               
               length = 100

               end_point = (
                   int(curr_x + length * math.cos(predict_radians)),
                   int(curr_y - length * math.sin(predict_radians))
               )

               smoothingFactor = 10 #What portion of the width the object has to move in order to count as new vector EX. 5 = 1/5

               box_diagonal = math.sqrt(w**2 + h**2)

               # Check if the object has moved more than 1/5 of the box diagonal
               if oldPositionX is not None and oldPositionY is not None:
                   movement_distance = math.sqrt((oldPositionX - curr_x)**2 + (oldPositionY - curr_y)**2) #(math.dist((oldPositionX, oldPositionY), (curr_x, curr_y)))
                   threshold_distance = box_diagonal / smoothingFactor  # 1/5 of the diagonal length
                   print(f"Movement Distance: {movement_distance}, Threshold Distance: {threshold_distance} Current Position: {curr_x, curr_y} Old Position: {oldPositionX, oldPositionY}")
                   # Only create a new vector if the object has moved more than the threshold
                   if movement_distance > threshold_distance:
                       old_positions[track_id] = (curr_x, curr_y)
                       cv2.arrowedLine(annotated_frame, (int(curr_x), int(curr_y)), end_point, (0, 255, 0), 8)
               else:
                   # If it's the first frame, set the old position to the current position
                   old_positions[track_id] = (curr_x, curr_y)

            
           # Store the current position for the next iteration
           prev2_positions[track_id] = prev_positions.get(track_id, (0, 0))
           prev_positions[track_id] = centers[track_id]


       track_ids_list = list(centers.keys())


       # Separate human and vehicle objects based on class IDs
       human_ids_list = []
       vehicle_ids_list = []


       for box in results[0].boxes:
           cls = box.cls  # class ID
           x, y, w, h = box.xywh[0].tolist()
           if bounding_start[0] <= x <= bounding_end[0] and bounding_start[1] <= y <= bounding_end[1]:
               if cls == 0 or cls == 1:  # 0 = human, 1 = bicycle
                   human_ids_list.append(box.id.item())
               elif cls == 2 or cls == 3 or cls == 5 or cls == 7:  # Vehicle classes, adjust based on YOLO class IDs
                   vehicle_ids_list.append(box.id.item())


       for i in range(len(human_ids_list)):  # getting the id of primary point
           for j in range(len(vehicle_ids_list)):  # getting the id of secondary point
               id_1 = human_ids_list[i]
               id_2 = vehicle_ids_list[j]
               # Get the center points of both objects
               p1 = np.array(centers[id_1])
               p2 = np.array(centers[id_2])

               distance = np.linalg.norm(p1 - p2)
               threshold = 450
               # Draw a line between these two objects
               cv2.line(annotated_frame, tuple(p1.astype(int)), tuple(p2.astype(int)), (0, 255, 0), 2)
               if distance < threshold:  # ALERT (Chance of Collision is high)
                   cv2.putText(annotated_frame, distance.astype(str), (tuple(((np.abs(p2 + p1)) / 2).astype(int))),
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
               else:  # Safe Distance
                   cv2.putText(annotated_frame, distance.astype(str), (tuple(((np.abs(p2 + p1)) / 2).astype(int))),
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2, cv2.LINE_AA)


       # Display the annotated frame
       annotated_frame = cv2.resize(annotated_frame, (960, 540), interpolation=cv2.INTER_AREA)
       cv2.imshow("YOLO11 Tracking", annotated_frame)


       # Break the loop if 'q' is pressed
       if cv2.waitKey(1) & 0xFF == ord("q"):
           # GPIO.cleanup()
           break
   else:
       # Break the loop if the end of the video is reached
       break


# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()

Leave a Comment