C0
user_1718919
plain_text
4 months ago
18 kB
4
Indexable
import pandas as pd import numpy as np import os from datetime import datetime import warnings warnings.filterwarnings('ignore') # Create processed_data directory if it doesn't exist if not os.path.exists('processed_data'): os.makedirs('processed_data') def clean_string(s): """Clean strings by removing extra spaces and converting to lowercase""" if pd.isna(s): return s return str(s).strip().lower() def extract_month_year(filename): """Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'""" date_str = filename.split()[-1].replace('.xlsx', '') return pd.to_datetime(date_str, format='%d.%m.%y') def process_monthly_report(file_path): """Process a single monthly report""" # Read Excel file starting from row 3 (0-based index 2) df = pd.read_excel(file_path, skiprows=2) # Extract report date from filename report_date = extract_month_year(os.path.basename(file_path)) # Clean column names df.columns = [col.strip() for col in df.columns] # Select and rename relevant columns columns_mapping = { 'Assignment Number': 'assignment_number', 'Title': 'title', 'First Name': 'first_name', 'Known As': 'known_as', 'Last Name': 'last_name', 'Full-Time Equivalent': 'fte', 'Position Name': 'position', 'Working Hours': 'working_hours', 'Line Manager Name': 'line_manager' } df = df[columns_mapping.keys()].copy() df.rename(columns=columns_mapping, inplace=True) # Clean string columns string_columns = ['title', 'first_name', 'known_as', 'last_name', 'position', 'line_manager'] for col in string_columns: df[col] = df[col].apply(clean_string) # Filter for Tutors and Learning Support Assistants df = df[df['position'].isin(['learning support assistant', 'tutor'])] # Convert numeric columns df['fte'] = pd.to_numeric(df['fte'], errors='coerce') df['working_hours'] = pd.to_numeric(df['working_hours'], errors='coerce') # Calculate target hours (only for salaried staff) df['is_salaried'] = df['working_hours'] > 3 df['target_hours'] = np.where(df['is_salaried'], df['fte'] * 840, 0) # Process line manager names df['line_manager_first_name'] = df['line_manager'].apply( lambda x: x.split(',')[0].split()[0] if pd.notna(x) and ',' in x else None ) df['line_manager_known_as'] = df['line_manager'].apply( lambda x: x.split(',')[0].split()[-1] if pd.notna(x) and ',' in x else None ) df['line_manager_last_name'] = df['line_manager'].apply( lambda x: x.split(',')[1].strip() if pd.notna(x) and ',' in x else None ) # Add report date df['report_date'] = report_date return df def process_qualification_references(approved_file='Approved List for Terms Caseloading FY2324.xlsx', archived_file='Archived List for Terms Caseloading FY2324.xlsx'): """Process approved and archived qualification reference lists""" print("\nProcessing qualification reference lists...") # Read both files approved_df = pd.read_excel(approved_file)[['QualificationName', 'QualificationNumber']] archived_df = pd.read_excel(archived_file)[['QualificationName', 'QualificationNumber']] # Combine both dataframes qual_ref_df = pd.concat([approved_df, archived_df], ignore_index=True) # Clean data qual_ref_df['QualificationName'] = qual_ref_df['QualificationName'].apply(clean_string) qual_ref_df['QualificationNumber'] = qual_ref_df['QualificationNumber'].apply(clean_string) # Remove duplicates (in case same qualification appears in both files) qual_ref_df = qual_ref_df.drop_duplicates() # Save processed data qual_ref_df.to_csv('processed_data/qualification_references.csv', index=False) print(f"Total unique qualifications: {len(qual_ref_df)}") return qual_ref_df def clean_qualification_number(number): """Remove P or N suffix from qualification numbers""" if pd.isna(number): return number number = str(number).strip() if number.endswith('P') or number.endswith('N'): return number[:-1] return number def process_terms_caseloading(filename='Terms caseloading 23.24 FY.xlsx', qual_ref_df=None): """Process Terms caseloading data with qualification reference mapping""" print("\nProcessing Terms caseloading data...") # Read the Excel file df = pd.read_excel(filename) # Select and rename relevant columns columns_mapping = { 'Primary Staff Name': 'first_name', 'Primary Staff Surname': 'last_name', 'Activity Name': 'activity_name', 'Funding Reference': 'funding_reference', 'Start Date': 'start_date', 'End Date': 'end_date', 'Level': 'level', 'Activity Group': 'activity_group' } df = df[columns_mapping.keys()].copy() df.rename(columns=columns_mapping, inplace=True) # Clean string columns string_columns = ['first_name', 'last_name', 'activity_name', 'funding_reference', 'level', 'activity_group'] for col in string_columns: df[col] = df[col].apply(clean_string) # Clean qualification numbers in funding reference df['funding_reference_clean'] = df['funding_reference'].apply(clean_qualification_number) # Map qualifications if qual_ref_df is not None: # Create a mapping dictionary for faster lookup qual_mapping = dict(zip(qual_ref_df['QualificationNumber'], qual_ref_df['QualificationName'])) # Map qualification names, use activity name as fallback df['qualification_name'] = df['funding_reference_clean'].map(qual_mapping) df['qualification_name'] = df['qualification_name'].fillna(df['activity_name']) # Convert dates date_columns = ['start_date', 'end_date'] for col in date_columns: df[col] = pd.to_datetime(df[col], errors='coerce') # Add academic year column (for easier filtering) df['academic_year'] = df['start_date'].dt.year.astype(str) + '/' + \ (df['start_date'].dt.year + 1).astype(str) # Calculate student counts per course course_counts = df.groupby([ 'first_name', 'last_name', 'qualification_name', 'level', 'activity_group', 'start_date', 'end_date' ]).size().reset_index(name='student_count') # Save processed data df.to_csv('processed_data/terms_caseloading_processed.csv', index=False) course_counts.to_csv('processed_data/course_student_counts.csv', index=False) print("\nTerms Caseloading Processing Summary:") print(f"Total records: {len(df)}") print(f"Unique tutors: {df[['first_name', 'last_name']].drop_duplicates().shape[0]}") print(f"Unique qualifications: {df['qualification_name'].nunique()}") print(f"Date range: {df['start_date'].min().strftime('%Y-%m-%d')} to {df['end_date'].max().strftime('%Y-%m-%d')}") return df, course_counts def process_assessor_lookup(filename='PICS Hours for Assessor Look up.xlsx'): """Process the Assessor lookup data""" print("\nProcessing Assessor lookup data...") # Read Excel file starting from row 3 df = pd.read_excel(filename, skiprows=2) # Select and rename relevant columns columns_mapping = { 'Standard title': 'standard_title', 'Weighted Monthly Hours (1.6)': 'monthly_hours' } df = df[columns_mapping.keys()].copy() df.rename(columns=columns_mapping, inplace=True) # Clean string columns df['standard_title'] = df['standard_title'].apply(clean_string) # Convert time format (H:MM) to decimal hours def convert_time_to_hours(time_str): if pd.isna(time_str): return 0 parts = str(time_str).split(':') try: hours = float(parts[0]) minutes = float(parts[1]) if len(parts) > 1 else 0 return hours + minutes/60 except: return 0 df['monthly_hours'] = df['monthly_hours'].apply(convert_time_to_hours) # Save processed data df.to_csv('processed_data/assessor_lookup_processed.csv', index=False) print(f"Total unique apprenticeship standards: {len(df)}") return df def process_pics_caseload(filename='PICS caseload for PBI.xlsx', assessor_lookup_df=None): """Process PICS caseload data with assessor lookup integration""" print("\nProcessing PICS caseload data...") # Read the Excel file df = pd.read_excel(filename) # Select and rename relevant columns columns_mapping = { 'Assessor Full Name': 'assessor_name', 'Programme': 'programme', 'Apprenticeship Standard Title': 'standard_title', 'Apprenticeship Achieved Date': 'achieved_date', 'Start Date': 'start_date', 'Learning Expected End': 'expected_end_date', 'Actual End': 'actual_end_date' } df = df[columns_mapping.keys()].copy() df.rename(columns=columns_mapping, inplace=True) # Clean string columns string_columns = ['assessor_name', 'programme', 'standard_title'] for col in string_columns: df[col] = df[col].apply(clean_string) # Convert all date columns date_columns = ['achieved_date', 'start_date', 'expected_end_date', 'actual_end_date'] for col in date_columns: df[col] = pd.to_datetime(df[col], errors='coerce') # Use expected end date when actual end date is missing df['end_date'] = df['actual_end_date'].fillna(df['expected_end_date']) # Create a function to check if a student is active in a given month def is_active_in_month(row, month_date): start = row['start_date'] end = row['end_date'] if pd.isna(start) or pd.isna(end): return False return (start <= month_date) & (month_date <= end) # Generate monthly student counts for the FY23/24 monthly_dates = pd.date_range('2023-04-01', '2024-03-31', freq='M') monthly_counts = [] for date in monthly_dates: # Create mask for active students in this month active_mask = df.apply(lambda row: is_active_in_month(row, date), axis=1) # Get counts per assessor and standard month_count = df[active_mask].groupby( ['assessor_name', 'standard_title', 'programme'] ).size().reset_index(name='student_count') month_count['month'] = date monthly_counts.append(month_count) monthly_df = pd.concat(monthly_counts, ignore_index=True) # Add monthly hours requirement if lookup data is available if assessor_lookup_df is not None: monthly_df = monthly_df.merge( assessor_lookup_df[['standard_title', 'monthly_hours']], on='standard_title', how='left' ) monthly_df['required_hours'] = monthly_df['student_count'] * monthly_df['monthly_hours'] # Save processed data df.to_csv('processed_data/pics_caseload_processed.csv', index=False) monthly_df.to_csv('processed_data/pics_monthly_counts.csv', index=False) print("\nPICS Caseload Processing Summary:") print(f"Total apprentices: {len(df)}") print(f"Unique assessors: {df['assessor_name'].nunique()}") print(f"Unique standards: {df['standard_title'].nunique()}") print(f"Date range: {df['start_date'].min().strftime('%Y-%m-%d')} to {df['end_date'].max().strftime('%Y-%m-%d')}") return df, monthly_df def process_all_monthly_reports(): """Process all monthly reports from the monthly_reports folder""" monthly_reports = [] # Process each monthly report for filename in os.listdir('monthly_reports'): if filename.startswith('ACL Monthly Report') and filename.endswith('.xlsx'): file_path = os.path.join('monthly_reports', filename) try: monthly_df = process_monthly_report(file_path) monthly_reports.append(monthly_df) print(f"Successfully processed {filename}") except Exception as e: print(f"Error processing {filename}: {str(e)}") # Combine all monthly reports if monthly_reports: combined_df = pd.concat(monthly_reports, ignore_index=True) # Create staff master list staff_master = combined_df.drop_duplicates( subset=['first_name', 'last_name', 'known_as', 'position'] )[['first_name', 'last_name', 'known_as', 'position']] # Save processed data combined_df.to_csv('processed_data/monthly_reports_processed.csv', index=False) staff_master.to_csv('processed_data/staff_master.csv', index=False) print("\nProcessing Summary:") print(f"Total monthly reports processed: {len(monthly_reports)}") print(f"Total staff records: {len(combined_df)}") print(f"Unique staff members: {len(staff_master)}") return combined_df, staff_master return None, None def process_tutor_pay_report(filename='TutorPayReport_New 23.24 FY 22.11.24.xlsx'): """Process the Tutor Pay Report""" print(f"\nProcessing Tutor Pay Report: {filename}") # Read the Excel file df = pd.read_excel(filename) # Select and rename relevant columns columns_mapping = { 'PEOPLENAME': 'first_name', 'PEOPLESURNAME': 'last_name', 'STAFFROLE': 'staff_role', 'CONTRACTNAME': 'contract_type', 'ACTIVITYTYPE': 'activity_type', 'Course Funding Group': 'funding_group', 'Course Prov Mon C': 'curriculum_name', 'Course Prov Mon D': 'venue_topline', 'VENUE': 'venue_name', 'EVENTDATE': 'event_date', 'TIMEADJUSTEARLY': 'time_adjust_early', 'TIMEADJUSTLATE': 'time_adjust_late', 'Potential Hours': 'potential_hours' } # Select only needed columns and rename df = df[columns_mapping.keys()].copy() df.rename(columns=columns_mapping, inplace=True) # Clean string columns string_columns = ['first_name', 'last_name', 'staff_role', 'contract_type', 'activity_type', 'funding_group', 'curriculum_name', 'venue_topline', 'venue_name'] for col in string_columns: df[col] = df[col].apply(clean_string) # Filter for relevant staff roles df = df[df['staff_role'].isin(['tutor', 'learning support'])] # Filter for relevant contract types df = df[df['contract_type'].isin(['lsa-nb', 'salaried tutor', 'sessional tutor'])] # Group assessment activities assessment_types = [ 'access assessments', 'assessments', 'beauty ol assess', 'cfl pa vols assess', 'creative assessments', 'e&m apprenticeassess', 'english fs assess', 'english gcse assess', 'esol f2f assess', 'esol online assess', 'hair/barb f2f assess', 'maths assessments', 'swis assessments' ] # Create activity group column df['activity_group'] = df['activity_type'].apply( lambda x: 'Assessment' if x.lower() in assessment_types else x if x.lower() in ['community engagement', 'tutorials/drop ins'] else 'Other' ) # Convert time adjustments from milliseconds to hours df['time_adjust_early'] = abs(df['time_adjust_early']) / (1000 * 60 * 60) df['time_adjust_late'] = abs(df['time_adjust_late']) / (1000 * 60 * 60) # Calculate actual hours df['actual_hours'] = df['potential_hours'] - (df['time_adjust_early'] + df['time_adjust_late']) df['actual_hours'] = df['actual_hours'].clip(lower=0) # Replace negative values with 0 # Convert event date to datetime df['event_date'] = pd.to_datetime(df['event_date']) # Add derived date columns for easier filtering df['week'] = df['event_date'].dt.isocalendar().week df['month'] = df['event_date'].dt.month df['year'] = df['event_date'].dt.year # Calculate utilization metrics # Note: We'll only calculate utilization percentage for salaried tutors df['is_salaried'] = df['contract_type'] == 'salaried tutor' df['hours_for_utilization'] = np.where( (df['is_salaried']) & (df['activity_group'] != 'Assessment'), df['actual_hours'], 0 ) # Save processed data df.to_csv('processed_data/tutor_pay_report_processed.csv', index=False) print("\nTutor Pay Report Processing Summary:") print(f"Total records: {len(df)}") print(f"Unique tutors/LSAs: {df[['first_name', 'last_name']].drop_duplicates().shape[0]}") print(f"Date range: {df['event_date'].min().strftime('%Y-%m-%d')} to {df['event_date'].max().strftime('%Y-%m-%d')}") return df # Execute the processing if __name__ == "__main__": print("Starting monthly reports processing...") combined_df, staff_master = process_all_monthly_reports() print("\nProcessing Tutor Pay Report...") tutor_pay_df = process_tutor_pay_report() print("\nProcessing Qualification References and Terms Caseloading...") qual_ref_df = process_qualification_references() terms_df, course_counts = process_terms_caseloading(qual_ref_df=qual_ref_df) print("\nProcessing PICS data...") assessor_lookup_df = process_assessor_lookup() pics_df, monthly_pics_df = process_pics_caseload(assessor_lookup_df=assessor_lookup_df)
Editor is loading...
Leave a Comment