Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
4.2 kB
5
Indexable
from flask import Flask, Response
from vidgear.gears import NetGear
import cv2
import mediapipe as mp
import threading

mpHands = mp.solutions.hands
hands=mpHands.Hands(False,1,min_detection_confidence=0.7)#(False,1,0.8)
mpDraw=mp.solutions.drawing_utils
hand_list=[]
sem = threading.Lock() #Semaphore()
dummy_img = cv2.imread("defect_scanner_logo.png")
# print(dummy_img.shape)

options = {"max_retries":2,"request_timeout":5,}
# client = NetGear(receive_mode=True, address="0.0.0.0", port="12345",**options,)
app = Flask(__name__)

@app.route('/')
def index():
    print("here")
    return "Flask Video Streaming"

flag = 1
client=0
th_l=0
buffer=cv2.imencode('.jpg', dummy_img)
def generate_frames():
    global flag
    global dummy_img
    global client
    global th_l
    global buffer
    while True:
        if th_l!=0:
            yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
            continue
        sem.acquire()
        th_l=1
        try:
            if flag!=0:
                print("enter")
                client = NetGear(receive_mode=True, address="0.0.0.0", port="12345",**options,)
                flag = 0
            frame = client.recv()

            if frame is None:
                print("closed")
                frame = dummy_img
                flag = 1
                client.close()
            # sem.release()
        except:
            print("Except closed")
            flag = 1
            client.close()
            _, buffer = cv2.imencode('.jpg', dummy_img)
            sem.release()
            break
            # print("Except closed")
            # frame = dummy_img
            # flag = 1
            # client.close()
        # if frame is None:
        #     continue
        frame = cv2.resize(frame, (640, 480))
        results = model(frame, classes=0,iou=0.5,conf=0.7,verbose=False)
        # sem.acquire()
        # results_hand = hands.process(frame)
        # sem.release()
        detection_cls=results[0].boxes.cls.tolist()
        annotated_frame = results[0].plot()
        cv2.putText(annotated_frame, f"count={detection_cls.count(0)}", (150, 30), cv2.FONT_HERSHEY_SIMPLEX, 1,[255, 0, 0], 2,cv2.LINE_AA)
        # if results_hand.multi_hand_landmarks:
        #     for handL in results_hand.multi_hand_landmarks:
        #         for id, lm in enumerate(handL.landmark):
        #             # print(id,lm)
        #             h, w, c = frame.shape
        #             cx, cy = int(lm.x * w), int(lm.y * h)
        #             hand_list.append([cx, cy])
        #         if hand_list[4][1] < hand_list[8][1] and hand_list[4][1] < hand_list[12][1] and hand_list[4][1] < hand_list[16][1] and hand_list[4][1] < hand_list[20][1] and hand_list[4][1] < hand_list[0][1]:
        #             cv2.putText(annotated_frame, "Thumbs up", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, [255, 0, 0], 2, cv2.LINE_AA)
        #         if hand_list[6][1] < hand_list[4][1] and hand_list[6][1] < hand_list[16][1] and hand_list[6][1] < hand_list[20][1] and hand_list[6][1] < hand_list[0][1]:
        #             if hand_list[10][1] < hand_list[4][1] and hand_list[10][1] < hand_list[16][1] and hand_list[10][1] < hand_list[20][1] and hand_list[10][1] < hand_list[0][1]:
        #                 if abs(hand_list[8][0]-hand_list[12][0])>20:
        #                     cv2.putText(annotated_frame, "Victory", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, [255, 0, 0], 2, cv2.LINE_AA)
        #         hand_list.clear()
        #         # mpDraw.draw_landmarks(annotated_frame, handL, mpHands.HAND_CONNECTIONS)
        _, buffer = cv2.imencode('.jpg', annotated_frame)
        th_l=0
        sem.release()
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')

    cv2.destroyAllWindows()
    client.close()

@app.route('/video_feed')
def video_feed():
    return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)