Untitled

 avatar
unknown
plain_text
a year ago
5.1 kB
12
Indexable
from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
import cv2
import numpy as np
from gtts import gTTS
import tempfile
import os
import asyncio

app = FastAPI()

# Load YOLO Model
weights_path = "./yolov3.weights"
config_path = "./yolov3.cfg"
labels_path = "./coco.names"

# Load YOLO model
print("Loading YOLO model...")
net = cv2.dnn.readNet(weights_path, config_path)
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]

# Load class labels
with open(labels_path, 'r') as f:
    classes = [line.strip() for line in f.readlines()]
print("YOLO model loaded successfully.")

# Helper function: Detect objects
def detect_objects(frame):
    blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
    net.setInput(blob)
    outputs = net.forward(output_layers)

    detected = False
    object_center = None
    object_area = None

    for out in outputs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]

            if confidence > 0.7:
                class_name = classes[class_id]
                if class_name in ["person", "car", "bicycle", "bus"]:
                    detected = True
                    center_x = int(detection[0] * frame.shape[1])
                    center_y = int(detection[1] * frame.shape[0])
                    w = int(detection[2] * frame.shape[1])
                    h = int(detection[3] * frame.shape[0])
                    area = w * h
                    object_center = (center_x, center_y)
                    object_area = area

    if detected:
        frame_center_x = frame.shape[1] // 2
        frame_center_y = frame.shape[0] // 2
        max_distance = 200
        size_threshold = 1000

        if object_center:
            distance_x = abs(object_center[0] - frame_center_x)
            distance_y = abs(object_center[1] - frame_center_y)

            if distance_x < max_distance and distance_y < max_distance:
                if object_area > size_threshold:
                    cv2.putText(frame, "STOP! Obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                    return frame, "obstacle"
                else:
                    cv2.putText(frame, "Obstacle detected, but not in front", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                    return frame, "not_front"
            else:
                cv2.putText(frame, "No obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                return frame, "safe"
    else:
        cv2.putText(frame, "No obstacle detected", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        return frame, "safe"

# Helper function: Speak alerts
def speak(text):
    tts = gTTS(text=text, lang="en")
    temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
    tts.save(temp_audio.name)
    with open(temp_audio.name, "rb") as audio:
        audio_data = audio.read()
    os.unlink(temp_audio.name)
    return audio_data

# WebSocket endpoint for live video
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    previous_state = None
    try:
        while True:
            frame_data = await websocket.receive_bytes()
            np_frame = np.frombuffer(frame_data, dtype=np.uint8)
            frame = cv2.imdecode(np_frame, cv2.IMREAD_COLOR)

            processed_frame, current_state = detect_objects(frame)

            # Only speak when the state changes
            if current_state != previous_state:
                if current_state == "obstacle":
                    audio_data = speak("Obstacle detected. Stop!")
                    await websocket.send_bytes(audio_data)
                elif current_state == "safe":
                    audio_data = speak("Path is clear. You can proceed.")
                    await websocket.send_bytes(audio_data)
                previous_state = current_state

            # Encode the processed frame and send it back
            _, encoded_frame = cv2.imencode(".jpg", processed_frame)
            await websocket.send_bytes(encoded_frame.tobytes())

    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await websocket.close()

# Test endpoint for live stream via RTSP
@app.get("/stream")
async def video_stream():
    cap = cv2.VideoCapture(0)  # 0 for webcam, replace with RTSP URL if needed

    async def video_generator():
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            _, encoded_frame = cv2.imencode(".jpg", frame)
            yield (b"--frame\r\n"
                   b"Content-Type: image/jpeg\r\n\r\n" + encoded_frame.tobytes() + b"\r\n")
            await asyncio.sleep(0.01)  # Small delay for async streaming

    return StreamingResponse(video_generator(), media_type="multipart/x-mixed-replace; boundary=frame")
Editor is loading...
Leave a Comment