Untitled

 avatar
unknown
plain_text
a month ago
5.1 kB
7
Indexable
from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
import cv2
import numpy as np
from gtts import gTTS
import tempfile
import os
import asyncio

app = FastAPI()

# Load YOLO Model
weights_path = "./yolov3.weights"
config_path = "./yolov3.cfg"
labels_path = "./coco.names"

# Load YOLO model
print("Loading YOLO model...")
net = cv2.dnn.readNet(weights_path, config_path)
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]

# Load class labels
with open(labels_path, 'r') as f:
    classes = [line.strip() for line in f.readlines()]
print("YOLO model loaded successfully.")

# Helper function: Detect objects
def detect_objects(frame):
    blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
    net.setInput(blob)
    outputs = net.forward(output_layers)

    detected = False
    object_center = None
    object_area = None

    for out in outputs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]

            if confidence > 0.7:
                class_name = classes[class_id]
                if class_name in ["person", "car", "bicycle", "bus"]:
                    detected = True
                    center_x = int(detection[0] * frame.shape[1])
                    center_y = int(detection[1] * frame.shape[0])
                    w = int(detection[2] * frame.shape[1])
                    h = int(detection[3] * frame.shape[0])
                    area = w * h
                    object_center = (center_x, center_y)
                    object_area = area

    if detected:
        frame_center_x = frame.shape[1] // 2
        frame_center_y = frame.shape[0] // 2
        max_distance = 200
        size_threshold = 1000

        if object_center:
            distance_x = abs(object_center[0] - frame_center_x)
            distance_y = abs(object_center[1] - frame_center_y)

            if distance_x < max_distance and distance_y < max_distance:
                if object_area > size_threshold:
                    cv2.putText(frame, "STOP! Obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                    return frame, "obstacle"
                else:
                    cv2.putText(frame, "Obstacle detected, but not in front", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                    return frame, "not_front"
            else:
                cv2.putText(frame, "No obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                return frame, "safe"
    else:
        cv2.putText(frame, "No obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        return frame, "safe"

# Helper function: Speak alerts
def speak(text):
    tts = gTTS(text=text, lang="en")
    temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
    tts.save(temp_audio.name)
    with open(temp_audio.name, "rb") as audio:
        audio_data = audio.read()
    os.unlink(temp_audio.name)
    return audio_data

# WebSocket endpoint for live video
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    previous_state = None
    try:
        while True:
            frame_data = await websocket.receive_bytes()
            np_frame = np.frombuffer(frame_data, dtype=np.uint8)
            frame = cv2.imdecode(np_frame, cv2.IMREAD_COLOR)

            processed_frame, current_state = detect_objects(frame)

            # Only speak when the state changes
            if current_state != previous_state:
                if current_state == "obstacle":
                    audio_data = speak("Obstacle detected. Stop!")
                    await websocket.send_bytes(audio_data)
                elif current_state == "safe":
                    audio_data = speak("Path is clear. You can proceed.")
                    await websocket.send_bytes(audio_data)
                previous_state = current_state

            # Encode the processed frame and send it back
            _, encoded_frame = cv2.imencode(".jpg", processed_frame)
            await websocket.send_bytes(encoded_frame.tobytes())

    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await websocket.close()

# Test endpoint for live stream via RTSP
@app.get("/stream")
async def video_stream():
    cap = cv2.VideoCapture(0)  # 0 for webcam, replace with RTSP URL if needed

    async def video_generator():
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            _, encoded_frame = cv2.imencode(".jpg", frame)
            yield (b"--frame\r\n"
                   b"Content-Type: image/jpeg\r\n\r\n" + encoded_frame.tobytes() + b"\r\n")
            await asyncio.sleep(0.01)  # Small delay for async streaming

    return StreamingResponse(video_generator(), media_type="multipart/x-mixed-replace; boundary=frame")
Leave a Comment