Untitled

mail@pastecode.io avatar
unknown
plain_text
5 months ago
4.0 kB
1
Indexable
import subprocess
subprocess.call(['/home/raspberrypi/.virtualenvs/pyt/bin/activate'])

import cv2                      # OpenCV library 
from threading import Thread    # library for multi-threading
from ultralytics import YOLO    # YOLOv8 library

import RPi.GPIO as GPIO
import time                     # time library
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

# ======= EXIT BUTTON =======
exitPin = 17
GPIO.setup(exitPin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
buttonState = False

global frame_width, frame_height
frame_width = 640
frame_height = 480

# ====== Webcam initialization ======= 
class WebcamStream : # defining a helper class for implementing multi-threading 
    # initialization method 
    def __init__(self, stream_id):
        self.stream_id = stream_id # default is 0 for main camera 
        
        # opening video capture stream 
        self.vcap = cv2.VideoCapture(self.stream_id, cv2.CAP_V4L2)
        # self.vcap.set(cv2.CAP_PROP_FRAME_WIDTH, frame_width)
        # self.vcap.set(cv2.CAP_PROP_FRAME_HEIGHT, frame_height)
        # self.vcap.set(cv2.CAP_PROP_FPS, 10)

        if self.vcap.isOpened() is False :
            print("[Exiting]: Error accessing webcam stream.")
            exit(0)
            
        # reading a single frame from vcap stream for initializing 
        self.grabbed , self.frame = self.vcap.read()
        if self.grabbed is False :
            print('[Exiting] No more frames to read')
            exit(0)
        # self.stopped is initialized to False 
        self.stopped = True
        # thread instantiation  
        self.t = Thread(target=self.update, args=())
        self.t.daemon = True # daemon threads run in background 
        
    # method to start thread 
    def start(self):
        self.stopped = False
        self.t.start()

    # method passed to thread to read next available frame  
    def update(self):
        while True :
            if self.stopped is True :
                break
            self.grabbed , self.frame = self.vcap.read()
            if self.grabbed is False :
                print('[Exiting] No more frames to read')
                self.stopped = True
                break 
        self.vcap.release()
    # method to return latest read frame 
    def read(self):
        return self.frame
    # method to stop reading frames 
    def stop(self):
        self.stopped = True

# initializing and starting multi-threaded webcam input stream 
webcam_stream = WebcamStream('/dev/video0') # 0 id for main camera
webcam_stream.start()

# setup model (it can be in PT, ONNX, or NCNN format. Preferably in NCNN)
model = YOLO("/home/raspberrypi/Desktop/Blindspot/yolov8/runs/run1/train/weights/ncnn480/best_ncnn_model", task="detect")

while True :
    img = cv2.imread('/home/raspberrypi/Desktop/Blindspot/black.jpg')
    cv2.namedWindow('Blindspot', cv2.WND_PROP_FULLSCREEN)
    cv2.setWindowProperty('Blindspot', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    cv2.imshow('Blindspot', img)
    if buttonState is True:
        if webcam_stream.stopped is True :
            break
        else :
            frame = webcam_stream.read()

        # time.sleep(0.03) # delay value in seconds. so, delay=1 is equivalent to 1 second
        # Delay variable in the code can be used for simulating time taken for performing some video processing task
        # Imgsz can handle arbitrary sized images as long as both sides are a multiple of 32 (i.e. 32*20=640, 32*15=480, 32*10=320)
        # !! IMGSZ must be the same with imgsz used in the model !!
        result = model(frame, imgsz=480, conf=0.8, iou=0.7)
        annotated_frame = result[0].plot()
        cv2.imshow('Blindspot', annotated_frame)

    if cv2.waitKey(5) == 27 or (GPIO.input(exitPin) == GPIO.LOW):
        buttonState = not buttonState
        cv2.destroyAllWindows()

GPIO.cleanup()
Leave a Comment