Untitled

 avatar
unknown
plain_text
a year ago
25 kB
10
Indexable
from nltk.corpus import brown
from sklearn.model_selection import train_test_split
import re
from collections import defaultdict


def clean_corpus(corpus):
    """
    this function cleans a given corpus from its complex tags
    :param corpus: represents a given corpus to be cleaned
    :return: a cleaned corpus, with fixed complex tags
    """
    cleaned_corpus = []
    clean_tag_format = re.compile(r'[*]|--|[^+*-]+')
    for sentence in corpus:
        for i in range(len(sentence)):
            word, tag = sentence[i]
            clean_tag = clean_tag_format.match(tag).group()
            sentence[i] = (word, clean_tag)
        cleaned_corpus += [sentence]
    return cleaned_corpus


class Tagger:
    UNKNOWN_WORD_TAG = "NN"
    START_TAG = "START TAG"
    STOP_TAG = "STOP TAG"
    # pseudo constants
    PSEUDO_THRESHOLD = 5
    # common prefixes in the english language
    PREFIX_2 = {"be", "en", "em", "co", "im", "un", "in", "ir", "il", "de", "ab", "ex", "re", "bi"}
    PREFIX_3 = {"non", "dis", "pro", "tri", "uni", "sub", "out", "uni", "pre", "mal", "mis"}
    PREFIX_4 = {"ambi", "auto", "over", "circ", "anti", "hypo", "phil", "mono", "fore", "bene"}

    # common suffixes in the english language
    SUFFIX_2 = {"th", "or", "el", "en", "al", "ly", "es", "ed", "er", "al", "ic"}
    SUFFIX_3 = {"ies", "ful", "ble", "ous", "ent", "ant", "ist", "ise", "ize", "age", "ogy", "ers", "ale", "ine", "ows",
                "ine", "ile", "ing", "ish", "est", "dom", "ity", "ive", "ify"}
    SUFFIX_4 = {"ship", "tion", "ance", "hood", "port", "tude", "sion", "ence", "ness", "less", "ment", "able", "ible",
                "wood", "main", "ions"}

    def __init__(self):
        self.tag_count = defaultdict(int)
        self.word_tags_count = defaultdict(lambda: defaultdict(int))
        self.word_predicted_tag = dict()
        self.transition_probabilities = defaultdict(float)
        self.emission_probabilities = defaultdict(float)
        self.tag_bigram = defaultdict(lambda: defaultdict(int))
        self.pseudo_words_tag = defaultdict(lambda: defaultdict(int))
        self.confusion_matrix = defaultdict(lambda: defaultdict(int))

    def fit(self, training_set):
        """
        this function will fit a Tagger HMM, with a given training set.
        the function will initialize self.tag_count, which represents a {tag: tag count in the corpus},
        and will initialize self.word_tags_count, which represents a {word: {tag: tag count as a the word's tag}
        :param training_set: represents a given training set.
        :return:
        """
        for sentence in training_set:
            cur_tag = Tagger.START_TAG
            self.tag_count[cur_tag] += 1
            for word, tag in sentence:
                self.tag_count[tag] += 1
                self.tag_bigram[cur_tag][tag] += 1
                self.word_tags_count[word][tag] += 1
                cur_tag = tag
            self.tag_count[Tagger.STOP_TAG] += 1
            self.tag_bigram[cur_tag][Tagger.STOP_TAG] += 1

    def fit_pseudo_train(self):
        """
        this function will fit our tagger, using pseudo words for low-frequency words in the training data
        :return:
        """
        # fits the training data
        word_freq = lambda w: sum(self.word_tags_count[w].values())
        for word in self.word_tags_count:
            # checking if the word has less than 5 appearances in the training data
            if word_freq(word) < Tagger.PSEUDO_THRESHOLD:
                for tag in self.word_tags_count[word]:
                    pseudo_word = self.convert_to_pseudo(word)
                    self.pseudo_words_tag[pseudo_word][tag] += 1
            else:
                self.pseudo_words_tag[word] = self.word_tags_count[word]

    def compute_most_likely_tag(self):
        """
        this function computes for each word w, the most likely tag to be the w's tag.
        the function will use the MLE principle.
        :return:
        """
        for word in self.word_tags_count:
            w_tags_count_dict = self.word_tags_count[word]
            values = list(w_tags_count_dict.values())
            keys = list(w_tags_count_dict.keys())
            # getting the tag with the most appearances as w's tag
            self.word_predicted_tag[word] = keys[values.index(max(values))]

    def error_rate(self, test_set, title, calculate_confusion_matrix=False, is_pseudo=False,
                   is_mle=False):
        """
        this function will calculate and print the known words error rate,
        the unknown words error rate, and the total error rate, for a given test set.
        :param test_set: represents a given test set which we calculate the error rate for.
        :return:
        """
        known_words_counter, unknown_words_counter = 0, 0
        known_words_errors, unknown_words_errors = 0, 0
        # iterating on each word, and it's true_tag assigment in the test set
        for test_sentence in test_set:
            for word, true_tag in test_sentence:
                # checking if the word is a known word
                if word in self.word_tags_count:
                    known_words_counter += 1
                    word = self.convert_to_pseudo(word) if is_pseudo else word
                    predicted_tag = self.word_predicted_tag[word]
                    # the prediction we make for a known word, is what we stored in word_predicted_tag dictionary
                    known_words_errors = Tagger.check_tag_prediction(predicted_tag=predicted_tag,
                                                                     true_tag=true_tag,
                                                                     error_counter=known_words_errors)

                else:
                    # if we got here, the word is an unknown word
                    unknown_words_counter += 1
                    word = self.convert_to_pseudo(word) if is_pseudo else word
                    # as we are assuming in (b), the tag we predict for an unknown word is "NN"
                    predicted_tag = Tagger.UNKNOWN_WORD_TAG if is_mle else self.word_predicted_tag[word]
                    unknown_words_errors = Tagger.check_tag_prediction(predicted_tag=predicted_tag,
                                                                       true_tag=true_tag,
                                                                       error_counter=unknown_words_errors)
                if calculate_confusion_matrix:
                    self.confusion_matrix[true_tag][predicted_tag] += 1
        # printing the results
        print("----- Results From " + title + " -----")
        print("The error rate for known words is: " + str(known_words_errors / known_words_counter))
        print("The error rate for unknown words is: " + str(unknown_words_errors / unknown_words_counter))
        print("The total error rate is: " + str((known_words_errors + unknown_words_errors) /
                                                (known_words_counter + unknown_words_counter)))
        if calculate_confusion_matrix:
            self.print_confusion_matrix()

    def print_confusion_matrix(self):
        """
        this function will print the significant errors - for each true tag, we print the
        tag (different from the true tag) with the maximal amount of predictions to be the true tag.
        :return:
        """
        print_task_format("Confusion Matrix")
        print("printing significant errors")
        for true_tag in self.confusion_matrix:
            # we want to investigate only the prediction errors, i.e the values that not located in the matrix diagonal
            # in addition, we want to print the significant errors - the tag with the maximal amount of
            # predictions to be the true tag.
            predictions_amount = list(self.confusion_matrix[true_tag].values())
            predicted_tags = list(self.confusion_matrix[true_tag].keys())
            maximal_predicted_tag = predicted_tags[predictions_amount.index(max(predictions_amount))]

            if true_tag != maximal_predicted_tag:
                print("The true tag was " + true_tag + " and we predicted " + maximal_predicted_tag + " " +
                      str(self.confusion_matrix[true_tag][maximal_predicted_tag]) + " times")

    def compute_emission_probabilities(self, smoothed_delta=0, is_pseudo=False):
        """
        this function calculates the emission probabilities of a bigram HMM tagger.
        :return:
        """
        self.emission_probabilities.clear()
        vocabulary_size = len(self.word_tags_count)
        words_and_tags = self.pseudo_words_tag if is_pseudo else self.word_tags_count
        for w in words_and_tags:
            for w_tag in words_and_tags[w]:
                word_tag_amount = words_and_tags[w][w_tag] + smoothed_delta
                tag_amount = self.tag_count[w_tag] + (smoothed_delta * vocabulary_size)
                self.emission_probabilities[(w, w_tag)] = word_tag_amount / tag_amount

    def compute_transition_probabilities(self):
        """
        this function calculates the transition probabilities of a bigram HMM tagger.
        :return:
        """
        self.transition_probabilities.clear()
        for prev_tag in self.tag_bigram:
            for tag in self.tag_bigram[prev_tag]:
                count_tag_after = self.tag_bigram[prev_tag][tag]
                count_prev_tag_in_corpus = self.tag_count[prev_tag]
                self.transition_probabilities[(prev_tag, tag)] = count_tag_after / count_prev_tag_in_corpus

    def handling_unknown_words_emission_probabilities(self, sentence, words_and_tags, is_smoothing,
                                                      smoothed_delta, is_pseudo):
        """
        this function will handle with the unknown words - in case we use smoothing, the function will set the
        emission probability dictionary according to add-1 laplace definiton,
        otherwise, setting their emission probability with a "NN" tag to be 1,
        and the probability for the rest of the tags (p(word|tag)) is 0
        :param sentence: represents a given sentence
        :param words_and_tags: represents the word and tags training data
        :param is_smoothing: represents an indicator for using smoothing
        :param smoothed_delta: represents the delta when using smoothing
        :return:
        """
        vocabulary_size = len(words_and_tags)
        for word, word_tag in sentence:
            # checking if the word is a known word in our model
            if word not in words_and_tags:
                # handling the case where we use pseudo words and smoothing
                if is_pseudo and is_smoothing:
                    pseudo_word = self.convert_to_pseudo(word)
                    word_tag_amount = self.pseudo_words_tag[pseudo_word][word_tag] + smoothed_delta
                    tag_amount = self.tag_count[word_tag] + smoothed_delta * vocabulary_size
                    self.emission_probabilities[(pseudo_word, word_tag)] = word_tag_amount / tag_amount

                # handling the case where we use smoothing, without pseudo words
                elif is_smoothing and not is_pseudo:
                    self.emission_probabilities[(word, word_tag)] = smoothed_delta / \
                                                                    (self.tag_count[word_tag] +
                                                                     smoothed_delta * vocabulary_size)
                # handling the case where we use pseudo words, without smoothing
                elif is_pseudo and not is_smoothing:
                    pseudo_word = self.convert_to_pseudo(word)
                    word_tag_amount = self.pseudo_words_tag[pseudo_word][word_tag]
                    tag_amount = self.tag_count[word_tag]
                    self.emission_probabilities[(pseudo_word, word_tag)] = word_tag_amount / tag_amount
                # handling the case where we don't use smoothing and don't use pseudo words
                elif not is_smoothing and not is_pseudo:
                    self.emission_probabilities[(word, Tagger.UNKNOWN_WORD_TAG)] = 1

    def viterbi_algorithm_per_sentence(self, sentence, is_smoothing=False, is_pseudo=False, smoothed_delta=1):
        """
        this function will run the viterbi algorithm on the given sentence
        :param sentence: represents a given sentence
        :return: the most probable assigment for the given sentence,
        in the format of [(word, most_probable_tag) for word in sentence]
        """
        dp_pi = {}
        # creating a set of all the tags, except START_TAG and STOP_TAG
        state_tags = set(tag for tag in self.tag_count.keys() if tag != Tagger.START_TAG and tag != Tagger.STOP_TAG)
        self.handling_unknown_words_emission_probabilities(sentence, self.word_tags_count, is_smoothing,
                                                           smoothed_delta, is_pseudo)
        # initializing our dynamic programing data structure
        for tag in state_tags:
            first_word = self.convert_to_pseudo(sentence[0][0]) if is_pseudo else sentence[0][0]
            dp_pi[tag, 0] = self.transition_probabilities[(Tagger.START_TAG, tag)] * \
                            self.emission_probabilities[(first_word, tag)]
        for i in range(1, len(sentence)):
            for tag in state_tags:
                cur_word = self.convert_to_pseudo(sentence[i][0]) if is_pseudo else sentence[i][0]
                cur_pi = [(dp_pi[k_tag, i - 1] * self.transition_probabilities[k_tag, tag] *
                           self.emission_probabilities[(cur_word, tag)], k_tag) for k_tag in state_tags]
                # getting the maximal probability
                sorted_pi = sorted(cur_pi)
                maximal_tag = sorted_pi[-1][1]
                dp_pi[tag, i] = dp_pi[maximal_tag, i - 1] * self.transition_probabilities[maximal_tag, tag] * \
                                self.emission_probabilities[(cur_word, tag)]
        most_probable_assigment = []
        for i in range(len(sentence) - 1, -1, -1):
            maximal_tag = sorted([(dp_pi[maximal_tag, i], maximal_tag) for maximal_tag in state_tags])[-1][1]
            most_probable_assigment.append((sentence[i][0], maximal_tag))
        return most_probable_assigment[::-1]

    def viterbi_algorithm(self, test_set, is_smoothed=False, is_pseudo=False, smoothed_delta=0):
        """
        this function will implement the viterbi algorithm.
        it will run the algorithm on each sentence in the given test set, and will map the words to their
        viterbi predicted tag.
        :param test_set: represents a given test set
        :return:
        """
        for sentence in test_set:
            # calculating the most probable assigment for the current sentence
            sentence_viterbi_assigment = self.viterbi_algorithm_per_sentence(sentence, is_smoothed, is_pseudo,
                                                                             smoothed_delta)
            # updating the word's most probable tag, according to the viterbi algorithm
            for word, tag in sentence_viterbi_assigment:
                # checking if we need to convert the current word to it's pseudo version
                word = self.convert_to_pseudo(word) if is_pseudo else word
                self.word_predicted_tag[word] = tag

    def convert_to_pseudo(self, word):
        """
        this function will convert a given word to a pseudo word, according to its context
        :param word: represents a given word to be converted to a pseudo
        :return:
        """
        if word in self.word_tags_count:
            if sum(self.word_tags_count[word].values()) >= Tagger.PSEUDO_THRESHOLD:
                return word
                # handling word with digits
        if len(word) == 2 and word.isdigit():
            return "twoDigitNum"
        if len(word) == 4 and word.isdigit():
            return "fourDigitNum"
        if Tagger.check_pseudo_contains_digit(word):
            if Tagger.check_pseudo_contains_alpha(word):
                return "containsDigitAndAlpha"
            if "-" in word:
                return "containsDigitAndDash"
            if "/" in word:
                return "containsDigitAndSlash"
            if "," in word:
                return "containsDigitAndComma"
            if "." in word:
                return "containsDigitAndPeriod"
            return "otherNum"
        # handling word with capital letters
        if Tagger.check_all_capitals(word):
            return "allCaps"
        if Tagger.check_pseudo_first_capital(word):
            return "initCap"
        if "$" in word:
            return "MONEY"
        # handling prefix and suffix

        if len(word) >= 2:
            prefix_tag, suffix_tag = Tagger.pseudo_prefix(word), Tagger.pseudo_suffix(word)
            if prefix_tag != "OTHER" and suffix_tag != "OTHER":
                return prefix_tag + "-" + suffix_tag
            result = suffix_tag if suffix_tag != "OTHER" else prefix_tag
            return "lowerCase" if result == "OTHER" and word.islower() else result

        return "OTHER"

    @staticmethod
    def check_tag_prediction(predicted_tag, true_tag, error_counter):
        """
        this function checks if the model classify the tags correctly.
        if it doesn't, the function will increment the error count.
        :param predicted_tag: represents the model predicted tag
        :param true_tag: represents the true tag
        :param error_counter: represents the current error tag
        :return:
        """
        return error_counter + 1 if predicted_tag != true_tag else error_counter

    @staticmethod
    def check_pseudo_first_capital(word):
        """
        this function checks if the first letter of the given word is a char
        :param word: represents a given word to be converted to a pseudo
        :return:
        """
        return word[0].isupper()

    @staticmethod
    def check_all_capitals(word):
        """
        this function checks if a given world contains only capital letters
        :param word: represents a given word to be converted to a pseudo
        :return:
        """
        return word.isupper()

    @staticmethod
    def check_pseudo_contains_digit(word):
        """
        this funtion check it a given world contains a digit
        :param word: represents a given word to be converted to a pseudo
        :return: true if it is, false otherwise
        """
        return any(char.isdigit() for char in word)

    @staticmethod
    def check_pseudo_contains_alpha(word):
        """
        this function checks if a given world contains a char
        :param word: represents a given word to be converted to a pseudo
        :return: true if it is, false otherwise
        """
        return any(char.isalpha() for char in word)

    @staticmethod
    def pseudo_prefix(word):
        """
        this function returns a pseudo prefix or "OTHER" pseudo
        :param word: represents a given word
        :return: pseudo prefix or "OTHER" pseudo
        """
        return Tagger.get_pseudo_tag_prefix_suffix(word, [Tagger.PREFIX_4, Tagger.PREFIX_3, Tagger.PREFIX_2])

    @staticmethod
    def pseudo_suffix(word):
        """
        this function returns a pseudo suffix or "OTHER" pseudo
        :param word: represents a given word
        :return: pseudo suffix or "OTHER" pseudo
        """
        return Tagger.get_pseudo_tag_prefix_suffix(word, [Tagger.SUFFIX_4, Tagger.SUFFIX_3, Tagger.SUFFIX_2], True)

    @staticmethod
    def get_pseudo_tag_prefix_suffix(word, pos_set, is_suffix=False):
        """
        this function checks if the word prefix or suffix is in the common language prefixes and suffixes.
        the function will prefer to capture longer prefix/ suffix length rather then shorter.
        :param word: represents a given word
        :param pos_set: represents the given common prefixes or suffixes sets
        :param is_suffix: indicator for checking suffixes or prefixes
        :return:
        """
        for pos, word_length in zip(pos_set, [4, 3, 2]):
            convert_to_prefix = lambda text: text.upper()
            if len(word) >= word_length:
                word_part = word[:word_length] if not is_suffix else word[-word_length:]
                if word_part in pos:
                    return convert_to_prefix(word_part)
        return "OTHER"


def print_task_format(task_title):
    """
    this function prints a title format to the console, for convenience purposes.
    :param task_title: represents the title
    :return:
    """
    print()
    print("====================================")
    print("||            " + task_title + "            ||")
    print("====================================")


def task_b(pos_tagger, test_set):
    """
    this function will implement the relevant parts in order to complete task b
    :param pos_tagger: represents our Tagger instance
    :param test_set: represents a given test set
    :return:
    """
    print_task_format("Task (b)")
    pos_tagger.compute_most_likely_tag()
    pos_tagger.error_rate(test_set, "Task (b)ii - error rates using MLE", is_mle=True)


def task_c(pos_tagger, test_set):
    """
    this function will implement the relevant parts in order to complete task c
    :param pos_tagger: represents our Tagger instance
    :param test_set: represents a given test set
    :return:
    """
    print_task_format("Task (c)")
    pos_tagger.compute_emission_probabilities()
    pos_tagger.compute_transition_probabilities()
    pos_tagger.viterbi_algorithm(test_corpus)
    pos_tagger.error_rate(test_set, "Task (c)iii - error rates using Viterbi algorithm")


def task_d(pos_tagger, test_set):
    """
    this function will implement the relevant parts in order to complete task d
    :param pos_tagger: represents our Tagger instance
    :param test_set: represents a given test set
    :return:
    """
    print_task_format("Task (d)")
    pos_tagger.compute_emission_probabilities(smoothed_delta=1)
    pos_tagger.compute_transition_probabilities()
    pos_tagger.viterbi_algorithm(test_corpus, is_smoothed=True, is_pseudo=False, smoothed_delta=1)
    pos_tagger.error_rate(test_set, "Task (d)iii - error rates using Viterbi algorithm with add-1 smoothing")


def task_e(pos_tagger, test_set):
    """
    this function will implement the relevant parts in order to complete task e
    :param pos_tagger: represents our Tagger instance
    :param test_set: represents a given test set
    :return:
    """
    print_task_format("Task (e)")
    # designing a set of pseudo words from the low frequency words in the training data,
    # and from unknown words from the test set
    pos_tagger.fit_pseudo_train()
    # using pseudo words as well as mle
    pos_tagger.compute_emission_probabilities(smoothed_delta=0, is_pseudo=True)
    pos_tagger.compute_transition_probabilities()
    pos_tagger.viterbi_algorithm(test_set, is_pseudo=True)
    pos_tagger.error_rate(test_set, "Task (e)ii - error rates using Viterbi algorithm with pseudo words",
                          is_pseudo=True)
    # using pseudo words as well as Add-1 smoothing
    pos_tagger.compute_emission_probabilities(smoothed_delta=1, is_pseudo=True)
    pos_tagger.viterbi_algorithm(test_set, is_smoothed=True, is_pseudo=True, smoothed_delta=1)
    pos_tagger.error_rate(test_set, "Task (e)iii - error rates using Viterbi algorithm with"
                                    " pseudo words and add-1 smoothing",
                          calculate_confusion_matrix=True, is_pseudo=True)


if __name__ == '__main__':
    # getting ntlk.brown corpus, and cleaning it
    brown_corpus = clean_corpus(brown.tagged_sents(categories="news"))
    # splitting the corpus to train and test sets
    training_corpus, test_corpus = train_test_split(brown_corpus, test_size=1.0 / 10, shuffle=False)
    tagger = Tagger()
    # fitting the tagger with the training data
    tagger.fit(training_corpus)
    # implementing the different tasks
    task_b(tagger, test_corpus)
    task_c(tagger, test_corpus)
    task_d(tagger, test_corpus)
    task_e(tagger, test_corpus)
Editor is loading...
Leave a Comment