Untitled
from fastapi import FastAPI, WebSocket from fastapi.responses import StreamingResponse import cv2 import numpy as np from gtts import gTTS import tempfile import os import asyncio app = FastAPI() # Load YOLO Model weights_path = "./yolov3.weights" config_path = "./yolov3.cfg" labels_path = "./coco.names" # Load YOLO model print("Loading YOLO model...") net = cv2.dnn.readNet(weights_path, config_path) layer_names = net.getLayerNames() output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()] # Load class labels with open(labels_path, 'r') as f: classes = [line.strip() for line in f.readlines()] print("YOLO model loaded successfully.") # Helper function: Detect objects def detect_objects(frame): blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False) net.setInput(blob) outputs = net.forward(output_layers) detected = False object_center = None object_area = None for out in outputs: for detection in out: scores = detection[5:] class_id = np.argmax(scores) confidence = scores[class_id] if confidence > 0.7: class_name = classes[class_id] if class_name in ["person", "car", "bicycle", "bus"]: detected = True center_x = int(detection[0] * frame.shape[1]) center_y = int(detection[1] * frame.shape[0]) w = int(detection[2] * frame.shape[1]) h = int(detection[3] * frame.shape[0]) area = w * h object_center = (center_x, center_y) object_area = area if detected: frame_center_x = frame.shape[1] // 2 frame_center_y = frame.shape[0] // 2 max_distance = 200 size_threshold = 1000 if object_center: distance_x = abs(object_center[0] - frame_center_x) distance_y = abs(object_center[1] - frame_center_y) if distance_x < max_distance and distance_y < max_distance: if object_area > size_threshold: cv2.putText(frame, "STOP! Obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) return frame, "obstacle" else: cv2.putText(frame, "Obstacle detected, but not in front", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) return frame, "not_front" else: cv2.putText(frame, "No obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) return frame, "safe" else: cv2.putText(frame, "No obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) return frame, "safe" # Helper function: Speak alerts def speak(text): tts = gTTS(text=text, lang="en") temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") tts.save(temp_audio.name) with open(temp_audio.name, "rb") as audio: audio_data = audio.read() os.unlink(temp_audio.name) return audio_data # WebSocket endpoint for live video @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() previous_state = None try: while True: frame_data = await websocket.receive_bytes() np_frame = np.frombuffer(frame_data, dtype=np.uint8) frame = cv2.imdecode(np_frame, cv2.IMREAD_COLOR) processed_frame, current_state = detect_objects(frame) # Only speak when the state changes if current_state != previous_state: if current_state == "obstacle": audio_data = speak("Obstacle detected. Stop!") await websocket.send_bytes(audio_data) elif current_state == "safe": audio_data = speak("Path is clear. You can proceed.") await websocket.send_bytes(audio_data) previous_state = current_state # Encode the processed frame and send it back _, encoded_frame = cv2.imencode(".jpg", processed_frame) await websocket.send_bytes(encoded_frame.tobytes()) except Exception as e: print(f"WebSocket error: {e}") finally: await websocket.close() # Test endpoint for live stream via RTSP @app.get("/stream") async def video_stream(): cap = cv2.VideoCapture(0) # 0 for webcam, replace with RTSP URL if needed async def video_generator(): while True: ret, frame = cap.read() if not ret: break _, encoded_frame = cv2.imencode(".jpg", frame) yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + encoded_frame.tobytes() + b"\r\n") await asyncio.sleep(0.01) # Small delay for async streaming return StreamingResponse(video_generator(), media_type="multipart/x-mixed-replace; boundary=frame")
Leave a Comment