C5
user_1718919
plain_text
4 months ago
15 kB
4
Indexable
def process_pics_data( caseload_file: str, hours_lookup_file: str, contract_history: pd.DataFrame ) -> Dict[str, pd.DataFrame]: """ Process PICS caseload data and hours lookup """ logging.info("Processing PICS data...") # Read hours lookup first logging.info("Reading hours lookup data...") hours_df, error = safe_read_excel(hours_lookup_file, skiprows=2) if error: logging.error(f"Failed to read hours lookup: {error}") raise ValueError(f"Cannot process PICS data: {error}") # Process hours lookup hours_lookup = process_pics_hours_lookup(hours_df) logging.info(f"Processed {len(hours_lookup)} standard titles with hours") # Read and validate caseload data logging.info("Reading PICS caseload data...") caseload_df, error = safe_read_excel(caseload_file) if error: logging.error(f"Failed to read caseload data: {error}") raise ValueError(f"Cannot process PICS data: {error}") required_columns = [ 'Assessor Full Name', 'Programme', 'Apprenticeship Standard Title', 'Apprenticeship Achieved Date', 'Start Date', 'Learning Expected End', 'Actual End' ] # Validate and select columns try: validate_processed_data(caseload_df, required_columns, 'PICS_caseload') df = caseload_df[required_columns].copy() except ValueError as e: logging.error(f"Validation failed: {str(e)}") raise # Rename columns for consistency column_mapping = { 'Assessor Full Name': 'Assessor_Name', 'Programme': 'Programme_Level', 'Apprenticeship Standard Title': 'Standard_Title', 'Apprenticeship Achieved Date': 'Achieved_Date', 'Start Date': 'Start_Date', 'Learning Expected End': 'Expected_End', 'Actual End': 'Actual_End' } df.rename(columns=column_mapping, inplace=True) # Clean assessor names df['Assessor_Name'] = df['Assessor_Name'].apply(clean_name) # Convert dates with validation date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End'] for col in date_columns: df[col] = pd.to_datetime(df[col], errors='coerce') invalid_dates = df[col].isna().sum() if invalid_dates > 0: logging.warning(f"Found {invalid_dates} invalid dates in {col}") if invalid_dates < 10: # Log details only for small numbers of issues invalid_rows = df[df[col].isna()] for _, row in invalid_rows.iterrows(): logging.warning(f"Invalid {col} for {row['Assessor_Name']}: {row['Standard_Title']}") # Clean titles and match with hours lookup df['Standard_Title'] = df['Standard_Title'].str.strip() # Add hours from lookup with validation merged_df = df.merge( hours_lookup[['Standard_Title', 'Monthly_Hours']], on='Standard_Title', how='left', validate='m:1' ) # Check for unmatched standards unmatched = merged_df[merged_df['Monthly_Hours'].isna()]['Standard_Title'].unique() if len(unmatched) > 0: logging.warning(f"Found {len(unmatched)} standards without matching hours:") for title in unmatched: logging.warning(f" {title}") merged_df['Monthly_Hours'] = merged_df['Monthly_Hours'].fillna(0) # Create monthly snapshots monthly_data = [] dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M') logging.info("Generating monthly snapshots...") for date in dates: month_start = date.replace(day=1) month_end = date logging.info(f"Processing month: {month_start.strftime('%B %Y')}") # Get active students for this month month_mask = merged_df.apply( lambda row: ( row['Start_Date'] <= month_end and (pd.isna(row['Actual_End']) or row['Actual_End'] >= month_start) and (pd.isna(row['Expected_End']) or row['Expected_End'] >= month_start) ), axis=1 ) month_data = merged_df[month_mask].copy() if not month_data.empty: month_data['Snapshot_Date'] = month_end month_data['Year'] = month_end.year month_data['Month'] = month_end.month # Add contract info for this month def get_assessor_contract(row): staff_contracts = contract_history[ contract_history['Staff_Name'] == row['Assessor_Name'] ] relevant_contracts = staff_contracts[ (staff_contracts['Start_Date'] <= month_end) & (staff_contracts['End_Date'] >= month_end) ] if len(relevant_contracts) > 0: positions = relevant_contracts['Position'].unique() contract_types = relevant_contracts['Contract_Type'].unique() total_target = relevant_contracts['Target_Hours'].sum() return pd.Series({ 'Assessor_Position': ' & '.join(positions), 'Assessor_Contract': ' & '.join(contract_types), 'Assessor_Target_Hours': total_target, 'Multiple_Contracts': len(relevant_contracts) > 1 }) return pd.Series({ 'Assessor_Position': None, 'Assessor_Contract': None, 'Assessor_Target_Hours': None, 'Multiple_Contracts': False }) # Add contract details for the month contract_details = month_data.apply(get_assessor_contract, axis=1) month_data = pd.concat([month_data, contract_details], axis=1) monthly_data.append(month_data) logging.info(f"Active students in {month_start.strftime('%B %Y')}: {len(month_data)}") # Combine all monthly data monthly_df = pd.concat(monthly_data, ignore_index=True) if monthly_data else pd.DataFrame() if monthly_df.empty: logging.warning("No monthly data generated!") return {} # Create summaries logging.info("Creating summary views...") # Monthly summary per assessor monthly_summary = monthly_df.groupby( ['Assessor_Name', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts'] ).agg({ 'Standard_Title': 'count', 'Monthly_Hours': 'sum', 'Assessor_Target_Hours': 'first' }).reset_index() monthly_summary.rename(columns={ 'Standard_Title': 'Active_Students', 'Monthly_Hours': 'Required_Hours' }, inplace=True) # Programme level summary programme_summary = monthly_df.groupby( ['Assessor_Name', 'Programme_Level', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts'] ).agg({ 'Standard_Title': 'count', 'Assessor_Target_Hours': 'first' }).reset_index() programme_summary.rename(columns={ 'Standard_Title': 'Students_In_Programme' }, inplace=True) # Standard title summary standard_summary = monthly_df.groupby( ['Assessor_Name', 'Standard_Title', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts'] ).agg({ 'Monthly_Hours': 'sum', 'Snapshot_Date': 'count', 'Assessor_Target_Hours': 'first' }).reset_index() standard_summary.rename(columns={ 'Snapshot_Date': 'Students_In_Standard', 'Monthly_Hours': 'Required_Hours' }, inplace=True) logging.info("PICS data processing completed!") return { 'detailed_monthly': monthly_df, 'monthly_summary': monthly_summary, 'programme_summary': programme_summary, 'standard_summary': standard_summary, 'hours_reference': hours_lookup } def process_pics_hours_lookup(hours_df: pd.DataFrame) -> pd.DataFrame: """Process the PICS hours lookup data""" # Process hours lookup hours_lookup = hours_df[[ 'Standard title', 'Weighted Monthly Hours (1.6)' ]].copy() # Convert time strings to decimal hours def parse_time_str(time_str): if pd.isna(time_str): return 0.0 try: if isinstance(time_str, datetime.time): return time_str.hour + (time_str.minute / 60) elif isinstance(time_str, str) and ':' in time_str: parts = time_str.split(':') if len(parts) == 3: hours, minutes, _ = map(float, parts) else: hours, minutes = map(float, parts) return hours + (minutes / 60) elif isinstance(time_str, (int, float)): return float(time_str) except Exception as e: logging.warning(f"Error parsing time value '{time_str}': {str(e)}") return 0.0 return 0.0 hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(parse_time_str) # Log any zero hours entries zero_hours = hours_lookup['Monthly_Hours'].eq(0) if zero_hours.any(): logging.warning(f"Found {zero_hours.sum()} entries with zero hours in lookup") logging.warning("Zero-hour entries:") zero_entries = hours_lookup[zero_hours] for _, row in zero_entries.iterrows(): logging.warning(f" {row['Standard title']}: {row['Weighted Monthly Hours (1.6)']}") # Clean and standardize titles hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip() return hours_lookup # Main execution block if __name__ == "__main__": try: # Setup logging setup_logging() # Get current directory and setup paths current_dir = os.getcwd() monthly_reports_folder = os.path.join(current_dir, "monthly_reports") output_folder = os.path.join(current_dir, "processed_data") logging.info(f"Processing data from: {current_dir}") logging.info(f"Monthly reports folder: {monthly_reports_folder}") logging.info(f"Output folder: {output_folder}") # Ensure monthly reports folder exists if not os.path.exists(monthly_reports_folder): raise FileNotFoundError( f"Monthly reports folder not found: {monthly_reports_folder}" ) # Create output folder if needed if not os.path.exists(output_folder): os.makedirs(output_folder) logging.info(f"Created output folder: {output_folder}") # --- Process Monthly Reports --- logging.info("\nStep 1: Processing Monthly Reports") staff_data, staff_summary, contract_history = process_monthly_reports( monthly_reports_folder ) # Save staff data staff_data.to_csv( os.path.join(output_folder, 'staff_monthly_data.csv'), index=False ) staff_summary.to_csv( os.path.join(output_folder, 'staff_summary.csv'), index=False ) contract_history.to_csv( os.path.join(output_folder, 'contract_history.csv'), index=False ) # --- Process Tutor Pay Report --- logging.info("\nStep 2: Processing Tutor Pay Report") tutor_report_data = process_tutor_report( "TutorPayReport_New 23.24 FY 22.11.24.xlsx", os.path.join(output_folder, 'staff_monthly_data.csv'), contract_history ) if tutor_report_data is not None: for name, df in tutor_report_data.items(): output_file = os.path.join(output_folder, f'{name}.csv') df.to_csv(output_file, index=False) logging.info(f"Saved {name} to {output_file}") else: logging.error("No tutor report data generated!") # --- Process Qualification Data --- logging.info("\nStep 3: Processing Qualification Data") # Define input files approved_file = "Approved List for Terms Caseloading FY2324.xlsx" archived_file = "Archived List for Terms Caseloading FY2324.xlsx" terms_file = "Terms caseloading 23.24 FY.xlsx" # Process qualification lists qualification_data = process_qualification_lists( approved_file, archived_file ) # Process Terms caseloading terms_data = process_terms_caseloading( terms_file, qualification_data, contract_history ) # Save qualification data results for name, df in terms_data.items(): output_file = os.path.join(output_folder, f'{name}.csv') df.to_csv(output_file, index=False) logging.info(f"Saved {name} to {output_file}") # --- Process PICS Data --- logging.info("\nStep 4: Processing PICS Data") # Define PICS input files caseload_file = "PICS caseload for PBI.xlsx" hours_lookup_file = "PICS Hours for Assessor Look up.xlsx" # Process PICS data pics_data = process_pics_data( caseload_file, hours_lookup_file, contract_history ) # Save PICS data results for name, df in pics_data.items(): output_file = os.path.join(output_folder, f'pics_{name}.csv') df.to_csv(output_file, index=False) logging.info(f"Saved {name} to {output_file}") logging.info("\nAll processing completed successfully!") except Exception as e: logging.error(f"\nError during processing: {str(e)}", exc_info=True) raise
Editor is loading...
Leave a Comment