C05

 avatar
user_1718919
plain_text
7 months ago
13 kB
4
Indexable
def process_pics_data(
    caseload_file: str,
    hours_lookup_file: str,
    contract_history: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Process PICS caseload data and hours lookup
    
    Args:
        caseload_file: Path to PICS caseload file
        hours_lookup_file: Path to hours lookup file
        contract_history: Contract history DataFrame
        
    Returns:
        Dictionary of processed DataFrames
    """
    logging.info("Processing PICS data...")
    
    # Read hours lookup first
    logging.info("Reading hours lookup data...")
    hours_df, error = safe_read_excel(hours_lookup_file, skiprows=2)
    if error:
        logging.error(f"Failed to read hours lookup: {error}")
        raise ValueError(f"Cannot process PICS data: {error}")
    
    # Process hours lookup
    hours_lookup = hours_df[[
        'Standard title', 'Weighted Monthly Hours (1.6)'
    ]].copy()
    
    # Convert time format to decimal hours
    hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(
        convert_time_to_hours
    )
    
    # Verify hours conversion
    zero_hours = hours_lookup['Monthly_Hours'].eq(0).sum()
    if zero_hours > 0:
        logging.warning(f"Found {zero_hours} entries with zero hours in lookup")
        logging.warning("Zero-hour entries:")
        for _, row in hours_lookup[hours_lookup['Monthly_Hours'] == 0].iterrows():
            logging.warning(f"  {row['Standard title']}: {row['Weighted Monthly Hours (1.6)']}")
    
    # Clean and standardize titles
    hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip()
    
    logging.info(f"Processed {len(hours_lookup)} standard titles with hours")
    
    # Read caseload data
    logging.info("Reading PICS caseload data...")
    caseload_df, error = safe_read_excel(caseload_file)
    if error:
        logging.error(f"Failed to read caseload data: {error}")
        raise ValueError(f"Cannot process PICS data: {error}")
    
    # Select and rename relevant columns
    cols_to_keep = {
        'Assessor Full Name': 'Assessor_Name',
        'Programme': 'Programme_Level',
        'Apprenticeship Standard Title': 'Standard_Title',
        'Apprenticeship Achieved Date': 'Achieved_Date',
        'Start Date': 'Start_Date',
        'Learning Expected End': 'Expected_End',
        'Actual End': 'Actual_End'
    }
    
    df = caseload_df[cols_to_keep.keys()].copy()
    df.rename(columns=cols_to_keep, inplace=True)
    
    # Convert dates and handle invalid dates
    date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End']
    for col in date_columns:
        df[col] = pd.to_datetime(df[col], errors='coerce')
        invalid_dates = df[col].isna().sum()
        if invalid_dates > 0:
            logging.warning(f"Found {invalid_dates} invalid dates in {col}")
    
    # Clean assessor names
    df['Assessor_Name'] = df['Assessor_Name'].apply(clean_name)
    
    # Clean titles for matching with lookup
    df['Standard_Title'] = df['Standard_Title'].str.strip()
    
    # Add hours from lookup with validation
    df = df.merge(
        hours_lookup[['Standard_Title', 'Monthly_Hours']],
        on='Standard_Title',
        how='left',
        validate='m:1'
    )
    
    # Check for unmatched standards
    unmatched = df[df['Monthly_Hours'].isna()]['Standard_Title'].unique()
    if len(unmatched) > 0:
        logging.warning(f"Found {len(unmatched)} standards without matching hours:")
        for title in unmatched:
            logging.warning(f"  {title}")
    
    # Fill missing hours with 0
    df['Monthly_Hours'] = df['Monthly_Hours'].fillna(0)
    
    # Function to determine active status
    def was_active_in_month(row: pd.Series, month_start: pd.Timestamp, month_end: pd.Timestamp) -> bool:
        """Check if student was active during any part of the month"""
        start_date = row['Start_Date']
        end_date = row['Actual_End'] if pd.notna(row['Actual_End']) else row['Expected_End']
        
        if pd.isna(start_date) or pd.isna(end_date):
            return False
        
        return (start_date <= month_end) and (end_date >= month_start)
    
    # Create monthly snapshots for FY23/24
    monthly_data = []
    dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M')
    
    logging.info("Generating monthly snapshots...")
    for date in dates:
        month_start = date.replace(day=1)
        month_end = date
        
        logging.info(f"Processing month: {month_start.strftime('%B %Y')}")
        
        # Filter for active students in this month
        month_mask = df.apply(
            lambda row: was_active_in_month(row, month_start, month_end),
            axis=1
        )
        month_data = df[month_mask].copy()
        
        # Add month info
        month_data['Snapshot_Date'] = month_end
        month_data['Year'] = month_end.year
        month_data['Month'] = month_end.month
        
        # Add contract info for this month
        def get_assessor_contract(row):
            staff_contracts = contract_history[
                contract_history['Staff_Name'] == row['Assessor_Name']
            ]
            
            relevant_contracts = staff_contracts[
                (staff_contracts['Start_Date'] <= month_end) &
                (staff_contracts['End_Date'] >= month_end)
            ]
            
            if len(relevant_contracts) > 0:
                positions = ', '.join(relevant_contracts['Position'].unique())
                contract_types = ', '.join(relevant_contracts['Contract_Type'].unique())
                total_target = relevant_contracts['Target_Hours'].sum()
                
                return pd.Series({
                    'Assessor_Position': positions,
                    'Assessor_Contract': contract_types,
                    'Assessor_Target_Hours': total_target,
                    'Multiple_Contracts': len(relevant_contracts) > 1
                })
            return pd.Series({
                'Assessor_Position': None,
                'Assessor_Contract': None,
                'Assessor_Target_Hours': None,
                'Multiple_Contracts': False
            })
        
        contract_details = month_data.apply(get_assessor_contract, axis=1)
        month_data = pd.concat([month_data, contract_details], axis=1)
        
        active_count = len(month_data)
        logging.info(f"Active students in {month_start.strftime('%B %Y')}: {active_count}")
        
        monthly_data.append(month_data)
    
    # Combine all monthly data
    monthly_df = pd.concat(monthly_data, ignore_index=True)
    
    # Create summaries
    logging.info("Creating summary views...")
    
    # 1. Monthly summary per assessor
    monthly_summary = monthly_df.groupby(
        ['Assessor_Name', 'Year', 'Month', 'Assessor_Position', 
         'Assessor_Contract', 'Multiple_Contracts']
    ).agg({
        'Standard_Title': 'count',
        'Monthly_Hours': 'sum',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    monthly_summary.rename(columns={
        'Standard_Title': 'Active_Students',
        'Monthly_Hours': 'Required_Hours'
    }, inplace=True)
    
    # 2. Programme level summary
    programme_summary = monthly_df.groupby(
        ['Assessor_Name', 'Programme_Level', 'Year', 'Month',
         'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
    ).agg({
        'Standard_Title': 'count',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    programme_summary.rename(columns={
        'Standard_Title': 'Students_In_Programme'
    }, inplace=True)
    
    # 3. Standard title summary
    standard_summary = monthly_df.groupby(
        ['Assessor_Name', 'Standard_Title', 'Year', 'Month',
         'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
    ).agg({
        'Monthly_Hours': 'sum',
        'Snapshot_Date': 'count',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    standard_summary.rename(columns={
        'Snapshot_Date': 'Students_In_Standard',
        'Monthly_Hours': 'Required_Hours'
    }, inplace=True)
    
    logging.info("PICS data processing completed!")
    
    return {
        'detailed_monthly': monthly_df,
        'monthly_summary': monthly_summary,
        'programme_summary': programme_summary,
        'standard_summary': standard_summary,
        'hours_reference': hours_lookup
    }

if __name__ == "__main__":
    try:
        # Setup logging
        setup_logging()
        
        # Get current directory and setup paths
        current_dir = os.getcwd()
        monthly_reports_folder = os.path.join(current_dir, "monthly_reports")
        output_folder = os.path.join(current_dir, "processed_data")
        
        logging.info(f"Processing data from: {current_dir}")
        logging.info(f"Monthly reports folder: {monthly_reports_folder}")
        logging.info(f"Output folder: {output_folder}")
        
        # Ensure monthly reports folder exists
        if not os.path.exists(monthly_reports_folder):
            raise FileNotFoundError(
                f"Monthly reports folder not found: {monthly_reports_folder}"
            )
        
        # Create output folder if needed
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)
            logging.info(f"Created output folder: {output_folder}")
        
        # --- Process Monthly Reports ---
        logging.info("\nStep 1: Processing Monthly Reports")
        staff_data, staff_summary, contract_history = process_monthly_reports(
            monthly_reports_folder
        )
        
        # Save staff data
        staff_data.to_csv(
            os.path.join(output_folder, 'staff_monthly_data.csv'),
            index=False
        )
        staff_summary.to_csv(
            os.path.join(output_folder, 'staff_summary.csv'),
            index=False
        )
        contract_history.to_csv(
            os.path.join(output_folder, 'contract_history.csv'),
            index=False
        )
        
        # --- Process Tutor Pay Report ---
        logging.info("\nStep 2: Processing Tutor Pay Report")
        tutor_report_data = process_tutor_report(
            "TutorPayReport_New 23.24 FY 22.11.24.xlsx",
            os.path.join(output_folder, 'staff_monthly_data.csv'),
            contract_history
        )
        
        if tutor_report_data is not None:
            for name, df in tutor_report_data.items():
                output_file = os.path.join(output_folder, f'{name}.csv')
                df.to_csv(output_file, index=False)
                logging.info(f"Saved {name} to {output_file}")
        else:
            logging.error("No tutor report data generated!")
        
        # --- Process Qualification Data ---
        logging.info("\nStep 3: Processing Qualification Data")
        # Define input files
        approved_file = "Approved List for Terms Caseloading FY2324.xlsx"
        archived_file = "Archived List for Terms Caseloading FY2324.xlsx"
        terms_file = "Terms caseloading 23.24 FY.xlsx"
        
        # Process qualification lists
        qualification_data = process_qualification_lists(
            approved_file,
            archived_file
        )
        
        # Process Terms caseloading
        terms_data = process_terms_caseloading(
            terms_file,
            qualification_data,
            contract_history
        )
        
        # Save qualification data results
        for name, df in terms_data.items():
            output_file = os.path.join(output_folder, f'{name}.csv')
            df.to_csv(output_file, index=False)
            logging.info(f"Saved {name} to {output_file}")
        
        # --- Process PICS Data ---
        logging.info("\nStep 4: Processing PICS Data")
        # Define PICS input files
        caseload_file = "PICS caseload for PBI.xlsx"
        hours_lookup_file = "PICS Hours for Assessor Look up.xlsx"
        
        # Process PICS data
        pics_data = process_pics_data(
            caseload_file,
            hours_lookup_file,
            contract_history
        )
        
        # Save PICS data results
        for name, df in pics_data.items():
            output_file = os.path.join(output_folder, f'pics_{name}.csv')
            df.to_csv(output_file, index=False)
            logging.info(f"Saved {name} to {output_file}")
        
        logging.info("\nAll processing completed successfully!")
        
    except Exception as e:
        logging.error(f"\nError during processing: {str(e)}", exc_info=True)
        raise
Editor is loading...
Leave a Comment