Untitled
unknown
plain_text
2 years ago
25 kB
11
Indexable
from nltk.corpus import brown
from sklearn.model_selection import train_test_split
import re
from collections import defaultdict
def clean_corpus(corpus):
"""
this function cleans a given corpus from its complex tags
:param corpus: represents a given corpus to be cleaned
:return: a cleaned corpus, with fixed complex tags
"""
cleaned_corpus = []
clean_tag_format = re.compile(r'[*]|--|[^+*-]+')
for sentence in corpus:
for i in range(len(sentence)):
word, tag = sentence[i]
clean_tag = clean_tag_format.match(tag).group()
sentence[i] = (word, clean_tag)
cleaned_corpus += [sentence]
return cleaned_corpus
class Tagger:
UNKNOWN_WORD_TAG = "NN"
START_TAG = "START TAG"
STOP_TAG = "STOP TAG"
# pseudo constants
PSEUDO_THRESHOLD = 5
# common prefixes in the english language
PREFIX_2 = {"be", "en", "em", "co", "im", "un", "in", "ir", "il", "de", "ab", "ex", "re", "bi"}
PREFIX_3 = {"non", "dis", "pro", "tri", "uni", "sub", "out", "uni", "pre", "mal", "mis"}
PREFIX_4 = {"ambi", "auto", "over", "circ", "anti", "hypo", "phil", "mono", "fore", "bene"}
# common suffixes in the english language
SUFFIX_2 = {"th", "or", "el", "en", "al", "ly", "es", "ed", "er", "al", "ic"}
SUFFIX_3 = {"ies", "ful", "ble", "ous", "ent", "ant", "ist", "ise", "ize", "age", "ogy", "ers", "ale", "ine", "ows",
"ine", "ile", "ing", "ish", "est", "dom", "ity", "ive", "ify"}
SUFFIX_4 = {"ship", "tion", "ance", "hood", "port", "tude", "sion", "ence", "ness", "less", "ment", "able", "ible",
"wood", "main", "ions"}
def __init__(self):
self.tag_count = defaultdict(int)
self.word_tags_count = defaultdict(lambda: defaultdict(int))
self.word_predicted_tag = dict()
self.transition_probabilities = defaultdict(float)
self.emission_probabilities = defaultdict(float)
self.tag_bigram = defaultdict(lambda: defaultdict(int))
self.pseudo_words_tag = defaultdict(lambda: defaultdict(int))
self.confusion_matrix = defaultdict(lambda: defaultdict(int))
def fit(self, training_set):
"""
this function will fit a Tagger HMM, with a given training set.
the function will initialize self.tag_count, which represents a {tag: tag count in the corpus},
and will initialize self.word_tags_count, which represents a {word: {tag: tag count as a the word's tag}
:param training_set: represents a given training set.
:return:
"""
for sentence in training_set:
cur_tag = Tagger.START_TAG
self.tag_count[cur_tag] += 1
for word, tag in sentence:
self.tag_count[tag] += 1
self.tag_bigram[cur_tag][tag] += 1
self.word_tags_count[word][tag] += 1
cur_tag = tag
self.tag_count[Tagger.STOP_TAG] += 1
self.tag_bigram[cur_tag][Tagger.STOP_TAG] += 1
def fit_pseudo_train(self):
"""
this function will fit our tagger, using pseudo words for low-frequency words in the training data
:return:
"""
# fits the training data
word_freq = lambda w: sum(self.word_tags_count[w].values())
for word in self.word_tags_count:
# checking if the word has less than 5 appearances in the training data
if word_freq(word) < Tagger.PSEUDO_THRESHOLD:
for tag in self.word_tags_count[word]:
pseudo_word = self.convert_to_pseudo(word)
self.pseudo_words_tag[pseudo_word][tag] += 1
else:
self.pseudo_words_tag[word] = self.word_tags_count[word]
def compute_most_likely_tag(self):
"""
this function computes for each word w, the most likely tag to be the w's tag.
the function will use the MLE principle.
:return:
"""
for word in self.word_tags_count:
w_tags_count_dict = self.word_tags_count[word]
values = list(w_tags_count_dict.values())
keys = list(w_tags_count_dict.keys())
# getting the tag with the most appearances as w's tag
self.word_predicted_tag[word] = keys[values.index(max(values))]
def error_rate(self, test_set, title, calculate_confusion_matrix=False, is_pseudo=False,
is_mle=False):
"""
this function will calculate and print the known words error rate,
the unknown words error rate, and the total error rate, for a given test set.
:param test_set: represents a given test set which we calculate the error rate for.
:return:
"""
known_words_counter, unknown_words_counter = 0, 0
known_words_errors, unknown_words_errors = 0, 0
# iterating on each word, and it's true_tag assigment in the test set
for test_sentence in test_set:
for word, true_tag in test_sentence:
# checking if the word is a known word
if word in self.word_tags_count:
known_words_counter += 1
word = self.convert_to_pseudo(word) if is_pseudo else word
predicted_tag = self.word_predicted_tag[word]
# the prediction we make for a known word, is what we stored in word_predicted_tag dictionary
known_words_errors = Tagger.check_tag_prediction(predicted_tag=predicted_tag,
true_tag=true_tag,
error_counter=known_words_errors)
else:
# if we got here, the word is an unknown word
unknown_words_counter += 1
word = self.convert_to_pseudo(word) if is_pseudo else word
# as we are assuming in (b), the tag we predict for an unknown word is "NN"
predicted_tag = Tagger.UNKNOWN_WORD_TAG if is_mle else self.word_predicted_tag[word]
unknown_words_errors = Tagger.check_tag_prediction(predicted_tag=predicted_tag,
true_tag=true_tag,
error_counter=unknown_words_errors)
if calculate_confusion_matrix:
self.confusion_matrix[true_tag][predicted_tag] += 1
# printing the results
print("----- Results From " + title + " -----")
print("The error rate for known words is: " + str(known_words_errors / known_words_counter))
print("The error rate for unknown words is: " + str(unknown_words_errors / unknown_words_counter))
print("The total error rate is: " + str((known_words_errors + unknown_words_errors) /
(known_words_counter + unknown_words_counter)))
if calculate_confusion_matrix:
self.print_confusion_matrix()
def print_confusion_matrix(self):
"""
this function will print the significant errors - for each true tag, we print the
tag (different from the true tag) with the maximal amount of predictions to be the true tag.
:return:
"""
print_task_format("Confusion Matrix")
print("printing significant errors")
for true_tag in self.confusion_matrix:
# we want to investigate only the prediction errors, i.e the values that not located in the matrix diagonal
# in addition, we want to print the significant errors - the tag with the maximal amount of
# predictions to be the true tag.
predictions_amount = list(self.confusion_matrix[true_tag].values())
predicted_tags = list(self.confusion_matrix[true_tag].keys())
maximal_predicted_tag = predicted_tags[predictions_amount.index(max(predictions_amount))]
if true_tag != maximal_predicted_tag:
print("The true tag was " + true_tag + " and we predicted " + maximal_predicted_tag + " " +
str(self.confusion_matrix[true_tag][maximal_predicted_tag]) + " times")
def compute_emission_probabilities(self, smoothed_delta=0, is_pseudo=False):
"""
this function calculates the emission probabilities of a bigram HMM tagger.
:return:
"""
self.emission_probabilities.clear()
vocabulary_size = len(self.word_tags_count)
words_and_tags = self.pseudo_words_tag if is_pseudo else self.word_tags_count
for w in words_and_tags:
for w_tag in words_and_tags[w]:
word_tag_amount = words_and_tags[w][w_tag] + smoothed_delta
tag_amount = self.tag_count[w_tag] + (smoothed_delta * vocabulary_size)
self.emission_probabilities[(w, w_tag)] = word_tag_amount / tag_amount
def compute_transition_probabilities(self):
"""
this function calculates the transition probabilities of a bigram HMM tagger.
:return:
"""
self.transition_probabilities.clear()
for prev_tag in self.tag_bigram:
for tag in self.tag_bigram[prev_tag]:
count_tag_after = self.tag_bigram[prev_tag][tag]
count_prev_tag_in_corpus = self.tag_count[prev_tag]
self.transition_probabilities[(prev_tag, tag)] = count_tag_after / count_prev_tag_in_corpus
def handling_unknown_words_emission_probabilities(self, sentence, words_and_tags, is_smoothing,
smoothed_delta, is_pseudo):
"""
this function will handle with the unknown words - in case we use smoothing, the function will set the
emission probability dictionary according to add-1 laplace definiton,
otherwise, setting their emission probability with a "NN" tag to be 1,
and the probability for the rest of the tags (p(word|tag)) is 0
:param sentence: represents a given sentence
:param words_and_tags: represents the word and tags training data
:param is_smoothing: represents an indicator for using smoothing
:param smoothed_delta: represents the delta when using smoothing
:return:
"""
vocabulary_size = len(words_and_tags)
for word, word_tag in sentence:
# checking if the word is a known word in our model
if word not in words_and_tags:
# handling the case where we use pseudo words and smoothing
if is_pseudo and is_smoothing:
pseudo_word = self.convert_to_pseudo(word)
word_tag_amount = self.pseudo_words_tag[pseudo_word][word_tag] + smoothed_delta
tag_amount = self.tag_count[word_tag] + smoothed_delta * vocabulary_size
self.emission_probabilities[(pseudo_word, word_tag)] = word_tag_amount / tag_amount
# handling the case where we use smoothing, without pseudo words
elif is_smoothing and not is_pseudo:
self.emission_probabilities[(word, word_tag)] = smoothed_delta / \
(self.tag_count[word_tag] +
smoothed_delta * vocabulary_size)
# handling the case where we use pseudo words, without smoothing
elif is_pseudo and not is_smoothing:
pseudo_word = self.convert_to_pseudo(word)
word_tag_amount = self.pseudo_words_tag[pseudo_word][word_tag]
tag_amount = self.tag_count[word_tag]
self.emission_probabilities[(pseudo_word, word_tag)] = word_tag_amount / tag_amount
# handling the case where we don't use smoothing and don't use pseudo words
elif not is_smoothing and not is_pseudo:
self.emission_probabilities[(word, Tagger.UNKNOWN_WORD_TAG)] = 1
def viterbi_algorithm_per_sentence(self, sentence, is_smoothing=False, is_pseudo=False, smoothed_delta=1):
"""
this function will run the viterbi algorithm on the given sentence
:param sentence: represents a given sentence
:return: the most probable assigment for the given sentence,
in the format of [(word, most_probable_tag) for word in sentence]
"""
dp_pi = {}
# creating a set of all the tags, except START_TAG and STOP_TAG
state_tags = set(tag for tag in self.tag_count.keys() if tag != Tagger.START_TAG and tag != Tagger.STOP_TAG)
self.handling_unknown_words_emission_probabilities(sentence, self.word_tags_count, is_smoothing,
smoothed_delta, is_pseudo)
# initializing our dynamic programing data structure
for tag in state_tags:
first_word = self.convert_to_pseudo(sentence[0][0]) if is_pseudo else sentence[0][0]
dp_pi[tag, 0] = self.transition_probabilities[(Tagger.START_TAG, tag)] * \
self.emission_probabilities[(first_word, tag)]
for i in range(1, len(sentence)):
for tag in state_tags:
cur_word = self.convert_to_pseudo(sentence[i][0]) if is_pseudo else sentence[i][0]
cur_pi = [(dp_pi[k_tag, i - 1] * self.transition_probabilities[k_tag, tag] *
self.emission_probabilities[(cur_word, tag)], k_tag) for k_tag in state_tags]
# getting the maximal probability
sorted_pi = sorted(cur_pi)
maximal_tag = sorted_pi[-1][1]
dp_pi[tag, i] = dp_pi[maximal_tag, i - 1] * self.transition_probabilities[maximal_tag, tag] * \
self.emission_probabilities[(cur_word, tag)]
most_probable_assigment = []
for i in range(len(sentence) - 1, -1, -1):
maximal_tag = sorted([(dp_pi[maximal_tag, i], maximal_tag) for maximal_tag in state_tags])[-1][1]
most_probable_assigment.append((sentence[i][0], maximal_tag))
return most_probable_assigment[::-1]
def viterbi_algorithm(self, test_set, is_smoothed=False, is_pseudo=False, smoothed_delta=0):
"""
this function will implement the viterbi algorithm.
it will run the algorithm on each sentence in the given test set, and will map the words to their
viterbi predicted tag.
:param test_set: represents a given test set
:return:
"""
for sentence in test_set:
# calculating the most probable assigment for the current sentence
sentence_viterbi_assigment = self.viterbi_algorithm_per_sentence(sentence, is_smoothed, is_pseudo,
smoothed_delta)
# updating the word's most probable tag, according to the viterbi algorithm
for word, tag in sentence_viterbi_assigment:
# checking if we need to convert the current word to it's pseudo version
word = self.convert_to_pseudo(word) if is_pseudo else word
self.word_predicted_tag[word] = tag
def convert_to_pseudo(self, word):
"""
this function will convert a given word to a pseudo word, according to its context
:param word: represents a given word to be converted to a pseudo
:return:
"""
if word in self.word_tags_count:
if sum(self.word_tags_count[word].values()) >= Tagger.PSEUDO_THRESHOLD:
return word
# handling word with digits
if len(word) == 2 and word.isdigit():
return "twoDigitNum"
if len(word) == 4 and word.isdigit():
return "fourDigitNum"
if Tagger.check_pseudo_contains_digit(word):
if Tagger.check_pseudo_contains_alpha(word):
return "containsDigitAndAlpha"
if "-" in word:
return "containsDigitAndDash"
if "/" in word:
return "containsDigitAndSlash"
if "," in word:
return "containsDigitAndComma"
if "." in word:
return "containsDigitAndPeriod"
return "otherNum"
# handling word with capital letters
if Tagger.check_all_capitals(word):
return "allCaps"
if Tagger.check_pseudo_first_capital(word):
return "initCap"
if "$" in word:
return "MONEY"
# handling prefix and suffix
if len(word) >= 2:
prefix_tag, suffix_tag = Tagger.pseudo_prefix(word), Tagger.pseudo_suffix(word)
if prefix_tag != "OTHER" and suffix_tag != "OTHER":
return prefix_tag + "-" + suffix_tag
result = suffix_tag if suffix_tag != "OTHER" else prefix_tag
return "lowerCase" if result == "OTHER" and word.islower() else result
return "OTHER"
@staticmethod
def check_tag_prediction(predicted_tag, true_tag, error_counter):
"""
this function checks if the model classify the tags correctly.
if it doesn't, the function will increment the error count.
:param predicted_tag: represents the model predicted tag
:param true_tag: represents the true tag
:param error_counter: represents the current error tag
:return:
"""
return error_counter + 1 if predicted_tag != true_tag else error_counter
@staticmethod
def check_pseudo_first_capital(word):
"""
this function checks if the first letter of the given word is a char
:param word: represents a given word to be converted to a pseudo
:return:
"""
return word[0].isupper()
@staticmethod
def check_all_capitals(word):
"""
this function checks if a given world contains only capital letters
:param word: represents a given word to be converted to a pseudo
:return:
"""
return word.isupper()
@staticmethod
def check_pseudo_contains_digit(word):
"""
this funtion check it a given world contains a digit
:param word: represents a given word to be converted to a pseudo
:return: true if it is, false otherwise
"""
return any(char.isdigit() for char in word)
@staticmethod
def check_pseudo_contains_alpha(word):
"""
this function checks if a given world contains a char
:param word: represents a given word to be converted to a pseudo
:return: true if it is, false otherwise
"""
return any(char.isalpha() for char in word)
@staticmethod
def pseudo_prefix(word):
"""
this function returns a pseudo prefix or "OTHER" pseudo
:param word: represents a given word
:return: pseudo prefix or "OTHER" pseudo
"""
return Tagger.get_pseudo_tag_prefix_suffix(word, [Tagger.PREFIX_4, Tagger.PREFIX_3, Tagger.PREFIX_2])
@staticmethod
def pseudo_suffix(word):
"""
this function returns a pseudo suffix or "OTHER" pseudo
:param word: represents a given word
:return: pseudo suffix or "OTHER" pseudo
"""
return Tagger.get_pseudo_tag_prefix_suffix(word, [Tagger.SUFFIX_4, Tagger.SUFFIX_3, Tagger.SUFFIX_2], True)
@staticmethod
def get_pseudo_tag_prefix_suffix(word, pos_set, is_suffix=False):
"""
this function checks if the word prefix or suffix is in the common language prefixes and suffixes.
the function will prefer to capture longer prefix/ suffix length rather then shorter.
:param word: represents a given word
:param pos_set: represents the given common prefixes or suffixes sets
:param is_suffix: indicator for checking suffixes or prefixes
:return:
"""
for pos, word_length in zip(pos_set, [4, 3, 2]):
convert_to_prefix = lambda text: text.upper()
if len(word) >= word_length:
word_part = word[:word_length] if not is_suffix else word[-word_length:]
if word_part in pos:
return convert_to_prefix(word_part)
return "OTHER"
def print_task_format(task_title):
"""
this function prints a title format to the console, for convenience purposes.
:param task_title: represents the title
:return:
"""
print()
print("====================================")
print("|| " + task_title + " ||")
print("====================================")
def task_b(pos_tagger, test_set):
"""
this function will implement the relevant parts in order to complete task b
:param pos_tagger: represents our Tagger instance
:param test_set: represents a given test set
:return:
"""
print_task_format("Task (b)")
pos_tagger.compute_most_likely_tag()
pos_tagger.error_rate(test_set, "Task (b)ii - error rates using MLE", is_mle=True)
def task_c(pos_tagger, test_set):
"""
this function will implement the relevant parts in order to complete task c
:param pos_tagger: represents our Tagger instance
:param test_set: represents a given test set
:return:
"""
print_task_format("Task (c)")
pos_tagger.compute_emission_probabilities()
pos_tagger.compute_transition_probabilities()
pos_tagger.viterbi_algorithm(test_corpus)
pos_tagger.error_rate(test_set, "Task (c)iii - error rates using Viterbi algorithm")
def task_d(pos_tagger, test_set):
"""
this function will implement the relevant parts in order to complete task d
:param pos_tagger: represents our Tagger instance
:param test_set: represents a given test set
:return:
"""
print_task_format("Task (d)")
pos_tagger.compute_emission_probabilities(smoothed_delta=1)
pos_tagger.compute_transition_probabilities()
pos_tagger.viterbi_algorithm(test_corpus, is_smoothed=True, is_pseudo=False, smoothed_delta=1)
pos_tagger.error_rate(test_set, "Task (d)iii - error rates using Viterbi algorithm with add-1 smoothing")
def task_e(pos_tagger, test_set):
"""
this function will implement the relevant parts in order to complete task e
:param pos_tagger: represents our Tagger instance
:param test_set: represents a given test set
:return:
"""
print_task_format("Task (e)")
# designing a set of pseudo words from the low frequency words in the training data,
# and from unknown words from the test set
pos_tagger.fit_pseudo_train()
# using pseudo words as well as mle
pos_tagger.compute_emission_probabilities(smoothed_delta=0, is_pseudo=True)
pos_tagger.compute_transition_probabilities()
pos_tagger.viterbi_algorithm(test_set, is_pseudo=True)
pos_tagger.error_rate(test_set, "Task (e)ii - error rates using Viterbi algorithm with pseudo words",
is_pseudo=True)
# using pseudo words as well as Add-1 smoothing
pos_tagger.compute_emission_probabilities(smoothed_delta=1, is_pseudo=True)
pos_tagger.viterbi_algorithm(test_set, is_smoothed=True, is_pseudo=True, smoothed_delta=1)
pos_tagger.error_rate(test_set, "Task (e)iii - error rates using Viterbi algorithm with"
" pseudo words and add-1 smoothing",
calculate_confusion_matrix=True, is_pseudo=True)
if __name__ == '__main__':
# getting ntlk.brown corpus, and cleaning it
brown_corpus = clean_corpus(brown.tagged_sents(categories="news"))
# splitting the corpus to train and test sets
training_corpus, test_corpus = train_test_split(brown_corpus, test_size=1.0 / 10, shuffle=False)
tagger = Tagger()
# fitting the tagger with the training data
tagger.fit(training_corpus)
# implementing the different tasks
task_b(tagger, test_corpus)
task_c(tagger, test_corpus)
task_d(tagger, test_corpus)
task_e(tagger, test_corpus)
Editor is loading...
Leave a Comment