sdasda

asda
mail@pastecode.io avatar
unknown
python
16 days ago
11 kB
1
Indexable
Never
import pandas as pd
import pyphen
import json
import spacy
import numpy as np
from preprocessing_corpus.feature.load_morphology import load_cls
import re
import joblib
import fasttext
import os

# get project root  path complex
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

nlp = spacy.load("it_core_news_sm")

model_path = project_root + "/dataset/it-3-7-192-1.json"
df_freq_morfemi_path = project_root + "/dataset/frequenza_morfemi_somma.csv"
sense_id_json_path = project_root + "/dataset/lemma_sense_count.json"
frequency_list_itwac_lemma_path = project_root + "/dataset/frequency-list_itwac_lemma.csv"
subtlex_it_path = project_root + "/dataset/subtlex-it.csv"
# Specify the base directory
base_directory_corpus = project_root+'/corpus_covicor'


class DataCreation:
    def __init__(self):
        self.nlp = nlp
        self.fasttext_vectors_path = project_root + "/dataset/cc.it.300.bin"

    def lemmatize_text(self, text):
        doc = nlp(text)
        lemmatized_text = ' '.join([token.lemma_ for token in doc])
        # print(token for token in doc)
        # print(lemmatized_text)

        return lemmatized_text

    def extract_pos_matrix(self, lemma):
        doc = self.nlp(lemma)

        pos_labels = [('VERB', ['VERB', 'AUX']),
                      ('NOUN', ['NOUN', 'PROPN']),
                      ('ADJ', ['ADJ']),
                      ('ADV', ['ADV']),
                      ('PRON', ['PRON']),
                      ('ADP', ['ADP']),
                      ('CONJ', ['CCONJ', 'SCONJ']),
                      ('DET', ['DET']),
                      ('NUM', ['NUM']),
                      ('INTJ', ['INTJ']),
                      ('X', ['X'])]

        pos_dict = {label: 0 for label, _ in pos_labels}  # Inizializza tutte le etichette POS a 0

        for token in doc:
            for label, pos_list in pos_labels:
                if token.pos_ in pos_list:
                    pos_dict[label] = 1
                    break
            else:
                pos_dict['X'] = 1  # Imposta 'X' a 1 se nessuna etichetta POS corrisponde

        pos_dict_sorted = dict(sorted(pos_dict.items()))

        return pos_dict_sorted

    def count_morphemes(self, model_path, parola):
        model = load_cls(model_path)
        predictions = model.predict([parola])
        flat_predictions = [item for sublist in predictions for item in sublist]
        morpheme_count = flat_predictions[0]

        return len(morpheme_count)

    def count_syllables(self, parola):
        dic_sillabazione = pyphen.Pyphen(lang='it_IT')
        sillabe = dic_sillabazione.inserted(parola)
        numero_sillabe = len(sillabe.split('-'))
        return numero_sillabe


    def frequency_lemma(self, lemma):
        df = pd.read_csv(frequency_list_itwac_lemma_path, sep=',')

        # Se il lemma è composto da più di una parola dopo la lemmatizzazione
        if len(lemma.split()) > 1:
            # Trova la parola più lunga tra quelle divise
            parole_divise = lemma.split()
            parola_piu_lunga = max(parole_divise, key=len)
            frequency = df.loc[df['lemma'] == parola_piu_lunga, 'frequency'].values
        else:
            frequency = df.loc[df['lemma'] == lemma, 'frequency'].values

        frequenza_lemma_log = np.log10(frequency) if frequency > 0 else 0

        return frequenza_lemma_log if isinstance(frequenza_lemma_log, (int, float)) else frequenza_lemma_log[0] if len(
            frequenza_lemma_log) > 0 else 0

    def frequency_lemma_subtlex(self, lemma):
        df_subtlex = pd.read_csv(subtlex_it_path, sep=',')

        # Se il lemma è composto da più di una parola dopo la lemmatizzazione
        if len(lemma.split()) > 1:
            # Trova la parola più lunga tra quelle divise
            parole_divise = lemma.split()
            parola_piu_lunga = max(parole_divise, key=len)
            lemma = parola_piu_lunga

        lemma_freq_df = df_subtlex[['dom_lemma', 'dom_lemma_freq']]

        lemma_row = lemma_freq_df.loc[lemma_freq_df['dom_lemma'] == lemma]

        if lemma_row.empty:
            return 0

        frequenza_lemma = lemma_row['dom_lemma_freq'].values[0]

        frequenza_lemma_log = np.log10(frequenza_lemma) if frequenza_lemma > 0 else 0

        return frequenza_lemma_log


    def calcola_sense_id(self, lemma, file_json):
        with open(file_json) as f:
            lemma_sense_count = json.load(f)

        if lemma in lemma_sense_count:
            return lemma_sense_count[lemma]
        else:
            return 0

    def is_stopword(self, parola):
        if parola.strip():  # Verifica se il testo non è vuoto
            return int(self.nlp(parola.lower())[0].is_stop)
        else:
            return 0

    def log_frequency_morfema_lessicale(self, parola):
        model = load_cls(model_path)
        predictions = model.predict([parola])
        morpheme_count = predictions[0][0]
        morfema_lungo = max(morpheme_count, key=len)

        df_freq_morfemi = pd.read_csv(df_freq_morfemi_path, sep=',')
        frequency_morfema = df_freq_morfemi.loc[df_freq_morfemi['Morfema'] == morfema_lungo, 'Somma'].values

        frequenza_morfema_log = np.log10(frequency_morfema[0]) if frequency_morfema and len(frequency_morfema) > 0 and \
                                                                  frequency_morfema[0] > 0 else 0

        return frequenza_morfema_log

    def vector_fasttext(self, parola):
        model = fasttext.load_model(self.fasttext_vectors_path)
        vector = model.get_word_vector(parola)
        # Converti il vettore in una lista di valori e ottieni la lunghezza del vettore
        vector_list = vector.tolist()
        vector_length = len(vector_list)
        # Genera i nomi delle colonne usando la lunghezza del vettore
        columns = [f'emb_{i}' for i in range(vector_length)]
        return dict(list(zip(columns, vector_list)))

    def estrai_features(self, parola, lemma):
        if len(parola) > 0:
            lunghezza = len(parola)
            numero_vocali = sum(1 for char in parola if char.lower() in 'aeiouàèéìòóù')
            numero_sillabe = self.count_syllables(parola)
            frequenza_lemma_val = self.frequency_lemma(lemma)
            numero_morfemi = self.count_morphemes(model_path, parola)
            senses_id = self.calcola_sense_id(lemma, sense_id_json_path)
            frequenza_lemma_subtlex = self.frequency_lemma_subtlex(lemma)
            pos_matrix = self.extract_pos_matrix(lemma)
            stopword_flag = self.is_stopword(parola)
            densita_morfologica = numero_morfemi / lunghezza if lunghezza > 0 else 0
            frequenza_morfema_lessicale_log = self.log_frequency_morfema_lessicale(parola)
            vec = self.vector_fasttext(parola)


            result = {
                'Parola': parola,
                'Lunghezza': lunghezza,
                'Numero_Vocali': numero_vocali,
                'Numero_Sillabe': numero_sillabe,
                'Numero_Morfemi': numero_morfemi,
                'Senses_ID': senses_id,
                'Frequenza_Lemma': frequenza_lemma_val,
                'Stopword': stopword_flag,
                'Densita_Morfologica': densita_morfologica,
                'Frequenza_morfema_lessicale_log': frequenza_morfema_lessicale_log,
                'Frequenza_Lemma_Subtlex': frequenza_lemma_subtlex,
                'POS_matrix': pos_matrix,
            }
            result.update(vec)
            print(result)
            return result


    def process(self, word):
        cleaned_word = re.sub(r'[^A-Za-zàèìòùáéíóú]+', '', word)

        # Chiamata al metodo lemmatize_text
        lemmatized_word = self.lemmatize_text(cleaned_word)

        # Rimani con il resto del tuo codice come era prima
        features = self.estrai_features(cleaned_word, lemmatized_word)
        # df = pd.DataFrame([features])

        return features

class Random_forest:
    def __init__(self):
        self.random_forest_model = project_root + "/models/random_forest_model_completo.pkl"
        self.model = self.load_model()
        self.data_creation = DataCreation()


    def load_model(self):
        loaded_model = joblib.load(self.random_forest_model)
        return loaded_model

    def predict(self, res: dict):
        data_dict = {"Parola": res['Parola'],
                     "Lunghezza": res['Lunghezza'],
                     "Numero_Vocali": res['Numero_Vocali'],
                     "Numero_Sillabe": res['Numero_Sillabe'],
                     "Numero_Morfemi": res['Numero_Morfemi'],
                     "Senses_ID": res['Senses_ID'],
                     "Frequenza_Lemma": res['Frequenza_Lemma'],
                     "Stopword": res['Stopword'],
                     "Densità_Morfologica": res['Densita_Morfologica'],
                     "Frequenza_morfema_lessicale_log": res['Frequenza_morfema_lessicale_log'],
                     "frequenza_lemma_subtlex_log": res['Frequenza_Lemma_Subtlex'],
                     "ADV": res.get('ADV', 0),
                    "CONJ": res.get('CONJ', 0),
                    "NOUN": res.get('NOUN', 0),
                    "SYM": res.get('SYM', 0),
                    "X": res.get('X', 0),
                    "VERB": res.get('VERB', 0),
                    "ADP": res.get('ADP', 0),
                    "NUM": res.get('NUM', 0),
                    "DET": res.get('DET', 0),
                    "ADJ": res.get('ADJ', 0),
                     "INTJ": res.get('INTJ', 0),
                     }

        vec = self.data_creation.vector_fasttext(res['Parola'])
        data_dict.update(vec)


        # Rimuovi le colonne che non ti servono
        df = pd.DataFrame([data_dict])
        df = df.drop(['Parola'], axis=1)


        # Make predictions between 0 and 1
        y_prob = self.model.predict_proba(df)[:, 1]

        # calcola la complessità della parola
        y_pred = self.model.predict(df)

        return y_pred[0], y_prob[0]


def find_txt_files(base_dir):
    txt_files = []
    for root, dirs, files in os.walk(base_dir):
        for file in files:
            if file.endswith('.txt'):
                txt_files.append(os.path.join(root, file))
    return txt_files


if __name__ == '__main__':
    dc = DataCreation()
    reg = Random_forest()
    txt_files = find_txt_files(base_directory_corpus)
    res_list = []
    for f in txt_files:
        with open(f, "r", encoding="utf-8") as file:
            frase = file.read()
            parole = frase.split()
            for word in parole:
                res = dc.process(word=word)
                if res is not None:
                    y_pred, y_prob = reg.predict(res)
                    res['pred'] = y_pred
                    res['prob'] = y_prob
                    res["file_path"] = f
                    res_list.append(res)

    df = pd.DataFrame(res_list)
    df.to_csv("output.csv", index=False)
Leave a Comment