Untitled

mail@pastecode.io avatar
unknown
plain_text
8 months ago
17 kB
3
Indexable
Never
#!/usr/bin/env python
# -*- coding: utf-8 -*-

'''
Implements a function, `duration_search`, which can identify various
kinds of date span expressions and fine/fee/restitution amounts in a string
'''

from __future__ import annotations
import re
import string
import pickle
import pkgutil
import numpy as np


model_data = pkgutil.get_data(__name__, "data/duration_svm_model_20230627.pkl")
VECTORIZER, SPAN_CLASSIFIER = pickle.loads(model_data)


MULTIPLIERS = dict([('YEARS', 365.25), ('MONTHS', 30.4375),
                   ('WEEKS', 7.0), ('DAYS', 1.0), ('HOURS', 0.04167)])


RGX = re.compile(r"""
     (?:(?P<YEARS>\d{1,3}\s?Y[EARS]*)?(?:[\s,\&\+]|and)*)?  # YEARS
     (?:(?P<MONTHS>\d{1,3}\s?M[ONTHS]*)?(?:[\s,\&\+]|and)*)?  # MONTHS without "-" in front
     (?:(?P<WEEKS>\d{1,3}\s?W[EKS]+)?(?:[\s,\&\+]|and)*)?    # WEEKS
     (?:(?P<DAYS>\d{1,3}\s?D[AYS]*)?(?:[\s,\&\+]|and)*)?    # DAYS
     (?:(?P<HOURS>\d{1,3}\s?H[OURS]*)?(?:[\s,\&\+]|and)*)?  #HOUR
     |(?<![\$\d]\s)(?<![\$\d])(?P<MONTHS2>11)(?:[\s\-\/]+)(?P<DAYS2>[123]?\d)\s(?!YE|MO|DA)  #ALT MO/DY
    """, re.VERBOSE+re.IGNORECASE)

# Define the regular expression pattern for monetary amounts
MONEY_RGX = re.compile(
    r"\$?\s*\d{1,7}\s*"
    r"(,\s*\d{3})*"
    r"(\.\d{2})?",
    re.IGNORECASE
)

# Define the labels and their associated keywords
# temporary label assisgnment until classification model can be applied
LABELS = {
    "Fine": ["FINE", "FINES", "FI", "FN", "FND", "FINED"],
    "Restitution": ["RESTIT", "RESTITUTION", "REST", "RESTI"],
    "Fee": [
        "CC", "CST", "COURT COST", "FEE", "FEES", "COST", "COSTS", "JAIL FEE"
    ],
    "Other": [
        "TO BE PAID", "UN", "UNDER", "OVER", "PAT", "DRUG FND",
        "DRUG FUND", "SEX OFFENDER TAX"
    ]
}


def cleanup_phrase(text):
    """Remove punctuation and spaces from edges of string"""
    return text.strip(r'\/\s&,')


def convert_to_digits(text):
    """Takes string and finds/converts alpha integers into digits"""
    words_to_numbers = {
        'ninety': '90',
        'eighty': '80',
        'seventy': '70',
        'sixty': '60',
        'fifty': '50',
        'forty': '40',
        'thirty': '30',
        'twenty': '20',
        'nineteen': '19',
        'eighteen': '18',
        'seventeen': '17',
        'sixteen': '16',
        'fifteen': '15',
        'fourteen': '14',
        'thirteen': '13',
        'twelve': '12',
        'eleven': '11',
        'ten': '10',
        'one': '1',
        'two': '2',
        'three': '3',
        'four': '4',
        'five': '5',
        'six': '6',
        'seven': '7',
        'eight': '8',
        'nine': '9',
        'zero': '0'
    }
    pattern = re.compile(
        r'\b(' + '|'.join(words_to_numbers.keys()) + r')\b', re.IGNORECASE)
    return re.sub(pattern, lambda x: words_to_numbers[x.group().lower()], text)


def extract_range(value, pre_text, post_text, stripped_unit):
    """Extracts and returns range values and units from a given string.
    If no range is found in the string, or if the first value is
    greater than the second, it returns None."""

    stripped_value = re.sub(r'[^\d]', '', value)
    stripped_pre_text_value_match = re.search(r"(\d+\s*/\s*\d+\s*(?:DAYS|DYS)?)\s*([a-zA-Z]*)?\s*(TO|-)|\b(\d{1,3}\s*[a-zA-Z]+)\s*([a-zA-Z]*)?\s*(TO|-)\s*", pre_text)
    if stripped_pre_text_value_match:

        unit = {stripped_unit: int(stripped_value)}
        days = calculate_days(unit)

        max_days = calculate_days({stripped_unit: int(stripped_value)}) if stripped_value else None
        if stripped_pre_text_value_match.group(1):
            result = build_result(
                stripped_pre_text_value_match.group(1) + " TO " + stripped_value + " " + stripped_unit,
                "",
                post_text,
                days,
                unit,
                364,
                stripped_value,
                "DAYS",
                stripped_unit,
                364,
                max_days,
                )
            return result
        if stripped_pre_text_value_match.group:
            min_value_match = re.search(r'(\d+)\s*([a-zA-Z]*\s*)?(TO|-)', pre_text)

            if min_value_match:
                min_value = min_value_match.group(1)

                min_unit_match = re.search(r'([a-zA-Z]+)\s*(TO|-)', pre_text)
                min_unit = min_unit_match.group(1).upper().replace("TO", "").replace("-", "") if min_unit_match else None
            stripped_pre_text_value = stripped_pre_text_value_match.group().strip() if stripped_pre_text_value_match else ""
            max_unit = stripped_unit
            unit = {stripped_unit: int(stripped_value)}
            days = calculate_days(unit)
            min_days = 0
            max_days = calculate_days({max_unit: int(stripped_value)}) if stripped_value else None
            days = max_days if max_days is not None else days
            consolidated_text = f"{stripped_pre_text_value} {stripped_value} {stripped_unit} "
            rgx_match = RGX.search(stripped_pre_text_value)
            if rgx_match:

                min_unit = next((k for k, v in rgx_match.groupdict().items() if v is not None), None)
                if min_unit and rgx_match[min_unit]:
                    max_value = stripped_value
                    min_days = calculate_days({min_unit: int(min_value)}) if min_value else None 

                    max_unit = stripped_unit

                    matched_string = f"{max_value} {max_unit}"
                    post_text = post_text.replace(matched_string, "", 1).strip()
            min_value_int = int(min_value) if min_value and min_value.isdigit() else None
            max_value_int = int(max_value) if max_value and max_value.isdigit() else None
            stripped_pre_text = stripped_pre_text_value.replace(stripped_pre_text_value, "")
            result = build_result(
                consolidated_text,
                stripped_pre_text,
                post_text,
                days,
                unit,
                min_value_int,
                max_value_int,
                min_unit,
                max_unit,
                min_days,
                max_days
            )
            
            return result
    else:

        stripped_pre_text = re.split(r'\s*\b\d{1,3}\s*-\s*', pre_text, maxsplit=1)[0] + " "
        stripped_pre_text_value = re.search(r'\b\d{1,3}\s*(TO|-)\s*', pre_text).group().strip()
        min_value = re.sub(r'^[^\d]*', '', pre_text).replace("-", "").replace("TO", "").strip()
        unit = {stripped_unit: int(stripped_value)}
        days = calculate_days(unit)

        min_unit = stripped_unit
        min_days = calculate_days({min_unit: int(min_value)})

        result = build_result(
            stripped_pre_text_value + " " + value.strip(),
            stripped_pre_text,
            post_text,
            days,
            unit,
            min_value,
            stripped_value,
            stripped_unit,
            stripped_unit,
            min_days,
            days,
        )
        
        return result


def calculate_days(units):
    """Takes a dictionary with time units as keys and amounts as values and
    returns the total number of days."""

    if not units:
        return None
    days = 0
    for key, val in units.items():
        key = key.upper()
        if key not in MULTIPLIERS:
            continue
        days = days + (MULTIPLIERS[key] * float(val))
    return int(round(days))


def calculate_min_max_days(units, values):
    """Calculates and returns the number of days corresponding to the minimum
    and maximum values of the given units.
    """
    return (
        calculate_days({units[0]: values[0]}),
        calculate_days({units[1]: values[1]})
    )


def build_result(value, pre_text, post_text, days, units,
                 min_value, max_value, min_unit, max_unit, min_days, max_days):
    """Builds and returns a dictionary containing the matched phrase
    and various details about it such as pre_text, post_text, total
    number of days, units, minimum and maximum confinement values
    and days.
    """
    result = {}
    is_min_greater_than_max = False

    # Swap min and max if min_days is greater than max_days
    if min_days is not None and max_days is not None and int(min_days) > int(max_days):
        min_days, max_days = max_days, min_days
        min_value, max_value = max_value, min_value
        min_unit, max_unit = max_unit, min_unit
        is_min_greater_than_max = True

    if isinstance(value, str):
        result['text'] = cleanup_phrase(value)
    else:
        result['text'] = str(value)

    result['pre_text'] = pre_text
    result['post_text'] = post_text
    result['days'] = days

    if min_value is None:
        result['units'] = units

    if min_value is not None:
        result['range'] = {
            'min': {
                'days': min_days,
                'units': {min_unit: int(min_value)}
            },
            'max': {
                'days': max_days,
                'units': {max_unit: int(max_value)}
            }
        }

    prediction = get_duration_classification(pre_text, post_text)
    if is_min_greater_than_max:
        result['classification'] = "Suspended"
        result['confidence'] = 1
    else:
        result['classification'] = prediction['classification']
        result['confidence'] = prediction['confidence']

    return result


def get_duration_classification(pre_text, post_text):
    """Returns details of predicted classification label for duration"""
    # pre_text = process_surrounding_text(text.upper()[max(0,match.span()[0]-12):match.span()[0]], 'pre')
    # post_text = process_surrounding_text(text.upper()[match.span()[1]:min(len(text.upper()),match.span()[1]+12)],'post')

    # vectorize text and predict label
    text_vectorized = VECTORIZER.transform([pre_text + ' X ' + post_text])
    classification = SPAN_CLASSIFIER.predict(text_vectorized)[0]
    # predict_prob = SPAN_CLASSIFIER.predict_proba(text_vectorized)

    # decision function, estimate of delta between top two labels
    dec_func_results = SPAN_CLASSIFIER.decision_function(text_vectorized)
    dec_func_margin = sorted(dec_func_results[0], reverse=True)[0] - sorted(dec_func_results[0], reverse=True)[1]

    return {
        'classification': classification if np.max(dec_func_margin) > 0.0001 else 'Unknown',
        'confidence': 1 if dec_func_margin > 0.0002 else None,
        'pre_text': pre_text,
        'post_text': post_text
    }


def process_fractional_sentence_match(match, result, results):
    """Takes a match object, a result dictionary, and a list
    of results. Modifies the result based on the match,
    appends the modified result and the original result
    to the results list with appropriate labels and returns
    the updated results list."""
    symbol = match.group(1).strip()
    percentage = int(match.group(2)) / 100.0
    modified_result = result.copy()
    modified_result['days'] = result['days'] * percentage
    modified_result['text'] = (
        f"{result['text']}"
        f"{symbol} "
        f"{match.group(2)}%"
    )
    modified_result['post_text'] = ''
    modified_result['units'] = {
        unit: value * percentage
        for unit, value in result['units'].items()
    }
    modified_result['classification'] = "Confinement"
    modified_result['confidence'] = 1
    results.append(modified_result)
    result['classification'] = "Suspended"
    result['confidence'] = 1
    results.append(result)
    return results


def process_surrounding_text(text, position='pre', max_length=12):
    """returns a string of adjacent text before or after a duration"""
    split_text = re.split(r'[^\w\s]', text)
    if position == 'pre':
        text = split_text[-1][-max_length:].strip() if len(split_text) > 0 else text[-max_length:]
    else:
        text = split_text[0][0:max_length].strip() if len(split_text) > 0 else text[0:max_length].strip()
    # remove punctuation and extra spaces
    return cleanup_phrase(
        re.sub(r'\s+', ' ', text.translate(str.maketrans('', '', string.punctuation)))
    )


def get_fine_fees_restitution(text):
    """Takes a string and returns list of dictionaries describing
    monetary amounts and their labels"""
    original_matches = MONEY_RGX.finditer(text.upper())
    original_match_list = [match.group() for match in original_matches]
    if not original_match_list:
        return []

    original_amount = original_match_list[0]
    text = re.sub(r'\s*,\s*', ',', text)
    text = text.replace("-", "").replace("=", "")
    matches = MONEY_RGX.finditer(text.upper())
    results = []

    for i in matches:
        # strip leading/trailing spaces
        pre_text = text.upper()[max(0, i.span()[0]-12):i.span()[0]].strip()
        post_text = text.upper()[i.span()[1]:min(
            len(text.upper()), i.span()[1]+12)].strip()
        # strip leading/trailing spaces

        # Determine the label based on the pre_text or post_text
        classification = None
        for lbl, keywords in LABELS.items():
            for keyword in keywords:
                # Check if keyword is at the end of pre_text or
                # end of post_text
                if pre_text.endswith(keyword) or post_text.startswith(keyword):
                    classification = lbl
                    break
            if classification is not None:
                break

        # Assign "Unknown" label if no labels were found
        if classification is None:
            classification = "Unknown"
        result = {
            'amount': i.group().strip(),
            'classification': classification,
            'text': original_amount,
            # Only keep the last 20 characters for display
            'pre_text': pre_text[-20:],
            # Only keep the first 20 characters for display
            'post_text': post_text[:20],
        }

        results.append(result)

    return results


def get_durations(text):
    """Takes a string and returns a list of dictionaries describing durations"""
    results = []
    phrases = []
    upper_text = text.upper()
    matches = RGX.finditer(upper_text)

    for match in matches:
        cleaned_phrase = cleanup_phrase(match.group())
        if cleaned_phrase.strip() and cleaned_phrase not in phrases:
            phrases.append(cleaned_phrase)
            units = {}
            pre_text = upper_text[max(0, match.span()[0] - 16): match.span()[0]].strip()
            post_text = upper_text[match.span()[1]: min(len(upper_text), match.span()[1] + 12)].strip()
            min_value = max_value = None
            min_unit = max_unit = None
            min_days = max_days = None
            for key, value in match.groupdict().items():
                if value is not None:
                    int_match = re.search(r'\d+', value)
                    if int_match:
                        group_value = int_match.group(0)
                        units[key] = int(group_value)
                    else:
                        group_value = 0
                    int_value = int(group_value)
                    stripped_key = re.sub(r'[\d]', '', key)
                    units[stripped_key] = int_value
            range_match = re.search(r'.+?\s*(-|TO)', pre_text)
            post_range_match = re.search(r'(-|TO)\s*\d+', post_text)
            if post_range_match:
                continue
            if range_match:
                processed_range = extract_range(match.group(), pre_text, post_text, stripped_key)
                text = text.replace(match.group(), "")
                results.append(processed_range)
                continue
            days = calculate_days(units)
            result = build_result(
                match.group(),
                pre_text,
                post_text,
                days,
                units,
                min_value,
                max_value,
                min_unit,
                max_unit,
                min_days,
                max_days
            )
            pattern = result["text"] + r'\s*(AT|@)\s*(\d+)\s*%'
            match = re.search(pattern, upper_text)
            if match:
                results = process_fractional_sentence_match(match, result, results)
            else:
                results.append(result)
    return results


def duration_search(text: str = '') -> str:
    """Public function returns a list of durations from a string"""
    final_results = []
    text = convert_to_digits(text)
    duration_results = get_durations(text)
    fines_restitution_results = get_fine_fees_restitution(text)
    final_results = duration_results + fines_restitution_results
    return final_results