Untitled
import numpy as np import cv2 as cv import glob import os import random from skimage.feature import hog from Parameters import Parameters from sklearn.svm import LinearSVC import pickle # Inițializare parametri params = Parameters() # Harta fișier-adnotări -> folder ANNOTATIONS_TO_FOLDER = { './antrenare/dad_annotations.txt': 'dad', './antrenare/deedee_annotations.txt': 'deedee', './antrenare/dexter_annotations.txt': 'dexter', './antrenare/mom_annotations.txt': 'mom' } class FacialDetector: def __init__(self, params): self.params = params self.best_model = None def load_annotations(self, file_path): """ Încarcă adnotările dintr-un fișier: image_name x_min y_min x_max y_max Returnează un dict: {image_name: [(x_min,y_min,x_max,y_max), ...]} """ annotations = {} with open(file_path, 'r') as f: for line in f: parts = line.strip().split() image_name = parts[0] x_min, y_min, x_max, y_max = map(int, parts[1:5]) if image_name not in annotations: annotations[image_name] = [] annotations[image_name].append((x_min, y_min, x_max, y_max)) return annotations def analyze_svm_training(self, positive_features, negative_features): """ Afișează scoruri, histograme, coeficienți SVM. """ if self.best_model is None: print("SVM-ul nu a fost antrenat încă!") return positive_scores = self.best_model.decision_function(positive_features) print(f"Scoruri pozitive (primele 10): {positive_scores[:10]}") negative_scores = self.best_model.decision_function(negative_features) print(f"Scoruri negative (primele 10): {negative_scores[:10]}") print("Coeficienți SVM (primele 10):") print(self.best_model.coef_[0][:10]) print(f"Bias-ul SVM: {self.best_model.intercept_[0]}") import matplotlib.pyplot as plt plt.hist(positive_scores, bins=30, alpha=0.5, label="Positive Scores") plt.hist(negative_scores, bins=30, alpha=0.5, label="Negative Scores") plt.axvline(x=0, color="red", linestyle="--", label="Decision Boundary") plt.legend() plt.title("Distribuția scorurilor SVM") plt.xlabel("Score") plt.ylabel("Count") plt.show() def train_classifier(self, positive_features, negative_features): """ Căutăm cel mai bun C pentru SVM. """ Cs = [10 ** -5, 10 ** -4, 10 ** -3, 10 ** -2, 10 ** -1, 10 ** 0, 10 ** 1] best_c = 0 best_accuracy = 0 best_model = None training_examples = np.vstack((positive_features, negative_features)) train_labels = np.hstack((np.ones(positive_features.shape[0]), np.zeros(negative_features.shape[0]))) print("Începem căutarea celui mai bun C...") for c in Cs: print(f"Antrenăm un clasificator pentru C = {c}") model = LinearSVC(C=c, max_iter=10000) model.fit(training_examples, train_labels) acc = model.score(training_examples, train_labels) print(f"Precizia pentru C = {c}: {acc}") if acc > best_accuracy: best_accuracy = acc best_c = c best_model = model self.best_model = best_model print(f"Cel mai bun C: {best_c} cu precizia: {best_accuracy}") model_path = os.path.join(self.params.dir_save_files, 'svm_model.pkl') with open(model_path, 'wb') as f: pickle.dump(best_model, f) print(f"Modelul optim SVM a fost salvat în {model_path}.") def generate_positive_patches(self, image_path, bounding_boxes): img = cv.imread(image_path, cv.IMREAD_GRAYSCALE) positive_patches = [] for (x_min, y_min, x_max, y_max) in bounding_boxes: patch = img[y_min:y_max, x_min:x_max] if patch is not None and patch.shape[0] > 0 and patch.shape[1] > 0: patch = cv.resize(patch, (self.params.dim_window, self.params.dim_window)) positive_patches.append(patch) return positive_patches def generate_negative_patches(self, image_path, bounding_boxes, num_negatives=10): """ Generează patch-uri negative. Asigurăm că nu se suprapune cu box-uri pozitive. """ img = cv.imread(image_path, cv.IMREAD_GRAYSCALE) if img is None: print(f"Eroare la încărcarea imaginii {image_path}.") return [] img_h, img_w = img.shape negative_patches = [] max_attempts = 50 for _ in range(num_negatives): attempts = 0 while attempts < max_attempts: x_min = random.randint(0, img_w - self.params.dim_window) y_min = random.randint(0, img_h - self.params.dim_window) x_max = x_min + self.params.dim_window y_max = y_min + self.params.dim_window intersects = False for (bx_min, by_min, bx_max, by_max) in bounding_boxes: if (x_min < bx_max and x_max > bx_min and y_min < by_max and y_max > by_min): intersects = True break if not intersects: patch = img[y_min:y_max, x_min:x_max] if patch is not None and patch.shape[0] > 0 and patch.shape[1] > 0: patch = cv.resize(patch, (self.params.dim_window, self.params.dim_window)) negative_patches.append(patch) break attempts += 1 return negative_patches def get_positive_descriptors(self): """ Citește imagini + adnotări => extrage patch-uri pozitive => HOG. """ annotations_files = [ './antrenare/dad_annotations.txt', './antrenare/deedee_annotations.txt', './antrenare/dexter_annotations.txt', './antrenare/mom_annotations.txt' ] positive_descriptors = [] for annotation_file in annotations_files: if not os.path.exists(annotation_file): print(f"Fișierul {annotation_file} nu există!") continue folder = ANNOTATIONS_TO_FOLDER[annotation_file] print(f"[INFO] Trecem la fișierul de adnotări: {annotation_file}, pentru folderul {folder}") annotations = self.load_annotations(annotation_file) processed_images = set() for image_name, bounding_boxes in annotations.items(): if image_name in processed_images: continue path = os.path.join('./antrenare', folder, image_name) if not os.path.exists(path): print(f"[WARN] Imaginea {image_name} nu există în folderul {folder}!") continue positive_patches = self.generate_positive_patches(path, bounding_boxes) from skimage.feature import hog for patch in positive_patches: hog_descriptor = hog( patch, pixels_per_cell=(self.params.dim_hog_cell, self.params.dim_hog_cell), cells_per_block=(2, 2), feature_vector=True ) positive_descriptors.append(hog_descriptor) processed_images.add(image_name) print(f"[INFO] Număr total de descriptori pozitivi generați: {len(positive_descriptors)}") return np.array(positive_descriptors) def get_negative_descriptors(self): """ Citește imagini + adnotări => extrage patch-uri negative => HOG. """ annotations_files = [ './antrenare/dad_annotations.txt', './antrenare/deedee_annotations.txt', './antrenare/dexter_annotations.txt', './antrenare/mom_annotations.txt' ] negative_descriptors = [] for annotation_file in annotations_files: if not os.path.exists(annotation_file): continue folder = ANNOTATIONS_TO_FOLDER[annotation_file] annotations = self.load_annotations(annotation_file) processed_images = set() for image_name, bounding_boxes in annotations.items(): if image_name in processed_images: continue path = os.path.join('./antrenare', folder, image_name) if not os.path.exists(path): continue negative_patches = self.generate_negative_patches(path, bounding_boxes, num_negatives=10) from skimage.feature import hog for patch in negative_patches: hog_descriptor = hog( patch, pixels_per_cell=(self.params.dim_hog_cell, self.params.dim_hog_cell), cells_per_block=(2, 2), feature_vector=True ) negative_descriptors.append(hog_descriptor) processed_images.add(image_name) print(f"[INFO] Număr total de descriptori negativi generați: {len(negative_descriptors)}") return np.array(negative_descriptors) def run(self): """ Detectarea fețelor folosind sliding window la MULTIPLE scale. """ test_images_path = os.path.join(self.params.dir_test_examples, '*.jpg') test_files = glob.glob(test_images_path) # liste pentru colectare detections_list = [] scores_list = [] file_names_list = [] # definim scări (exemplu) scales = [0.4, 0.6, 0.8, 1.0, 1.3, 1.6, 2.0] for test_file in test_files: img = cv.imread(test_file, cv.IMREAD_GRAYSCALE) if img is None: print(f"Imaginea {test_file} nu a putut fi încărcată.") continue original_h, original_w = img.shape for scale in scales: new_w = int(original_w * scale) new_h = int(original_h * scale) if new_w < self.params.dim_window or new_h < self.params.dim_window: continue resized_img = cv.resize(img, (new_w, new_h)) step = self.params.dim_window // 4 from skimage.feature import hog for y in range(0, new_h - self.params.dim_window, step): for x in range(0, new_w - self.params.dim_window, step): patch = resized_img[y:y + self.params.dim_window, x:x + self.params.dim_window] hog_descriptor = hog( patch, pixels_per_cell=(self.params.dim_hog_cell, self.params.dim_hog_cell), cells_per_block=(2, 2), feature_vector=True ) score = self.best_model.decision_function([hog_descriptor])[0] if score > self.params.threshold: # transformăm coordonatele x_min = int(x / scale) y_min = int(y / scale) x_max = int((x + self.params.dim_window) / scale) y_max = int((y + self.params.dim_window) / scale) detections_list.append([x_min, y_min, x_max, y_max]) scores_list.append(score) file_names_list.append(os.path.basename(test_file)) # convertim la np.array detections = np.array(detections_list) scores = np.array(scores_list) file_names = np.array(file_names_list) print("Aplicăm Non-Maximum Suppression...") final_boxes, final_scores, keep_idx = self.non_max_suppression(detections, scores, iou_threshold=0.5) final_file_names = file_names[keep_idx] print(f"Număr total de detecții după NMS: {len(final_boxes)}") return final_boxes, final_scores, final_file_names def eval_detections(self, detections, scores, file_names): """ Evaluează detectările și calculează AP. """ if not os.path.exists(self.params.path_annotations): print(f"Debug: Fișierul de adnotări nu există la calea: {self.params.path_annotations}") return else: print(f"Debug: Fișierul de adnotări a fost găsit la calea: {self.params.path_annotations}") # 1) Citim fișierul de adnotări ground_truth = {} total_gt = 0 with open(self.params.path_annotations, "r") as f: for line in f: parts = line.strip().split() file_name = parts[0] bbox = list(map(int, parts[1:5])) if file_name not in ground_truth: ground_truth[file_name] = [] ground_truth[file_name].append(bbox) total_gt += 1 # 2) Parcurgem toate detecțiile sub formă (file_name, score, x1,y1,x2,y2) all_dets = [] for i, fn in enumerate(file_names): box = detections[i] sc = scores[i] all_dets.append((fn, sc, box)) # Sortăm global după scor, descrescător all_dets = sorted(all_dets, key=lambda x: x[1], reverse=True) iou_threshold = 0.3 tp_array = [] fp_array = [] # Ținem evidența box-urilor GT deja asociate matched_gt = {} # {file_name: [False, False, ...] pt fiecare box} for fn in ground_truth: matched_gt[fn] = [False] * len(ground_truth[fn]) # 3) Marcare TP / FP for (fname, score, box_det) in all_dets: if fname not in ground_truth: # nu avem adnotări pt imaginea asta => FP tp_array.append(0) fp_array.append(1) continue gtlist = ground_truth[fname] best_iou = 0 best_idx = -1 for idx, gt_box in enumerate(gtlist): iou_val = self.compute_iou(box_det, gt_box) if iou_val > best_iou: best_iou = iou_val best_idx = idx if best_iou >= iou_threshold and best_idx != -1 and not matched_gt[fname][best_idx]: tp_array.append(1) fp_array.append(0) matched_gt[fname][best_idx] = True else: tp_array.append(0) fp_array.append(1) # 4) Calculăm precision-recall + AP tp_cum = np.cumsum(tp_array) fp_cum = np.cumsum(fp_array) recalls = tp_cum / float(total_gt) precisions = tp_cum / (tp_cum + fp_cum + 1e-10) ap = self.voc_ap(recalls, precisions) # exemplu funcție voc_ap # 5) Afișăm metrici la ultimul punct (adică la threshold fix) # (dar e un alt threshold, practic pe scor...) # Totuși, True Pos/False Pos/False Neg pot fi interpretate la final final_tp = tp_cum[-1] final_fp = fp_cum[-1] final_fn = total_gt - final_tp precision = final_tp / float(final_tp + final_fp + 1e-10) recall = final_tp / float(total_gt + 1e-10) print(f"Evaluare finală:") print(f"True Positives: {int(final_tp)}") print(f"False Positives: {int(final_fp)}") print(f"False Negatives: {int(final_fn)}") print(f"Precision: {precision:.4f}") print(f"Recall: {recall:.4f}") print(f"Average Precision: {ap:.4f}") # Salvăm results_path = os.path.join(self.params.dir_save_files, "evaluation_results.txt") with open(results_path, "w") as f: f.write(f"True Positives: {int(final_tp)}\n") f.write(f"False Positives: {int(final_fp)}\n") f.write(f"False Negatives: {int(final_fn)}\n") f.write(f"Precision: {precision:.4f}\n") f.write(f"Recall: {recall:.4f}\n") f.write(f"Average Precision: {ap:.4f}\n") print(f"Rezultatele evaluării au fost salvate în {results_path}.") def voc_ap(self, rec, prec): """ Computăm Average Precision folosind metoda "VOC2007 11-point interpolation" sau direct area sub PR. Aici facem "area sub curba" simplu: Metodă simplă: AP = sum((r[i] - r[i-1]) * p[i]) """ # Adăugăm puncte inițiale (0,1) mrec = np.concatenate(([0.0], rec, [1.0])) mpre = np.concatenate(([1.0], prec, [0.0])) # Pentru a fi robust, transformăm mpre în envelope for i in range(mpre.size - 1, 0, -1): mpre[i - 1] = max(mpre[i - 1], mpre[i]) # Apoi sumăm pe segmente unde recall crește i_idx = np.where(mrec[1:] != mrec[:-1])[0] ap = np.sum((mrec[i_idx + 1] - mrec[i_idx]) * mpre[i_idx + 1]) return ap def non_max_suppression(self, boxes, scores, iou_threshold=0.3): if len(boxes) == 0: return boxes, scores, [] order = np.argsort(scores)[::-1] keep = [] while len(order) > 0: i = order[0] keep.append(i) ious = [] for j in order[1:]: iou_ij = self.compute_iou(boxes[i], boxes[j]) ious.append(iou_ij) ious = np.array(ious) inds = np.where(ious <= iou_threshold)[0] order = order[inds + 1] keep = np.array(keep, dtype=int) final_boxes = boxes[keep] final_scores = scores[keep] return final_boxes, final_scores, keep def compute_iou(self, boxA, boxB): xA = max(boxA[0], boxB[0]) yA = max(boxA[1], boxB[1]) xB = min(boxA[2], boxB[2]) yB = min(boxA[3], boxB[3]) interW = max(0, xB - xA) interH = max(0, yB - yA) interArea = interW * interH boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1]) boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1]) iou = interArea / float(boxAArea + boxBArea - interArea + 1e-10) return iou # Inițializăm detectorul facial facial_detector = FacialDetector(params)
Leave a Comment