Untitled
unknown
plain_text
14 days ago
19 kB
10
Indexable
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Akilli Baston - Kor Bireyler icin Sesli Nesne Algilama + Cukur Tespiti
Raspberry Pi 5 + Hailo-8L + IMX708 Kamera + Arduino Ultrasonik Sensör (asagi bakis)
GStreamer + Hailo pipeline - Turkce sesli uyari
Ultrasonik sensör ASAGIYA bakar:
- Normal zeminde mesafe sabit/kucuk (ZEMIN_NORMAL_CM civarinda)
- Cukurda zemin uzaklasir -> mesafe CUKUR_ESIK_CM degerini ASAR -> uyari ver
"""
import sys, io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GstApp
import cv2
import time
import signal
import os
import threading
import queue
import subprocess
import serial
import hailo
from picamera2 import Picamera2
Gst.init(None)
# ─── GENEL AYARLAR ────────────────────────────────────────────────────────────
HEF_PATH = "/usr/share/hailo-models/yolov8s_h8l.hef"
SO_PATH = "/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so"
CAM_W = 1280
CAM_H = 720
HAILO_W = 640
HAILO_H = 640
CONF_THRESH = 0.45
SHOW_DISPLAY = True
# ─── ARDUİNO / ULTRASONİK (ÇUKUR TESPİTİ) ────────────────────────────────────
ARDUINO_PORT = "/dev/ttyUSB0"
ARDUINO_BAUD = 9600
# Kalibrasyon:
# Bastonu düz zeminde tut, seri monitörden gelen değeri oku.
# O değer ZEMIN_NORMAL_CM'dir. CUKUR_ESIK_CM = ZEMIN_NORMAL_CM + 15 yapabilirsin.
ZEMIN_NORMAL_CM = 25 # cm - düz zeminde beklenen ölçüm (kalibre et)
CUKUR_ESIK_CM = 40 # cm - bu değerin ÜSTÜNE çıkarsa çukur var demektir
CUKUR_UYARI_ARALIK = 3.0 # saniye - aynı çukur uyarısı kaç sn'de bir tekrarlansın
# ─── NESNE UYARI ARALİKLARI (saniye) ─────────────────────────────────────────
UYARI_ARALIK = {
"tehlikeli": 4.0,
"dikkat" : 6.0,
"bilgi" : 10.0,
}
# ─── KAMERA TABANLI MESAFE ESİKLERİ (nesne_h / frame_h oranı) ─────────────────
MESAFE_ESIK = {
"person" : 0.35,
"bicycle" : 0.30,
"car" : 0.25,
"motorcycle" : 0.25,
"bus" : 0.30,
"truck" : 0.30,
"cat" : 0.20,
"dog" : 0.20,
"traffic light": 0.10,
"cell phone" : 0.10,
"bench" : 0.25,
"stop sign" : 0.10,
"chair" : 0.25,
"potted plant" : 0.20,
}
# ─── HEDEF NESNELER ───────────────────────────────────────────────────────────
HEDEF_NESNELER = {
"person" : ("insan", "dikkat", "İnsan"),
"bicycle" : ("bisiklet", "dikkat", "Dikkat bisiklet"),
"car" : ("araba", "tehlikeli", "Tehlike araba"),
"motorcycle" : ("motosiklet", "tehlikeli", "Tehlike motosiklet"),
"bus" : ("otobüs", "tehlikeli", "Tehlike otobus"),
"truck" : ("kamyon", "tehlikeli", "Tehlike kamyon"),
"traffic light" : ("trafik ışığı", "bilgi", "Trafik Işığı"),
"cat" : ("kedi", "dikkat", "Dikkat kedi"),
"dog" : ("köpek", "dikkat", "Dikkat köpek"),
"cell phone" : ("telefon", "bilgi", "Cep telefonu"),
"bench" : ("bank", "bilgi", "Bank"),
"stop sign" : ("dur işareti", "bilgi", "Dur işareti"),
"potted plant" : ("engel", "bilgi", "Dikkat engel var"),
}
RENK = {
"tehlikeli": (0, 0, 255),
"dikkat" : (0, 165, 255),
"bilgi" : (0, 200, 0 ),
"diger" : (150, 150, 150),
}
# ─── GLOBAL DURUM ─────────────────────────────────────────────────────────────
ses_kuyrugu = queue.Queue(maxsize=3)
son_uyari_zamani = {}
son_tespitler = []
son_frame = None
tespitler_lock = threading.Lock()
genis_frame_lock = threading.Lock()
genis_frame = [None]
fps_list = []
frame_count = [0]
running = [True]
# Ultrasonik sensörden gelen anlık mesafe (cm) — None = bağlantı yok
ultrasonik_mesafe = [None]
ultrasonik_lock = threading.Lock()
# ─── YARDIMCI FONKSİYONLAR ────────────────────────────────────────────────────
def yon_hesapla(cx: float) -> str:
if cx < 0.33: return "solda"
elif cx > 0.67: return "sağda"
else: return "önünde"
def yakin_mi_kamera(sinif_adi: str, nesne_h: float) -> bool:
"""Kamera tabanlı nesne yakınlık tahmini."""
return nesne_h >= MESAFE_ESIK.get(sinif_adi, 0.25)
def cukur_var_mi() -> bool:
"""
Ultrasonik sensör aşağıya bakar.
Zemin mesafesi CUKUR_ESIK_CM değerini aşarsa çukur/basamak var demektir.
"""
with ultrasonik_lock:
mesafe = ultrasonik_mesafe[0]
if mesafe is None:
return False
return mesafe > CUKUR_ESIK_CM
# ─── UYARI FONKSİYONLARI ──────────────────────────────────────────────────────
def cukur_uyari_ver():
"""Çukur tespit edildiğinde sesli uyarı verir."""
if not cukur_var_mi():
return
anahtar = "cukur"
simdi = time.time()
if simdi - son_uyari_zamani.get(anahtar, 0) < CUKUR_UYARI_ARALIK:
return
with ultrasonik_lock:
mesafe = ultrasonik_mesafe[0]
son_uyari_zamani[anahtar] = simdi
metin = "İleride çukur var"
print(f"[ÇUKUR] {metin} ({mesafe:.0f} cm)")
if not ses_kuyrugu.full():
ses_kuyrugu.put(metin)
def uyari_ver(sinif_adi: str, cx: float, nesne_h: float):
"""Kamera tespitine göre nesne uyarısı verir."""
if sinif_adi not in HEDEF_NESNELER:
return
MESAFESIZ_UYARI = {"traffic light", "stop sign"}
if sinif_adi not in MESAFESIZ_UYARI and not yakin_mi_kamera(sinif_adi, nesne_h):
return
ad, seviye, temel = HEDEF_NESNELER[sinif_adi]
yon = yon_hesapla(cx)
anahtar = f"{sinif_adi}_{yon}"
simdi = time.time()
aralik = UYARI_ARALIK.get(seviye, 6.0)
if simdi - son_uyari_zamani.get(anahtar, 0) < aralik:
return
son_uyari_zamani[anahtar] = simdi
metin = f"{temel} {yon}"
print(f"[SES] {metin}")
if not ses_kuyrugu.full():
ses_kuyrugu.put(metin)
# ─── SES THREAD ───────────────────────────────────────────────────────────────
def tts_worker():
while True:
metin = ses_kuyrugu.get()
if metin is None:
break
try:
subprocess.run(
["espeak-ng", "-v", "tr", "-s", "150", "-a", "200", metin],
capture_output=True, timeout=5
)
except Exception as e:
print(f"[SES HATA] {e}")
finally:
ses_kuyrugu.task_done()
# ─── ARDUİNO THREAD ───────────────────────────────────────────────────────────
def arduino_worker():
"""
Arduino'dan seri port üzerinden zemin mesafesini okur (sensör aşağıya bakar).
Her satırda float cm değeri beklenir.
Bağlantı kopuksa 3 saniye bekleyip yeniden bağlanır.
"""
while running[0]:
try:
print(f"[*] Arduino bağlanıyor: {ARDUINO_PORT} @ {ARDUINO_BAUD} baud...")
with serial.Serial(ARDUINO_PORT, ARDUINO_BAUD, timeout=2) as arduino:
time.sleep(2) # Arduino reset süresi
print(f"[✓] Arduino bağlandı. Zemin normal: ~{ZEMIN_NORMAL_CM} cm | Çukur eşiği: >{CUKUR_ESIK_CM} cm")
while running[0]:
satir = arduino.readline().decode("utf-8", errors="ignore").strip()
if not satir:
continue
try:
mesafe_cm = float(satir)
with ultrasonik_lock:
ultrasonik_mesafe[0] = mesafe_cm
if mesafe_cm > CUKUR_ESIK_CM:
print(f"[US] Mesafe: {mesafe_cm:.1f} cm ← ÇUKUR!")
# else: normal zemin, log basma (gürültüyü azalt)
# Çukur uyarısını her okumada kontrol et
cukur_uyari_ver()
except ValueError:
pass # sayısal olmayan satırları atla
except serial.SerialException as e:
print(f"[!] Arduino bağlantı hatası: {e}. 3 sn sonra yeniden deneniyor...")
with ultrasonik_lock:
ultrasonik_mesafe[0] = None
time.sleep(3)
# ─── GSTREAMER CALLBACK ───────────────────────────────────────────────────────
def on_buffer(sink, data):
sample = sink.emit('pull-sample')
if not sample:
return Gst.FlowReturn.ERROR
buf = sample.get_buffer()
tespitler = []
try:
roi = hailo.get_roi_from_buffer(buf)
dets = roi.get_objects_typed(hailo.HAILO_DETECTION)
for d in dets:
conf = d.get_confidence()
label = d.get_label()
if label not in HEDEF_NESNELER or conf < CONF_THRESH:
continue
bbox = d.get_bbox()
cx = bbox.xmin() + bbox.width() / 2
nesne_h = bbox.height()
x1 = int(bbox.xmin() * CAM_W)
y1 = int(bbox.ymin() * CAM_H)
x2 = int((bbox.xmin() + bbox.width()) * CAM_W)
y2 = int((bbox.ymin() + bbox.height()) * CAM_H)
tespitler.append((x1, y1, x2, y2, conf, label, cx, nesne_h))
except Exception:
pass
# Tehlikeliler önce sıralanır
def seviye_sirasi(t):
s = HEDEF_NESNELER.get(t[5], (None, "z", None))[1]
return 0 if s == "tehlikeli" else 1 if s == "dikkat" else 2
for t in sorted(tespitler, key=seviye_sirasi):
uyari_ver(t[5], t[6], t[7])
with genis_frame_lock:
frame = genis_frame[0]
with tespitler_lock:
son_tespitler.clear()
son_tespitler.extend(tespitler)
global son_frame
son_frame = frame
frame_count[0] += 1
return Gst.FlowReturn.OK
# ─── ÇİZİM ────────────────────────────────────────────────────────────────────
def draw(frame, tespitler, fps):
oh, ow = frame.shape[:2]
# Nesne kutuları
for (x1, y1, x2, y2, conf, sinif_adi, cx, nesne_h) in tespitler:
ad, seviye, _ = HEDEF_NESNELER[sinif_adi]
color = RENK[seviye]
thick = 3 if seviye == "tehlikeli" else 2
yakin = yakin_mi_kamera(sinif_adi, nesne_h)
if not yakin:
color = tuple(c // 3 for c in color)
thick = 1
cv2.rectangle(frame, (x1, y1), (x2, y2), color, thick)
yon = yon_hesapla(cx)
yakin_str = "YAKIN" if yakin else "uzak"
etiket = f"{ad} {conf:.2f} | {yon} | {yakin_str}"
(tw, th), _ = cv2.getTextSize(etiket, cv2.FONT_HERSHEY_SIMPLEX, 0.55, 2)
cv2.rectangle(frame, (x1, max(0, y1 - th - 8)), (x1 + tw + 6, y1), color, -1)
cv2.putText(frame, etiket, (x1 + 3, y1 - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 2)
# ── Üst bilgi paneli ──
with ultrasonik_lock:
us_val = ultrasonik_mesafe[0]
cukur = cukur_var_mi()
us_str = f"{us_val:.0f}cm" if us_val is not None else "bagli degil"
# Çukur varsa kırmızı, normal zeminde yeşil, bağlı değilse gri
if us_val is None:
us_renk = (150, 150, 150)
elif cukur:
us_renk = (0, 0, 255)
else:
us_renk = (0, 220, 0)
yakin_n = sum(1 for t in tespitler if yakin_mi_kamera(t[5], t[7]))
cv2.rectangle(frame, (0, 0), (620, 45), (0, 0, 0), -1)
cv2.putText(frame,
f"FPS:{fps:.1f} Nesne:{yakin_n}/{len(tespitler)} Zemin:{us_str} [Hailo-8L]",
(8, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.68, us_renk, 2)
# ── Çukur uyarı bandı (altta) ──
if cukur and us_val is not None:
cv2.rectangle(frame, (0, oh - 90), (ow, oh - 50), (0, 0, 200), -1)
cv2.putText(frame, f"! CUKUR | Zemin: {us_val:.0f} cm !",
(ow // 2 - 220, oh - 60),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
# ── Kamera tabanlı tehlike bandı (en altta) ──
tehlike = any(
HEDEF_NESNELER.get(t[5], (None, "diger", None))[1] == "tehlikeli"
and yakin_mi_kamera(t[5], t[7])
for t in tespitler
)
if tehlike:
cv2.rectangle(frame, (0, oh - 50), (ow, oh), (0, 0, 180), -1)
cv2.putText(frame, "! TEHLIKE !", (ow // 2 - 130, oh - 12),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 255), 3)
return frame
# ─── ANA FONKSİYON ────────────────────────────────────────────────────────────
def main():
global son_frame
print("=" * 62)
print(" Akilli Baston - Hailo-8L + Çukur Tespiti (Ultrasonik)")
print(f" Zemin normal: ~{ZEMIN_NORMAL_CM} cm | Çukur eşiği: >{CUKUR_ESIK_CM} cm")
print("=" * 62)
# Ses thread
ses_thread = threading.Thread(target=tts_worker, daemon=True)
ses_thread.start()
if subprocess.run(["which", "espeak-ng"], capture_output=True).returncode == 0:
print("[✓] espeak-ng hazır.")
ses_kuyrugu.put("Akıllı Baston Başlatıldı")
else:
print("[!] espeak-ng bulunamadı. Kur: sudo apt install espeak-ng")
# Arduino thread
arduino_thread = threading.Thread(target=arduino_worker, daemon=True)
arduino_thread.start()
print(f"[*] Çukur sensörü thread başlatıldı ({ARDUINO_PORT}).")
# Kamera
print("[*] Kamera başlatılıyor...")
cam = Picamera2()
cam.configure(cam.create_video_configuration(
main={"size": (CAM_W, CAM_H), "format": "RGB888"},
controls={"FrameRate": 30}
))
cam.start()
time.sleep(1)
print(f"[✓] Kamera hazır ({CAM_W}x{CAM_H}).")
# GStreamer pipeline
print("[*] Hailo pipeline başlatılıyor...")
pipeline_str = f'''
appsrc name=src format=time is-live=true block=true
caps=video/x-raw,format=RGB,width={HAILO_W},height={HAILO_H},framerate=30/1 !
hailonet hef-path={HEF_PATH} batch-size=1 force-writable=true !
hailofilter so-path={SO_PATH} qos=false !
appsink name=sink emit-signals=true max-buffers=2 drop=false sync=false
'''
pipeline = Gst.parse_launch(pipeline_str)
src = pipeline.get_by_name('src')
sink = pipeline.get_by_name('sink')
sink.connect('new-sample', on_buffer, None)
pipeline.set_state(Gst.State.PLAYING)
time.sleep(0.5)
print("[✓] Hailo hazır.")
# Ekran
if SHOW_DISPLAY:
os.environ.setdefault("DISPLAY", ":0")
cv2.namedWindow("Akilli Baston", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Akilli Baston", CAM_W, CAM_H)
# Ctrl+C
def handle_exit(sig, frame):
running[0] = False
signal.signal(signal.SIGINT, handle_exit)
# Frame push thread
def push_frames():
pts = 0
dur = int(1e9 / 30)
while running[0]:
frame = cam.capture_array()
with genis_frame_lock:
genis_frame[0] = frame.copy()
small = cv2.resize(frame, (HAILO_W, HAILO_H))
buf = Gst.Buffer.new_wrapped(bytes(small))
buf.pts = pts
buf.duration = dur
pts += dur
src.emit('push-buffer', buf)
push_thread = threading.Thread(target=push_frames, daemon=True)
push_thread.start()
print("[*] Çalışıyor. Çıkmak için Ctrl+C veya 'q'\n")
t_prev = time.time()
try:
while running[0]:
with tespitler_lock:
frame = son_frame
tespitler = list(son_tespitler)
if frame is None:
time.sleep(0.01)
continue
now = time.time()
fps = 1.0 / max(now - t_prev, 1e-6)
t_prev = now
fps_list.append(fps)
if len(fps_list) > 30:
fps_list.pop(0)
avg_fps = sum(fps_list) / len(fps_list)
frame = draw(frame.copy(), tespitler, avg_fps)
if SHOW_DISPLAY:
cv2.imshow("Akilli Baston", frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('s'):
fn = f"snap_{frame_count[0]}.jpg"
cv2.imwrite(fn, frame)
print(f"[✓] Kaydedildi: {fn}")
if frame_count[0] % 30 == 0 and frame_count[0] > 0:
with ultrasonik_lock:
us = ultrasonik_mesafe[0]
yakin_nesneler = [t[5] for t in tespitler if yakin_mi_kamera(t[5], t[7])]
cukur_str = f"ÇUKUR! ({us:.0f}cm)" if cukur_var_mi() else f"normal ({us:.0f}cm)" if us else "bagli degil"
print(f"Frame {frame_count[0]:5d} | FPS:{avg_fps:.1f} | "
f"Nesneler:{yakin_nesneler} | Zemin:{cukur_str}")
finally:
running[0] = False
src.emit('end-of-stream')
pipeline.set_state(Gst.State.NULL)
cam.stop()
ses_kuyrugu.put(None)
cv2.destroyAllWindows()
if fps_list:
print(f"\n[✓] Toplam frame : {frame_count[0]}")
print(f"[✓] Ortalama FPS : {sum(fps_list)/len(fps_list):.1f}")
if __name__ == "__main__":
main()Editor is loading...
Leave a Comment