Untitled

 avatar
unknown
plain_text
14 days ago
19 kB
10
Indexable
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Akilli Baston - Kor Bireyler icin Sesli Nesne Algilama + Cukur Tespiti
Raspberry Pi 5 + Hailo-8L + IMX708 Kamera + Arduino Ultrasonik Sensör (asagi bakis)
GStreamer + Hailo pipeline - Turkce sesli uyari

Ultrasonik sensör ASAGIYA bakar:
  - Normal zeminde mesafe sabit/kucuk (ZEMIN_NORMAL_CM civarinda)
  - Cukurda zemin uzaklasir -> mesafe CUKUR_ESIK_CM degerini ASAR -> uyari ver
"""
import sys, io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GstApp

import cv2
import time
import signal
import os
import threading
import queue
import subprocess
import serial
import hailo
from picamera2 import Picamera2

Gst.init(None)


# ─── GENEL AYARLAR ────────────────────────────────────────────────────────────
HEF_PATH     = "/usr/share/hailo-models/yolov8s_h8l.hef"
SO_PATH      = "/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so"
CAM_W        = 1280
CAM_H        = 720
HAILO_W      = 640
HAILO_H      = 640
CONF_THRESH  = 0.45
SHOW_DISPLAY = True

# ─── ARDUİNO / ULTRASONİK (ÇUKUR TESPİTİ) ────────────────────────────────────
ARDUINO_PORT    = "/dev/ttyUSB0"
ARDUINO_BAUD    = 9600

# Kalibrasyon:
#   Bastonu düz zeminde tut, seri monitörden gelen değeri oku.
#   O değer ZEMIN_NORMAL_CM'dir. CUKUR_ESIK_CM = ZEMIN_NORMAL_CM + 15 yapabilirsin.
ZEMIN_NORMAL_CM = 25   # cm - düz zeminde beklenen ölçüm (kalibre et)
CUKUR_ESIK_CM   = 40   # cm - bu değerin ÜSTÜNE çıkarsa çukur var demektir

CUKUR_UYARI_ARALIK = 3.0   # saniye - aynı çukur uyarısı kaç sn'de bir tekrarlansın

# ─── NESNE UYARI ARALİKLARI (saniye) ─────────────────────────────────────────
UYARI_ARALIK = {
    "tehlikeli": 4.0,
    "dikkat"   : 6.0,
    "bilgi"    : 10.0,
}

# ─── KAMERA TABANLI MESAFE ESİKLERİ (nesne_h / frame_h oranı) ─────────────────
MESAFE_ESIK = {
    "person"       : 0.35,
    "bicycle"      : 0.30,
    "car"          : 0.25,
    "motorcycle"   : 0.25,
    "bus"          : 0.30,
    "truck"        : 0.30,
    "cat"          : 0.20,
    "dog"          : 0.20,
    "traffic light": 0.10,
    "cell phone"   : 0.10,
    "bench"        : 0.25,
    "stop sign"    : 0.10,
    "chair"        : 0.25,
    "potted plant" : 0.20,
}

# ─── HEDEF NESNELER ───────────────────────────────────────────────────────────
HEDEF_NESNELER = {
    "person"        : ("insan",        "dikkat",    "İnsan"),
    "bicycle"       : ("bisiklet",     "dikkat",    "Dikkat bisiklet"),
    "car"           : ("araba",        "tehlikeli", "Tehlike araba"),
    "motorcycle"    : ("motosiklet",   "tehlikeli", "Tehlike motosiklet"),
    "bus"           : ("otobüs",       "tehlikeli", "Tehlike otobus"),
    "truck"         : ("kamyon",       "tehlikeli", "Tehlike kamyon"),
    "traffic light" : ("trafik ışığı", "bilgi",     "Trafik Işığı"),
    "cat"           : ("kedi",         "dikkat",    "Dikkat kedi"),
    "dog"           : ("köpek",        "dikkat",    "Dikkat köpek"),
    "cell phone"    : ("telefon",      "bilgi",     "Cep telefonu"),
    "bench"         : ("bank",         "bilgi",     "Bank"),
    "stop sign"     : ("dur işareti",  "bilgi",     "Dur işareti"),
    "potted plant"  : ("engel",        "bilgi",     "Dikkat engel var"),
}

RENK = {
    "tehlikeli": (0,   0,   255),
    "dikkat"   : (0,   165, 255),
    "bilgi"    : (0,   200, 0  ),
    "diger"    : (150, 150, 150),
}


# ─── GLOBAL DURUM ─────────────────────────────────────────────────────────────
ses_kuyrugu      = queue.Queue(maxsize=3)
son_uyari_zamani = {}

son_tespitler    = []
son_frame        = None
tespitler_lock   = threading.Lock()
genis_frame_lock = threading.Lock()
genis_frame      = [None]
fps_list         = []
frame_count      = [0]
running          = [True]

# Ultrasonik sensörden gelen anlık mesafe (cm) — None = bağlantı yok
ultrasonik_mesafe = [None]
ultrasonik_lock   = threading.Lock()


# ─── YARDIMCI FONKSİYONLAR ────────────────────────────────────────────────────
def yon_hesapla(cx: float) -> str:
    if cx < 0.33:   return "solda"
    elif cx > 0.67: return "sağda"
    else:           return "önünde"

def yakin_mi_kamera(sinif_adi: str, nesne_h: float) -> bool:
    """Kamera tabanlı nesne yakınlık tahmini."""
    return nesne_h >= MESAFE_ESIK.get(sinif_adi, 0.25)

def cukur_var_mi() -> bool:
    """
    Ultrasonik sensör aşağıya bakar.
    Zemin mesafesi CUKUR_ESIK_CM değerini aşarsa çukur/basamak var demektir.
    """
    with ultrasonik_lock:
        mesafe = ultrasonik_mesafe[0]
    if mesafe is None:
        return False
    return mesafe > CUKUR_ESIK_CM


# ─── UYARI FONKSİYONLARI ──────────────────────────────────────────────────────
def cukur_uyari_ver():
    """Çukur tespit edildiğinde sesli uyarı verir."""
    if not cukur_var_mi():
        return

    anahtar = "cukur"
    simdi   = time.time()
    if simdi - son_uyari_zamani.get(anahtar, 0) < CUKUR_UYARI_ARALIK:
        return

    with ultrasonik_lock:
        mesafe = ultrasonik_mesafe[0]

    son_uyari_zamani[anahtar] = simdi
    metin = "İleride çukur var"
    print(f"[ÇUKUR] {metin}  ({mesafe:.0f} cm)")
    if not ses_kuyrugu.full():
        ses_kuyrugu.put(metin)

def uyari_ver(sinif_adi: str, cx: float, nesne_h: float):
    """Kamera tespitine göre nesne uyarısı verir."""
    if sinif_adi not in HEDEF_NESNELER:
        return

    MESAFESIZ_UYARI = {"traffic light", "stop sign"}
    if sinif_adi not in MESAFESIZ_UYARI and not yakin_mi_kamera(sinif_adi, nesne_h):
        return

    ad, seviye, temel = HEDEF_NESNELER[sinif_adi]
    yon     = yon_hesapla(cx)
    anahtar = f"{sinif_adi}_{yon}"
    simdi   = time.time()
    aralik  = UYARI_ARALIK.get(seviye, 6.0)

    if simdi - son_uyari_zamani.get(anahtar, 0) < aralik:
        return

    son_uyari_zamani[anahtar] = simdi
    metin = f"{temel} {yon}"
    print(f"[SES] {metin}")
    if not ses_kuyrugu.full():
        ses_kuyrugu.put(metin)


# ─── SES THREAD ───────────────────────────────────────────────────────────────
def tts_worker():
    while True:
        metin = ses_kuyrugu.get()
        if metin is None:
            break
        try:
            subprocess.run(
                ["espeak-ng", "-v", "tr", "-s", "150", "-a", "200", metin],
                capture_output=True, timeout=5
            )
        except Exception as e:
            print(f"[SES HATA] {e}")
        finally:
            ses_kuyrugu.task_done()


# ─── ARDUİNO THREAD ───────────────────────────────────────────────────────────
def arduino_worker():
    """
    Arduino'dan seri port üzerinden zemin mesafesini okur (sensör aşağıya bakar).
    Her satırda float cm değeri beklenir.
    Bağlantı kopuksa 3 saniye bekleyip yeniden bağlanır.
    """
    while running[0]:
        try:
            print(f"[*] Arduino bağlanıyor: {ARDUINO_PORT} @ {ARDUINO_BAUD} baud...")
            with serial.Serial(ARDUINO_PORT, ARDUINO_BAUD, timeout=2) as arduino:
                time.sleep(2)   # Arduino reset süresi
                print(f"[✓] Arduino bağlandı. Zemin normal: ~{ZEMIN_NORMAL_CM} cm | Çukur eşiği: >{CUKUR_ESIK_CM} cm")
                while running[0]:
                    satir = arduino.readline().decode("utf-8", errors="ignore").strip()
                    if not satir:
                        continue
                    try:
                        mesafe_cm = float(satir)
                        with ultrasonik_lock:
                            ultrasonik_mesafe[0] = mesafe_cm

                        if mesafe_cm > CUKUR_ESIK_CM:
                            print(f"[US] Mesafe: {mesafe_cm:.1f} cm  ← ÇUKUR!")
                        # else: normal zemin, log basma (gürültüyü azalt)

                        # Çukur uyarısını her okumada kontrol et
                        cukur_uyari_ver()

                    except ValueError:
                        pass   # sayısal olmayan satırları atla

        except serial.SerialException as e:
            print(f"[!] Arduino bağlantı hatası: {e}. 3 sn sonra yeniden deneniyor...")
            with ultrasonik_lock:
                ultrasonik_mesafe[0] = None
            time.sleep(3)


# ─── GSTREAMER CALLBACK ───────────────────────────────────────────────────────
def on_buffer(sink, data):
    sample = sink.emit('pull-sample')
    if not sample:
        return Gst.FlowReturn.ERROR

    buf = sample.get_buffer()

    tespitler = []
    try:
        roi  = hailo.get_roi_from_buffer(buf)
        dets = roi.get_objects_typed(hailo.HAILO_DETECTION)
        for d in dets:
            conf  = d.get_confidence()
            label = d.get_label()

            if label not in HEDEF_NESNELER or conf < CONF_THRESH:
                continue

            bbox    = d.get_bbox()
            cx      = bbox.xmin() + bbox.width() / 2
            nesne_h = bbox.height()

            x1 = int(bbox.xmin() * CAM_W)
            y1 = int(bbox.ymin() * CAM_H)
            x2 = int((bbox.xmin() + bbox.width())  * CAM_W)
            y2 = int((bbox.ymin() + bbox.height()) * CAM_H)
            tespitler.append((x1, y1, x2, y2, conf, label, cx, nesne_h))
    except Exception:
        pass

    # Tehlikeliler önce sıralanır
    def seviye_sirasi(t):
        s = HEDEF_NESNELER.get(t[5], (None, "z", None))[1]
        return 0 if s == "tehlikeli" else 1 if s == "dikkat" else 2

    for t in sorted(tespitler, key=seviye_sirasi):
        uyari_ver(t[5], t[6], t[7])

    with genis_frame_lock:
        frame = genis_frame[0]

    with tespitler_lock:
        son_tespitler.clear()
        son_tespitler.extend(tespitler)
        global son_frame
        son_frame = frame

    frame_count[0] += 1
    return Gst.FlowReturn.OK


# ─── ÇİZİM ────────────────────────────────────────────────────────────────────
def draw(frame, tespitler, fps):
    oh, ow = frame.shape[:2]

    # Nesne kutuları
    for (x1, y1, x2, y2, conf, sinif_adi, cx, nesne_h) in tespitler:
        ad, seviye, _ = HEDEF_NESNELER[sinif_adi]
        color = RENK[seviye]
        thick = 3 if seviye == "tehlikeli" else 2

        yakin = yakin_mi_kamera(sinif_adi, nesne_h)
        if not yakin:
            color = tuple(c // 3 for c in color)
            thick = 1

        cv2.rectangle(frame, (x1, y1), (x2, y2), color, thick)

        yon       = yon_hesapla(cx)
        yakin_str = "YAKIN" if yakin else "uzak"
        etiket    = f"{ad} {conf:.2f} | {yon} | {yakin_str}"

        (tw, th), _ = cv2.getTextSize(etiket, cv2.FONT_HERSHEY_SIMPLEX, 0.55, 2)
        cv2.rectangle(frame, (x1, max(0, y1 - th - 8)), (x1 + tw + 6, y1), color, -1)
        cv2.putText(frame, etiket, (x1 + 3, y1 - 4),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 2)

    # ── Üst bilgi paneli ──
    with ultrasonik_lock:
        us_val = ultrasonik_mesafe[0]

    cukur  = cukur_var_mi()
    us_str = f"{us_val:.0f}cm" if us_val is not None else "bagli degil"

    # Çukur varsa kırmızı, normal zeminde yeşil, bağlı değilse gri
    if us_val is None:
        us_renk = (150, 150, 150)
    elif cukur:
        us_renk = (0, 0, 255)
    else:
        us_renk = (0, 220, 0)

    yakin_n = sum(1 for t in tespitler if yakin_mi_kamera(t[5], t[7]))
    cv2.rectangle(frame, (0, 0), (620, 45), (0, 0, 0), -1)
    cv2.putText(frame,
                f"FPS:{fps:.1f}  Nesne:{yakin_n}/{len(tespitler)}  Zemin:{us_str}  [Hailo-8L]",
                (8, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.68, us_renk, 2)

    # ── Çukur uyarı bandı (altta) ──
    if cukur and us_val is not None:
        cv2.rectangle(frame, (0, oh - 90), (ow, oh - 50), (0, 0, 200), -1)
        cv2.putText(frame, f"!  CUKUR  |  Zemin: {us_val:.0f} cm  !",
                    (ow // 2 - 220, oh - 60),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)

    # ── Kamera tabanlı tehlike bandı (en altta) ──
    tehlike = any(
        HEDEF_NESNELER.get(t[5], (None, "diger", None))[1] == "tehlikeli"
        and yakin_mi_kamera(t[5], t[7])
        for t in tespitler
    )
    if tehlike:
        cv2.rectangle(frame, (0, oh - 50), (ow, oh), (0, 0, 180), -1)
        cv2.putText(frame, "! TEHLIKE !", (ow // 2 - 130, oh - 12),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 255), 3)

    return frame


# ─── ANA FONKSİYON ────────────────────────────────────────────────────────────
def main():
    global son_frame

    print("=" * 62)
    print("   Akilli Baston  -  Hailo-8L + Çukur Tespiti (Ultrasonik)")
    print(f"   Zemin normal: ~{ZEMIN_NORMAL_CM} cm  |  Çukur eşiği: >{CUKUR_ESIK_CM} cm")
    print("=" * 62)

    # Ses thread
    ses_thread = threading.Thread(target=tts_worker, daemon=True)
    ses_thread.start()
    if subprocess.run(["which", "espeak-ng"], capture_output=True).returncode == 0:
        print("[✓] espeak-ng hazır.")
        ses_kuyrugu.put("Akıllı Baston Başlatıldı")
    else:
        print("[!] espeak-ng bulunamadı. Kur: sudo apt install espeak-ng")

    # Arduino thread
    arduino_thread = threading.Thread(target=arduino_worker, daemon=True)
    arduino_thread.start()
    print(f"[*] Çukur sensörü thread başlatıldı ({ARDUINO_PORT}).")

    # Kamera
    print("[*] Kamera başlatılıyor...")
    cam = Picamera2()
    cam.configure(cam.create_video_configuration(
        main={"size": (CAM_W, CAM_H), "format": "RGB888"},
        controls={"FrameRate": 30}
    ))
    cam.start()
    time.sleep(1)
    print(f"[✓] Kamera hazır ({CAM_W}x{CAM_H}).")

    # GStreamer pipeline
    print("[*] Hailo pipeline başlatılıyor...")
    pipeline_str = f'''
        appsrc name=src format=time is-live=true block=true
            caps=video/x-raw,format=RGB,width={HAILO_W},height={HAILO_H},framerate=30/1 !
        hailonet hef-path={HEF_PATH} batch-size=1 force-writable=true !
        hailofilter so-path={SO_PATH} qos=false !
        appsink name=sink emit-signals=true max-buffers=2 drop=false sync=false
    '''
    pipeline = Gst.parse_launch(pipeline_str)
    src  = pipeline.get_by_name('src')
    sink = pipeline.get_by_name('sink')
    sink.connect('new-sample', on_buffer, None)
    pipeline.set_state(Gst.State.PLAYING)
    time.sleep(0.5)
    print("[✓] Hailo hazır.")

    # Ekran
    if SHOW_DISPLAY:
        os.environ.setdefault("DISPLAY", ":0")
        cv2.namedWindow("Akilli Baston", cv2.WINDOW_NORMAL)
        cv2.resizeWindow("Akilli Baston", CAM_W, CAM_H)

    # Ctrl+C
    def handle_exit(sig, frame):
        running[0] = False
    signal.signal(signal.SIGINT, handle_exit)

    # Frame push thread
    def push_frames():
        pts = 0
        dur = int(1e9 / 30)
        while running[0]:
            frame = cam.capture_array()
            with genis_frame_lock:
                genis_frame[0] = frame.copy()
            small = cv2.resize(frame, (HAILO_W, HAILO_H))
            buf = Gst.Buffer.new_wrapped(bytes(small))
            buf.pts = pts
            buf.duration = dur
            pts += dur
            src.emit('push-buffer', buf)

    push_thread = threading.Thread(target=push_frames, daemon=True)
    push_thread.start()

    print("[*] Çalışıyor. Çıkmak için Ctrl+C veya 'q'\n")

    t_prev = time.time()
    try:
        while running[0]:
            with tespitler_lock:
                frame     = son_frame
                tespitler = list(son_tespitler)

            if frame is None:
                time.sleep(0.01)
                continue

            now    = time.time()
            fps    = 1.0 / max(now - t_prev, 1e-6)
            t_prev = now
            fps_list.append(fps)
            if len(fps_list) > 30:
                fps_list.pop(0)
            avg_fps = sum(fps_list) / len(fps_list)

            frame = draw(frame.copy(), tespitler, avg_fps)

            if SHOW_DISPLAY:
                cv2.imshow("Akilli Baston", frame)
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    break
                elif key == ord('s'):
                    fn = f"snap_{frame_count[0]}.jpg"
                    cv2.imwrite(fn, frame)
                    print(f"[✓] Kaydedildi: {fn}")

            if frame_count[0] % 30 == 0 and frame_count[0] > 0:
                with ultrasonik_lock:
                    us = ultrasonik_mesafe[0]
                yakin_nesneler = [t[5] for t in tespitler if yakin_mi_kamera(t[5], t[7])]
                cukur_str = f"ÇUKUR! ({us:.0f}cm)" if cukur_var_mi() else f"normal ({us:.0f}cm)" if us else "bagli degil"
                print(f"Frame {frame_count[0]:5d} | FPS:{avg_fps:.1f} | "
                      f"Nesneler:{yakin_nesneler} | Zemin:{cukur_str}")

    finally:
        running[0] = False
        src.emit('end-of-stream')
        pipeline.set_state(Gst.State.NULL)
        cam.stop()
        ses_kuyrugu.put(None)
        cv2.destroyAllWindows()
        if fps_list:
            print(f"\n[✓] Toplam frame : {frame_count[0]}")
            print(f"[✓] Ortalama FPS : {sum(fps_list)/len(fps_list):.1f}")


if __name__ == "__main__":
    main()
Editor is loading...
Leave a Comment