C
user_1718919
plain_text
4 months ago
56 kB
2
Indexable
# Cell 1: Setup and Utilities # --- Import required libraries --- import pandas as pd import numpy as np import os from datetime import datetime import logging from typing import Tuple, Dict, List, Optional, Union import warnings # --- Setup logging --- def setup_logging(log_file: str = 'staff_utilization.log') -> None: """Configure logging for the application""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) # Suppress pandas warning about copy operations pd.options.mode.chained_assignment = None # default='warn' warnings.filterwarnings('ignore', category=pd.errors.SettingWithCopyWarning) # --- Enhanced utility functions --- def safe_read_excel(file_path: str, **kwargs) -> Tuple[Optional[pd.DataFrame], Optional[str]]: """ Safely read Excel files with error handling """ try: df = pd.read_excel(file_path, **kwargs) return df, None except Exception as e: error_msg = f"Error reading {file_path}: {str(e)}" logging.error(error_msg) return None, error_msg def clean_name(name: Union[str, float, None]) -> Optional[str]: """ Clean and standardize names """ if pd.isna(name): return None name_str = str(name).strip() return ' '.join(name_str.split()) def standardize_names(df: pd.DataFrame, name_columns: List[str]) -> pd.DataFrame: """ Standardize names across specified columns """ for col in name_columns: if col in df.columns: df[col] = df[col].apply(clean_name) df[col] = df[col].str.replace(r'\s+', ' ', regex=True) df[col] = df[col].str.strip() return df def extract_month_year(filename: str) -> pd.Timestamp: """ Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx' """ try: date_str = filename.split()[-1].replace('.xlsx', '') return pd.to_datetime(date_str, format='%d.%m.%y') except Exception as e: logging.error(f"Error extracting date from filename {filename}: {str(e)}") raise ValueError(f"Invalid filename format: {filename}") def clean_qualification_number(qual_num: Union[str, float, None]) -> Optional[str]: """ Clean qualification number by removing P or N suffix """ if pd.isna(qual_num): return None qual_str = str(qual_num).strip() if qual_str.endswith(('P', 'N')): return qual_str[:-1] return qual_str def convert_time_to_hours(time_value: Union[str, float, None, pd.Timestamp, datetime.time]) -> float: """ Convert various time formats to hours """ if pd.isna(time_value): return 0.0 try: if isinstance(time_value, datetime.time): return time_value.hour + (time_value.minute / 60) elif isinstance(time_value, str) and ':' in time_value: hours, minutes = map(float, time_value.split(':')[:2]) # Only use hours and minutes return hours + (minutes / 60) else: return abs(float(time_value)) / (1000 * 60 * 60) except Exception as e: logging.warning(f"Error converting time value {time_value}: {str(e)}") return 0.0 def validate_processed_data( df: pd.DataFrame, required_columns: List[str], name: Optional[str] = None ) -> None: """ Validate processed DataFrames for required columns and data quality """ df_name = name if name else 'dataframe' # Check for missing columns missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: raise ValueError(f"Missing required columns in {df_name}: {missing_cols}") # Check for empty dataframe if df.empty: raise ValueError(f"No data found in {df_name}") # Check for all-null columns null_cols = df.columns[df.isna().all()].tolist() if null_cols: logging.warning(f"Found columns with all null values in {df_name}: {null_cols}") # Check for mostly null columns (>90% null) high_null_cols = df.columns[df.isna().mean() > 0.9].tolist() if high_null_cols: logging.warning(f"Found columns with >90% null values in {df_name}: {high_null_cols}") # Log data quality metrics logging.info(f"Data validation for {df_name}:") logging.info(f" - Total rows: {len(df)}") logging.info(f" - Columns: {len(df.columns)}") for col in required_columns: null_count = df[col].isna().sum() if null_count > 0: logging.info(f" - {col}: {null_count} null values ({(null_count/len(df))*100:.1f}%)") def calculate_monthly_utilization( group_df: pd.DataFrame, target_hours: float, exclude_categories: Optional[List[str]] = None ) -> Optional[float]: """ Calculate monthly utilization percentage """ if pd.isna(target_hours) or target_hours == 0: return None if exclude_categories: included_df = group_df[~group_df['Activity_Category'].isin(exclude_categories)] else: included_df = group_df total_hours = included_df['Actual_Hours'].sum() monthly_target = target_hours / 12 if monthly_target > 0: utilization = (total_hours / monthly_target) * 100 logging.debug(f"Utilization calculation:") logging.debug(f" Total hours: {total_hours}") logging.debug(f" Monthly target ({target_hours}/12): {monthly_target}") logging.debug(f" Utilization: {utilization}%") return utilization return None def identify_overlapping_contracts(contract_periods: List[Dict]) -> List[Dict]: """ Identify and handle overlapping contract periods """ sorted_periods = sorted(contract_periods, key=lambda x: x['Start_Date']) active_periods = [] result_periods = [] for period in sorted_periods: active_periods = [p for p in active_periods if p['End_Date'] >= period['Start_Date']] overlaps = [] for active in active_periods: if (active['Position'] != period['Position'] or active['Contract_Type'] != period['Contract_Type']): overlaps.append(active) new_period = period.copy() if overlaps: overlap_positions = [p['Position'] for p in overlaps] + [period['Position']] overlap_contracts = [p['Contract_Type'] for p in overlaps] + [period['Contract_Type']] new_period.update({ 'Overlapping_Positions': overlap_positions, 'Overlapping_Contracts': overlap_contracts, 'Has_Overlap': True, 'Overlap_Count': len(overlaps) + 1 }) else: new_period.update({ 'Overlapping_Positions': [period['Position']], 'Overlapping_Contracts': [period['Contract_Type']], 'Has_Overlap': False, 'Overlap_Count': 1 }) result_periods.append(new_period) active_periods.append(period) return result_periods def consolidate_contract_changes(contract_df: pd.DataFrame) -> pd.DataFrame: """ Consolidate contract changes to handle overlapping periods """ consolidated = [] for staff_name in contract_df['Staff_Name'].unique(): staff_contracts = contract_df[ contract_df['Staff_Name'] == staff_name ].sort_values('Start_Date') contract_list = staff_contracts.to_dict('records') processed_contracts = identify_overlapping_contracts(contract_list) for contract in processed_contracts: row = { 'Staff_Name': staff_name, 'Start_Date': contract['Start_Date'], 'End_Date': contract['End_Date'], 'Position': ' & '.join(contract['Overlapping_Positions']), 'Contract_Type': ' & '.join(contract['Overlapping_Contracts']), 'Has_Multiple_Contracts': contract['Has_Overlap'], 'Contract_Count': contract['Overlap_Count'], 'Change_Type': contract['Change_Type'] } # Calculate combined target hours if contract['Has_Overlap']: total_target = sum( float(c.get('Target_Hours', 0) or 0) for c in contract_list if (c['Start_Date'] <= contract['End_Date'] and c['End_Date'] >= contract['Start_Date']) ) row['Target_Hours'] = total_target else: row['Target_Hours'] = contract.get('Target_Hours', 0) consolidated.append(row) return pd.DataFrame(consolidated) # Cell 2: Contract Tracking and Monthly Reports def track_contract_changes(df: pd.DataFrame) -> pd.DataFrame: """ Track staff contract changes with full history, including overlapping contracts """ logging.info("Starting contract change tracking...") changes = [] for name in df['Staff_Standard'].unique(): logging.debug(f"Processing contracts for: {name}") staff_data = df[df['Staff_Standard'] == name].sort_values('Report_Date') contract_periods = [] current_contracts = set() # Track currently active contracts for idx, row in staff_data.iterrows(): current_contract = (row['Position Name'], row['Contract_Type'], row['Target_Hours']) # Check for changes in existing contracts if current_contract not in current_contracts: # New contract type detected contract_periods.append({ 'Staff_Name': name, 'Start_Date': row['Report_Date'], 'End_Date': staff_data['Report_Date'].max(), 'Position': row['Position Name'], 'Contract_Type': row['Contract_Type'], 'Target_Hours': row['Target_Hours'], 'Change_Type': 'New Contract' if idx > staff_data.index[0] else 'Initial Contract' }) current_contracts.add(current_contract) # Check for ended contracts ended_contracts = set() for contract in current_contracts: pos, type_, target = contract if not any((staff_data.loc[idx:, 'Position Name'] == pos) & (staff_data.loc[idx:, 'Contract_Type'] == type_) & (staff_data.loc[idx:, 'Target_Hours'] == target)): # Contract has ended ended_contracts.add(contract) contract_periods.append({ 'Staff_Name': name, 'Start_Date': row['Report_Date'], 'End_Date': row['Report_Date'], 'Position': pos, 'Contract_Type': type_, 'Target_Hours': target, 'Change_Type': 'Contract End' }) current_contracts -= ended_contracts # Process contract periods to identify overlaps if contract_periods: processed_periods = identify_overlapping_contracts(contract_periods) changes.extend(processed_periods) changes_df = consolidate_contract_changes(pd.DataFrame(changes)) # Log summary statistics total_staff = len(changes_df['Staff_Name'].unique()) multi_contract_staff = len(changes_df[changes_df['Has_Multiple_Contracts']]['Staff_Name'].unique()) logging.info(f"Processed contract changes for {total_staff} staff members") logging.info(f"Found {multi_contract_staff} staff with multiple contracts") if multi_contract_staff > 0: logging.info("Staff with multiple contracts:") for staff in changes_df[changes_df['Has_Multiple_Contracts']]['Staff_Name'].unique(): staff_records = changes_df[ (changes_df['Staff_Name'] == staff) & (changes_df['Has_Multiple_Contracts']) ] logging.info(f" - {staff}:") for _, record in staff_records.iterrows(): logging.info(f" * {record['Position']} ({record['Contract_Type']}) - " f"Target Hours: {record['Target_Hours']}") return changes_df def process_monthly_reports(folder_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Process all monthly reports and track staff/contract changes """ logging.info(f"Processing Monthly Reports from: {folder_path}") all_data = [] # Get list of files and sort chronologically files = [f for f in os.listdir(folder_path) if f.startswith('ACL Monthly Report')] files.sort() required_columns = [ 'Assignment Number', 'First Name', 'Known As', 'Last Name', 'Position Name', 'Working Hours', 'Full-Time Equivalent', 'Line Manager Name' ] for file in files: logging.info(f"Processing file: {file}") # Read file with error handling df, error = safe_read_excel(os.path.join(folder_path, file), skiprows=2) if error: logging.error(f"Skipping file {file} due to error: {error}") continue # Validate required columns try: validate_processed_data(df, required_columns, name=file) except ValueError as e: logging.error(f"Validation failed for {file}: {str(e)}") continue report_date = extract_month_year(file) df['Report_Date'] = report_date # Filter for Tutors and Learning Support Assistant mask = df['Position Name'].isin(['Learning Support Assistant', 'Tutor']) df = df[mask].copy() # Calculate Target Hours - only for salaried tutors def calculate_target_hours(row): try: working_hours = float(row['Working Hours']) fte = float(row['Full-Time Equivalent']) if row['Position Name'] == 'Tutor' and working_hours > 3: return fte * 840 except (ValueError, TypeError) as e: logging.warning(f"Error calculating target hours: {str(e)}") return None df['Target_Hours'] = df.apply(calculate_target_hours, axis=1) # Determine Contract Type df['Contract_Type'] = df['Working Hours'].apply( lambda x: 'Sessional' if float(x) <= 3 else 'Salaried' ) # Process Line Manager names def split_manager_name(name): if pd.isna(name): return pd.NA, pd.NA, pd.NA parts = name.split(',') if len(parts) != 2: logging.warning(f"Unexpected manager name format: {name}") return name, name, name first_known = parts[0].strip() last = parts[1].strip() if ' ' in first_known: first, known = first_known.split(' ', 1) else: first = known = first_known return first, known, last # Create manager name formats df[['Manager_First', 'Manager_Known', 'Manager_Last']] = df['Line Manager Name'].apply( split_manager_name).apply(pd.Series) df['Manager_Standard'] = df.apply( lambda x: f"{x['Manager_First']} {x['Manager_Last']}" if pd.notna(x['Manager_First']) else pd.NA, axis=1) df['Manager_Known_As'] = df.apply( lambda x: f"{x['Manager_Known']}, {x['Manager_Last']}" if pd.notna(x['Manager_Known']) else pd.NA, axis=1) # Create staff name formats df['Staff_Standard'] = df.apply( lambda x: f"{x['First Name']} {x['Last Name']}", axis=1) df['Staff_Known_As'] = df.apply( lambda x: f"{x['Known As']}, {x['Last Name']}", axis=1) # Clean all name columns name_columns = [ 'Staff_Standard', 'Staff_Known_As', 'Manager_Standard', 'Manager_Known_As' ] df = standardize_names(df, name_columns) columns_to_keep = [ 'Assignment Number', 'Staff_Standard', 'Staff_Known_As', 'Position Name', 'Contract_Type', 'Target_Hours', 'Manager_Standard', 'Manager_Known_As', 'Report_Date' ] all_data.append(df[columns_to_keep]) logging.info(f"Processed {len(df)} records from {file}") if not all_data: raise ValueError("No data was successfully processed from monthly reports") # Combine all monthly data combined_df = pd.concat(all_data, ignore_index=True) combined_df = combined_df.sort_values(['Report_Date', 'Staff_Standard']) # Track contract history with enhanced tracking contract_history = track_contract_changes(combined_df) # Create staff summary with contract history staff_summary = [] for name in combined_df['Staff_Standard'].unique(): staff_data = combined_df[combined_df['Staff_Standard'] == name].copy() contract_data = contract_history[contract_history['Staff_Name'] == name] latest_contracts = contract_data[ contract_data['End_Date'] == staff_data['Report_Date'].max() ] staff_summary.append({ 'Staff_Name': name, 'First_Appearance': staff_data['Report_Date'].min(), 'Last_Appearance': staff_data['Report_Date'].max(), 'Current_Position': latest_contracts.iloc[0]['Position'], 'Current_Contract': latest_contracts.iloc[0]['Contract_Type'], 'Current_Target': latest_contracts.iloc[0]['Target_Hours'], 'Has_Multiple_Contracts': any(latest_contracts['Has_Multiple_Contracts']), 'Number_Of_Changes': len(contract_data), 'Contract_History': contract_data.to_dict('records') }) staff_summary_df = pd.DataFrame(staff_summary) # Final validation validate_processed_data( combined_df, ['Staff_Standard', 'Position Name', 'Contract_Type', 'Report_Date'], 'final_staff_data' ) logging.info("Monthly reports processing completed!") logging.info(f"Processed data for {len(staff_summary_df)} staff members") logging.info(f"Generated {len(contract_history)} contract records") return combined_df, staff_summary_df, contract_history #Cell 3: Tutor Pay Report Processing def calculate_activity_hours(row: pd.Series) -> Tuple[str, float]: """ Calculate and categorize activity hours Args: row: DataFrame row containing activity information Returns: Tuple of (activity_category, actual_hours) """ assessment_types = [ 'Access Assessments', 'Assessments', 'Beauty OL Assess', 'CFL PA Vols Assess', 'Creative Assessments', 'E&M ApprenticeAssess', 'English FS Assess', 'English GCSE Assess', 'ESOL F2F Assess', 'ESOL Online Assess', 'Hair/Barb F2F Assess', 'Maths Assessments', 'SWiS Assessments' ] if pd.isna(row['ACTIVITYTYPE']): return 'Other', row['Actual_Hours'] activity = str(row['ACTIVITYTYPE']).strip() hours = float(row['Actual_Hours']) if activity in assessment_types: return 'Assessment', hours elif activity in ['Community Engagement', 'Tutorials/Drop Ins']: return activity, hours return 'Other', hours def process_tutor_report( file_path: str, staff_data_path: str, contract_history: pd.DataFrame ) -> Optional[Dict[str, pd.DataFrame]]: """ Process the TutorPayReport and link with staff data Args: file_path: Path to tutor pay report file staff_data_path: Path to processed staff data contract_history: Contract history DataFrame Returns: Dictionary of processed DataFrames or None if processing fails """ logging.info("Processing Tutor Pay Report...") # Read files with error handling df, error = safe_read_excel(file_path) if error: logging.error(f"Failed to read tutor pay report: {error}") return None staff_df = pd.read_csv(staff_data_path) required_columns = [ 'PEOPLENAME', 'PEOPLESURNAME', 'STAFFROLE', 'CONTRACTNAME', 'ACTIVITYTYPE', 'Potential Hours', 'TIMEADJUSTEARLY', 'TIMEADJUSTLATE', 'EVENTDATE', 'Course Funding Group', 'Course Prov Mon C', 'Course Prov Mon D', 'VENUE' ] try: validate_processed_data(df, required_columns, 'tutor_pay_report') except ValueError as e: logging.error(f"Validation failed: {str(e)}") return None # Clean and standardize names name_columns = ['PEOPLENAME', 'PEOPLESURNAME'] df = standardize_names(df, name_columns) df['Staff_Standard'] = df.apply( lambda x: f"{x['PEOPLENAME']} {x['PEOPLESURNAME']}", axis=1 ) # Filter staff roles and clean df['STAFFROLE'] = df['STAFFROLE'].apply(clean_name) role_mask = df['STAFFROLE'].isin(['Tutor', 'Learning Support']) invalid_roles = df[~role_mask]['STAFFROLE'].unique() if len(invalid_roles) > 0: logging.warning(f"Found invalid staff roles: {invalid_roles}") df = df[role_mask].copy() # Calculate actual hours with time adjustments logging.info("Calculating actual hours...") # Convert time adjustments for col in ['TIMEADJUSTEARLY', 'TIMEADJUSTLATE']: df[f'{col}_Hours'] = pd.to_numeric(df[col], errors='coerce').fillna(0).apply( lambda x: convert_time_to_hours(x) if x != 0 else 0 ) # Calculate actual hours df['Potential Hours'] = pd.to_numeric(df['Potential Hours'], errors='coerce').fillna(0) df['Actual_Hours'] = df.apply( lambda x: max(0, x['Potential Hours'] - (x['TIMEADJUSTEARLY_Hours'] + x['TIMEADJUSTLATE_Hours'])), axis=1 ) # Verify hour calculations zero_hours = df['Actual_Hours'].eq(0).sum() if zero_hours > 0: logging.warning(f"Found {zero_hours} entries with zero actual hours") logging.debug("Sample of zero-hour entries:") sample_zero = df[df['Actual_Hours'] == 0].head() for _, row in sample_zero.iterrows(): logging.debug(f" Activity: {row['ACTIVITYTYPE']}, " f"Potential: {row['Potential Hours']}, " f"Adjustments: {row['TIMEADJUSTEARLY_Hours'] + row['TIMEADJUSTLATE_Hours']}") # Process dates and create time periods df['EVENTDATE'] = pd.to_datetime(df['EVENTDATE'], format='%d %b %Y', errors='coerce') invalid_dates = df['EVENTDATE'].isna().sum() if invalid_dates > 0: logging.warning(f"Found {invalid_dates} invalid event dates") logging.debug("Sample of invalid dates:") sample_invalid = df[df['EVENTDATE'].isna()]['EVENTDATE'].head() for date in sample_invalid: logging.debug(f" Invalid date value: {date}") df['Year'] = df['EVENTDATE'].dt.year df['Month'] = df['EVENTDATE'].dt.month df['Week'] = df['EVENTDATE'].dt.isocalendar().week # Categorize activities and calculate hours logging.info("Categorizing activities...") df['Activity_Category'], df['Category_Hours'] = zip(*df.apply(calculate_activity_hours, axis=1)) # Keep required columns columns_to_keep = [ 'Staff_Standard', 'STAFFROLE', 'CONTRACTNAME', 'Activity_Category', 'Actual_Hours', 'Category_Hours', 'EVENTDATE', 'Year', 'Month', 'Week', 'Course Funding Group', 'Course Prov Mon C', 'Course Prov Mon D', 'VENUE' ] df = df[columns_to_keep] # Create monthly summaries logging.info("Creating monthly summaries...") monthly_hours = df.groupby( ['Staff_Standard', 'Year', 'Month', 'Activity_Category'] ).agg({ 'Actual_Hours': 'sum', 'Category_Hours': 'sum' }).reset_index() # Get contract details for each month def get_monthly_contract(row): month_date = pd.Timestamp(year=int(row['Year']), month=int(row['Month']), day=1) staff_contracts = contract_history[ contract_history['Staff_Name'] == row['Staff_Standard'] ] # Find applicable contracts for the month relevant_contracts = staff_contracts[ (staff_contracts['Start_Date'] <= month_date) & (staff_contracts['End_Date'] >= month_date) ] if len(relevant_contracts) > 0: # If multiple contracts, combine information positions = relevant_contracts['Position'].unique() contract_types = relevant_contracts['Contract_Type'].unique() total_target = relevant_contracts['Target_Hours'].sum() return pd.Series({ 'Position': ' & '.join(positions), 'Contract_Type': ' & '.join(contract_types), 'Target_Hours': total_target, 'Multiple_Contracts': len(relevant_contracts) > 1 }) return pd.Series({ 'Position': None, 'Contract_Type': None, 'Target_Hours': None, 'Multiple_Contracts': False }) # Add contract info to monthly summary logging.info("Adding contract information...") contract_details = monthly_hours.apply(get_monthly_contract, axis=1) monthly_hours = pd.concat([monthly_hours, contract_details], axis=1) # Calculate utilization logging.info("Calculating monthly utilization...") monthly_utilization = [] for name in monthly_hours['Staff_Standard'].unique(): staff_monthly = monthly_hours[ monthly_hours['Staff_Standard'] == name ].copy() # Process each month for month_idx, month_group in staff_monthly.groupby(['Year', 'Month']): year, month = month_idx # Only calculate utilization if we have target hours if pd.notna(month_group['Target_Hours'].iloc[0]): utilization = calculate_monthly_utilization( month_group, month_group['Target_Hours'].iloc[0], exclude_categories=['Assessment'] ) staff_monthly.loc[ (staff_monthly['Year'] == year) & (staff_monthly['Month'] == month), 'Utilization_Percentage' ] = utilization monthly_utilization.append(staff_monthly) if not monthly_utilization: logging.warning("No monthly utilization data generated!") return None monthly_summary = pd.concat(monthly_utilization) # Create additional summaries logging.info("Creating additional summaries...") # Daily summary daily_summary = df.groupby( ['Staff_Standard', 'EVENTDATE', 'Activity_Category'] ).agg({ 'Actual_Hours': 'sum', 'Category_Hours': 'sum' }).reset_index() # Weekly summary weekly_summary = df.groupby( ['Staff_Standard', 'Year', 'Week', 'Activity_Category'] ).agg({ 'Actual_Hours': 'sum', 'Category_Hours': 'sum' }).reset_index() # Add total hours columns for PowerBI monthly_summary['Total_Hours'] = monthly_summary['Actual_Hours'] monthly_summary['Assessment_Hours'] = monthly_summary.apply( lambda x: x['Category_Hours'] if x['Activity_Category'] == 'Assessment' else 0, axis=1 ) monthly_summary['Non_Assessment_Hours'] = monthly_summary.apply( lambda x: x['Category_Hours'] if x['Activity_Category'] != 'Assessment' else 0, axis=1 ) # Venue and curriculum summaries venue_summary = df.groupby( ['Staff_Standard', 'Year', 'Month', 'VENUE'] ).agg({ 'Actual_Hours': 'sum', 'Category_Hours': 'sum' }).reset_index() curriculum_summary = df.groupby( ['Staff_Standard', 'Year', 'Month', 'Course Prov Mon C'] ).agg({ 'Actual_Hours': 'sum', 'Category_Hours': 'sum' }).reset_index() logging.info("Tutor pay report processing completed!") # Return all processed data return { 'daily_summary': daily_summary, 'weekly_summary': weekly_summary, 'monthly_summary': monthly_summary, 'venue_summary': venue_summary, 'curriculum_summary': curriculum_summary, 'detailed_data': df } #Cell 4: Qualification Data Processing def process_qualification_lists( approved_file: str, archived_file: str ) -> pd.DataFrame: """ Process approved and archived qualification lists """ logging.info("Processing qualification lists...") required_columns = ['QualificationName', 'QualificationNumber'] # Read both files with validation approved_df, error = safe_read_excel(approved_file) if error: logging.error(f"Failed to read approved qualifications: {error}") return pd.DataFrame(columns=required_columns + ['Source']) archived_df, error = safe_read_excel(archived_file) if error: logging.error(f"Failed to read archived qualifications: {error}") return pd.DataFrame(columns=required_columns + ['Source']) # Process approved list approved_df = approved_df[required_columns].copy() approved_df['Source'] = 'Approved' # Process archived list archived_df = archived_df[required_columns].copy() archived_df['Source'] = 'Archived' # Validate data try: validate_processed_data(approved_df, required_columns, 'approved_qualifications') validate_processed_data(archived_df, required_columns, 'archived_qualifications') except ValueError as e: logging.error(f"Validation failed: {str(e)}") return pd.DataFrame(columns=required_columns + ['Source']) # Clean qualification numbers in both datasets for df in [approved_df, archived_df]: df['QualificationNumber'] = df['QualificationNumber'].apply(clean_qualification_number) df['QualificationName'] = df['QualificationName'].str.strip() # Check for duplicates all_quals = pd.concat([approved_df, archived_df], ignore_index=True) duplicates = all_quals.groupby('QualificationNumber').size() duplicate_quals = duplicates[duplicates > 1] if len(duplicate_quals) > 0: logging.warning(f"Found {len(duplicate_quals)} duplicate qualification numbers") for qual_num in duplicate_quals.index: dupes = all_quals[all_quals['QualificationNumber'] == qual_num] logging.warning(f"Duplicate entries for {qual_num}:") for _, row in dupes.iterrows(): logging.warning(f" {row['Source']}: {row['QualificationName']}") # Remove duplicates, keeping 'Approved' version if exists qual_df = all_quals.sort_values( ['Source', 'QualificationNumber'], ascending=[False, True] # False for Source puts 'Approved' first ).drop_duplicates('QualificationNumber', keep='first') # Log summary statistics logging.info(f"Processed {len(qual_df)} unique qualifications:") logging.info(f" - Approved: {len(qual_df[qual_df['Source'] == 'Approved'])}") logging.info(f" - Archived: {len(qual_df[qual_df['Source'] == 'Archived'])}") return qual_df def process_terms_caseloading( terms_file: str, qualification_df: pd.DataFrame, contract_history: pd.DataFrame ) -> Dict[str, pd.DataFrame]: """ Process Terms caseloading data and link with qualification information """ logging.info("Processing Terms caseloading data...") # Read Terms caseloading data with error handling terms_df, error = safe_read_excel(terms_file) if error: logging.error(f"Failed to read Terms caseloading data: {error}") raise ValueError(f"Cannot process Terms caseloading data: {error}") # Select and rename relevant columns cols_to_keep = { 'Primary Staff Name': 'Staff_First_Name', 'Primary Staff Surname': 'Staff_Last_Name', 'Start Date': 'Start_Date', 'End Date': 'End_Date', 'Activity Name': 'Activity_Name', 'Funding Reference': 'Qualification_Number', 'Level': 'Level', 'Activity Group': 'Activity_Group', 'Name': 'Learner_First_Name', 'Surname': 'Learner_Last_Name' } try: terms_df = terms_df[cols_to_keep.keys()].copy() terms_df.rename(columns=cols_to_keep, inplace=True) except KeyError as e: logging.error(f"Missing required columns: {str(e)}") raise ValueError(f"Required columns missing from Terms caseloading data: {str(e)}") # Clean qualification numbers and create staff names terms_df['Qualification_Number'] = terms_df['Qualification_Number'].apply(clean_qualification_number) # Clean name columns name_columns = ['Staff_First_Name', 'Staff_Last_Name', 'Learner_First_Name', 'Learner_Last_Name'] terms_df = standardize_names(terms_df, name_columns) terms_df['Staff_Standard'] = terms_df.apply( lambda x: f"{x['Staff_First_Name']} {x['Staff_Last_Name']}", axis=1 ) # Convert dates with validation date_columns = ['Start_Date', 'End_Date'] for col in date_columns: terms_df[col] = pd.to_datetime(terms_df[col], errors='coerce') invalid_dates = terms_df[col].isna().sum() if invalid_dates > 0: logging.warning(f"Found {invalid_dates} invalid dates in {col}") logging.debug("Sample of rows with invalid dates:") sample_invalid = terms_df[terms_df[col].isna()].head() for _, row in sample_invalid.iterrows(): logging.debug(f" Staff: {row['Staff_Standard']}, Activity: {row['Activity_Name']}") # Look up qualification names def get_qualification_name(row): if pd.isna(row['Qualification_Number']): logging.debug(f"No qualification number for activity: {row['Activity_Name']}") return row['Activity_Name'] qual_match = qualification_df[ qualification_df['QualificationNumber'] == row['Qualification_Number'] ] if len(qual_match) > 0: source = qual_match.iloc[0]['Source'] qual_name = qual_match.iloc[0]['QualificationName'] logging.debug(f"Found qualification {row['Qualification_Number']} in {source} list: {qual_name}") return qual_name logging.debug(f"No match found for qualification {row['Qualification_Number']}, " f"using activity name: {row['Activity_Name']}") return row['Activity_Name'] terms_df['Qualification_Name'] = terms_df.apply(get_qualification_name, axis=1) # Add contract info based on start date with handling of multiple contracts def get_contract_info(row): if pd.isna(row['Start_Date']): return pd.Series({ 'Staff_Position': None, 'Staff_Contract': None, 'Staff_Target_Hours': None, 'Multiple_Contracts': False }) staff_contracts = contract_history[ contract_history['Staff_Name'] == row['Staff_Standard'] ] relevant_contracts = staff_contracts[ (staff_contracts['Start_Date'] <= row['Start_Date']) & (staff_contracts['End_Date'] >= row['Start_Date']) ] if len(relevant_contracts) > 0: positions = relevant_contracts['Position'].unique() contract_types = relevant_contracts['Contract_Type'].unique() total_target = relevant_contracts['Target_Hours'].sum() return pd.Series({ 'Staff_Position': ' & '.join(positions), 'Staff_Contract': ' & '.join(contract_types), 'Staff_Target_Hours': total_target, 'Multiple_Contracts': len(relevant_contracts) > 1 }) return pd.Series({ 'Staff_Position': None, 'Staff_Contract': None, 'Staff_Target_Hours': None, 'Multiple_Contracts': False }) # Add contract details logging.info("Adding contract information...") contract_details = terms_df.apply(get_contract_info, axis=1) terms_df = pd.concat([terms_df, contract_details], axis=1) # Create monthly learner counts logging.info("Calculating monthly learner counts...") monthly_data = [] dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M') for date in dates: month_start = date.replace(day=1) month_end = date # Get active learners for this month month_mask = ( (terms_df['Start_Date'] <= month_end) & ((terms_df['End_Date'].isna()) | (terms_df['End_Date'] >= month_start)) ) month_data = terms_df[month_mask].copy() if not month_data.empty: month_data['Year'] = date.year month_data['Month'] = date.month monthly_data.append(month_data) logging.debug( f"Month {date.strftime('%Y-%m')}: {len(month_data)} active learners" ) # Combine monthly data with empty DataFrame handling monthly_df = pd.concat(monthly_data, ignore_index=True) if monthly_data else pd.DataFrame() if monthly_df.empty: logging.warning("No monthly data generated!") return {} # Create summary dataframes logging.info("Creating summary dataframes...") # Learner summary learner_summary = monthly_df.groupby( ['Staff_Standard', 'Year', 'Month'] ).agg({ 'Learner_First_Name': 'count', 'Staff_Position': 'first', 'Staff_Contract': 'first', 'Staff_Target_Hours': 'first', 'Multiple_Contracts': 'first' }).reset_index() learner_summary.rename(columns={ 'Learner_First_Name': 'Total_Learners' }, inplace=True) # Qualification summary qual_summary = monthly_df.groupby( ['Staff_Standard', 'Year', 'Month', 'Qualification_Name', 'Level', 'Staff_Position', 'Staff_Contract', 'Activity_Group'] ).agg({ 'Learner_First_Name': 'count', 'Staff_Target_Hours': 'first', 'Multiple_Contracts': 'first' }).reset_index() qual_summary.rename(columns={ 'Learner_First_Name': 'Learners_In_Qualification' }, inplace=True) # Level summary level_summary = monthly_df.groupby( ['Staff_Standard', 'Year', 'Month', 'Level'] ).agg({ 'Learner_First_Name': 'count', 'Multiple_Contracts': 'first' }).reset_index() level_summary.rename(columns={ 'Learner_First_Name': 'Learners_In_Level' }, inplace=True) # Activity group summary activity_summary = monthly_df.groupby( ['Staff_Standard', 'Year', 'Month', 'Activity_Group'] ).agg({ 'Learner_First_Name': 'count', 'Multiple_Contracts': 'first' }).reset_index() activity_summary.rename(columns={ 'Learner_First_Name': 'Learners_In_Activity' }, inplace=True) logging.info("Terms caseloading processing completed!") return { 'terms_detailed': monthly_df, 'learner_summary': learner_summary, 'qualification_summary': qual_summary, 'level_summary': level_summary, 'activity_summary': activity_summary, 'qualification_reference': qualification_df } # Cell 5: PICS Data Processing and Main Execution def process_pics_data( caseload_file: str, hours_lookup_file: str, contract_history: pd.DataFrame ) -> Dict[str, pd.DataFrame]: """ Process PICS caseload data and hours lookup """ logging.info("Processing PICS data...") # Read hours lookup first logging.info("Reading hours lookup data...") hours_df, error = safe_read_excel(hours_lookup_file, skiprows=2) if error: logging.error(f"Failed to read hours lookup: {error}") raise ValueError(f"Cannot process PICS data: {error}") # Process hours lookup hours_lookup = process_pics_hours_lookup(hours_df) logging.info(f"Processed {len(hours_lookup)} standard titles with hours") # Read and validate caseload data logging.info("Reading PICS caseload data...") caseload_df, error = safe_read_excel(caseload_file) if error: logging.error(f"Failed to read caseload data: {error}") raise ValueError(f"Cannot process PICS data: {error}") required_columns = [ 'Assessor Full Name', 'Programme', 'Apprenticeship Standard Title', 'Apprenticeship Achieved Date', 'Start Date', 'Learning Expected End', 'Actual End' ] # Validate and select columns try: validate_processed_data(caseload_df, required_columns, 'PICS_caseload') df = caseload_df[required_columns].copy() except ValueError as e: logging.error(f"Validation failed: {str(e)}") raise # Rename columns for consistency column_mapping = { 'Assessor Full Name': 'Assessor_Name', 'Programme': 'Programme_Level', 'Apprenticeship Standard Title': 'Standard_Title', 'Apprenticeship Achieved Date': 'Achieved_Date', 'Start Date': 'Start_Date', 'Learning Expected End': 'Expected_End', 'Actual End': 'Actual_End' } df.rename(columns=column_mapping, inplace=True) # Clean assessor names df['Assessor_Name'] = df['Assessor_Name'].apply(clean_name) # Convert dates with validation date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End'] for col in date_columns: df[col] = pd.to_datetime(df[col], errors='coerce') invalid_dates = df[col].isna().sum() if invalid_dates > 0: logging.warning(f"Found {invalid_dates} invalid dates in {col}") if invalid_dates < 10: # Log details only for small numbers of issues invalid_rows = df[df[col].isna()] for _, row in invalid_rows.iterrows(): logging.warning(f"Invalid {col} for {row['Assessor_Name']}: {row['Standard_Title']}") # Clean titles and match with hours lookup df['Standard_Title'] = df['Standard_Title'].str.strip() # Add hours from lookup with validation merged_df = df.merge( hours_lookup[['Standard_Title', 'Monthly_Hours']], on='Standard_Title', how='left', validate='m:1' ) # Check for unmatched standards unmatched = merged_df[merged_df['Monthly_Hours'].isna()]['Standard_Title'].unique() if len(unmatched) > 0: logging.warning(f"Found {len(unmatched)} standards without matching hours:") for title in unmatched: logging.warning(f" {title}") merged_df['Monthly_Hours'] = merged_df['Monthly_Hours'].fillna(0) # Create monthly snapshots monthly_data = [] dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M') logging.info("Generating monthly snapshots...") for date in dates: month_start = date.replace(day=1) month_end = date logging.info(f"Processing month: {month_start.strftime('%B %Y')}") # Get active students for this month month_mask = merged_df.apply( lambda row: ( row['Start_Date'] <= month_end and (pd.isna(row['Actual_End']) or row['Actual_End'] >= month_start) and (pd.isna(row['Expected_End']) or row['Expected_End'] >= month_start) ), axis=1 ) month_data = merged_df[month_mask].copy() if not month_data.empty: month_data['Snapshot_Date'] = month_end month_data['Year'] = month_end.year month_data['Month'] = month_end.month # Add contract info for this month def get_assessor_contract(row): staff_contracts = contract_history[ contract_history['Staff_Name'] == row['Assessor_Name'] ] relevant_contracts = staff_contracts[ (staff_contracts['Start_Date'] <= month_end) & (staff_contracts['End_Date'] >= month_end) ] if len(relevant_contracts) > 0: positions = relevant_contracts['Position'].unique() contract_types = relevant_contracts['Contract_Type'].unique() total_target = relevant_contracts['Target_Hours'].sum() return pd.Series({ 'Assessor_Position': ' & '.join(positions), 'Assessor_Contract': ' & '.join(contract_types), 'Assessor_Target_Hours': total_target, 'Multiple_Contracts': len(relevant_contracts) > 1 }) return pd.Series({ 'Assessor_Position': None, 'Assessor_Contract': None, 'Assessor_Target_Hours': None, 'Multiple_Contracts': False }) # Add contract details for the month contract_details = month_data.apply(get_assessor_contract, axis=1) month_data = pd.concat([month_data, contract_details], axis=1) monthly_data.append(month_data) logging.info(f"Active students in {month_start.strftime('%B %Y')}: {len(month_data)}") # Combine all monthly data monthly_df = pd.concat(monthly_data, ignore_index=True) if monthly_data else pd.DataFrame() if monthly_df.empty: logging.warning("No monthly data generated!") return {} # Create summaries logging.info("Creating summary views...") # Monthly summary per assessor monthly_summary = monthly_df.groupby( ['Assessor_Name', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts'] ).agg({ 'Standard_Title': 'count', 'Monthly_Hours': 'sum', 'Assessor_Target_Hours': 'first' }).reset_index() monthly_summary.rename(columns={ 'Standard_Title': 'Active_Students', 'Monthly_Hours': 'Required_Hours' }, inplace=True) # Programme level summary programme_summary = monthly_df.groupby( ['Assessor_Name', 'Programme_Level', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts'] ).agg({ 'Standard_Title': 'count', 'Assessor_Target_Hours': 'first' }).reset_index() programme_summary.rename(columns={ 'Standard_Title': 'Students_In_Programme' }, inplace=True) # Standard title summary standard_summary = monthly_df.groupby( ['Assessor_Name', 'Standard_Title', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts'] ).agg({ 'Monthly_Hours': 'sum', 'Snapshot_Date': 'count', 'Assessor_Target_Hours': 'first' }).reset_index() standard_summary.rename(columns={ 'Snapshot_Date': 'Students_In_Standard', 'Monthly_Hours': 'Required_Hours' }, inplace=True) logging.info("PICS data processing completed!") return { 'detailed_monthly': monthly_df, 'monthly_summary': monthly_summary, 'programme_summary': programme_summary, 'standard_summary': standard_summary, 'hours_reference': hours_lookup } def process_pics_hours_lookup(hours_df: pd.DataFrame) -> pd.DataFrame: """Process the PICS hours lookup data""" # Process hours lookup hours_lookup = hours_df[[ 'Standard title', 'Weighted Monthly Hours (1.6)' ]].copy() # Convert time strings to decimal hours def parse_time_str(time_str): if pd.isna(time_str): return 0.0 try: if isinstance(time_str, datetime.time): return time_str.hour + (time_str.minute / 60) elif isinstance(time_str, str) and ':' in time_str: parts = time_str.split(':') if len(parts) == 3: hours, minutes, _ = map(float, parts) else: hours, minutes = map(float, parts) return hours + (minutes / 60) elif isinstance(time_str, (int, float)): return float(time_str) except Exception as e: logging.warning(f"Error parsing time value '{time_str}': {str(e)}") return 0.0 return 0.0 hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(parse_time_str) # Log any zero hours entries zero_hours = hours_lookup['Monthly_Hours'].eq(0) if zero_hours.any(): logging.warning(f"Found {zero_hours.sum()} entries with zero hours in lookup") logging.warning("Zero-hour entries:") zero_entries = hours_lookup[zero_hours] for _, row in zero_entries.iterrows(): logging.warning(f" {row['Standard title']}: {row['Weighted Monthly Hours (1.6)']}") # Clean and standardize titles hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip() return hours_lookup # Main execution block if __name__ == "__main__": try: # Setup logging setup_logging() # Get current directory and setup paths current_dir = os.getcwd() monthly_reports_folder = os.path.join(current_dir, "monthly_reports") output_folder = os.path.join(current_dir, "processed_data") logging.info(f"Processing data from: {current_dir}") logging.info(f"Monthly reports folder: {monthly_reports_folder}") logging.info(f"Output folder: {output_folder}") # Ensure monthly reports folder exists if not os.path.exists(monthly_reports_folder): raise FileNotFoundError( f"Monthly reports folder not found: {monthly_reports_folder}" ) # Create output folder if needed if not os.path.exists(output_folder): os.makedirs(output_folder) logging.info(f"Created output folder: {output_folder}") # --- Process Monthly Reports --- logging.info("\nStep 1: Processing Monthly Reports") staff_data, staff_summary, contract_history = process_monthly_reports( monthly_reports_folder ) # Save staff data staff_data.to_csv( os.path.join(output_folder, 'staff_monthly_data.csv'), index=False ) staff_summary.to_csv( os.path.join(output_folder, 'staff_summary.csv'), index=False ) contract_history.to_csv( os.path.join(output_folder, 'contract_history.csv'), index=False ) # --- Process Tutor Pay Report --- logging.info("\nStep 2: Processing Tutor Pay Report") tutor_report_data = process_tutor_report( "TutorPayReport_New 23.24 FY 22.11.24.xlsx", os.path.join(output_folder, 'staff_monthly_data.csv'), contract_history ) if tutor_report_data is not None: for name, df in tutor_report_data.items(): output_file = os.path.join(output_folder, f'{name}.csv') df.to_csv(output_file, index=False) logging.info(f"Saved {name} to {output_file}") else: logging.error("No tutor report data generated!") # --- Process Qualification Data --- logging.info("\nStep 3: Processing Qualification Data") # Define input files approved_file = "Approved List for Terms Caseloading FY2324.xlsx" archived_file = "Archived List for Terms Caseloading FY2324.xlsx" terms_file = "Terms caseloading 23.24 FY.xlsx" # Process qualification lists qualification_data = process_qualification_lists( approved_file, archived_file ) # Process Terms caseloading terms_data = process_terms_caseloading( terms_file, qualification_data, contract_history ) # Save qualification data results for name, df in terms_data.items(): output_file = os.path.join(output_folder, f'{name}.csv') df.to_csv(output_file, index=False) logging.info(f"Saved {name} to {output_file}") # --- Process PICS Data --- logging.info("\nStep 4: Processing PICS Data") # Define PICS input files caseload_file = "PICS caseload for PBI.xlsx" hours_lookup_file = "PICS Hours for Assessor Look up.xlsx" # Process PICS data pics_data = process_pics_data( caseload_file, hours_lookup_file, contract_history ) # Save PICS data results for name, df in pics_data.items(): output_file = os.path.join(output_folder, f'pics_{name}.csv') df.to_csv(output_file, index=False) logging.info(f"Saved {name} to {output_file}") logging.info("\nAll processing completed successfully!") except Exception as e: logging.error(f"\nError during processing: {str(e)}", exc_info=True) raise
Editor is loading...
Leave a Comment