C5

 avatar
user_1718919
plain_text
4 months ago
15 kB
4
Indexable
def process_pics_data(
    caseload_file: str,
    hours_lookup_file: str,
    contract_history: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Process PICS caseload data and hours lookup
    """
    logging.info("Processing PICS data...")
    
    # Read hours lookup first
    logging.info("Reading hours lookup data...")
    hours_df, error = safe_read_excel(hours_lookup_file, skiprows=2)
    if error:
        logging.error(f"Failed to read hours lookup: {error}")
        raise ValueError(f"Cannot process PICS data: {error}")
        
    # Process hours lookup
    hours_lookup = process_pics_hours_lookup(hours_df)
    logging.info(f"Processed {len(hours_lookup)} standard titles with hours")
    
    # Read and validate caseload data
    logging.info("Reading PICS caseload data...")
    caseload_df, error = safe_read_excel(caseload_file)
    if error:
        logging.error(f"Failed to read caseload data: {error}")
        raise ValueError(f"Cannot process PICS data: {error}")
        
    required_columns = [
        'Assessor Full Name', 'Programme', 'Apprenticeship Standard Title',
        'Apprenticeship Achieved Date', 'Start Date', 'Learning Expected End',
        'Actual End'
    ]
    
    # Validate and select columns
    try:
        validate_processed_data(caseload_df, required_columns, 'PICS_caseload')
        df = caseload_df[required_columns].copy()
    except ValueError as e:
        logging.error(f"Validation failed: {str(e)}")
        raise
    
    # Rename columns for consistency
    column_mapping = {
        'Assessor Full Name': 'Assessor_Name',
        'Programme': 'Programme_Level',
        'Apprenticeship Standard Title': 'Standard_Title',
        'Apprenticeship Achieved Date': 'Achieved_Date',
        'Start Date': 'Start_Date',
        'Learning Expected End': 'Expected_End',
        'Actual End': 'Actual_End'
    }
    df.rename(columns=column_mapping, inplace=True)
    
    # Clean assessor names
    df['Assessor_Name'] = df['Assessor_Name'].apply(clean_name)
    
    # Convert dates with validation
    date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End']
    for col in date_columns:
        df[col] = pd.to_datetime(df[col], errors='coerce')
        invalid_dates = df[col].isna().sum()
        if invalid_dates > 0:
            logging.warning(f"Found {invalid_dates} invalid dates in {col}")
            if invalid_dates < 10:  # Log details only for small numbers of issues
                invalid_rows = df[df[col].isna()]
                for _, row in invalid_rows.iterrows():
                    logging.warning(f"Invalid {col} for {row['Assessor_Name']}: {row['Standard_Title']}")
    
    # Clean titles and match with hours lookup
    df['Standard_Title'] = df['Standard_Title'].str.strip()
    
    # Add hours from lookup with validation
    merged_df = df.merge(
        hours_lookup[['Standard_Title', 'Monthly_Hours']],
        on='Standard_Title',
        how='left',
        validate='m:1'
    )
    
    # Check for unmatched standards
    unmatched = merged_df[merged_df['Monthly_Hours'].isna()]['Standard_Title'].unique()
    if len(unmatched) > 0:
        logging.warning(f"Found {len(unmatched)} standards without matching hours:")
        for title in unmatched:
            logging.warning(f"  {title}")
    
    merged_df['Monthly_Hours'] = merged_df['Monthly_Hours'].fillna(0)
    
    # Create monthly snapshots
    monthly_data = []
    dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M')
    
    logging.info("Generating monthly snapshots...")
    for date in dates:
        month_start = date.replace(day=1)
        month_end = date
        
        logging.info(f"Processing month: {month_start.strftime('%B %Y')}")
        
        # Get active students for this month
        month_mask = merged_df.apply(
            lambda row: (
                row['Start_Date'] <= month_end and
                (pd.isna(row['Actual_End']) or row['Actual_End'] >= month_start) and
                (pd.isna(row['Expected_End']) or row['Expected_End'] >= month_start)
            ),
            axis=1
        )
        
        month_data = merged_df[month_mask].copy()
        
        if not month_data.empty:
            month_data['Snapshot_Date'] = month_end
            month_data['Year'] = month_end.year
            month_data['Month'] = month_end.month
            
            # Add contract info for this month
            def get_assessor_contract(row):
                staff_contracts = contract_history[
                    contract_history['Staff_Name'] == row['Assessor_Name']
                ]
                
                relevant_contracts = staff_contracts[
                    (staff_contracts['Start_Date'] <= month_end) &
                    (staff_contracts['End_Date'] >= month_end)
                ]
                
                if len(relevant_contracts) > 0:
                    positions = relevant_contracts['Position'].unique()
                    contract_types = relevant_contracts['Contract_Type'].unique()
                    total_target = relevant_contracts['Target_Hours'].sum()
                    
                    return pd.Series({
                        'Assessor_Position': ' & '.join(positions),
                        'Assessor_Contract': ' & '.join(contract_types),
                        'Assessor_Target_Hours': total_target,
                        'Multiple_Contracts': len(relevant_contracts) > 1
                    })
                
                return pd.Series({
                    'Assessor_Position': None,
                    'Assessor_Contract': None,
                    'Assessor_Target_Hours': None,
                    'Multiple_Contracts': False
                })
            
            # Add contract details for the month
            contract_details = month_data.apply(get_assessor_contract, axis=1)
            month_data = pd.concat([month_data, contract_details], axis=1)
            
            monthly_data.append(month_data)
            logging.info(f"Active students in {month_start.strftime('%B %Y')}: {len(month_data)}")
    
    # Combine all monthly data
    monthly_df = pd.concat(monthly_data, ignore_index=True) if monthly_data else pd.DataFrame()
    
    if monthly_df.empty:
        logging.warning("No monthly data generated!")
        return {}
    
    # Create summaries
    logging.info("Creating summary views...")
    
    # Monthly summary per assessor
    monthly_summary = monthly_df.groupby(
        ['Assessor_Name', 'Year', 'Month', 'Assessor_Position', 
         'Assessor_Contract', 'Multiple_Contracts']
    ).agg({
        'Standard_Title': 'count',
        'Monthly_Hours': 'sum',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    monthly_summary.rename(columns={
        'Standard_Title': 'Active_Students',
        'Monthly_Hours': 'Required_Hours'
    }, inplace=True)
    
    # Programme level summary
    programme_summary = monthly_df.groupby(
        ['Assessor_Name', 'Programme_Level', 'Year', 'Month',
         'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
    ).agg({
        'Standard_Title': 'count',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    programme_summary.rename(columns={
        'Standard_Title': 'Students_In_Programme'
    }, inplace=True)
    
    # Standard title summary
    standard_summary = monthly_df.groupby(
        ['Assessor_Name', 'Standard_Title', 'Year', 'Month',
         'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
    ).agg({
        'Monthly_Hours': 'sum',
        'Snapshot_Date': 'count',
        'Assessor_Target_Hours': 'first'
    }).reset_index()
    
    standard_summary.rename(columns={
        'Snapshot_Date': 'Students_In_Standard',
        'Monthly_Hours': 'Required_Hours'
    }, inplace=True)
    
    logging.info("PICS data processing completed!")
    
    return {
        'detailed_monthly': monthly_df,
        'monthly_summary': monthly_summary,
        'programme_summary': programme_summary,
        'standard_summary': standard_summary,
        'hours_reference': hours_lookup
    }

def process_pics_hours_lookup(hours_df: pd.DataFrame) -> pd.DataFrame:
    """Process the PICS hours lookup data"""
    # Process hours lookup
    hours_lookup = hours_df[[
        'Standard title', 'Weighted Monthly Hours (1.6)'
    ]].copy()
    
    # Convert time strings to decimal hours
    def parse_time_str(time_str):
        if pd.isna(time_str):
            return 0.0
        try:
            if isinstance(time_str, datetime.time):
                return time_str.hour + (time_str.minute / 60)
            elif isinstance(time_str, str) and ':' in time_str:
                parts = time_str.split(':')
                if len(parts) == 3:
                    hours, minutes, _ = map(float, parts)
                else:
                    hours, minutes = map(float, parts)
                return hours + (minutes / 60)
            elif isinstance(time_str, (int, float)):
                return float(time_str)
        except Exception as e:
            logging.warning(f"Error parsing time value '{time_str}': {str(e)}")
            return 0.0
        return 0.0
    
    hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(parse_time_str)
    
    # Log any zero hours entries
    zero_hours = hours_lookup['Monthly_Hours'].eq(0)
    if zero_hours.any():
        logging.warning(f"Found {zero_hours.sum()} entries with zero hours in lookup")
        logging.warning("Zero-hour entries:")
        zero_entries = hours_lookup[zero_hours]
        for _, row in zero_entries.iterrows():
            logging.warning(f"  {row['Standard title']}: {row['Weighted Monthly Hours (1.6)']}")
    
    # Clean and standardize titles
    hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip()
    
    return hours_lookup

# Main execution block
if __name__ == "__main__":
    try:
        # Setup logging
        setup_logging()
        
        # Get current directory and setup paths
        current_dir = os.getcwd()
        monthly_reports_folder = os.path.join(current_dir, "monthly_reports")
        output_folder = os.path.join(current_dir, "processed_data")
        
        logging.info(f"Processing data from: {current_dir}")
        logging.info(f"Monthly reports folder: {monthly_reports_folder}")
        logging.info(f"Output folder: {output_folder}")
        
        # Ensure monthly reports folder exists
        if not os.path.exists(monthly_reports_folder):
            raise FileNotFoundError(
                f"Monthly reports folder not found: {monthly_reports_folder}"
            )
        
        # Create output folder if needed
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)
            logging.info(f"Created output folder: {output_folder}")
        
        # --- Process Monthly Reports ---
        logging.info("\nStep 1: Processing Monthly Reports")
        staff_data, staff_summary, contract_history = process_monthly_reports(
            monthly_reports_folder
        )
        
        # Save staff data
        staff_data.to_csv(
            os.path.join(output_folder, 'staff_monthly_data.csv'),
            index=False
        )
        staff_summary.to_csv(
            os.path.join(output_folder, 'staff_summary.csv'),
            index=False
        )
        contract_history.to_csv(
            os.path.join(output_folder, 'contract_history.csv'),
            index=False
        )
        
        # --- Process Tutor Pay Report ---
        logging.info("\nStep 2: Processing Tutor Pay Report")
        tutor_report_data = process_tutor_report(
            "TutorPayReport_New 23.24 FY 22.11.24.xlsx",
            os.path.join(output_folder, 'staff_monthly_data.csv'),
            contract_history
        )
        
        if tutor_report_data is not None:
            for name, df in tutor_report_data.items():
                output_file = os.path.join(output_folder, f'{name}.csv')
                df.to_csv(output_file, index=False)
                logging.info(f"Saved {name} to {output_file}")
        else:
            logging.error("No tutor report data generated!")
        
        # --- Process Qualification Data ---
        logging.info("\nStep 3: Processing Qualification Data")
        # Define input files
        approved_file = "Approved List for Terms Caseloading FY2324.xlsx"
        archived_file = "Archived List for Terms Caseloading FY2324.xlsx"
        terms_file = "Terms caseloading 23.24 FY.xlsx"
        
        # Process qualification lists
        qualification_data = process_qualification_lists(
            approved_file,
            archived_file
        )
        
        # Process Terms caseloading
        terms_data = process_terms_caseloading(
            terms_file,
            qualification_data,
            contract_history
        )
        
        # Save qualification data results
        for name, df in terms_data.items():
            output_file = os.path.join(output_folder, f'{name}.csv')
            df.to_csv(output_file, index=False)
            logging.info(f"Saved {name} to {output_file}")
        
        # --- Process PICS Data ---
        logging.info("\nStep 4: Processing PICS Data")
        # Define PICS input files
        caseload_file = "PICS caseload for PBI.xlsx"
        hours_lookup_file = "PICS Hours for Assessor Look up.xlsx"
        
        # Process PICS data
        pics_data = process_pics_data(
            caseload_file,
            hours_lookup_file,
            contract_history
        )
        
        # Save PICS data results
        for name, df in pics_data.items():
            output_file = os.path.join(output_folder, f'pics_{name}.csv')
            df.to_csv(output_file, index=False)
            logging.info(f"Saved {name} to {output_file}")
        
        logging.info("\nAll processing completed successfully!")
        
    except Exception as e:
        logging.error(f"\nError during processing: {str(e)}", exc_info=True)
        raise
Editor is loading...
Leave a Comment