Untitled
from flask import Flask, render_template, Response, request, jsonify import cv2 from picamera2 import Picamera2 from ultralytics import YOLO import atexit import time import RPi.GPIO as GPIO app = Flask(__name__) # Initialize GPIO pins for motor control IN1, IN2, IN3, IN4 = 17, 18, 27, 22 ENA, ENB = 19, 13 GPIO.setmode(GPIO.BCM) GPIO.setup([IN1, IN2, IN3, IN4, ENA, ENB], GPIO.OUT) pwm_ena = GPIO.PWM(ENA, 1000) pwm_enb = GPIO.PWM(ENB, 1000) pwm_ena.start(0) pwm_enb.start(0) def move_forward(): GPIO.output(IN1, GPIO.HIGH) GPIO.output(IN2, GPIO.LOW) GPIO.output(IN3, GPIO.HIGH) GPIO.output(IN4, GPIO.LOW) pwm_ena.ChangeDutyCycle(50) pwm_enb.ChangeDutyCycle(50) def move_backward(): GPIO.output(IN1, GPIO.LOW) GPIO.output(IN2, GPIO.HIGH) GPIO.output(IN3, GPIO.LOW) GPIO.output(IN4, GPIO.HIGH) pwm_ena.ChangeDutyCycle(50) pwm_enb.ChangeDutyCycle(50) def turn_left(): GPIO.output(IN1, GPIO.LOW) GPIO.output(IN2, GPIO.HIGH) GPIO.output(IN3, GPIO.HIGH) GPIO.output(IN4, GPIO.LOW) pwm_ena.ChangeDutyCycle(50) pwm_enb.ChangeDutyCycle(50) def turn_right(): GPIO.output(IN1, GPIO.HIGH) GPIO.output(IN2, GPIO.LOW) GPIO.output(IN3, GPIO.LOW) GPIO.output(IN4, GPIO.HIGH) pwm_ena.ChangeDutyCycle(50) pwm_enb.ChangeDutyCycle(50) def stop_motors(): GPIO.output(IN1, GPIO.LOW) GPIO.output(IN2, GPIO.LOW) GPIO.output(IN3, GPIO.LOW) GPIO.output(IN4, GPIO.LOW) pwm_ena.ChangeDutyCycle(0) pwm_enb.ChangeDutyCycle(0) picam2 = None model = YOLO("yolov8n_ncnn_model") selected_human = None human_class_id = 0 def initialize_camera(): global picam2 if picam2 is None: try: picam2 = Picamera2() picam2.preview_configuration.main.size = (320, 320) picam2.preview_configuration.main.format = "RGB888" picam2.preview_configuration.align() picam2.configure("preview") picam2.start() print("Camera initialized successfully") except Exception as e: print(f"Failed to initialize camera: {e}") @app.before_request def before_request(): initialize_camera() @app.route('/select_person', methods=['POST']) def select_person(): global selected_human data = request.get_json() x, y = data['x'], data['y'] for result in model(picam2.capture_array(), imgsz=320)[0].boxes: if int(result.cls) == human_class_id: x1, y1, x2, y2 = result.xyxy[0].tolist() if x1 <= x <= x2 and y1 <= y <= y2: selected_human = (x1, y1, x2, y2) return jsonify({"status": "Person selected"}) return jsonify({"status": "No person found"}) def generate_frames(): global selected_human while True: frame = picam2.capture_array() results = model(frame, imgsz=320) detected_humans = [] for result in results[0].boxes: if int(result.cls) == human_class_id: x1, y1, x2, y2 = result.xyxy[0].tolist() detected_humans.append((x1, y1, x2, y2)) cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2) if detected_humans: if selected_human: found_selected_human = False for human in detected_humans: x1, y1, x2, y2 = human if (x1 <= selected_human[2] and x2 >= selected_human[0] and y1 <= selected_human[3] and y2 >= selected_human[1]): found_selected_human = True selected_human = human break if not found_selected_human: stop_motors() selected_human = None else: x1, y1, x2, y2 = selected_human cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) center_x = (x1 + x2) // 2 frame_center_x = frame.shape[1] // 2 threshold = 50 if center_x < frame_center_x - threshold: turn_left() elif center_x > frame_center_x + threshold: turn_right() else: move_forward() else: stop_motors() else: stop_motors() _, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') @app.route('/') def index(): return render_template('index.html') @app.route('/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') def cleanup(): print("Cleaning up before shutdown...") stop_motors() GPIO.cleanup() if picam2: picam2.stop() print("Camera stopped.") atexit.register(cleanup) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True, use_reloader=False)
Leave a Comment