Code
user_1718919
plain_text
4 months ago
33 kB
3
Indexable
# Import required libraries import pandas as pd import numpy as np import os from datetime import datetime # --- UTILITY FUNCTIONS --- def clean_name(name): """Clean extra spaces from names""" if pd.isna(name): return name return ' '.join(str(name).split()) def extract_month_year(filename): """Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'""" date_str = filename.split()[-1].replace('.xlsx', '') return pd.to_datetime(date_str, format='%d.%m.%y') def clean_qualification_number(qual_num): """Remove P or N suffix from qualification number""" if pd.isna(qual_num): return qual_num qual_num = str(qual_num).strip() if qual_num.endswith('P') or qual_num.endswith('N'): return qual_num[:-1] return qual_num def convert_time_to_hours(time_str): """Convert time string in format H:MM or HH:MM:SS to decimal hours""" if pd.isna(time_str): return 0 try: time_str = str(time_str).strip() parts = time_str.split(':') if len(parts) == 2: # H:MM format hours = int(parts[0]) minutes = int(parts[1]) return hours + minutes/60 elif len(parts) == 3: # HH:MM:SS format hours = int(parts[0]) minutes = int(parts[1]) seconds = int(parts[2]) return hours + minutes/60 + seconds/3600 else: print(f"Unexpected time format: {time_str}") return 0 except Exception as e: print(f"Error converting time {time_str}: {str(e)}") return 0 def track_employee_history(staff_data): """Track detailed employee contract history""" history = [] print("\nTracking employee contract history...") for name in staff_data['Staff_Standard'].unique(): employee_data = staff_data[staff_data['Staff_Standard'] == name].sort_values('Report_Date') # Track each contract period current_contract = None current_position = None period_start = None for _, row in employee_data.iterrows(): if (current_contract != row['Contract_Type'] or current_position != row['Position Name']): # Save previous period if exists if period_start is not None: history.append({ 'Staff_Name': name, 'Position': current_position, 'Contract_Type': current_contract, 'Start_Date': period_start, 'End_Date': row['Report_Date'], 'Target_Hours': previous_target, 'Status': 'Changed' }) # Start new period current_contract = row['Contract_Type'] current_position = row['Position Name'] period_start = row['Report_Date'] previous_target = row['Target_Hours'] # Add final period if period_start is not None: history.append({ 'Staff_Name': name, 'Position': current_position, 'Contract_Type': current_contract, 'Start_Date': period_start, 'End_Date': employee_data['Report_Date'].max(), 'Target_Hours': previous_target, 'Status': 'Current' }) history_df = pd.DataFrame(history) print(f"Tracked {len(history_df)} contract periods for {len(staff_data['Staff_Standard'].unique())} employees") return history_df def calculate_monthly_target(row, contract_history): """Calculate target hours based on contract type for specific month""" staff_contracts = contract_history[ contract_history['Staff_Name'] == row['Staff_Standard'] ] # Find applicable contract for this month relevant_contract = staff_contracts[ (staff_contracts['Start_Date'] <= row['Report_Date']) & (staff_contracts['End_Date'] >= row['Report_Date']) ] if len(relevant_contract) > 0: if (relevant_contract.iloc[0]['Position'] == 'Tutor' and relevant_contract.iloc[0]['Contract_Type'] == 'Salaried'): return relevant_contract.iloc[0]['Target_Hours'] return None # --- PART 1: PROCESS MONTHLY REPORTS --- def process_monthly_reports(folder_path): """Process all monthly reports in the specified folder""" print("\nProcessing Monthly Reports...") all_data = [] files = [f for f in os.listdir(folder_path) if f.startswith('ACL Monthly Report')] for file in files: print(f"Processing file: {file}") df = pd.read_excel(os.path.join(folder_path, file), skiprows=2) report_date = extract_month_year(file) df['Report_Date'] = report_date # Filter for Tutors and Learning Support Assistant mask = df['Position Name'].isin(['Learning Support Assistant', 'Tutor']) df = df[mask].copy() # Calculate Target Hours - only for salaried tutors def calculate_target_hours(row): if row['Position Name'] == 'Tutor' and float(row['Working Hours']) > 3: return float(row['Full-Time Equivalent']) * 840 return None df['Target_Hours'] = df.apply(calculate_target_hours, axis=1) # Determine Contract Type df['Contract_Type'] = df['Working Hours'].apply( lambda x: 'Sessional' if float(x) <= 3 else 'Salaried' ) # Process Line Manager names def split_manager_name(name): if pd.isna(name): return pd.NA, pd.NA, pd.NA parts = name.split(',') if len(parts) != 2: return name, name, name first_known = parts[0].strip() last = parts[1].strip() if ' ' in first_known: first, known = first_known.split(' ', 1) else: first = known = first_known return first, known, last # Create manager name formats df[['Manager_First', 'Manager_Known', 'Manager_Last']] = df['Line Manager Name'].apply( split_manager_name).apply(pd.Series) df['Manager_Standard'] = df.apply( lambda x: f"{x['Manager_First']} {x['Manager_Last']}" if pd.notna(x['Manager_First']) else pd.NA, axis=1) df['Manager_Known_As'] = df.apply( lambda x: f"{x['Manager_Known']}, {x['Manager_Last']}" if pd.notna(x['Manager_Known']) else pd.NA, axis=1) # Create staff name formats df['Staff_Standard'] = df.apply( lambda x: f"{x['First Name']} {x['Last Name']}", axis=1) df['Staff_Known_As'] = df.apply( lambda x: f"{x['Known As']}, {x['Last Name']}", axis=1) columns_to_keep = [ 'Assignment Number', 'Staff_Standard', 'Staff_Known_As', 'Position Name', 'Contract_Type', 'Target_Hours', 'Manager_Standard', 'Manager_Known_As', 'Report_Date' ] all_data.append(df[columns_to_keep]) combined_df = pd.concat(all_data, ignore_index=True) combined_df = combined_df.sort_values(['Report_Date', 'Staff_Standard']) # Track contract changes combined_df['Previous_Contract'] = combined_df.groupby('Staff_Standard')['Contract_Type'].shift(1) combined_df['Contract_Changed'] = ( combined_df['Contract_Type'] != combined_df['Previous_Contract'] ) # Generate detailed contract history contract_history = track_employee_history(combined_df) # Create staff summary staff_summary = [] for name in combined_df['Staff_Standard'].unique(): staff_data = combined_df[combined_df['Staff_Standard'] == name].copy() first_date = staff_data['Report_Date'].min() last_date = staff_data['Report_Date'].max() # Get all contract changes contract_changes = contract_history[contract_history['Staff_Name'] == name].copy() staff_summary.append({ 'Staff_Name': name, 'First_Appearance': first_date, 'Last_Appearance': last_date, 'Current_Position': staff_data.iloc[-1]['Position Name'], 'Current_Contract': staff_data.iloc[-1]['Contract_Type'], 'Number_Of_Contract_Changes': len(contract_changes) - 1, # Subtract initial contract 'Initial_Contract': contract_changes.iloc[0]['Contract_Type'], 'Initial_Position': contract_changes.iloc[0]['Position'], 'Contract_History': contract_changes.to_dict('records') }) staff_summary_df = pd.DataFrame(staff_summary) print("Monthly reports processing completed!") return combined_df, staff_summary_df, contract_history # --- PART 2: PROCESS TUTOR PAY REPORT --- def process_tutor_report(file_path, staff_data_path, contract_history): """Process the TutorPayReport and link with staff data""" print("\nProcessing Tutor Pay Report...") df = pd.read_excel(file_path) staff_df = pd.read_csv(staff_data_path) df['PEOPLENAME'] = df['PEOPLENAME'].apply(clean_name) df['PEOPLESURNAME'] = df['PEOPLESURNAME'].apply(clean_name) df['Staff_Standard'] = df.apply( lambda x: f"{x['PEOPLENAME']} {x['PEOPLESURNAME']}", axis=1) df['STAFFROLE'] = df['STAFFROLE'].apply(clean_name) df = df[df['STAFFROLE'].isin(['Tutor', 'Learning Support'])] def convert_ms_to_hours(ms_value): if pd.isna(ms_value) or ms_value == 0: return 0 return abs(float(ms_value)) / (1000 * 60 * 60) df['EarlyAdjust'] = pd.to_numeric(df['TIMEADJUSTEARLY'], errors='coerce').fillna(0) df['LateAdjust'] = pd.to_numeric(df['TIMEADJUSTLATE'], errors='coerce').fillna(0) df['EarlyAdjust'] = df['EarlyAdjust'].apply(convert_ms_to_hours) df['LateAdjust'] = df['LateAdjust'].apply(convert_ms_to_hours) df['Potential Hours'] = pd.to_numeric(df['Potential Hours'], errors='coerce').fillna(0) df['Actual_Hours'] = df.apply( lambda x: max(0, x['Potential Hours'] - (x['EarlyAdjust'] + x['LateAdjust'])), axis=1 ) assessment_types = [ 'Access Assessments', 'Assessments', 'Beauty OL Assess', 'CFL PA Vols Assess', 'Creative Assessments', 'E&M ApprenticeAssess', 'English FS Assess', 'English GCSE Assess', 'ESOL F2F Assess', 'ESOL Online Assess', 'Hair/Barb F2F Assess', 'Maths Assessments', 'SWiS Assessments' ] def categorize_activity(activity): if pd.isna(activity): return 'Other' activity = str(activity).strip() if activity in assessment_types: return 'Assessment' elif activity in ['Community Engagement', 'Tutorials/Drop Ins']: return activity return 'Other' df['Activity_Category'] = df['ACTIVITYTYPE'].apply(categorize_activity) df['EVENTDATE'] = pd.to_datetime(df['EVENTDATE'], format='%d %b %Y', errors='coerce') # Create summaries daily_summary = df.groupby( ['Staff_Standard', 'EVENTDATE', 'Activity_Category'] ).agg({ 'Actual_Hours': 'sum' }).reset_index() df['Week'] = df['EVENTDATE'].dt.isocalendar().week df['Year'] = df['EVENTDATE'].dt.isocalendar().year weekly_summary = df.groupby( ['Staff_Standard', 'Year', 'Week', 'Activity_Category'] ).agg({ 'Actual_Hours': 'sum' }).reset_index() df['Month'] = df['EVENTDATE'].dt.month monthly_summary = df.groupby( ['Staff_Standard', 'Year', 'Month', 'Activity_Category'] ).agg({ 'Actual_Hours': 'sum' }).reset_index() def calculate_utilization(group_df, contract_info): """Calculate utilization percentage based on contract status""" if pd.isna(contract_info['Target_Hours']) or contract_info['Target_Hours'] == 0: return None non_assessment_hours = group_df[ group_df['Activity_Category'] != 'Assessment' ]['Actual_Hours'].sum() # Calculate period target based on contract duration contract_start = pd.to_datetime(contract_info['Start_Date']) contract_end = pd.to_datetime(contract_info['End_Date']) contract_months = ((contract_end.year - contract_start.year) * 12 + contract_end.month - contract_start.month + 1) period_target = contract_info['Target_Hours'] / contract_months utilization = (non_assessment_hours / period_target * 100) if period_target > 0 else None print(f"Non-assessment hours: {non_assessment_hours}") print(f"Period target: {period_target}") print(f"Calculated utilization: {utilization}%") return utilization # Add utilization to monthly summary monthly_utilization = [] print("\nProcessing utilization calculations...") for staff in monthly_summary['Staff_Standard'].unique(): print(f"\nProcessing staff member: {staff}") # Get contract history for this staff member staff_contracts = contract_history[ contract_history['Staff_Name'] == staff ].sort_values('Start_Date') if len(staff_contracts) > 0: staff_monthly = monthly_summary[ monthly_summary['Staff_Standard'] == staff ].copy() if len(staff_monthly) > 0: print(f"Found {len(staff_monthly)} monthly records") # Process each month based on contract at that time for idx, month_data in staff_monthly.iterrows(): month_date = pd.Timestamp(year=month_data['Year'], month=month_data['Month'], day=1) # Find applicable contract relevant_contract = staff_contracts[ (staff_contracts['Start_Date'] <= month_date) & (staff_contracts['End_Date'] >= month_date) ] if len(relevant_contract) > 0: contract = relevant_contract.iloc[0] print(f"Contract for {month_date.strftime('%B %Y')}:") print(f"Position: {contract['Position']}") print(f"Contract Type: {contract['Contract_Type']}") print(f"Target Hours: {contract['Target_Hours']}") staff_monthly.at[idx, 'Position'] = contract['Position'] staff_monthly.at[idx, 'Contract_Type'] = contract['Contract_Type'] staff_monthly.at[idx, 'Target_Hours'] = contract['Target_Hours'] staff_monthly.at[idx, 'Utilization_Percentage'] = calculate_utilization( staff_monthly[staff_monthly.index == idx], contract ) else: print(f"No contract found for {month_date.strftime('%B %Y')}") staff_monthly.at[idx, 'Utilization_Percentage'] = None monthly_utilization.append(staff_monthly) if not monthly_utilization: print("WARNING: No monthly utilization data generated!") return None monthly_summary_with_utilization = pd.concat(monthly_utilization) print("Tutor pay report processing completed!") return { 'daily_summary': daily_summary, 'weekly_summary': weekly_summary, 'monthly_summary': monthly_summary_with_utilization, 'detailed_data': df } # --- PART 3: QUALIFICATION MAPPING --- def process_qualification_lists(approved_file, archived_file): """Process approved and archived qualification lists""" print("\nProcessing qualification lists...") # Read approved list approved_df = pd.read_excel(approved_file) approved_df = approved_df[['QualificationName', 'QualificationNumber']].copy() approved_df['Source'] = 'Approved' # Read archived list archived_df = pd.read_excel(archived_file) archived_df = archived_df[['QualificationName', 'QualificationNumber']].copy() archived_df['Source'] = 'Archived' # Combine lists qual_df = pd.concat([approved_df, archived_df], ignore_index=True) # Clean qualification numbers qual_df['QualificationNumber'] = qual_df['QualificationNumber'].apply(clean_qualification_number) # Remove duplicates, keeping 'Approved' version if exists qual_df = qual_df.sort_values('Source', ascending=False).drop_duplicates('QualificationNumber', keep='first') print(f"Processed {len(qual_df)} unique qualifications") return qual_df def process_terms_caseloading(terms_file, qualification_df, contract_history): """Process Terms caseloading data and link with qualification information""" print("\nProcessing Terms caseloading data...") # Read Terms caseloading data terms_df = pd.read_excel(terms_file) # Select and rename relevant columns cols_to_keep = { 'Primary Staff Name': 'Staff_First_Name', 'Primary Staff Surname': 'Staff_Last_Name', 'Start Date': 'Start_Date', 'End Date': 'End_Date', 'Activity Name': 'Activity_Name', 'Funding Reference': 'Qualification_Number', 'Level': 'Level', 'Activity Group': 'Activity_Group', 'Name': 'Learner_First_Name', 'Surname': 'Learner_Last_Name' } terms_df = terms_df[cols_to_keep.keys()].copy() terms_df.rename(columns=cols_to_keep, inplace=True) # Clean qualification numbers terms_df['Qualification_Number'] = terms_df['Qualification_Number'].apply(clean_qualification_number) # Create staff standard name format terms_df['Staff_Standard'] = terms_df.apply( lambda x: f"{x['Staff_First_Name']} {x['Staff_Last_Name']}", axis=1 ) # Convert dates date_columns = ['Start_Date', 'End_Date'] for col in date_columns: terms_df[col] = pd.to_datetime(terms_df[col], errors='coerce') # Look up qualification names def get_qualification_name(row): if pd.notna(row['Qualification_Number']): qual_match = qualification_df[ qualification_df['QualificationNumber'] == row['Qualification_Number'] ] if len(qual_match) > 0: return qual_match.iloc[0]['QualificationName'] return row['Activity_Name'] terms_df['Qualification_Name'] = terms_df.apply(get_qualification_name, axis=1) # Add contract information based on date def get_contract_info(row): staff_contracts = contract_history[ contract_history['Staff_Name'] == row['Staff_Standard'] ] relevant_contract = staff_contracts[ (staff_contracts['Start_Date'] <= row['Start_Date']) & (staff_contracts['End_Date'] >= row['Start_Date']) ] if len(relevant_contract) > 0: return pd.Series({ 'Staff_Position': relevant_contract.iloc[0]['Position'], 'Staff_Contract': relevant_contract.iloc[0]['Contract_Type'], 'Staff_Target_Hours': relevant_contract.iloc[0]['Target_Hours'] }) return pd.Series({ 'Staff_Position': None, 'Staff_Contract': None, 'Staff_Target_Hours': None }) # Add contract details contract_details = terms_df.apply(get_contract_info, axis=1) terms_df = pd.concat([terms_df, contract_details], axis=1) # Create summary of learners per staff member learner_summary = terms_df.groupby('Staff_Standard').agg({ 'Learner_First_Name': 'count' }).reset_index() # Add level counts as separate columns level_pivot = terms_df.pivot_table( index='Staff_Standard', columns='Level', values='Learner_First_Name', aggfunc='count', fill_value=0 ).reset_index() level_pivot.columns = ['Staff_Standard'] + [f'Level_{col}_Count' for col in level_pivot.columns[1:]] # Add activity group counts as separate columns activity_pivot = terms_df.pivot_table( index='Staff_Standard', columns='Activity_Group', values='Learner_First_Name', aggfunc='count', fill_value=0 ).reset_index() activity_pivot.columns = ['Staff_Standard'] + [f'Activity_{col.replace(" ", "_")}_Count' for col in activity_pivot.columns[1:]] # Merge all summaries learner_summary = ( learner_summary.merge(level_pivot, on='Staff_Standard', how='left') .merge(activity_pivot, on='Staff_Standard', how='left') ) learner_summary.rename(columns={ 'Learner_First_Name': 'Total_Learners' }, inplace=True) # Create a qualification summary with contract information qual_summary = terms_df.groupby( ['Staff_Standard', 'Qualification_Name', 'Level', 'Staff_Position', 'Staff_Contract'] ).agg({ 'Learner_First_Name': 'count', 'Staff_Target_Hours': 'first' }).reset_index() qual_summary.rename(columns={ 'Learner_First_Name': 'Learners_In_Qualification' }, inplace=True) print("Terms caseloading processing completed!") return { 'terms_detailed': terms_df, 'learner_summary': learner_summary, 'qualification_summary': qual_summary, 'qualification_reference': qualification_df } # --- PART 4: PICS DATA PROCESSING --- def process_pics_data(caseload_file, hours_lookup_file, contract_history): """Process PICS caseload data and hours lookup""" print("\nProcessing PICS data...") # Read hours lookup first print("Reading hours lookup data...") hours_df = pd.read_excel(hours_lookup_file, skiprows=2) # Skip first two rows # Process hours lookup hours_lookup = hours_df[[ 'Standard title', 'Weighted Monthly Hours (1.6)' ]].copy() print("\nSample of original hours data:") print(hours_lookup.head()) # Convert time format to decimal hours hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply( convert_time_to_hours ) print("\nSample after conversion to decimal hours:") print(hours_lookup.head()) # Verify we have non-zero hours zero_hours = hours_lookup['Monthly_Hours'].eq(0).sum() if zero_hours > 0: print(f"\nWARNING: Found {zero_hours} entries with zero hours!") print("\nSample of zero-hour entries:") print(hours_lookup[hours_lookup['Monthly_Hours'] == 0].head()) # Clean and standardize titles hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip() print(f"Processed {len(hours_lookup)} standard titles with hours") # Read caseload data print("\nReading PICS caseload data...") caseload_df = pd.read_excel(caseload_file) # Select and rename relevant columns cols_to_keep = { 'Assessor Full Name': 'Assessor_Name', 'Programme': 'Programme_Level', 'Apprenticeship Standard Title': 'Standard_Title', 'Apprenticeship Achieved Date': 'Achieved_Date', 'Start Date': 'Start_Date', 'Learning Expected End': 'Expected_End', 'Actual End': 'Actual_End' } df = caseload_df[cols_to_keep.keys()].copy() df.rename(columns=cols_to_keep, inplace=True) # Convert dates date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End'] for col in date_columns: df[col] = pd.to_datetime(df[col], errors='coerce') # Clean titles for matching with lookup df['Standard_Title'] = df['Standard_Title'].str.strip() # Add hours from lookup df = df.merge( hours_lookup[['Standard_Title', 'Monthly_Hours']], on='Standard_Title', how='left' ) # Fill missing hours with 0 df['Monthly_Hours'] = df['Monthly_Hours'].fillna(0) # Add contract information for assessors def get_assessor_contract(row, date): staff_contracts = contract_history[ contract_history['Staff_Name'] == row['Assessor_Name'] ] relevant_contract = staff_contracts[ (staff_contracts['Start_Date'] <= date) & (staff_contracts['End_Date'] >= date) ] if len(relevant_contract) > 0: return pd.Series({ 'Assessor_Position': relevant_contract.iloc[0]['Position'], 'Assessor_Contract': relevant_contract.iloc[0]['Contract_Type'], 'Assessor_Target_Hours': relevant_contract.iloc[0]['Target_Hours'] }) return pd.Series({ 'Assessor_Position': None, 'Assessor_Contract': None, 'Assessor_Target_Hours': None }) # Function to check if a student was active in a given month def was_active_in_month(row, month_start, month_end): """ Check if student was active during any part of the month month_start: First day of the month month_end: Last day of the month """ start_date = row['Start_Date'] end_date = row['Actual_End'] if pd.notna(row['Actual_End']) else row['Expected_End'] if pd.isna(start_date) or pd.isna(end_date): return False # Student is active if their period overlaps with the month at all return (start_date <= month_end) and (end_date >= month_start) # Create monthly snapshots for FY23/24 monthly_data = [] # Generate dates for FY23/24 dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M') print("\nGenerating monthly snapshots...") for date in dates: month_start = date.replace(day=1) month_end = date print(f"\nProcessing month: {month_start.strftime('%B %Y')}") print(f"Month range: {month_start.strftime('%Y-%m-%d')} to {month_end.strftime('%Y-%m-%d')}") # Filter for active students in this month month_mask = df.apply(lambda row: was_active_in_month(row, month_start, month_end), axis=1) month_data = df[month_mask].copy() # Add month info month_data['Snapshot_Date'] = month_end month_data['Year'] = month_end.year month_data['Month'] = month_end.month # Add contract info for this month contract_details = month_data.apply( lambda row: get_assessor_contract(row, month_end), axis=1 ) month_data = pd.concat([month_data, contract_details], axis=1) active_count = len(month_data) print(f"Active students in this month: {active_count}") monthly_data.append(month_data) monthly_df = pd.concat(monthly_data, ignore_index=True) # Create summaries # 1. Monthly summary per assessor monthly_summary = monthly_df.groupby( ['Assessor_Name', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract'] ).agg({ 'Standard_Title': 'count', # Number of active students 'Monthly_Hours': 'sum', # Total hours needed 'Assessor_Target_Hours': 'first' }).reset_index() monthly_summary.rename(columns={ 'Standard_Title': 'Active_Students' }, inplace=True) # 2. Programme level summary programme_summary = monthly_df.groupby( ['Assessor_Name', 'Programme_Level', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract'] ).agg({ 'Standard_Title': 'count', 'Assessor_Target_Hours': 'first' }).reset_index() programme_summary.rename(columns={ 'Standard_Title': 'Students_In_Programme' }, inplace=True) # 3. Standard title summary standard_summary = monthly_df.groupby( ['Assessor_Name', 'Standard_Title', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract'] ).agg({ 'Monthly_Hours': 'sum', 'Snapshot_Date': 'count', 'Assessor_Target_Hours': 'first' }).reset_index() standard_summary.rename(columns={ 'Snapshot_Date': 'Students_In_Standard', 'Monthly_Hours': 'Required_Hours' }, inplace=True) print("PICS data processing completed!") return { 'detailed_monthly': monthly_df, 'monthly_summary': monthly_summary, 'programme_summary': programme_summary, 'standard_summary': standard_summary, 'hours_reference': hours_lookup } # Execute processing if __name__ == "__main__": # Get current directory current_dir = os.getcwd() monthly_reports_folder = os.path.join(current_dir, "monthly_reports") output_folder = os.path.join(current_dir, "processed_data") print(f"Processing data from: {current_dir}") try: # --- Process Monthly Reports --- print("\nStep 1: Processing Monthly Reports") staff_data, staff_summary, contract_history = process_monthly_reports(monthly_reports_folder) # Save staff data if not os.path.exists(output_folder): os.makedirs(output_folder) staff_data.to_csv(os.path.join(output_folder, 'staff_monthly_data.csv'), index=False) staff_summary.to_csv(os.path.join(output_folder, 'staff_summary.csv'), index=False) contract_history.to_csv(os.path.join(output_folder, 'contract_history.csv'), index=False) # --- Process Tutor Pay Report --- print("\nStep 2: Processing Tutor Pay Report") tutor_report_data = process_tutor_report( "TutorPayReport_New 23.24 FY 22.11.24.xlsx", os.path.join(output_folder, 'staff_monthly_data.csv'), contract_history ) if tutor_report_data is not None: for name, df in tutor_report_data.items(): output_file = os.path.join(output_folder, f'{name}.csv') df.to_csv(output_file, index=False) print(f"Saved {name} to {output_file}") # --- Process Qualification Data --- print("\nStep 3: Processing Qualification Data") # Define input files approved_file = "Approved List for Terms Caseloading FY2324.xlsx" archived_file = "Archived List for Terms Caseloading FY2324.xlsx" terms_file = "Terms caseloading 23.24 FY.xlsx" # Process qualification lists qualification_data = process_qualification_lists(approved_file, archived_file) # Process Terms caseloading terms_data = process_terms_caseloading(terms_file, qualification_data, contract_history) # Save qualification data results for name, df in terms_data.items(): output_file = os.path.join(output_folder, f'{name}.csv') df.to_csv(output_file, index=False) print(f"Saved {name} to {output_file}") # --- Process PICS Data --- print("\nStep 4: Processing PICS Data") # Define PICS input files caseload_file = "PICS caseload for PBI.xlsx" hours_lookup_file = "PICS Hours for Assessor Look up.xlsx" # Process PICS data pics_data = process_pics_data(caseload_file, hours_lookup_file, contract_history) # Save PICS data results for name, df in pics_data.items(): output_file = os.path.join(output_folder, f'pics_{name}.csv') df.to_csv(output_file, index=False) print(f"Saved {name} to {output_file}") print("\nAll processing completed successfully!") except Exception as e: print(f"\nError during processing: {str(e)}") raise
Editor is loading...
Leave a Comment