C8

 avatar
user_1718919
plain_text
4 months ago
7.6 kB
4
Indexable
import pandas as pd
import numpy as np
import os
from datetime import datetime

def clean_name(name):
    """Clean extra spaces from names"""
    if pd.isna(name):
        return name
    return ' '.join(str(name).split())

def process_tutor_report(file_path, staff_data_path):
    """Process the TutorPayReport and link with staff data"""
    
    print(f"\nReading tutor report from: {file_path}")
    print(f"Reading staff data from: {staff_data_path}")
    
    # Read the TutorPayReport
    df = pd.read_excel(file_path)
    print(f"\nTutor report shape: {df.shape}")
    
    # Clean name fields
    df['PEOPLENAME'] = df['PEOPLENAME'].apply(clean_name)
    df['PEOPLESURNAME'] = df['PEOPLESURNAME'].apply(clean_name)
    
    # Read the processed staff data for name matching
    staff_df = pd.read_csv(staff_data_path)
    print(f"\nStaff data shape: {staff_df.shape}")
    
    # Create standardized name format for matching
    df['Staff_Standard'] = df.apply(
        lambda x: f"{x['PEOPLENAME']} {x['PEOPLESURNAME']}", axis=1)
    
    print("\nSample of cleaned names from tutor report:")
    print(df[['Staff_Standard']].head())
    print("\nSample of names from staff data:")
    print(staff_df[['Staff_Standard']].head())
    
    # Filter for relevant staff roles
    df = df[df['STAFFROLE'].isin(['Tutor', 'Learning Support'])]
    
    # Process time adjustments
    def convert_ms_to_hours(ms_value):
        if pd.isna(ms_value) or ms_value == 0:
            return 0
        return abs(float(ms_value)) / (1000 * 60 * 60)
    
    # Convert time adjustments to hours, handling potential string values
    df['EarlyAdjust'] = pd.to_numeric(df['TIMEADJUSTEARLY'], errors='coerce').fillna(0)
    df['LateAdjust'] = pd.to_numeric(df['TIMEADJUSTLATE'], errors='coerce').fillna(0)
    df['EarlyAdjust'] = df['EarlyAdjust'].apply(convert_ms_to_hours)
    df['LateAdjust'] = df['LateAdjust'].apply(convert_ms_to_hours)
    
    # Calculate actual hours
    df['Potential Hours'] = pd.to_numeric(df['Potential Hours'], errors='coerce').fillna(0)
    df['Actual_Hours'] = df.apply(
        lambda x: max(0, x['Potential Hours'] - (x['EarlyAdjust'] + x['LateAdjust'])), 
        axis=1
    )
    
    # Group activity types
    assessment_types = [
        'Access Assessments', 'Assessments', 'Beauty OL Assess',
        'CFL PA Vols Assess', 'Creative Assessments', 'E&M ApprenticeAssess',
        'English FS Assess', 'English GCSE Assess', 'ESOL F2F Assess',
        'ESOL Online Assess', 'Hair/Barb F2F Assess', 'Maths Assessments',
        'SWiS Assessments'
    ]
    
    def categorize_activity(activity):
        if pd.isna(activity):
            return 'Other'
        activity = str(activity).strip()
        if activity in assessment_types:
            return 'Assessment'
        elif activity in ['Community Engagement', 'Tutorials/Drop Ins']:
            return activity
        return 'Other'
    
    df['Activity_Category'] = df['ACTIVITYTYPE'].apply(categorize_activity)
    
    # Convert event date to datetime
    df['EVENTDATE'] = pd.to_datetime(df['EVENTDATE'], format='%d %b %Y', errors='coerce')
    
    print("\nProcessing summaries...")
    
    # Create summary dataframes
    # 1. Daily summary
    daily_summary = df.groupby(
        ['Staff_Standard', 'EVENTDATE', 'Activity_Category']
    ).agg({
        'Actual_Hours': 'sum'
    }).reset_index()
    
    print(f"Daily summary records: {len(daily_summary)}")
    
    # 2. Weekly summary
    df['Week'] = df['EVENTDATE'].dt.isocalendar().week
    df['Year'] = df['EVENTDATE'].dt.isocalendar().year
    weekly_summary = df.groupby(
        ['Staff_Standard', 'Year', 'Week', 'Activity_Category']
    ).agg({
        'Actual_Hours': 'sum'
    }).reset_index()
    
    print(f"Weekly summary records: {len(weekly_summary)}")
    
    # 3. Monthly summary
    df['Month'] = df['EVENTDATE'].dt.month
    monthly_summary = df.groupby(
        ['Staff_Standard', 'Year', 'Month', 'Activity_Category']
    ).agg({
        'Actual_Hours': 'sum'
    }).reset_index()
    
    print(f"Monthly summary records: {len(monthly_summary)}")
    
    def calculate_utilization(group_df, target_hours, period_divisor):
        """Calculate utilization percentage"""
        non_assessment_hours = group_df[
            group_df['Activity_Category'] != 'Assessment'
        ]['Actual_Hours'].sum()
        
        period_target = target_hours / period_divisor
        return (non_assessment_hours / period_target * 100) if period_target > 0 else 0
    
    # Add utilization to monthly summary
    monthly_utilization = []
    
    for staff in monthly_summary['Staff_Standard'].unique():
        staff_records = staff_df[staff_df['Staff_Standard'] == staff]
        
        if len(staff_records) > 0:
            staff_target = staff_records['Target_Hours'].iloc[-1]
            
            staff_monthly = monthly_summary[
                monthly_summary['Staff_Standard'] == staff
            ].copy()
            
            if len(staff_monthly) > 0:
                staff_monthly['Utilization_Percentage'] = calculate_utilization(
                    staff_monthly, staff_target, 12
                )
                monthly_utilization.append(staff_monthly)
    
    if not monthly_utilization:
        print("\nWARNING: No monthly utilization data was generated!")
        return None
    
    monthly_summary_with_utilization = pd.concat(monthly_utilization)
    
    print("\nProcessing completed successfully!")
    
    return {
        'daily_summary': daily_summary,
        'weekly_summary': weekly_summary,
        'monthly_summary': monthly_summary_with_utilization,
        'detailed_data': df
    }

def save_processed_data(data_dict, output_folder):
    """Save all processed dataframes to CSV files"""
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    for name, df in data_dict.items():
        output_file = os.path.join(output_folder, f'{name}.csv')
        df.to_csv(output_file, index=False)
        print(f"Saved {name} to {output_file}")

# Usage
if __name__ == "__main__":
    # Print current directory and files for debugging
    current_dir = os.getcwd()
    print(f"Current working directory: {current_dir}")
    print("\nFiles in current directory:")
    for file in os.listdir(current_dir):
        print(f"- {file}")
    
    # Define paths
    input_file = "TutorPayReport_New 23.24 FY 22.11.24.xlsx"
    staff_data = os.path.join("processed_data", "staff_monthly_data.csv")
    output_folder = "processed_data"
    
    # Verify files exist
    if not os.path.exists(input_file):
        print(f"\nERROR: Could not find {input_file}")
        print("Please make sure the TutorPayReport file is in the same folder as this notebook")
        exit()
        
    if not os.path.exists(staff_data):
        print(f"\nERROR: Could not find {staff_data}")
        print("Please make sure the processed_data folder contains staff_monthly_data.csv")
        exit()
    
    # Process the data
    processed_data = process_tutor_report(input_file, staff_data)
    
    if processed_data is not None:
        # Save the results
        save_processed_data(processed_data, output_folder)
        
        print("\nSummary stats:")
        for name, df in processed_data.items():
            print(f"{name}: {len(df)} records")
    else:
        print("\nERROR: Data processing failed!")
Editor is loading...
Leave a Comment