Untitled
unknown
plain_text
a year ago
4.2 kB
5
Indexable
Never
from flask import Flask, Response from vidgear.gears import NetGear import cv2 import mediapipe as mp import threading mpHands = mp.solutions.hands hands=mpHands.Hands(False,1,min_detection_confidence=0.7)#(False,1,0.8) mpDraw=mp.solutions.drawing_utils hand_list=[] sem = threading.Lock() #Semaphore() dummy_img = cv2.imread("defect_scanner_logo.png") # print(dummy_img.shape) options = {"max_retries":2,"request_timeout":5,} # client = NetGear(receive_mode=True, address="0.0.0.0", port="12345",**options,) app = Flask(__name__) @app.route('/') def index(): print("here") return "Flask Video Streaming" flag = 1 client=0 th_l=0 buffer=cv2.imencode('.jpg', dummy_img) def generate_frames(): global flag global dummy_img global client global th_l global buffer while True: if th_l!=0: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') continue sem.acquire() th_l=1 try: if flag!=0: print("enter") client = NetGear(receive_mode=True, address="0.0.0.0", port="12345",**options,) flag = 0 frame = client.recv() if frame is None: print("closed") frame = dummy_img flag = 1 client.close() # sem.release() except: print("Except closed") flag = 1 client.close() _, buffer = cv2.imencode('.jpg', dummy_img) sem.release() break # print("Except closed") # frame = dummy_img # flag = 1 # client.close() # if frame is None: # continue frame = cv2.resize(frame, (640, 480)) results = model(frame, classes=0,iou=0.5,conf=0.7,verbose=False) # sem.acquire() # results_hand = hands.process(frame) # sem.release() detection_cls=results[0].boxes.cls.tolist() annotated_frame = results[0].plot() cv2.putText(annotated_frame, f"count={detection_cls.count(0)}", (150, 30), cv2.FONT_HERSHEY_SIMPLEX, 1,[255, 0, 0], 2,cv2.LINE_AA) # if results_hand.multi_hand_landmarks: # for handL in results_hand.multi_hand_landmarks: # for id, lm in enumerate(handL.landmark): # # print(id,lm) # h, w, c = frame.shape # cx, cy = int(lm.x * w), int(lm.y * h) # hand_list.append([cx, cy]) # if hand_list[4][1] < hand_list[8][1] and hand_list[4][1] < hand_list[12][1] and hand_list[4][1] < hand_list[16][1] and hand_list[4][1] < hand_list[20][1] and hand_list[4][1] < hand_list[0][1]: # cv2.putText(annotated_frame, "Thumbs up", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, [255, 0, 0], 2, cv2.LINE_AA) # if hand_list[6][1] < hand_list[4][1] and hand_list[6][1] < hand_list[16][1] and hand_list[6][1] < hand_list[20][1] and hand_list[6][1] < hand_list[0][1]: # if hand_list[10][1] < hand_list[4][1] and hand_list[10][1] < hand_list[16][1] and hand_list[10][1] < hand_list[20][1] and hand_list[10][1] < hand_list[0][1]: # if abs(hand_list[8][0]-hand_list[12][0])>20: # cv2.putText(annotated_frame, "Victory", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, [255, 0, 0], 2, cv2.LINE_AA) # hand_list.clear() # # mpDraw.draw_landmarks(annotated_frame, handL, mpHands.HAND_CONNECTIONS) _, buffer = cv2.imencode('.jpg', annotated_frame) th_l=0 sem.release() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') cv2.destroyAllWindows() client.close() @app.route('/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)