Untitled
unknown
plain_text
a year ago
25 kB
10
Indexable
from nltk.corpus import brown from sklearn.model_selection import train_test_split import re from collections import defaultdict def clean_corpus(corpus): """ this function cleans a given corpus from its complex tags :param corpus: represents a given corpus to be cleaned :return: a cleaned corpus, with fixed complex tags """ cleaned_corpus = [] clean_tag_format = re.compile(r'[*]|--|[^+*-]+') for sentence in corpus: for i in range(len(sentence)): word, tag = sentence[i] clean_tag = clean_tag_format.match(tag).group() sentence[i] = (word, clean_tag) cleaned_corpus += [sentence] return cleaned_corpus class Tagger: UNKNOWN_WORD_TAG = "NN" START_TAG = "START TAG" STOP_TAG = "STOP TAG" # pseudo constants PSEUDO_THRESHOLD = 5 # common prefixes in the english language PREFIX_2 = {"be", "en", "em", "co", "im", "un", "in", "ir", "il", "de", "ab", "ex", "re", "bi"} PREFIX_3 = {"non", "dis", "pro", "tri", "uni", "sub", "out", "uni", "pre", "mal", "mis"} PREFIX_4 = {"ambi", "auto", "over", "circ", "anti", "hypo", "phil", "mono", "fore", "bene"} # common suffixes in the english language SUFFIX_2 = {"th", "or", "el", "en", "al", "ly", "es", "ed", "er", "al", "ic"} SUFFIX_3 = {"ies", "ful", "ble", "ous", "ent", "ant", "ist", "ise", "ize", "age", "ogy", "ers", "ale", "ine", "ows", "ine", "ile", "ing", "ish", "est", "dom", "ity", "ive", "ify"} SUFFIX_4 = {"ship", "tion", "ance", "hood", "port", "tude", "sion", "ence", "ness", "less", "ment", "able", "ible", "wood", "main", "ions"} def __init__(self): self.tag_count = defaultdict(int) self.word_tags_count = defaultdict(lambda: defaultdict(int)) self.word_predicted_tag = dict() self.transition_probabilities = defaultdict(float) self.emission_probabilities = defaultdict(float) self.tag_bigram = defaultdict(lambda: defaultdict(int)) self.pseudo_words_tag = defaultdict(lambda: defaultdict(int)) self.confusion_matrix = defaultdict(lambda: defaultdict(int)) def fit(self, training_set): """ this function will fit a Tagger HMM, with a given training set. the function will initialize self.tag_count, which represents a {tag: tag count in the corpus}, and will initialize self.word_tags_count, which represents a {word: {tag: tag count as a the word's tag} :param training_set: represents a given training set. :return: """ for sentence in training_set: cur_tag = Tagger.START_TAG self.tag_count[cur_tag] += 1 for word, tag in sentence: self.tag_count[tag] += 1 self.tag_bigram[cur_tag][tag] += 1 self.word_tags_count[word][tag] += 1 cur_tag = tag self.tag_count[Tagger.STOP_TAG] += 1 self.tag_bigram[cur_tag][Tagger.STOP_TAG] += 1 def fit_pseudo_train(self): """ this function will fit our tagger, using pseudo words for low-frequency words in the training data :return: """ # fits the training data word_freq = lambda w: sum(self.word_tags_count[w].values()) for word in self.word_tags_count: # checking if the word has less than 5 appearances in the training data if word_freq(word) < Tagger.PSEUDO_THRESHOLD: for tag in self.word_tags_count[word]: pseudo_word = self.convert_to_pseudo(word) self.pseudo_words_tag[pseudo_word][tag] += 1 else: self.pseudo_words_tag[word] = self.word_tags_count[word] def compute_most_likely_tag(self): """ this function computes for each word w, the most likely tag to be the w's tag. the function will use the MLE principle. :return: """ for word in self.word_tags_count: w_tags_count_dict = self.word_tags_count[word] values = list(w_tags_count_dict.values()) keys = list(w_tags_count_dict.keys()) # getting the tag with the most appearances as w's tag self.word_predicted_tag[word] = keys[values.index(max(values))] def error_rate(self, test_set, title, calculate_confusion_matrix=False, is_pseudo=False, is_mle=False): """ this function will calculate and print the known words error rate, the unknown words error rate, and the total error rate, for a given test set. :param test_set: represents a given test set which we calculate the error rate for. :return: """ known_words_counter, unknown_words_counter = 0, 0 known_words_errors, unknown_words_errors = 0, 0 # iterating on each word, and it's true_tag assigment in the test set for test_sentence in test_set: for word, true_tag in test_sentence: # checking if the word is a known word if word in self.word_tags_count: known_words_counter += 1 word = self.convert_to_pseudo(word) if is_pseudo else word predicted_tag = self.word_predicted_tag[word] # the prediction we make for a known word, is what we stored in word_predicted_tag dictionary known_words_errors = Tagger.check_tag_prediction(predicted_tag=predicted_tag, true_tag=true_tag, error_counter=known_words_errors) else: # if we got here, the word is an unknown word unknown_words_counter += 1 word = self.convert_to_pseudo(word) if is_pseudo else word # as we are assuming in (b), the tag we predict for an unknown word is "NN" predicted_tag = Tagger.UNKNOWN_WORD_TAG if is_mle else self.word_predicted_tag[word] unknown_words_errors = Tagger.check_tag_prediction(predicted_tag=predicted_tag, true_tag=true_tag, error_counter=unknown_words_errors) if calculate_confusion_matrix: self.confusion_matrix[true_tag][predicted_tag] += 1 # printing the results print("----- Results From " + title + " -----") print("The error rate for known words is: " + str(known_words_errors / known_words_counter)) print("The error rate for unknown words is: " + str(unknown_words_errors / unknown_words_counter)) print("The total error rate is: " + str((known_words_errors + unknown_words_errors) / (known_words_counter + unknown_words_counter))) if calculate_confusion_matrix: self.print_confusion_matrix() def print_confusion_matrix(self): """ this function will print the significant errors - for each true tag, we print the tag (different from the true tag) with the maximal amount of predictions to be the true tag. :return: """ print_task_format("Confusion Matrix") print("printing significant errors") for true_tag in self.confusion_matrix: # we want to investigate only the prediction errors, i.e the values that not located in the matrix diagonal # in addition, we want to print the significant errors - the tag with the maximal amount of # predictions to be the true tag. predictions_amount = list(self.confusion_matrix[true_tag].values()) predicted_tags = list(self.confusion_matrix[true_tag].keys()) maximal_predicted_tag = predicted_tags[predictions_amount.index(max(predictions_amount))] if true_tag != maximal_predicted_tag: print("The true tag was " + true_tag + " and we predicted " + maximal_predicted_tag + " " + str(self.confusion_matrix[true_tag][maximal_predicted_tag]) + " times") def compute_emission_probabilities(self, smoothed_delta=0, is_pseudo=False): """ this function calculates the emission probabilities of a bigram HMM tagger. :return: """ self.emission_probabilities.clear() vocabulary_size = len(self.word_tags_count) words_and_tags = self.pseudo_words_tag if is_pseudo else self.word_tags_count for w in words_and_tags: for w_tag in words_and_tags[w]: word_tag_amount = words_and_tags[w][w_tag] + smoothed_delta tag_amount = self.tag_count[w_tag] + (smoothed_delta * vocabulary_size) self.emission_probabilities[(w, w_tag)] = word_tag_amount / tag_amount def compute_transition_probabilities(self): """ this function calculates the transition probabilities of a bigram HMM tagger. :return: """ self.transition_probabilities.clear() for prev_tag in self.tag_bigram: for tag in self.tag_bigram[prev_tag]: count_tag_after = self.tag_bigram[prev_tag][tag] count_prev_tag_in_corpus = self.tag_count[prev_tag] self.transition_probabilities[(prev_tag, tag)] = count_tag_after / count_prev_tag_in_corpus def handling_unknown_words_emission_probabilities(self, sentence, words_and_tags, is_smoothing, smoothed_delta, is_pseudo): """ this function will handle with the unknown words - in case we use smoothing, the function will set the emission probability dictionary according to add-1 laplace definiton, otherwise, setting their emission probability with a "NN" tag to be 1, and the probability for the rest of the tags (p(word|tag)) is 0 :param sentence: represents a given sentence :param words_and_tags: represents the word and tags training data :param is_smoothing: represents an indicator for using smoothing :param smoothed_delta: represents the delta when using smoothing :return: """ vocabulary_size = len(words_and_tags) for word, word_tag in sentence: # checking if the word is a known word in our model if word not in words_and_tags: # handling the case where we use pseudo words and smoothing if is_pseudo and is_smoothing: pseudo_word = self.convert_to_pseudo(word) word_tag_amount = self.pseudo_words_tag[pseudo_word][word_tag] + smoothed_delta tag_amount = self.tag_count[word_tag] + smoothed_delta * vocabulary_size self.emission_probabilities[(pseudo_word, word_tag)] = word_tag_amount / tag_amount # handling the case where we use smoothing, without pseudo words elif is_smoothing and not is_pseudo: self.emission_probabilities[(word, word_tag)] = smoothed_delta / \ (self.tag_count[word_tag] + smoothed_delta * vocabulary_size) # handling the case where we use pseudo words, without smoothing elif is_pseudo and not is_smoothing: pseudo_word = self.convert_to_pseudo(word) word_tag_amount = self.pseudo_words_tag[pseudo_word][word_tag] tag_amount = self.tag_count[word_tag] self.emission_probabilities[(pseudo_word, word_tag)] = word_tag_amount / tag_amount # handling the case where we don't use smoothing and don't use pseudo words elif not is_smoothing and not is_pseudo: self.emission_probabilities[(word, Tagger.UNKNOWN_WORD_TAG)] = 1 def viterbi_algorithm_per_sentence(self, sentence, is_smoothing=False, is_pseudo=False, smoothed_delta=1): """ this function will run the viterbi algorithm on the given sentence :param sentence: represents a given sentence :return: the most probable assigment for the given sentence, in the format of [(word, most_probable_tag) for word in sentence] """ dp_pi = {} # creating a set of all the tags, except START_TAG and STOP_TAG state_tags = set(tag for tag in self.tag_count.keys() if tag != Tagger.START_TAG and tag != Tagger.STOP_TAG) self.handling_unknown_words_emission_probabilities(sentence, self.word_tags_count, is_smoothing, smoothed_delta, is_pseudo) # initializing our dynamic programing data structure for tag in state_tags: first_word = self.convert_to_pseudo(sentence[0][0]) if is_pseudo else sentence[0][0] dp_pi[tag, 0] = self.transition_probabilities[(Tagger.START_TAG, tag)] * \ self.emission_probabilities[(first_word, tag)] for i in range(1, len(sentence)): for tag in state_tags: cur_word = self.convert_to_pseudo(sentence[i][0]) if is_pseudo else sentence[i][0] cur_pi = [(dp_pi[k_tag, i - 1] * self.transition_probabilities[k_tag, tag] * self.emission_probabilities[(cur_word, tag)], k_tag) for k_tag in state_tags] # getting the maximal probability sorted_pi = sorted(cur_pi) maximal_tag = sorted_pi[-1][1] dp_pi[tag, i] = dp_pi[maximal_tag, i - 1] * self.transition_probabilities[maximal_tag, tag] * \ self.emission_probabilities[(cur_word, tag)] most_probable_assigment = [] for i in range(len(sentence) - 1, -1, -1): maximal_tag = sorted([(dp_pi[maximal_tag, i], maximal_tag) for maximal_tag in state_tags])[-1][1] most_probable_assigment.append((sentence[i][0], maximal_tag)) return most_probable_assigment[::-1] def viterbi_algorithm(self, test_set, is_smoothed=False, is_pseudo=False, smoothed_delta=0): """ this function will implement the viterbi algorithm. it will run the algorithm on each sentence in the given test set, and will map the words to their viterbi predicted tag. :param test_set: represents a given test set :return: """ for sentence in test_set: # calculating the most probable assigment for the current sentence sentence_viterbi_assigment = self.viterbi_algorithm_per_sentence(sentence, is_smoothed, is_pseudo, smoothed_delta) # updating the word's most probable tag, according to the viterbi algorithm for word, tag in sentence_viterbi_assigment: # checking if we need to convert the current word to it's pseudo version word = self.convert_to_pseudo(word) if is_pseudo else word self.word_predicted_tag[word] = tag def convert_to_pseudo(self, word): """ this function will convert a given word to a pseudo word, according to its context :param word: represents a given word to be converted to a pseudo :return: """ if word in self.word_tags_count: if sum(self.word_tags_count[word].values()) >= Tagger.PSEUDO_THRESHOLD: return word # handling word with digits if len(word) == 2 and word.isdigit(): return "twoDigitNum" if len(word) == 4 and word.isdigit(): return "fourDigitNum" if Tagger.check_pseudo_contains_digit(word): if Tagger.check_pseudo_contains_alpha(word): return "containsDigitAndAlpha" if "-" in word: return "containsDigitAndDash" if "/" in word: return "containsDigitAndSlash" if "," in word: return "containsDigitAndComma" if "." in word: return "containsDigitAndPeriod" return "otherNum" # handling word with capital letters if Tagger.check_all_capitals(word): return "allCaps" if Tagger.check_pseudo_first_capital(word): return "initCap" if "$" in word: return "MONEY" # handling prefix and suffix if len(word) >= 2: prefix_tag, suffix_tag = Tagger.pseudo_prefix(word), Tagger.pseudo_suffix(word) if prefix_tag != "OTHER" and suffix_tag != "OTHER": return prefix_tag + "-" + suffix_tag result = suffix_tag if suffix_tag != "OTHER" else prefix_tag return "lowerCase" if result == "OTHER" and word.islower() else result return "OTHER" @staticmethod def check_tag_prediction(predicted_tag, true_tag, error_counter): """ this function checks if the model classify the tags correctly. if it doesn't, the function will increment the error count. :param predicted_tag: represents the model predicted tag :param true_tag: represents the true tag :param error_counter: represents the current error tag :return: """ return error_counter + 1 if predicted_tag != true_tag else error_counter @staticmethod def check_pseudo_first_capital(word): """ this function checks if the first letter of the given word is a char :param word: represents a given word to be converted to a pseudo :return: """ return word[0].isupper() @staticmethod def check_all_capitals(word): """ this function checks if a given world contains only capital letters :param word: represents a given word to be converted to a pseudo :return: """ return word.isupper() @staticmethod def check_pseudo_contains_digit(word): """ this funtion check it a given world contains a digit :param word: represents a given word to be converted to a pseudo :return: true if it is, false otherwise """ return any(char.isdigit() for char in word) @staticmethod def check_pseudo_contains_alpha(word): """ this function checks if a given world contains a char :param word: represents a given word to be converted to a pseudo :return: true if it is, false otherwise """ return any(char.isalpha() for char in word) @staticmethod def pseudo_prefix(word): """ this function returns a pseudo prefix or "OTHER" pseudo :param word: represents a given word :return: pseudo prefix or "OTHER" pseudo """ return Tagger.get_pseudo_tag_prefix_suffix(word, [Tagger.PREFIX_4, Tagger.PREFIX_3, Tagger.PREFIX_2]) @staticmethod def pseudo_suffix(word): """ this function returns a pseudo suffix or "OTHER" pseudo :param word: represents a given word :return: pseudo suffix or "OTHER" pseudo """ return Tagger.get_pseudo_tag_prefix_suffix(word, [Tagger.SUFFIX_4, Tagger.SUFFIX_3, Tagger.SUFFIX_2], True) @staticmethod def get_pseudo_tag_prefix_suffix(word, pos_set, is_suffix=False): """ this function checks if the word prefix or suffix is in the common language prefixes and suffixes. the function will prefer to capture longer prefix/ suffix length rather then shorter. :param word: represents a given word :param pos_set: represents the given common prefixes or suffixes sets :param is_suffix: indicator for checking suffixes or prefixes :return: """ for pos, word_length in zip(pos_set, [4, 3, 2]): convert_to_prefix = lambda text: text.upper() if len(word) >= word_length: word_part = word[:word_length] if not is_suffix else word[-word_length:] if word_part in pos: return convert_to_prefix(word_part) return "OTHER" def print_task_format(task_title): """ this function prints a title format to the console, for convenience purposes. :param task_title: represents the title :return: """ print() print("====================================") print("|| " + task_title + " ||") print("====================================") def task_b(pos_tagger, test_set): """ this function will implement the relevant parts in order to complete task b :param pos_tagger: represents our Tagger instance :param test_set: represents a given test set :return: """ print_task_format("Task (b)") pos_tagger.compute_most_likely_tag() pos_tagger.error_rate(test_set, "Task (b)ii - error rates using MLE", is_mle=True) def task_c(pos_tagger, test_set): """ this function will implement the relevant parts in order to complete task c :param pos_tagger: represents our Tagger instance :param test_set: represents a given test set :return: """ print_task_format("Task (c)") pos_tagger.compute_emission_probabilities() pos_tagger.compute_transition_probabilities() pos_tagger.viterbi_algorithm(test_corpus) pos_tagger.error_rate(test_set, "Task (c)iii - error rates using Viterbi algorithm") def task_d(pos_tagger, test_set): """ this function will implement the relevant parts in order to complete task d :param pos_tagger: represents our Tagger instance :param test_set: represents a given test set :return: """ print_task_format("Task (d)") pos_tagger.compute_emission_probabilities(smoothed_delta=1) pos_tagger.compute_transition_probabilities() pos_tagger.viterbi_algorithm(test_corpus, is_smoothed=True, is_pseudo=False, smoothed_delta=1) pos_tagger.error_rate(test_set, "Task (d)iii - error rates using Viterbi algorithm with add-1 smoothing") def task_e(pos_tagger, test_set): """ this function will implement the relevant parts in order to complete task e :param pos_tagger: represents our Tagger instance :param test_set: represents a given test set :return: """ print_task_format("Task (e)") # designing a set of pseudo words from the low frequency words in the training data, # and from unknown words from the test set pos_tagger.fit_pseudo_train() # using pseudo words as well as mle pos_tagger.compute_emission_probabilities(smoothed_delta=0, is_pseudo=True) pos_tagger.compute_transition_probabilities() pos_tagger.viterbi_algorithm(test_set, is_pseudo=True) pos_tagger.error_rate(test_set, "Task (e)ii - error rates using Viterbi algorithm with pseudo words", is_pseudo=True) # using pseudo words as well as Add-1 smoothing pos_tagger.compute_emission_probabilities(smoothed_delta=1, is_pseudo=True) pos_tagger.viterbi_algorithm(test_set, is_smoothed=True, is_pseudo=True, smoothed_delta=1) pos_tagger.error_rate(test_set, "Task (e)iii - error rates using Viterbi algorithm with" " pseudo words and add-1 smoothing", calculate_confusion_matrix=True, is_pseudo=True) if __name__ == '__main__': # getting ntlk.brown corpus, and cleaning it brown_corpus = clean_corpus(brown.tagged_sents(categories="news")) # splitting the corpus to train and test sets training_corpus, test_corpus = train_test_split(brown_corpus, test_size=1.0 / 10, shuffle=False) tagger = Tagger() # fitting the tagger with the training data tagger.fit(training_corpus) # implementing the different tasks task_b(tagger, test_corpus) task_c(tagger, test_corpus) task_d(tagger, test_corpus) task_e(tagger, test_corpus)
Editor is loading...
Leave a Comment