C5
user_1718919
plain_text
10 months ago
15 kB
5
Indexable
def process_pics_data(
caseload_file: str,
hours_lookup_file: str,
contract_history: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
"""
Process PICS caseload data and hours lookup
"""
logging.info("Processing PICS data...")
# Read hours lookup first
logging.info("Reading hours lookup data...")
hours_df, error = safe_read_excel(hours_lookup_file, skiprows=2)
if error:
logging.error(f"Failed to read hours lookup: {error}")
raise ValueError(f"Cannot process PICS data: {error}")
# Process hours lookup
hours_lookup = process_pics_hours_lookup(hours_df)
logging.info(f"Processed {len(hours_lookup)} standard titles with hours")
# Read and validate caseload data
logging.info("Reading PICS caseload data...")
caseload_df, error = safe_read_excel(caseload_file)
if error:
logging.error(f"Failed to read caseload data: {error}")
raise ValueError(f"Cannot process PICS data: {error}")
required_columns = [
'Assessor Full Name', 'Programme', 'Apprenticeship Standard Title',
'Apprenticeship Achieved Date', 'Start Date', 'Learning Expected End',
'Actual End'
]
# Validate and select columns
try:
validate_processed_data(caseload_df, required_columns, 'PICS_caseload')
df = caseload_df[required_columns].copy()
except ValueError as e:
logging.error(f"Validation failed: {str(e)}")
raise
# Rename columns for consistency
column_mapping = {
'Assessor Full Name': 'Assessor_Name',
'Programme': 'Programme_Level',
'Apprenticeship Standard Title': 'Standard_Title',
'Apprenticeship Achieved Date': 'Achieved_Date',
'Start Date': 'Start_Date',
'Learning Expected End': 'Expected_End',
'Actual End': 'Actual_End'
}
df.rename(columns=column_mapping, inplace=True)
# Clean assessor names
df['Assessor_Name'] = df['Assessor_Name'].apply(clean_name)
# Convert dates with validation
date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End']
for col in date_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
invalid_dates = df[col].isna().sum()
if invalid_dates > 0:
logging.warning(f"Found {invalid_dates} invalid dates in {col}")
if invalid_dates < 10: # Log details only for small numbers of issues
invalid_rows = df[df[col].isna()]
for _, row in invalid_rows.iterrows():
logging.warning(f"Invalid {col} for {row['Assessor_Name']}: {row['Standard_Title']}")
# Clean titles and match with hours lookup
df['Standard_Title'] = df['Standard_Title'].str.strip()
# Add hours from lookup with validation
merged_df = df.merge(
hours_lookup[['Standard_Title', 'Monthly_Hours']],
on='Standard_Title',
how='left',
validate='m:1'
)
# Check for unmatched standards
unmatched = merged_df[merged_df['Monthly_Hours'].isna()]['Standard_Title'].unique()
if len(unmatched) > 0:
logging.warning(f"Found {len(unmatched)} standards without matching hours:")
for title in unmatched:
logging.warning(f" {title}")
merged_df['Monthly_Hours'] = merged_df['Monthly_Hours'].fillna(0)
# Create monthly snapshots
monthly_data = []
dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M')
logging.info("Generating monthly snapshots...")
for date in dates:
month_start = date.replace(day=1)
month_end = date
logging.info(f"Processing month: {month_start.strftime('%B %Y')}")
# Get active students for this month
month_mask = merged_df.apply(
lambda row: (
row['Start_Date'] <= month_end and
(pd.isna(row['Actual_End']) or row['Actual_End'] >= month_start) and
(pd.isna(row['Expected_End']) or row['Expected_End'] >= month_start)
),
axis=1
)
month_data = merged_df[month_mask].copy()
if not month_data.empty:
month_data['Snapshot_Date'] = month_end
month_data['Year'] = month_end.year
month_data['Month'] = month_end.month
# Add contract info for this month
def get_assessor_contract(row):
staff_contracts = contract_history[
contract_history['Staff_Name'] == row['Assessor_Name']
]
relevant_contracts = staff_contracts[
(staff_contracts['Start_Date'] <= month_end) &
(staff_contracts['End_Date'] >= month_end)
]
if len(relevant_contracts) > 0:
positions = relevant_contracts['Position'].unique()
contract_types = relevant_contracts['Contract_Type'].unique()
total_target = relevant_contracts['Target_Hours'].sum()
return pd.Series({
'Assessor_Position': ' & '.join(positions),
'Assessor_Contract': ' & '.join(contract_types),
'Assessor_Target_Hours': total_target,
'Multiple_Contracts': len(relevant_contracts) > 1
})
return pd.Series({
'Assessor_Position': None,
'Assessor_Contract': None,
'Assessor_Target_Hours': None,
'Multiple_Contracts': False
})
# Add contract details for the month
contract_details = month_data.apply(get_assessor_contract, axis=1)
month_data = pd.concat([month_data, contract_details], axis=1)
monthly_data.append(month_data)
logging.info(f"Active students in {month_start.strftime('%B %Y')}: {len(month_data)}")
# Combine all monthly data
monthly_df = pd.concat(monthly_data, ignore_index=True) if monthly_data else pd.DataFrame()
if monthly_df.empty:
logging.warning("No monthly data generated!")
return {}
# Create summaries
logging.info("Creating summary views...")
# Monthly summary per assessor
monthly_summary = monthly_df.groupby(
['Assessor_Name', 'Year', 'Month', 'Assessor_Position',
'Assessor_Contract', 'Multiple_Contracts']
).agg({
'Standard_Title': 'count',
'Monthly_Hours': 'sum',
'Assessor_Target_Hours': 'first'
}).reset_index()
monthly_summary.rename(columns={
'Standard_Title': 'Active_Students',
'Monthly_Hours': 'Required_Hours'
}, inplace=True)
# Programme level summary
programme_summary = monthly_df.groupby(
['Assessor_Name', 'Programme_Level', 'Year', 'Month',
'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
).agg({
'Standard_Title': 'count',
'Assessor_Target_Hours': 'first'
}).reset_index()
programme_summary.rename(columns={
'Standard_Title': 'Students_In_Programme'
}, inplace=True)
# Standard title summary
standard_summary = monthly_df.groupby(
['Assessor_Name', 'Standard_Title', 'Year', 'Month',
'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
).agg({
'Monthly_Hours': 'sum',
'Snapshot_Date': 'count',
'Assessor_Target_Hours': 'first'
}).reset_index()
standard_summary.rename(columns={
'Snapshot_Date': 'Students_In_Standard',
'Monthly_Hours': 'Required_Hours'
}, inplace=True)
logging.info("PICS data processing completed!")
return {
'detailed_monthly': monthly_df,
'monthly_summary': monthly_summary,
'programme_summary': programme_summary,
'standard_summary': standard_summary,
'hours_reference': hours_lookup
}
def process_pics_hours_lookup(hours_df: pd.DataFrame) -> pd.DataFrame:
"""Process the PICS hours lookup data"""
# Process hours lookup
hours_lookup = hours_df[[
'Standard title', 'Weighted Monthly Hours (1.6)'
]].copy()
# Convert time strings to decimal hours
def parse_time_str(time_str):
if pd.isna(time_str):
return 0.0
try:
if isinstance(time_str, datetime.time):
return time_str.hour + (time_str.minute / 60)
elif isinstance(time_str, str) and ':' in time_str:
parts = time_str.split(':')
if len(parts) == 3:
hours, minutes, _ = map(float, parts)
else:
hours, minutes = map(float, parts)
return hours + (minutes / 60)
elif isinstance(time_str, (int, float)):
return float(time_str)
except Exception as e:
logging.warning(f"Error parsing time value '{time_str}': {str(e)}")
return 0.0
return 0.0
hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(parse_time_str)
# Log any zero hours entries
zero_hours = hours_lookup['Monthly_Hours'].eq(0)
if zero_hours.any():
logging.warning(f"Found {zero_hours.sum()} entries with zero hours in lookup")
logging.warning("Zero-hour entries:")
zero_entries = hours_lookup[zero_hours]
for _, row in zero_entries.iterrows():
logging.warning(f" {row['Standard title']}: {row['Weighted Monthly Hours (1.6)']}")
# Clean and standardize titles
hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip()
return hours_lookup
# Main execution block
if __name__ == "__main__":
try:
# Setup logging
setup_logging()
# Get current directory and setup paths
current_dir = os.getcwd()
monthly_reports_folder = os.path.join(current_dir, "monthly_reports")
output_folder = os.path.join(current_dir, "processed_data")
logging.info(f"Processing data from: {current_dir}")
logging.info(f"Monthly reports folder: {monthly_reports_folder}")
logging.info(f"Output folder: {output_folder}")
# Ensure monthly reports folder exists
if not os.path.exists(monthly_reports_folder):
raise FileNotFoundError(
f"Monthly reports folder not found: {monthly_reports_folder}"
)
# Create output folder if needed
if not os.path.exists(output_folder):
os.makedirs(output_folder)
logging.info(f"Created output folder: {output_folder}")
# --- Process Monthly Reports ---
logging.info("\nStep 1: Processing Monthly Reports")
staff_data, staff_summary, contract_history = process_monthly_reports(
monthly_reports_folder
)
# Save staff data
staff_data.to_csv(
os.path.join(output_folder, 'staff_monthly_data.csv'),
index=False
)
staff_summary.to_csv(
os.path.join(output_folder, 'staff_summary.csv'),
index=False
)
contract_history.to_csv(
os.path.join(output_folder, 'contract_history.csv'),
index=False
)
# --- Process Tutor Pay Report ---
logging.info("\nStep 2: Processing Tutor Pay Report")
tutor_report_data = process_tutor_report(
"TutorPayReport_New 23.24 FY 22.11.24.xlsx",
os.path.join(output_folder, 'staff_monthly_data.csv'),
contract_history
)
if tutor_report_data is not None:
for name, df in tutor_report_data.items():
output_file = os.path.join(output_folder, f'{name}.csv')
df.to_csv(output_file, index=False)
logging.info(f"Saved {name} to {output_file}")
else:
logging.error("No tutor report data generated!")
# --- Process Qualification Data ---
logging.info("\nStep 3: Processing Qualification Data")
# Define input files
approved_file = "Approved List for Terms Caseloading FY2324.xlsx"
archived_file = "Archived List for Terms Caseloading FY2324.xlsx"
terms_file = "Terms caseloading 23.24 FY.xlsx"
# Process qualification lists
qualification_data = process_qualification_lists(
approved_file,
archived_file
)
# Process Terms caseloading
terms_data = process_terms_caseloading(
terms_file,
qualification_data,
contract_history
)
# Save qualification data results
for name, df in terms_data.items():
output_file = os.path.join(output_folder, f'{name}.csv')
df.to_csv(output_file, index=False)
logging.info(f"Saved {name} to {output_file}")
# --- Process PICS Data ---
logging.info("\nStep 4: Processing PICS Data")
# Define PICS input files
caseload_file = "PICS caseload for PBI.xlsx"
hours_lookup_file = "PICS Hours for Assessor Look up.xlsx"
# Process PICS data
pics_data = process_pics_data(
caseload_file,
hours_lookup_file,
contract_history
)
# Save PICS data results
for name, df in pics_data.items():
output_file = os.path.join(output_folder, f'pics_{name}.csv')
df.to_csv(output_file, index=False)
logging.info(f"Saved {name} to {output_file}")
logging.info("\nAll processing completed successfully!")
except Exception as e:
logging.error(f"\nError during processing: {str(e)}", exc_info=True)
raiseEditor is loading...
Leave a Comment