Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
6.1 kB
1
Indexable
Never
from sklearn.metrics import pairwise_distances
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

df['role_name']=label_enc.fit_transform(df['role_name'])

# Link the X vector with index
index = df.index.values

def get_top_5_person_who_resolved(df, row, distance_metric='cosine'):
    # Concatenate the input data into a single string
    input_data = ' '.join([str(row['ticket_category']), str(row['ticket_type']), str(row['ticket_item']),str(row['ticket_summary']),
                          str(row['ticket_severity']),str(row['resolution_sla_violated']),str(row['reopen_count']),
                          str(row['owner_user_id']),str(row['role_name']),str(row['ticket_resolution_time'])])

    # Calculate the pairwise distances between the input vector and X
    input_vector_x = np.array(list(row[['ticket_category', 'ticket_type', 'ticket_item','ticket_summary',
       'ticket_severity', 'resolution_sla_violated', 'reopen_count', 'owner_user_id','role_name','ticket_resolution_time']]))
    if distance_metric == 'cosine':
        distances = pairwise_distances(input_vector_x.reshape(1, -1), df[['ticket_category', 'ticket_type', 'ticket_item','ticket_summary',
       'ticket_severity', 'resolution_sla_violated', 'reopen_count', 'owner_user_id','role_name','ticket_resolution_time']], metric='cosine')[0]
    elif distance_metric == 'euclidean':
        distances = pairwise_distances(input_vector_x.reshape(1, -1), df[['ticket_category', 'ticket_type', 'ticket_item','ticket_summary',
       'ticket_severity', 'resolution_sla_violated', 'reopen_count', 'owner_user_id','role_name','ticket_resolution_time']], metric='euclidean')[0]
    elif distance_metric == 'manhattan':
        distances = pairwise_distances(input_vector_x.reshape(1, -1), df[['ticket_category', 'ticket_type', 'ticket_item','ticket_summary',
       'ticket_severity', 'resolution_sla_violated', 'reopen_count', 'owner_user_id','role_name','ticket_resolution_time']], metric='manhattan')[0]
    else:
        raise ValueError('Invalid distance metric')
    
    # Get the indices of the top 5 closest tickets
    closest_indices = np.argsort(distances)[:5]
    
    # Get the person_who_resolved values for the closest tickets
    closest_person_who_resolved = df.iloc[closest_indices]['person_who_resolved']
    closest_owner_user_id = df.iloc[closest_indices]['owner_user_id']
    closest_role_name = df.iloc[closest_indices]['role_name']
    
    # Get the actual person_who_resolved value for the input ticket
    actual_person_who_resolved = row['person_who_resolved']
    actual_owner_user_id = row['owner_user_id']
    actual_role_name = row['role_name']


    return list(zip(closest_person_who_resolved, closest_owner_user_id, closest_role_name)), (actual_person_who_resolved, actual_owner_user_id, actual_role_name)


# Split the data into training and testing sets
train_data, test_data = train_test_split(df, test_size=0.2, random_state=42)

# Apply the function to each row of the test data to get the recommendations
test_data['recommendations'], test_data['actual_person_who_resolved'] = zip(*test_data.apply(lambda row: get_top_5_person_who_resolved(train_data, row), axis=1))

# Remove duplicate values from recommendations
test_data['recommendations'] = test_data['recommendations'].apply(lambda x: list(set(x))) 

##Printing the test data
#print(test_data.head())

# Evaluation Metrics
def calculate_map(actual, predicted):
    """
    Calculate Mean Average Precision (MAP) for the recommendations.
    
    Args:
        actual (str or int): Actual user who resolved the ticket.
        predicted (list): List of recommended users.
    
    Returns:
        float: Mean Average Precision (MAP) score.
    """
    avg_precision = 0.0
    num_correct = 0
    
    if actual in predicted:
        num_correct += 1
        precision_at_k = num_correct / (predicted.index(actual) + 1)
        avg_precision += precision_at_k
    
    if num_correct == 0:
        return 0.0
    
    return avg_precision


def calculate_topk_accuracy(actual, predicted, k):
    """
    Calculate Top-k Accuracy for the recommendations.
    
    Args:
        actual (str or int): Actual user who resolved the ticket.
        predicted (list): List of recommended users.
        k (int): Number of top-k recommendations to consider.
    
    Returns:
        float: Top-k Accuracy score.
    """
    topk_predictions = predicted[:k]
    if actual in topk_predictions:
        return 1.0
    else:
        return 0.0

# Calculate overall MAP score for the test data
test_map_score = test_data.apply(lambda row: calculate_map(row['actual_person_who_resolved'], row['recommendations']), axis=1).mean()

# Calculate overall Top-k Accuracy score for the test data
test_topk_accuracy = test_data.apply(lambda row: calculate_topk_accuracy(row['actual_person_who_resolved'], row['recommendations'], k=5), axis=1).mean()

print("Test MAP score:", test_map_score)
print("Test Top-k Accuracy score:", test_topk_accuracy)

# Getting unique values
unique_values = test_data['recommendations'].apply(lambda x: list(set(x)))  # Remove duplicate values in each list
test_data['unique_top_5_person_who_resolved'] = unique_values.apply(lambda x: x[:5])  # Take only the first 5 unique values

print(test_data.head())

# Calculate overall MAP score for the updated test data
test_map_score_updated = test_data.apply(lambda row: calculate_map(row['actual_person_who_resolved'], row['unique_top_5_person_who_resolved']), axis=1).mean()

# Calculate overall Top-k Accuracy score for the updated test data
test_topk_accuracy_updated = test_data.apply(lambda row: calculate_topk_accuracy(row['actual_person_who_resolved'], row['unique_top_5_person_who_resolved'], k=5), axis=1).mean()

print("Updated Test MAP score:", test_map_score_updated)
print("Updated Test Top-k Accuracy score:", test_topk_accuracy_updated)