Untitled
plain_text
12 days ago
17 kB
3
Indexable
Never
#!/usr/bin/env python # -*- coding: utf-8 -*- ''' Implements a function, `duration_search`, which can identify various kinds of date span expressions and fine/fee/restitution amounts in a string ''' from __future__ import annotations import re import string import pickle import pkgutil import numpy as np model_data = pkgutil.get_data(__name__, "data/duration_svm_model_20230627.pkl") VECTORIZER, SPAN_CLASSIFIER = pickle.loads(model_data) MULTIPLIERS = dict([('YEARS', 365.25), ('MONTHS', 30.4375), ('WEEKS', 7.0), ('DAYS', 1.0), ('HOURS', 0.04167)]) RGX = re.compile(r""" (?:(?P<YEARS>\d{1,3}\s?Y[EARS]*)?(?:[\s,\&\+]|and)*)? # YEARS (?:(?P<MONTHS>\d{1,3}\s?M[ONTHS]*)?(?:[\s,\&\+]|and)*)? # MONTHS without "-" in front (?:(?P<WEEKS>\d{1,3}\s?W[EKS]+)?(?:[\s,\&\+]|and)*)? # WEEKS (?:(?P<DAYS>\d{1,3}\s?D[AYS]*)?(?:[\s,\&\+]|and)*)? # DAYS (?:(?P<HOURS>\d{1,3}\s?H[OURS]*)?(?:[\s,\&\+]|and)*)? #HOUR |(?<![\$\d]\s)(?<![\$\d])(?P<MONTHS2>11)(?:[\s\-\/]+)(?P<DAYS2>[123]?\d)\s(?!YE|MO|DA) #ALT MO/DY """, re.VERBOSE+re.IGNORECASE) # Define the regular expression pattern for monetary amounts MONEY_RGX = re.compile( r"\$?\s*\d{1,7}\s*" r"(,\s*\d{3})*" r"(\.\d{2})?", re.IGNORECASE ) # Define the labels and their associated keywords # temporary label assisgnment until classification model can be applied LABELS = { "Fine": ["FINE", "FINES", "FI", "FN", "FND", "FINED"], "Restitution": ["RESTIT", "RESTITUTION", "REST", "RESTI"], "Fee": [ "CC", "CST", "COURT COST", "FEE", "FEES", "COST", "COSTS", "JAIL FEE" ], "Other": [ "TO BE PAID", "UN", "UNDER", "OVER", "PAT", "DRUG FND", "DRUG FUND", "SEX OFFENDER TAX" ] } def cleanup_phrase(text): """Remove punctuation and spaces from edges of string""" return text.strip(r'\/\s&,') def convert_to_digits(text): """Takes string and finds/converts alpha integers into digits""" words_to_numbers = { 'ninety': '90', 'eighty': '80', 'seventy': '70', 'sixty': '60', 'fifty': '50', 'forty': '40', 'thirty': '30', 'twenty': '20', 'nineteen': '19', 'eighteen': '18', 'seventeen': '17', 'sixteen': '16', 'fifteen': '15', 'fourteen': '14', 'thirteen': '13', 'twelve': '12', 'eleven': '11', 'ten': '10', 'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'zero': '0' } pattern = re.compile( r'\b(' + '|'.join(words_to_numbers.keys()) + r')\b', re.IGNORECASE) return re.sub(pattern, lambda x: words_to_numbers[x.group().lower()], text) def extract_range(value, pre_text, post_text, stripped_unit): """Extracts and returns range values and units from a given string. If no range is found in the string, or if the first value is greater than the second, it returns None.""" stripped_value = re.sub(r'[^\d]', '', value) stripped_pre_text_value_match = re.search(r"(\d+\s*/\s*\d+\s*(?:DAYS|DYS)?)\s*([a-zA-Z]*)?\s*(TO|-)|\b(\d{1,3}\s*[a-zA-Z]+)\s*([a-zA-Z]*)?\s*(TO|-)\s*", pre_text) if stripped_pre_text_value_match: unit = {stripped_unit: int(stripped_value)} days = calculate_days(unit) max_days = calculate_days({stripped_unit: int(stripped_value)}) if stripped_value else None if stripped_pre_text_value_match.group(1): result = build_result( stripped_pre_text_value_match.group(1) + " TO " + stripped_value + " " + stripped_unit, "", post_text, days, unit, 364, stripped_value, "DAYS", stripped_unit, 364, max_days, ) return result if stripped_pre_text_value_match.group: min_value_match = re.search(r'(\d+)\s*([a-zA-Z]*\s*)?(TO|-)', pre_text) if min_value_match: min_value = min_value_match.group(1) min_unit_match = re.search(r'([a-zA-Z]+)\s*(TO|-)', pre_text) min_unit = min_unit_match.group(1).upper().replace("TO", "").replace("-", "") if min_unit_match else None stripped_pre_text_value = stripped_pre_text_value_match.group().strip() if stripped_pre_text_value_match else "" max_unit = stripped_unit unit = {stripped_unit: int(stripped_value)} days = calculate_days(unit) min_days = 0 max_days = calculate_days({max_unit: int(stripped_value)}) if stripped_value else None days = max_days if max_days is not None else days consolidated_text = f"{stripped_pre_text_value} {stripped_value} {stripped_unit} " rgx_match = RGX.search(stripped_pre_text_value) if rgx_match: min_unit = next((k for k, v in rgx_match.groupdict().items() if v is not None), None) if min_unit and rgx_match[min_unit]: max_value = stripped_value min_days = calculate_days({min_unit: int(min_value)}) if min_value else None max_unit = stripped_unit matched_string = f"{max_value} {max_unit}" post_text = post_text.replace(matched_string, "", 1).strip() min_value_int = int(min_value) if min_value and min_value.isdigit() else None max_value_int = int(max_value) if max_value and max_value.isdigit() else None stripped_pre_text = stripped_pre_text_value.replace(stripped_pre_text_value, "") result = build_result( consolidated_text, stripped_pre_text, post_text, days, unit, min_value_int, max_value_int, min_unit, max_unit, min_days, max_days ) return result else: stripped_pre_text = re.split(r'\s*\b\d{1,3}\s*-\s*', pre_text, maxsplit=1)[0] + " " stripped_pre_text_value = re.search(r'\b\d{1,3}\s*(TO|-)\s*', pre_text).group().strip() min_value = re.sub(r'^[^\d]*', '', pre_text).replace("-", "").replace("TO", "").strip() unit = {stripped_unit: int(stripped_value)} days = calculate_days(unit) min_unit = stripped_unit min_days = calculate_days({min_unit: int(min_value)}) result = build_result( stripped_pre_text_value + " " + value.strip(), stripped_pre_text, post_text, days, unit, min_value, stripped_value, stripped_unit, stripped_unit, min_days, days, ) return result def calculate_days(units): """Takes a dictionary with time units as keys and amounts as values and returns the total number of days.""" if not units: return None days = 0 for key, val in units.items(): key = key.upper() if key not in MULTIPLIERS: continue days = days + (MULTIPLIERS[key] * float(val)) return int(round(days)) def calculate_min_max_days(units, values): """Calculates and returns the number of days corresponding to the minimum and maximum values of the given units. """ return ( calculate_days({units[0]: values[0]}), calculate_days({units[1]: values[1]}) ) def build_result(value, pre_text, post_text, days, units, min_value, max_value, min_unit, max_unit, min_days, max_days): """Builds and returns a dictionary containing the matched phrase and various details about it such as pre_text, post_text, total number of days, units, minimum and maximum confinement values and days. """ result = {} is_min_greater_than_max = False # Swap min and max if min_days is greater than max_days if min_days is not None and max_days is not None and int(min_days) > int(max_days): min_days, max_days = max_days, min_days min_value, max_value = max_value, min_value min_unit, max_unit = max_unit, min_unit is_min_greater_than_max = True if isinstance(value, str): result['text'] = cleanup_phrase(value) else: result['text'] = str(value) result['pre_text'] = pre_text result['post_text'] = post_text result['days'] = days if min_value is None: result['units'] = units if min_value is not None: result['range'] = { 'min': { 'days': min_days, 'units': {min_unit: int(min_value)} }, 'max': { 'days': max_days, 'units': {max_unit: int(max_value)} } } prediction = get_duration_classification(pre_text, post_text) if is_min_greater_than_max: result['classification'] = "Suspended" result['confidence'] = 1 else: result['classification'] = prediction['classification'] result['confidence'] = prediction['confidence'] return result def get_duration_classification(pre_text, post_text): """Returns details of predicted classification label for duration""" # pre_text = process_surrounding_text(text.upper()[max(0,match.span()[0]-12):match.span()[0]], 'pre') # post_text = process_surrounding_text(text.upper()[match.span()[1]:min(len(text.upper()),match.span()[1]+12)],'post') # vectorize text and predict label text_vectorized = VECTORIZER.transform([pre_text + ' X ' + post_text]) classification = SPAN_CLASSIFIER.predict(text_vectorized)[0] # predict_prob = SPAN_CLASSIFIER.predict_proba(text_vectorized) # decision function, estimate of delta between top two labels dec_func_results = SPAN_CLASSIFIER.decision_function(text_vectorized) dec_func_margin = sorted(dec_func_results[0], reverse=True)[0] - sorted(dec_func_results[0], reverse=True)[1] return { 'classification': classification if np.max(dec_func_margin) > 0.0001 else 'Unknown', 'confidence': 1 if dec_func_margin > 0.0002 else None, 'pre_text': pre_text, 'post_text': post_text } def process_fractional_sentence_match(match, result, results): """Takes a match object, a result dictionary, and a list of results. Modifies the result based on the match, appends the modified result and the original result to the results list with appropriate labels and returns the updated results list.""" symbol = match.group(1).strip() percentage = int(match.group(2)) / 100.0 modified_result = result.copy() modified_result['days'] = result['days'] * percentage modified_result['text'] = ( f"{result['text']}" f"{symbol} " f"{match.group(2)}%" ) modified_result['post_text'] = '' modified_result['units'] = { unit: value * percentage for unit, value in result['units'].items() } modified_result['classification'] = "Confinement" modified_result['confidence'] = 1 results.append(modified_result) result['classification'] = "Suspended" result['confidence'] = 1 results.append(result) return results def process_surrounding_text(text, position='pre', max_length=12): """returns a string of adjacent text before or after a duration""" split_text = re.split(r'[^\w\s]', text) if position == 'pre': text = split_text[-1][-max_length:].strip() if len(split_text) > 0 else text[-max_length:] else: text = split_text[0][0:max_length].strip() if len(split_text) > 0 else text[0:max_length].strip() # remove punctuation and extra spaces return cleanup_phrase( re.sub(r'\s+', ' ', text.translate(str.maketrans('', '', string.punctuation))) ) def get_fine_fees_restitution(text): """Takes a string and returns list of dictionaries describing monetary amounts and their labels""" original_matches = MONEY_RGX.finditer(text.upper()) original_match_list = [match.group() for match in original_matches] if not original_match_list: return [] original_amount = original_match_list[0] text = re.sub(r'\s*,\s*', ',', text) text = text.replace("-", "").replace("=", "") matches = MONEY_RGX.finditer(text.upper()) results = [] for i in matches: # strip leading/trailing spaces pre_text = text.upper()[max(0, i.span()[0]-12):i.span()[0]].strip() post_text = text.upper()[i.span()[1]:min( len(text.upper()), i.span()[1]+12)].strip() # strip leading/trailing spaces # Determine the label based on the pre_text or post_text classification = None for lbl, keywords in LABELS.items(): for keyword in keywords: # Check if keyword is at the end of pre_text or # end of post_text if pre_text.endswith(keyword) or post_text.startswith(keyword): classification = lbl break if classification is not None: break # Assign "Unknown" label if no labels were found if classification is None: classification = "Unknown" result = { 'amount': i.group().strip(), 'classification': classification, 'text': original_amount, # Only keep the last 20 characters for display 'pre_text': pre_text[-20:], # Only keep the first 20 characters for display 'post_text': post_text[:20], } results.append(result) return results def get_durations(text): """Takes a string and returns a list of dictionaries describing durations""" results = [] phrases = [] upper_text = text.upper() matches = RGX.finditer(upper_text) for match in matches: cleaned_phrase = cleanup_phrase(match.group()) if cleaned_phrase.strip() and cleaned_phrase not in phrases: phrases.append(cleaned_phrase) units = {} pre_text = upper_text[max(0, match.span()[0] - 16): match.span()[0]].strip() post_text = upper_text[match.span()[1]: min(len(upper_text), match.span()[1] + 12)].strip() min_value = max_value = None min_unit = max_unit = None min_days = max_days = None for key, value in match.groupdict().items(): if value is not None: int_match = re.search(r'\d+', value) if int_match: group_value = int_match.group(0) units[key] = int(group_value) else: group_value = 0 int_value = int(group_value) stripped_key = re.sub(r'[\d]', '', key) units[stripped_key] = int_value range_match = re.search(r'.+?\s*(-|TO)', pre_text) post_range_match = re.search(r'(-|TO)\s*\d+', post_text) if post_range_match: continue if range_match: processed_range = extract_range(match.group(), pre_text, post_text, stripped_key) text = text.replace(match.group(), "") results.append(processed_range) continue days = calculate_days(units) result = build_result( match.group(), pre_text, post_text, days, units, min_value, max_value, min_unit, max_unit, min_days, max_days ) pattern = result["text"] + r'\s*(AT|@)\s*(\d+)\s*%' match = re.search(pattern, upper_text) if match: results = process_fractional_sentence_match(match, result, results) else: results.append(result) return results def duration_search(text: str = '') -> str: """Public function returns a list of durations from a string""" final_results = [] text = convert_to_digits(text) duration_results = get_durations(text) fines_restitution_results = get_fine_fees_restitution(text) final_results = duration_results + fines_restitution_results return final_results