Code

 avatar
user_1718919
plain_text
4 months ago
33 kB
3
Indexable
# Import required libraries
import pandas as pd
import numpy as np
import os
from datetime import datetime

# --- UTILITY FUNCTIONS ---
def clean_name(name):
    """Clean extra spaces from names"""
    if pd.isna(name):
        return name
    return ' '.join(str(name).split())

def extract_month_year(filename):
    """Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'"""
    date_str = filename.split()[-1].replace('.xlsx', '')
    return pd.to_datetime(date_str, format='%d.%m.%y')

def clean_qualification_number(qual_num):
    """Remove P or N suffix from qualification number"""
    if pd.isna(qual_num):
        return qual_num
    qual_num = str(qual_num).strip()
    if qual_num.endswith('P') or qual_num.endswith('N'):
        return qual_num[:-1]
    return qual_num

def convert_time_to_hours(time_str):
    """Convert time string in format H:MM or HH:MM:SS to decimal hours"""
    if pd.isna(time_str):
        return 0
    try:
        time_str = str(time_str).strip()
        parts = time_str.split(':')
        
        if len(parts) == 2:  # H:MM format
            hours = int(parts[0])
            minutes = int(parts[1])
            return hours + minutes/60
        elif len(parts) == 3:  # HH:MM:SS format
            hours = int(parts[0])
            minutes = int(parts[1])
            seconds = int(parts[2])
            return hours + minutes/60 + seconds/3600
        else:
            print(f"Unexpected time format: {time_str}")
            return 0
    except Exception as e:
        print(f"Error converting time {time_str}: {str(e)}")
        return 0

def track_employee_history(staff_data):
    """Track detailed employee contract history"""
    history = []
    print("\nTracking employee contract history...")
    
    for name in staff_data['Staff_Standard'].unique():
        employee_data = staff_data[staff_data['Staff_Standard'] == name].sort_values('Report_Date')
        
        # Track each contract period
        current_contract = None
        current_position = None
        period_start = None
        
        for _, row in employee_data.iterrows():
            if (current_contract != row['Contract_Type'] or 
                current_position != row['Position Name']):
                
                # Save previous period if exists
                if period_start is not None:
                    history.append({
                        'Staff_Name': name,
                        'Position': current_position,
                        'Contract_Type': current_contract,
                        'Start_Date': period_start,
                        'End_Date': row['Report_Date'],
                        'Target_Hours': previous_target,
                        'Status': 'Changed'
                    })
                
                # Start new period
                current_contract = row['Contract_Type']
                current_position = row['Position Name']
                period_start = row['Report_Date']
                previous_target = row['Target_Hours']
        
        # Add final period
        if period_start is not None:
            history.append({
                'Staff_Name': name,
                'Position': current_position,
                'Contract_Type': current_contract,
                'Start_Date': period_start,
                'End_Date': employee_data['Report_Date'].max(),
                'Target_Hours': previous_target,
                'Status': 'Current'
            })
    
    history_df = pd.DataFrame(history)
    print(f"Tracked {len(history_df)} contract periods for {len(staff_data['Staff_Standard'].unique())} employees")
    return history_df

def calculate_monthly_target(row, contract_history):
    """Calculate target hours based on contract type for specific month"""
    staff_contracts = contract_history[
        contract_history['Staff_Name'] == row['Staff_Standard']
    ]
    
    # Find applicable contract for this month
    relevant_contract = staff_contracts[
        (staff_contracts['Start_Date'] <= row['Report_Date']) &
        (staff_contracts['End_Date'] >= row['Report_Date'])
    ]
    
    if len(relevant_contract) > 0:
        if (relevant_contract.iloc[0]['Position'] == 'Tutor' and 
            relevant_contract.iloc[0]['Contract_Type'] == 'Salaried'):
            return relevant_contract.iloc[0]['Target_Hours']
    
    return None

# --- PART 1: PROCESS MONTHLY REPORTS ---
def process_monthly_reports(folder_path):
    """Process all monthly reports in the specified folder"""
    print("\nProcessing Monthly Reports...")
    all_data = []
    
    files = [f for f in os.listdir(folder_path) if f.startswith('ACL Monthly Report')]
    
    for file in files:
        print(f"Processing file: {file}")
        df = pd.read_excel(os.path.join(folder_path, file), skiprows=2)
        
        report_date = extract_month_year(file)
        df['Report_Date'] = report_date
        
        # Filter for Tutors and Learning Support Assistant
        mask = df['Position Name'].isin(['Learning Support Assistant', 'Tutor'])
        df = df[mask].copy()
        
        # Calculate Target Hours - only for salaried tutors
        def calculate_target_hours(row):
            if row['Position Name'] == 'Tutor' and float(row['Working Hours']) > 3:
                return float(row['Full-Time Equivalent']) * 840
            return None
            
        df['Target_Hours'] = df.apply(calculate_target_hours, axis=1)
        
        # Determine Contract Type
        df['Contract_Type'] = df['Working Hours'].apply(
            lambda x: 'Sessional' if float(x) <= 3 else 'Salaried'
        )
        
        # Process Line Manager names
        def split_manager_name(name):
            if pd.isna(name):
                return pd.NA, pd.NA, pd.NA
            parts = name.split(',')
            if len(parts) != 2:
                return name, name, name
            
            first_known = parts[0].strip()
            last = parts[1].strip()
            
            if ' ' in first_known:
                first, known = first_known.split(' ', 1)
            else:
                first = known = first_known
                
            return first, known, last
        
        # Create manager name formats
        df[['Manager_First', 'Manager_Known', 'Manager_Last']] = df['Line Manager Name'].apply(
            split_manager_name).apply(pd.Series)
        
        df['Manager_Standard'] = df.apply(
            lambda x: f"{x['Manager_First']} {x['Manager_Last']}" 
            if pd.notna(x['Manager_First']) else pd.NA, axis=1)
        df['Manager_Known_As'] = df.apply(
            lambda x: f"{x['Manager_Known']}, {x['Manager_Last']}"
            if pd.notna(x['Manager_Known']) else pd.NA, axis=1)
        
        # Create staff name formats
        df['Staff_Standard'] = df.apply(
            lambda x: f"{x['First Name']} {x['Last Name']}", axis=1)
        df['Staff_Known_As'] = df.apply(
            lambda x: f"{x['Known As']}, {x['Last Name']}", axis=1)
        
        columns_to_keep = [
            'Assignment Number', 'Staff_Standard', 'Staff_Known_As',
            'Position Name', 'Contract_Type', 'Target_Hours',
            'Manager_Standard', 'Manager_Known_As', 'Report_Date'
        ]
        
        all_data.append(df[columns_to_keep])
    
    combined_df = pd.concat(all_data, ignore_index=True)
    combined_df = combined_df.sort_values(['Report_Date', 'Staff_Standard'])
    
    # Track contract changes
    combined_df['Previous_Contract'] = combined_df.groupby('Staff_Standard')['Contract_Type'].shift(1)
    combined_df['Contract_Changed'] = (
        combined_df['Contract_Type'] != combined_df['Previous_Contract']
    )
    
    # Generate detailed contract history
    contract_history = track_employee_history(combined_df)
    
    # Create staff summary
    staff_summary = []
    for name in combined_df['Staff_Standard'].unique():
        staff_data = combined_df[combined_df['Staff_Standard'] == name].copy()
        
        first_date = staff_data['Report_Date'].min()
        last_date = staff_data['Report_Date'].max()
        
        # Get all contract changes
        contract_changes = contract_history[contract_history['Staff_Name'] == name].copy()
        
        staff_summary.append({
            'Staff_Name': name,
            'First_Appearance': first_date,
            'Last_Appearance': last_date,
            'Current_Position': staff_data.iloc[-1]['Position Name'],
            'Current_Contract': staff_data.iloc[-1]['Contract_Type'],
            'Number_Of_Contract_Changes': len(contract_changes) - 1,  # Subtract initial contract
            'Initial_Contract': contract_changes.iloc[0]['Contract_Type'],
            'Initial_Position': contract_changes.iloc[0]['Position'],
            'Contract_History': contract_changes.to_dict('records')
        })
    
    staff_summary_df = pd.DataFrame(staff_summary)
    
    print("Monthly reports processing completed!")
    return combined_df, staff_summary_df, contract_history

# --- PART 2: PROCESS TUTOR PAY REPORT ---
def process_tutor_report(file_path, staff_data_path, contract_history):
    """Process the TutorPayReport and link with staff data"""
    print("\nProcessing Tutor Pay Report...")
    
    df = pd.read_excel(file_path)
    staff_df = pd.read_csv(staff_data_path)
    
    df['PEOPLENAME'] = df['PEOPLENAME'].apply(clean_name)
    df['PEOPLESURNAME'] = df['PEOPLESURNAME'].apply(clean_name)
    df['Staff_Standard'] = df.apply(
        lambda x: f"{x['PEOPLENAME']} {x['PEOPLESURNAME']}", axis=1)
    
    df['STAFFROLE'] = df['STAFFROLE'].apply(clean_name)
    df = df[df['STAFFROLE'].isin(['Tutor', 'Learning Support'])]
    
    def convert_ms_to_hours(ms_value):
        if pd.isna(ms_value) or ms_value == 0:
            return 0
        return abs(float(ms_value)) / (1000 * 60 * 60)
    
    df['EarlyAdjust'] = pd.to_numeric(df['TIMEADJUSTEARLY'], errors='coerce').fillna(0)
    df['LateAdjust'] = pd.to_numeric(df['TIMEADJUSTLATE'], errors='coerce').fillna(0)
    df['EarlyAdjust'] = df['EarlyAdjust'].apply(convert_ms_to_hours)
    df['LateAdjust'] = df['LateAdjust'].apply(convert_ms_to_hours)
    
    df['Potential Hours'] = pd.to_numeric(df['Potential Hours'], errors='coerce').fillna(0)
    df['Actual_Hours'] = df.apply(
        lambda x: max(0, x['Potential Hours'] - (x['EarlyAdjust'] + x['LateAdjust'])), 
        axis=1
    )
    
    assessment_types = [
        'Access Assessments', 'Assessments', 'Beauty OL Assess',
        'CFL PA Vols Assess', 'Creative Assessments', 'E&M ApprenticeAssess',
        'English FS Assess', 'English GCSE Assess', 'ESOL F2F Assess',
        'ESOL Online Assess', 'Hair/Barb F2F Assess', 'Maths Assessments',
        'SWiS Assessments'
    ]
    
    def categorize_activity(activity):
        if pd.isna(activity):
            return 'Other'
        activity = str(activity).strip()
        if activity in assessment_types:
            return 'Assessment'
        elif activity in ['Community Engagement', 'Tutorials/Drop Ins']:
            return activity
        return 'Other'
    
    df['Activity_Category'] = df['ACTIVITYTYPE'].apply(categorize_activity)
    df['EVENTDATE'] = pd.to_datetime(df['EVENTDATE'], format='%d %b %Y', errors='coerce')
    
    # Create summaries
    daily_summary = df.groupby(
        ['Staff_Standard', 'EVENTDATE', 'Activity_Category']
    ).agg({
        'Actual_Hours': 'sum'
    }).reset_index()
    
    df['Week'] = df['EVENTDATE'].dt.isocalendar().week
    df['Year'] = df['EVENTDATE'].dt.isocalendar().year
    weekly_summary = df.groupby(
        ['Staff_Standard', 'Year', 'Week', 'Activity_Category']
    ).agg({
        'Actual_Hours': 'sum'
    }).reset_index()
    
    df['Month'] = df['EVENTDATE'].dt.month
    monthly_summary = df.groupby(
        ['Staff_Standard', 'Year', 'Month', 'Activity_Category']
    ).agg({
        'Actual_Hours': 'sum'
    }).reset_index()
    
    def calculate_utilization(group_df, contract_info):
        """Calculate utilization percentage based on contract status"""
        if pd.isna(contract_info['Target_Hours']) or contract_info['Target_Hours'] == 0:
            return None
            
        non_assessment_hours = group_df[
            group_df['Activity_Category'] != 'Assessment'
        ]['Actual_Hours'].sum()
        
        # Calculate period target based on contract duration
        contract_start = pd.to_datetime(contract_info['Start_Date'])
        contract_end = pd.to_datetime(contract_info['End_Date'])
        contract_months = ((contract_end.year - contract_start.year) * 12 + 
                         contract_end.month - contract_start.month + 1)
        
        period_target = contract_info['Target_Hours'] / contract_months
        
        utilization = (non_assessment_hours / period_target * 100) if period_target > 0 else None
        print(f"Non-assessment hours: {non_assessment_hours}")
        print(f"Period target: {period_target}")
        print(f"Calculated utilization: {utilization}%")
        return utilization
    
    # Add utilization to monthly summary
    monthly_utilization = []
    print("\nProcessing utilization calculations...")
    
    for staff in monthly_summary['Staff_Standard'].unique():
        print(f"\nProcessing staff member: {staff}")
        
        # Get contract history for this staff member
        staff_contracts = contract_history[
            contract_history['Staff_Name'] == staff
        ].sort_values('Start_Date')
        
        if len(staff_contracts) > 0:
            staff_monthly = monthly_summary[
                monthly_summary['Staff_Standard'] == staff
            ].copy()
            
            if len(staff_monthly) > 0:
                print(f"Found {len(staff_monthly)} monthly records")
                
                # Process each month based on contract at that time
                for idx, month_data in staff_monthly.iterrows():
                    month_date = pd.Timestamp(year=month_data['Year'], 
                                           month=month_data['Month'], 
                                           day=1)
                    
                    # Find applicable contract
                    relevant_contract = staff_contracts[
                        (staff_contracts['Start_Date'] <= month_date) &
                        (staff_contracts['End_Date'] >= month_date)
                    ]
                    
                    if len(relevant_contract) > 0:
                        contract = relevant_contract.iloc[0]
                        print(f"Contract for {month_date.strftime('%B %Y')}:")
                        print(f"Position: {contract['Position']}")
                        print(f"Contract Type: {contract['Contract_Type']}")
                        print(f"Target Hours: {contract['Target_Hours']}")
                        
                        staff_monthly.at[idx, 'Position'] = contract['Position']
                        staff_monthly.at[idx, 'Contract_Type'] = contract['Contract_Type']
                        staff_monthly.at[idx, 'Target_Hours'] = contract['Target_Hours']
                        staff_monthly.at[idx, 'Utilization_Percentage'] = calculate_utilization(
                            staff_monthly[staff_monthly.index == idx],
                            contract
                        )
                    else:
                        print(f"No contract found for {month_date.strftime('%B %Y')}")
                        staff_monthly.at[idx, 'Utilization_Percentage'] = None
                
                monthly_utilization.append(staff_monthly)
    
    if not monthly_utilization:
        print("WARNING: No monthly utilization data generated!")
        return None
        
    monthly_summary_with_utilization = pd.concat(monthly_utilization)
    
    print("Tutor pay report processing completed!")
    return {
        'daily_summary': daily_summary,
        'weekly_summary': weekly_summary,
        'monthly_summary': monthly_summary_with_utilization,
        'detailed_data': df
    }

# --- PART 3: QUALIFICATION MAPPING ---
def process_qualification_lists(approved_file, archived_file):
    """Process approved and archived qualification lists"""
    print("\nProcessing qualification lists...")
    
    # Read approved list
    approved_df = pd.read_excel(approved_file)
    approved_df = approved_df[['QualificationName', 'QualificationNumber']].copy()
    approved_df['Source'] = 'Approved'
    
    # Read archived list
    archived_df = pd.read_excel(archived_file)
    archived_df = archived_df[['QualificationName', 'QualificationNumber']].copy()
    archived_df['Source'] = 'Archived'
    
    # Combine lists
    qual_df = pd.concat([approved_df, archived_df], ignore_index=True)
    
    # Clean qualification numbers
    qual_df['QualificationNumber'] = qual_df['QualificationNumber'].apply(clean_qualification_number)
    
    # Remove duplicates, keeping 'Approved' version if exists
    qual_df = qual_df.sort_values('Source', ascending=False).drop_duplicates('QualificationNumber', keep='first')
    
    print(f"Processed {len(qual_df)} unique qualifications")
    return qual_df

def process_terms_caseloading(terms_file, qualification_df, contract_history):
    """Process Terms caseloading data and link with qualification information"""
    print("\nProcessing Terms caseloading data...")
    
    # Read Terms caseloading data
    terms_df = pd.read_excel(terms_file)
    
    # Select and rename relevant columns
    cols_to_keep = {
        'Primary Staff Name': 'Staff_First_Name',
        'Primary Staff Surname': 'Staff_Last_Name',
        'Start Date': 'Start_Date',
        'End Date': 'End_Date',
        'Activity Name': 'Activity_Name',
        'Funding Reference': 'Qualification_Number',
        'Level': 'Level',
        'Activity Group': 'Activity_Group',
        'Name': 'Learner_First_Name',
        'Surname': 'Learner_Last_Name'
    }
    
    terms_df = terms_df[cols_to_keep.keys()].copy()
    terms_df.rename(columns=cols_to_keep, inplace=True)
    
    # Clean qualification numbers
    terms_df['Qualification_Number'] = terms_df['Qualification_Number'].apply(clean_qualification_number)
    
    # Create staff standard name format
    terms_df['Staff_Standard'] = terms_df.apply(
        lambda x: f"{x['Staff_First_Name']} {x['Staff_Last_Name']}", axis=1
    )
    
    # Convert dates
    date_columns = ['Start_Date', 'End_Date']
    for col in date_columns:
        terms_df[col] = pd.to_datetime(terms_df[col], errors='coerce')
    
    # Look up qualification names
    def get_qualification_name(row):
        if pd.notna(row['Qualification_Number']):
            qual_match = qualification_df[
                qualification_df['QualificationNumber'] == row['Qualification_Number']
            ]
            if len(qual_match) > 0:
                return qual_match.iloc[0]['QualificationName']
        return row['Activity_Name']
    
    terms_df['Qualification_Name'] = terms_df.apply(get_qualification_name, axis=1)
    
    # Add contract information based on date
    def get_contract_info(row):
        staff_contracts = contract_history[
            contract_history['Staff_Name'] == row['Staff_Standard']
        ]
        
        relevant_contract = staff_contracts[
            (staff_contracts['Start_Date'] <= row['Start_Date']) &
            (staff_contracts['End_Date'] >= row['Start_Date'])
        ]
        
        if len(relevant_contract) > 0:
            return pd.Series({
                'Staff_Position': relevant_contract.iloc[0]['Position'],
                'Staff_Contract': relevant_contract.iloc[0]['Contract_Type'],
                'Staff_Target_Hours': relevant_contract.iloc[0]['Target_Hours']
            })
        return pd.Series({
            'Staff_Position': None,
            'Staff_Contract': None,
            'Staff_Target_Hours': None
        })
    
    # Add contract details
    contract_details = terms_df.apply(get_contract_info, axis=1)
    terms_df = pd.concat([terms_df, contract_details], axis=1)
    
    # Create summary of learners per staff member
    learner_summary = terms_df.groupby('Staff_Standard').agg({
        'Learner_First_Name': 'count'
    }).reset_index()
    
    # Add level counts as separate columns
    level_pivot = terms_df.pivot_table(
        index='Staff_Standard',
        columns='Level',
        values='Learner_First_Name',
        aggfunc='count',
        fill_value=0
    ).reset_index()
    level_pivot.columns = ['Staff_Standard'] + [f'Level_{col}_Count' for col in level_pivot.columns[1:]]
    
    # Add activity group counts as separate columns
    activity_pivot = terms_df.pivot_table(
        index='Staff_Standard',
        columns='Activity_Group',
        values='Learner_First_Name',
        aggfunc='count',
        fill_value=0
    ).reset_index()
    activity_pivot.columns = ['Staff_Standard'] + [f'Activity_{col.replace(" ", "_")}_Count' for col in activity_pivot.columns[1:]]
    
    # Merge all summaries
    learner_summary = (
        learner_summary.merge(level_pivot, on='Staff_Standard', how='left')
        .merge(activity_pivot, on='Staff_Standard', how='left')
    )
    
    learner_summary.rename(columns={
        'Learner_First_Name': 'Total_Learners'
    }, inplace=True)
    
    # Create a qualification summary with contract information
    qual_summary = terms_df.groupby(
        ['Staff_Standard', 'Qualification_Name', 'Level', 'Staff_Position', 'Staff_Contract']
    ).agg({
        'Learner_First_Name': 'count',
        'Staff_Target_Hours': 'first'
    }).reset_index()
    
    qual_summary.rename(columns={
        'Learner_First_Name': 'Learners_In_Qualification'
    }, inplace=True)
    
    print("Terms caseloading processing completed!")
    return {
        'terms_detailed': terms_df,
        'learner_summary': learner_summary,
        'qualification_summary': qual_summary,
        'qualification_reference': qualification_df
    }

# --- PART 4: PICS DATA PROCESSING ---
def process_pics_data(caseload_file, hours_lookup_file, contract_history):
    """Process PICS caseload data and hours lookup"""
    print("\nProcessing PICS data...")
    
    # Read hours lookup first
    print("Reading hours lookup data...")
    hours_df = pd.read_excel(hours_lookup_file, skiprows=2)  # Skip first two rows
    
    # Process hours lookup
    hours_lookup = hours_df[[
        'Standard title', 'Weighted Monthly Hours (1.6)'
    ]].copy()
    
    print("\nSample of original hours data:")
    print(hours_lookup.head())
    
    # Convert time format to decimal hours
    hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(
        convert_time_to_hours
    )
    
    print("\nSample after conversion to decimal hours:")
    print(hours_lookup.head())
    
    # Verify we have non-zero hours
    zero_hours = hours_lookup['Monthly_Hours'].eq(0).sum()
    if zero_hours > 0:
        print(f"\nWARNING: Found {zero_hours} entries with zero hours!")
        print("\nSample of zero-hour entries:")
        print(hours_lookup[hours_lookup['Monthly_Hours'] == 0].head())
    
    # Clean and standardize titles
    hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip()
    
    print(f"Processed {len(hours_lookup)} standard titles with hours")
    
    # Read caseload data
    print("\nReading PICS caseload data...")
    caseload_df = pd.read_excel(caseload_file)
    
    # Select and rename relevant columns
    cols_to_keep = {
        'Assessor Full Name': 'Assessor_Name',
        'Programme': 'Programme_Level',
        'Apprenticeship Standard Title': 'Standard_Title',
        'Apprenticeship Achieved Date': 'Achieved_Date',
        'Start Date': 'Start_Date',
        'Learning Expected End': 'Expected_End',
        'Actual End': 'Actual_End'
    }
    
    df = caseload_df[cols_to_keep.keys()].copy()
    df.rename(columns=cols_to_keep, inplace=True)
    
    # Convert dates
    date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End']
    for col in date_columns:
        df[col] = pd.to_datetime(df[col], errors='coerce')
    
    # Clean titles for matching with lookup
    df['Standard_Title'] = df['Standard_Title'].str.strip()
    
    # Add hours from lookup
    df = df.merge(
        hours_lookup[['Standard_Title', 'Monthly_Hours']], 
        on='Standard_Title', 
        how='left'
    )
    
    # Fill missing hours with 0
    df['Monthly_Hours'] = df['Monthly_Hours'].fillna(0)
    
    # Add contract information for assessors
    def get_assessor_contract(row, date):
        staff_contracts = contract_history[
            contract_history['Staff_Name'] == row['Assessor_Name']
        ]
        
        relevant_contract = staff_contracts[
            (staff_contracts['Start_Date'] <= date) &
            (staff_contracts['End_Date'] >= date)
        ]
        
        if len(relevant_contract) > 0:
            return pd.Series({
                'Assessor_Position': relevant_contract.iloc[0]['Position'],
                'Assessor_Contract': relevant_contract.iloc[0]['Contract_Type'],
                'Assessor_Target_Hours': relevant_contract.iloc[0]['Target_Hours']
            })
        return pd.Series({
            'Assessor_Position': None,
            'Assessor_Contract': None,
            'Assessor_Target_Hours': None
        })
    
    # Function to check if a student was active in a given month
    def was_active_in_month(row, month_start, month_end):
        """
        Check if student was active during any part of the month
        month_start: First day of the month
        month_end: Last day of the month
        """
        start_date = row['Start_Date']
        end_date = row['Actual_End'] if pd.notna(row['Actual_End']) else row['Expected_End']
        
        if pd.isna(start_date) or pd.isna(end_date):
            return False
            
        # Student is active if their period overlaps with the month at all
        return (start_date <= month_end) and (end_date >= month_start)
    
    # Create monthly snapshots for FY23/24
    monthly_data = []
    
    # Generate dates for FY23/24
    dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M')
    
    print("\nGenerating monthly snapshots...")
    for date in dates:
        month_start = date.replace(day=1)
        month_end = date
        
        print(f"\nProcessing month: {month_start.strftime('%B %Y')}")
        print(f"Month range: {month_start.strftime('%Y-%m-%d')} to {month_end.strftime('%Y-%m-%d')}")
        
        # Filter for active students in this month
        month_mask = df.apply(lambda row: was_active_in_month(row, month_start, month_end), axis=1)
        month_data = df[month_mask].copy()
        
        # Add month info
        month_data['Snapshot_Date'] = month_end
        month_data['Year'] = month_end.year
        month_data['Month'] = month_end.month
        
        # Add contract info for this month
        contract_details = month_data.apply(
            lambda row: get_assessor_contract(row, month_end), 
            axis=1
        )
        month_data = pd.concat([month_data, contract_details], axis=1)
        
        active_count = len(month_data)
        print(f"Active students in this month: {active_count}")
        
        monthly_data.append(month_data)
    
    monthly_df = pd.concat(monthly_data, ignore_index=True)
    
    # Create summaries
    # 1. Monthly summary per assessor
    monthly_summary = monthly_df.groupby(
        ['Assessor_Name', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract']
    ).agg({
        'Standard_Title': 'count',  # Number of active students
        'Monthly_Hours': 'sum',     # Total hours needed
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    monthly_summary.rename(columns={
        'Standard_Title': 'Active_Students'
    }, inplace=True)
    
    # 2. Programme level summary
    programme_summary = monthly_df.groupby(
        ['Assessor_Name', 'Programme_Level', 'Year', 'Month', 
         'Assessor_Position', 'Assessor_Contract']
    ).agg({
        'Standard_Title': 'count',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    programme_summary.rename(columns={
        'Standard_Title': 'Students_In_Programme'
    }, inplace=True)
    
    # 3. Standard title summary
    standard_summary = monthly_df.groupby(
        ['Assessor_Name', 'Standard_Title', 'Year', 'Month',
         'Assessor_Position', 'Assessor_Contract']
    ).agg({
        'Monthly_Hours': 'sum',
        'Snapshot_Date': 'count',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    standard_summary.rename(columns={
        'Snapshot_Date': 'Students_In_Standard',
        'Monthly_Hours': 'Required_Hours'
    }, inplace=True)
    
    print("PICS data processing completed!")
    
    return {
        'detailed_monthly': monthly_df,
        'monthly_summary': monthly_summary,
        'programme_summary': programme_summary,
        'standard_summary': standard_summary,
        'hours_reference': hours_lookup
    }

# Execute processing
if __name__ == "__main__":
    # Get current directory
    current_dir = os.getcwd()
    monthly_reports_folder = os.path.join(current_dir, "monthly_reports")
    output_folder = os.path.join(current_dir, "processed_data")
    
    print(f"Processing data from: {current_dir}")
    
    try:
        # --- Process Monthly Reports ---
        print("\nStep 1: Processing Monthly Reports")
        staff_data, staff_summary, contract_history = process_monthly_reports(monthly_reports_folder)
        
        # Save staff data
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)
        staff_data.to_csv(os.path.join(output_folder, 'staff_monthly_data.csv'), index=False)
        staff_summary.to_csv(os.path.join(output_folder, 'staff_summary.csv'), index=False)
        contract_history.to_csv(os.path.join(output_folder, 'contract_history.csv'), index=False)
        
        # --- Process Tutor Pay Report ---
        print("\nStep 2: Processing Tutor Pay Report")
        tutor_report_data = process_tutor_report(
            "TutorPayReport_New 23.24 FY 22.11.24.xlsx",
            os.path.join(output_folder, 'staff_monthly_data.csv'),
            contract_history
        )
        
        if tutor_report_data is not None:
            for name, df in tutor_report_data.items():
                output_file = os.path.join(output_folder, f'{name}.csv')
                df.to_csv(output_file, index=False)
                print(f"Saved {name} to {output_file}")
        
        # --- Process Qualification Data ---
        print("\nStep 3: Processing Qualification Data")
        # Define input files
        approved_file = "Approved List for Terms Caseloading FY2324.xlsx"
        archived_file = "Archived List for Terms Caseloading FY2324.xlsx"
        terms_file = "Terms caseloading 23.24 FY.xlsx"
        
        # Process qualification lists
        qualification_data = process_qualification_lists(approved_file, archived_file)
        
        # Process Terms caseloading
        terms_data = process_terms_caseloading(terms_file, qualification_data, contract_history)
        
        # Save qualification data results
        for name, df in terms_data.items():
            output_file = os.path.join(output_folder, f'{name}.csv')
            df.to_csv(output_file, index=False)
            print(f"Saved {name} to {output_file}")
        
        # --- Process PICS Data ---
        print("\nStep 4: Processing PICS Data")
        # Define PICS input files
        caseload_file = "PICS caseload for PBI.xlsx"
        hours_lookup_file = "PICS Hours for Assessor Look up.xlsx"
        
        # Process PICS data
        pics_data = process_pics_data(caseload_file, hours_lookup_file, contract_history)
        
        # Save PICS data results
        for name, df in pics_data.items():
            output_file = os.path.join(output_folder, f'pics_{name}.csv')
            df.to_csv(output_file, index=False)
            print(f"Saved {name} to {output_file}")
        
        print("\nAll processing completed successfully!")
        
    except Exception as e:
        print(f"\nError during processing: {str(e)}")
        raise
Editor is loading...
Leave a Comment