Untitled

 avatar
unknown
plain_text
a month ago
19 kB
4
Indexable
import numpy as np
import cv2 as cv
import glob
import os
import random
from skimage.feature import hog
from Parameters import Parameters
from sklearn.svm import LinearSVC
import pickle

# Inițializare parametri
params = Parameters()

# Harta fișier-adnotări -> folder
ANNOTATIONS_TO_FOLDER = {
    './antrenare/dad_annotations.txt': 'dad',
    './antrenare/deedee_annotations.txt': 'deedee',
    './antrenare/dexter_annotations.txt': 'dexter',
    './antrenare/mom_annotations.txt': 'mom'
}

class FacialDetector:
    def __init__(self, params):
        self.params = params
        self.best_model = None

    def load_annotations(self, file_path):
        """
        Încarcă adnotările dintr-un fișier: image_name x_min y_min x_max y_max
        Returnează un dict: {image_name: [(x_min,y_min,x_max,y_max), ...]}
        """
        annotations = {}
        with open(file_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                image_name = parts[0]
                x_min, y_min, x_max, y_max = map(int, parts[1:5])
                if image_name not in annotations:
                    annotations[image_name] = []
                annotations[image_name].append((x_min, y_min, x_max, y_max))
        return annotations

    def analyze_svm_training(self, positive_features, negative_features):
        """
        Afișează scoruri, histograme, coeficienți SVM.
        """
        if self.best_model is None:
            print("SVM-ul nu a fost antrenat încă!")
            return

        positive_scores = self.best_model.decision_function(positive_features)
        print(f"Scoruri pozitive (primele 10): {positive_scores[:10]}")

        negative_scores = self.best_model.decision_function(negative_features)
        print(f"Scoruri negative (primele 10): {negative_scores[:10]}")

        print("Coeficienți SVM (primele 10):")
        print(self.best_model.coef_[0][:10])

        print(f"Bias-ul SVM: {self.best_model.intercept_[0]}")

        import matplotlib.pyplot as plt
        plt.hist(positive_scores, bins=30, alpha=0.5, label="Positive Scores")
        plt.hist(negative_scores, bins=30, alpha=0.5, label="Negative Scores")
        plt.axvline(x=0, color="red", linestyle="--", label="Decision Boundary")
        plt.legend()
        plt.title("Distribuția scorurilor SVM")
        plt.xlabel("Score")
        plt.ylabel("Count")
        plt.show()

    def train_classifier(self, positive_features, negative_features):
        """
        Căutăm cel mai bun C pentru SVM.
        """
        Cs = [10 ** -5, 10 ** -4, 10 ** -3, 10 ** -2, 10 ** -1, 10 ** 0, 10 ** 1]
        best_c = 0
        best_accuracy = 0
        best_model = None

        training_examples = np.vstack((positive_features, negative_features))
        train_labels = np.hstack((np.ones(positive_features.shape[0]), np.zeros(negative_features.shape[0])))

        print("Începem căutarea celui mai bun C...")

        for c in Cs:
            print(f"Antrenăm un clasificator pentru C = {c}")
            model = LinearSVC(C=c, max_iter=10000)
            model.fit(training_examples, train_labels)
            acc = model.score(training_examples, train_labels)
            print(f"Precizia pentru C = {c}: {acc}")

            if acc > best_accuracy:
                best_accuracy = acc
                best_c = c
                best_model = model

        self.best_model = best_model
        print(f"Cel mai bun C: {best_c} cu precizia: {best_accuracy}")

        model_path = os.path.join(self.params.dir_save_files, 'svm_model.pkl')
        with open(model_path, 'wb') as f:
            pickle.dump(best_model, f)
        print(f"Modelul optim SVM a fost salvat în {model_path}.")

    def generate_positive_patches(self, image_path, bounding_boxes):
        img = cv.imread(image_path, cv.IMREAD_GRAYSCALE)
        positive_patches = []
        for (x_min, y_min, x_max, y_max) in bounding_boxes:
            patch = img[y_min:y_max, x_min:x_max]
            if patch is not None and patch.shape[0] > 0 and patch.shape[1] > 0:
                patch = cv.resize(patch, (self.params.dim_window, self.params.dim_window))
                positive_patches.append(patch)
        return positive_patches

    def generate_negative_patches(self, image_path, bounding_boxes, num_negatives=10):
        """
        Generează patch-uri negative. Asigurăm că nu se suprapune cu box-uri pozitive.
        """
        img = cv.imread(image_path, cv.IMREAD_GRAYSCALE)
        if img is None:
            print(f"Eroare la încărcarea imaginii {image_path}.")
            return []

        img_h, img_w = img.shape
        negative_patches = []
        max_attempts = 50
        for _ in range(num_negatives):
            attempts = 0
            while attempts < max_attempts:
                x_min = random.randint(0, img_w - self.params.dim_window)
                y_min = random.randint(0, img_h - self.params.dim_window)
                x_max = x_min + self.params.dim_window
                y_max = y_min + self.params.dim_window

                intersects = False
                for (bx_min, by_min, bx_max, by_max) in bounding_boxes:
                    if (x_min < bx_max and x_max > bx_min and
                            y_min < by_max and y_max > by_min):
                        intersects = True
                        break
                if not intersects:
                    patch = img[y_min:y_max, x_min:x_max]
                    if patch is not None and patch.shape[0] > 0 and patch.shape[1] > 0:
                        patch = cv.resize(patch, (self.params.dim_window, self.params.dim_window))
                        negative_patches.append(patch)
                    break
                attempts += 1
        return negative_patches

    def get_positive_descriptors(self):
        """
        Citește imagini + adnotări => extrage patch-uri pozitive => HOG.
        """
        annotations_files = [
            './antrenare/dad_annotations.txt',
            './antrenare/deedee_annotations.txt',
            './antrenare/dexter_annotations.txt',
            './antrenare/mom_annotations.txt'
        ]
        positive_descriptors = []

        for annotation_file in annotations_files:
            if not os.path.exists(annotation_file):
                print(f"Fișierul {annotation_file} nu există!")
                continue

            folder = ANNOTATIONS_TO_FOLDER[annotation_file]
            print(f"[INFO] Trecem la fișierul de adnotări: {annotation_file}, pentru folderul {folder}")
            annotations = self.load_annotations(annotation_file)

            processed_images = set()
            for image_name, bounding_boxes in annotations.items():
                if image_name in processed_images:
                    continue

                path = os.path.join('./antrenare', folder, image_name)
                if not os.path.exists(path):
                    print(f"[WARN] Imaginea {image_name} nu există în folderul {folder}!")
                    continue

                positive_patches = self.generate_positive_patches(path, bounding_boxes)

                from skimage.feature import hog
                for patch in positive_patches:
                    hog_descriptor = hog(
                        patch,
                        pixels_per_cell=(self.params.dim_hog_cell, self.params.dim_hog_cell),
                        cells_per_block=(2, 2),
                        feature_vector=True
                    )
                    positive_descriptors.append(hog_descriptor)

                processed_images.add(image_name)

        print(f"[INFO] Număr total de descriptori pozitivi generați: {len(positive_descriptors)}")
        return np.array(positive_descriptors)

    def get_negative_descriptors(self):
        """
        Citește imagini + adnotări => extrage patch-uri negative => HOG.
        """
        annotations_files = [
            './antrenare/dad_annotations.txt',
            './antrenare/deedee_annotations.txt',
            './antrenare/dexter_annotations.txt',
            './antrenare/mom_annotations.txt'
        ]
        negative_descriptors = []

        for annotation_file in annotations_files:
            if not os.path.exists(annotation_file):
                continue

            folder = ANNOTATIONS_TO_FOLDER[annotation_file]
            annotations = self.load_annotations(annotation_file)

            processed_images = set()
            for image_name, bounding_boxes in annotations.items():
                if image_name in processed_images:
                    continue

                path = os.path.join('./antrenare', folder, image_name)
                if not os.path.exists(path):
                    continue

                negative_patches = self.generate_negative_patches(path, bounding_boxes, num_negatives=10)
                from skimage.feature import hog
                for patch in negative_patches:
                    hog_descriptor = hog(
                        patch,
                        pixels_per_cell=(self.params.dim_hog_cell, self.params.dim_hog_cell),
                        cells_per_block=(2, 2),
                        feature_vector=True
                    )
                    negative_descriptors.append(hog_descriptor)

                processed_images.add(image_name)

        print(f"[INFO] Număr total de descriptori negativi generați: {len(negative_descriptors)}")
        return np.array(negative_descriptors)

    def run(self):
        """
        Detectarea fețelor folosind sliding window la MULTIPLE scale.
        """
        test_images_path = os.path.join(self.params.dir_test_examples, '*.jpg')
        test_files = glob.glob(test_images_path)

        # liste pentru colectare
        detections_list = []
        scores_list = []
        file_names_list = []

        # definim scări (exemplu)
        scales = [0.4, 0.6, 0.8, 1.0, 1.3, 1.6, 2.0]

        for test_file in test_files:
            img = cv.imread(test_file, cv.IMREAD_GRAYSCALE)
            if img is None:
                print(f"Imaginea {test_file} nu a putut fi încărcată.")
                continue

            original_h, original_w = img.shape

            for scale in scales:
                new_w = int(original_w * scale)
                new_h = int(original_h * scale)
                if new_w < self.params.dim_window or new_h < self.params.dim_window:
                    continue

                resized_img = cv.resize(img, (new_w, new_h))

                step = self.params.dim_window // 4
                from skimage.feature import hog
                for y in range(0, new_h - self.params.dim_window, step):
                    for x in range(0, new_w - self.params.dim_window, step):
                        patch = resized_img[y:y + self.params.dim_window, x:x + self.params.dim_window]
                        hog_descriptor = hog(
                            patch,
                            pixels_per_cell=(self.params.dim_hog_cell, self.params.dim_hog_cell),
                            cells_per_block=(2, 2),
                            feature_vector=True
                        )
                        score = self.best_model.decision_function([hog_descriptor])[0]
                        if score > self.params.threshold:
                            # transformăm coordonatele
                            x_min = int(x / scale)
                            y_min = int(y / scale)
                            x_max = int((x + self.params.dim_window) / scale)
                            y_max = int((y + self.params.dim_window) / scale)

                            detections_list.append([x_min, y_min, x_max, y_max])
                            scores_list.append(score)
                            file_names_list.append(os.path.basename(test_file))

        # convertim la np.array
        detections = np.array(detections_list)
        scores = np.array(scores_list)
        file_names = np.array(file_names_list)

        print("Aplicăm Non-Maximum Suppression...")
        final_boxes, final_scores, keep_idx = self.non_max_suppression(detections, scores, iou_threshold=0.5)
        final_file_names = file_names[keep_idx]

        print(f"Număr total de detecții după NMS: {len(final_boxes)}")
        return final_boxes, final_scores, final_file_names

    def eval_detections(self, detections, scores, file_names):
        """
        Evaluează detectările și calculează AP.
        """
        if not os.path.exists(self.params.path_annotations):
            print(f"Debug: Fișierul de adnotări nu există la calea: {self.params.path_annotations}")
            return
        else:
            print(f"Debug: Fișierul de adnotări a fost găsit la calea: {self.params.path_annotations}")

        # 1) Citim fișierul de adnotări
        ground_truth = {}
        total_gt = 0
        with open(self.params.path_annotations, "r") as f:
            for line in f:
                parts = line.strip().split()
                file_name = parts[0]
                bbox = list(map(int, parts[1:5]))
                if file_name not in ground_truth:
                    ground_truth[file_name] = []
                ground_truth[file_name].append(bbox)
                total_gt += 1

        # 2) Parcurgem toate detecțiile sub formă (file_name, score, x1,y1,x2,y2)
        all_dets = []
        for i, fn in enumerate(file_names):
            box = detections[i]
            sc = scores[i]
            all_dets.append((fn, sc, box))

        # Sortăm global după scor, descrescător
        all_dets = sorted(all_dets, key=lambda x: x[1], reverse=True)

        iou_threshold = 0.3
        tp_array = []
        fp_array = []
        # Ținem evidența box-urilor GT deja asociate
        matched_gt = {}  # {file_name: [False, False, ...] pt fiecare box}
        for fn in ground_truth:
            matched_gt[fn] = [False] * len(ground_truth[fn])

        # 3) Marcare TP / FP
        for (fname, score, box_det) in all_dets:
            if fname not in ground_truth:
                # nu avem adnotări pt imaginea asta => FP
                tp_array.append(0)
                fp_array.append(1)
                continue
            gtlist = ground_truth[fname]
            best_iou = 0
            best_idx = -1
            for idx, gt_box in enumerate(gtlist):
                iou_val = self.compute_iou(box_det, gt_box)
                if iou_val > best_iou:
                    best_iou = iou_val
                    best_idx = idx
            if best_iou >= iou_threshold and best_idx != -1 and not matched_gt[fname][best_idx]:
                tp_array.append(1)
                fp_array.append(0)
                matched_gt[fname][best_idx] = True
            else:
                tp_array.append(0)
                fp_array.append(1)

        # 4) Calculăm precision-recall + AP
        tp_cum = np.cumsum(tp_array)
        fp_cum = np.cumsum(fp_array)
        recalls = tp_cum / float(total_gt)
        precisions = tp_cum / (tp_cum + fp_cum + 1e-10)
        ap = self.voc_ap(recalls, precisions)  # exemplu funcție voc_ap

        # 5) Afișăm metrici la ultimul punct (adică la threshold fix)
        # (dar e un alt threshold, practic pe scor...)
        # Totuși, True Pos/False Pos/False Neg pot fi interpretate la final
        final_tp = tp_cum[-1]
        final_fp = fp_cum[-1]
        final_fn = total_gt - final_tp

        precision = final_tp / float(final_tp + final_fp + 1e-10)
        recall = final_tp / float(total_gt + 1e-10)

        print(f"Evaluare finală:")
        print(f"True Positives: {int(final_tp)}")
        print(f"False Positives: {int(final_fp)}")
        print(f"False Negatives: {int(final_fn)}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"Average Precision: {ap:.4f}")

        # Salvăm
        results_path = os.path.join(self.params.dir_save_files, "evaluation_results.txt")
        with open(results_path, "w") as f:
            f.write(f"True Positives: {int(final_tp)}\n")
            f.write(f"False Positives: {int(final_fp)}\n")
            f.write(f"False Negatives: {int(final_fn)}\n")
            f.write(f"Precision: {precision:.4f}\n")
            f.write(f"Recall: {recall:.4f}\n")
            f.write(f"Average Precision: {ap:.4f}\n")
        print(f"Rezultatele evaluării au fost salvate în {results_path}.")

    def voc_ap(self, rec, prec):
        """
        Computăm Average Precision folosind metoda "VOC2007 11-point interpolation"
        sau direct area sub PR. Aici facem "area sub curba" simplu:

        Metodă simplă: AP = sum((r[i] - r[i-1]) * p[i])
        """
        # Adăugăm puncte inițiale (0,1)
        mrec = np.concatenate(([0.0], rec, [1.0]))
        mpre = np.concatenate(([1.0], prec, [0.0]))

        # Pentru a fi robust, transformăm mpre în envelope
        for i in range(mpre.size - 1, 0, -1):
            mpre[i - 1] = max(mpre[i - 1], mpre[i])

        # Apoi sumăm pe segmente unde recall crește
        i_idx = np.where(mrec[1:] != mrec[:-1])[0]
        ap = np.sum((mrec[i_idx + 1] - mrec[i_idx]) * mpre[i_idx + 1])
        return ap

    def non_max_suppression(self, boxes, scores, iou_threshold=0.3):
        if len(boxes) == 0:
            return boxes, scores, []

        order = np.argsort(scores)[::-1]
        keep = []
        while len(order) > 0:
            i = order[0]
            keep.append(i)

            ious = []
            for j in order[1:]:
                iou_ij = self.compute_iou(boxes[i], boxes[j])
                ious.append(iou_ij)

            ious = np.array(ious)
            inds = np.where(ious <= iou_threshold)[0]
            order = order[inds + 1]

        keep = np.array(keep, dtype=int)
        final_boxes = boxes[keep]
        final_scores = scores[keep]
        return final_boxes, final_scores, keep

    def compute_iou(self, boxA, boxB):
        xA = max(boxA[0], boxB[0])
        yA = max(boxA[1], boxB[1])
        xB = min(boxA[2], boxB[2])
        yB = min(boxA[3], boxB[3])

        interW = max(0, xB - xA)
        interH = max(0, yB - yA)
        interArea = interW * interH

        boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
        boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
        iou = interArea / float(boxAArea + boxBArea - interArea + 1e-10)
        return iou

# Inițializăm detectorul facial
facial_detector = FacialDetector(params)
Leave a Comment