Code
user_1718919
plain_text
a year ago
33 kB
4
Indexable
# Import required libraries
import pandas as pd
import numpy as np
import os
from datetime import datetime
# --- UTILITY FUNCTIONS ---
def clean_name(name):
"""Clean extra spaces from names"""
if pd.isna(name):
return name
return ' '.join(str(name).split())
def extract_month_year(filename):
"""Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'"""
date_str = filename.split()[-1].replace('.xlsx', '')
return pd.to_datetime(date_str, format='%d.%m.%y')
def clean_qualification_number(qual_num):
"""Remove P or N suffix from qualification number"""
if pd.isna(qual_num):
return qual_num
qual_num = str(qual_num).strip()
if qual_num.endswith('P') or qual_num.endswith('N'):
return qual_num[:-1]
return qual_num
def convert_time_to_hours(time_str):
"""Convert time string in format H:MM or HH:MM:SS to decimal hours"""
if pd.isna(time_str):
return 0
try:
time_str = str(time_str).strip()
parts = time_str.split(':')
if len(parts) == 2: # H:MM format
hours = int(parts[0])
minutes = int(parts[1])
return hours + minutes/60
elif len(parts) == 3: # HH:MM:SS format
hours = int(parts[0])
minutes = int(parts[1])
seconds = int(parts[2])
return hours + minutes/60 + seconds/3600
else:
print(f"Unexpected time format: {time_str}")
return 0
except Exception as e:
print(f"Error converting time {time_str}: {str(e)}")
return 0
def track_employee_history(staff_data):
"""Track detailed employee contract history"""
history = []
print("\nTracking employee contract history...")
for name in staff_data['Staff_Standard'].unique():
employee_data = staff_data[staff_data['Staff_Standard'] == name].sort_values('Report_Date')
# Track each contract period
current_contract = None
current_position = None
period_start = None
for _, row in employee_data.iterrows():
if (current_contract != row['Contract_Type'] or
current_position != row['Position Name']):
# Save previous period if exists
if period_start is not None:
history.append({
'Staff_Name': name,
'Position': current_position,
'Contract_Type': current_contract,
'Start_Date': period_start,
'End_Date': row['Report_Date'],
'Target_Hours': previous_target,
'Status': 'Changed'
})
# Start new period
current_contract = row['Contract_Type']
current_position = row['Position Name']
period_start = row['Report_Date']
previous_target = row['Target_Hours']
# Add final period
if period_start is not None:
history.append({
'Staff_Name': name,
'Position': current_position,
'Contract_Type': current_contract,
'Start_Date': period_start,
'End_Date': employee_data['Report_Date'].max(),
'Target_Hours': previous_target,
'Status': 'Current'
})
history_df = pd.DataFrame(history)
print(f"Tracked {len(history_df)} contract periods for {len(staff_data['Staff_Standard'].unique())} employees")
return history_df
def calculate_monthly_target(row, contract_history):
"""Calculate target hours based on contract type for specific month"""
staff_contracts = contract_history[
contract_history['Staff_Name'] == row['Staff_Standard']
]
# Find applicable contract for this month
relevant_contract = staff_contracts[
(staff_contracts['Start_Date'] <= row['Report_Date']) &
(staff_contracts['End_Date'] >= row['Report_Date'])
]
if len(relevant_contract) > 0:
if (relevant_contract.iloc[0]['Position'] == 'Tutor' and
relevant_contract.iloc[0]['Contract_Type'] == 'Salaried'):
return relevant_contract.iloc[0]['Target_Hours']
return None
# --- PART 1: PROCESS MONTHLY REPORTS ---
def process_monthly_reports(folder_path):
"""Process all monthly reports in the specified folder"""
print("\nProcessing Monthly Reports...")
all_data = []
files = [f for f in os.listdir(folder_path) if f.startswith('ACL Monthly Report')]
for file in files:
print(f"Processing file: {file}")
df = pd.read_excel(os.path.join(folder_path, file), skiprows=2)
report_date = extract_month_year(file)
df['Report_Date'] = report_date
# Filter for Tutors and Learning Support Assistant
mask = df['Position Name'].isin(['Learning Support Assistant', 'Tutor'])
df = df[mask].copy()
# Calculate Target Hours - only for salaried tutors
def calculate_target_hours(row):
if row['Position Name'] == 'Tutor' and float(row['Working Hours']) > 3:
return float(row['Full-Time Equivalent']) * 840
return None
df['Target_Hours'] = df.apply(calculate_target_hours, axis=1)
# Determine Contract Type
df['Contract_Type'] = df['Working Hours'].apply(
lambda x: 'Sessional' if float(x) <= 3 else 'Salaried'
)
# Process Line Manager names
def split_manager_name(name):
if pd.isna(name):
return pd.NA, pd.NA, pd.NA
parts = name.split(',')
if len(parts) != 2:
return name, name, name
first_known = parts[0].strip()
last = parts[1].strip()
if ' ' in first_known:
first, known = first_known.split(' ', 1)
else:
first = known = first_known
return first, known, last
# Create manager name formats
df[['Manager_First', 'Manager_Known', 'Manager_Last']] = df['Line Manager Name'].apply(
split_manager_name).apply(pd.Series)
df['Manager_Standard'] = df.apply(
lambda x: f"{x['Manager_First']} {x['Manager_Last']}"
if pd.notna(x['Manager_First']) else pd.NA, axis=1)
df['Manager_Known_As'] = df.apply(
lambda x: f"{x['Manager_Known']}, {x['Manager_Last']}"
if pd.notna(x['Manager_Known']) else pd.NA, axis=1)
# Create staff name formats
df['Staff_Standard'] = df.apply(
lambda x: f"{x['First Name']} {x['Last Name']}", axis=1)
df['Staff_Known_As'] = df.apply(
lambda x: f"{x['Known As']}, {x['Last Name']}", axis=1)
columns_to_keep = [
'Assignment Number', 'Staff_Standard', 'Staff_Known_As',
'Position Name', 'Contract_Type', 'Target_Hours',
'Manager_Standard', 'Manager_Known_As', 'Report_Date'
]
all_data.append(df[columns_to_keep])
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values(['Report_Date', 'Staff_Standard'])
# Track contract changes
combined_df['Previous_Contract'] = combined_df.groupby('Staff_Standard')['Contract_Type'].shift(1)
combined_df['Contract_Changed'] = (
combined_df['Contract_Type'] != combined_df['Previous_Contract']
)
# Generate detailed contract history
contract_history = track_employee_history(combined_df)
# Create staff summary
staff_summary = []
for name in combined_df['Staff_Standard'].unique():
staff_data = combined_df[combined_df['Staff_Standard'] == name].copy()
first_date = staff_data['Report_Date'].min()
last_date = staff_data['Report_Date'].max()
# Get all contract changes
contract_changes = contract_history[contract_history['Staff_Name'] == name].copy()
staff_summary.append({
'Staff_Name': name,
'First_Appearance': first_date,
'Last_Appearance': last_date,
'Current_Position': staff_data.iloc[-1]['Position Name'],
'Current_Contract': staff_data.iloc[-1]['Contract_Type'],
'Number_Of_Contract_Changes': len(contract_changes) - 1, # Subtract initial contract
'Initial_Contract': contract_changes.iloc[0]['Contract_Type'],
'Initial_Position': contract_changes.iloc[0]['Position'],
'Contract_History': contract_changes.to_dict('records')
})
staff_summary_df = pd.DataFrame(staff_summary)
print("Monthly reports processing completed!")
return combined_df, staff_summary_df, contract_history
# --- PART 2: PROCESS TUTOR PAY REPORT ---
def process_tutor_report(file_path, staff_data_path, contract_history):
"""Process the TutorPayReport and link with staff data"""
print("\nProcessing Tutor Pay Report...")
df = pd.read_excel(file_path)
staff_df = pd.read_csv(staff_data_path)
df['PEOPLENAME'] = df['PEOPLENAME'].apply(clean_name)
df['PEOPLESURNAME'] = df['PEOPLESURNAME'].apply(clean_name)
df['Staff_Standard'] = df.apply(
lambda x: f"{x['PEOPLENAME']} {x['PEOPLESURNAME']}", axis=1)
df['STAFFROLE'] = df['STAFFROLE'].apply(clean_name)
df = df[df['STAFFROLE'].isin(['Tutor', 'Learning Support'])]
def convert_ms_to_hours(ms_value):
if pd.isna(ms_value) or ms_value == 0:
return 0
return abs(float(ms_value)) / (1000 * 60 * 60)
df['EarlyAdjust'] = pd.to_numeric(df['TIMEADJUSTEARLY'], errors='coerce').fillna(0)
df['LateAdjust'] = pd.to_numeric(df['TIMEADJUSTLATE'], errors='coerce').fillna(0)
df['EarlyAdjust'] = df['EarlyAdjust'].apply(convert_ms_to_hours)
df['LateAdjust'] = df['LateAdjust'].apply(convert_ms_to_hours)
df['Potential Hours'] = pd.to_numeric(df['Potential Hours'], errors='coerce').fillna(0)
df['Actual_Hours'] = df.apply(
lambda x: max(0, x['Potential Hours'] - (x['EarlyAdjust'] + x['LateAdjust'])),
axis=1
)
assessment_types = [
'Access Assessments', 'Assessments', 'Beauty OL Assess',
'CFL PA Vols Assess', 'Creative Assessments', 'E&M ApprenticeAssess',
'English FS Assess', 'English GCSE Assess', 'ESOL F2F Assess',
'ESOL Online Assess', 'Hair/Barb F2F Assess', 'Maths Assessments',
'SWiS Assessments'
]
def categorize_activity(activity):
if pd.isna(activity):
return 'Other'
activity = str(activity).strip()
if activity in assessment_types:
return 'Assessment'
elif activity in ['Community Engagement', 'Tutorials/Drop Ins']:
return activity
return 'Other'
df['Activity_Category'] = df['ACTIVITYTYPE'].apply(categorize_activity)
df['EVENTDATE'] = pd.to_datetime(df['EVENTDATE'], format='%d %b %Y', errors='coerce')
# Create summaries
daily_summary = df.groupby(
['Staff_Standard', 'EVENTDATE', 'Activity_Category']
).agg({
'Actual_Hours': 'sum'
}).reset_index()
df['Week'] = df['EVENTDATE'].dt.isocalendar().week
df['Year'] = df['EVENTDATE'].dt.isocalendar().year
weekly_summary = df.groupby(
['Staff_Standard', 'Year', 'Week', 'Activity_Category']
).agg({
'Actual_Hours': 'sum'
}).reset_index()
df['Month'] = df['EVENTDATE'].dt.month
monthly_summary = df.groupby(
['Staff_Standard', 'Year', 'Month', 'Activity_Category']
).agg({
'Actual_Hours': 'sum'
}).reset_index()
def calculate_utilization(group_df, contract_info):
"""Calculate utilization percentage based on contract status"""
if pd.isna(contract_info['Target_Hours']) or contract_info['Target_Hours'] == 0:
return None
non_assessment_hours = group_df[
group_df['Activity_Category'] != 'Assessment'
]['Actual_Hours'].sum()
# Calculate period target based on contract duration
contract_start = pd.to_datetime(contract_info['Start_Date'])
contract_end = pd.to_datetime(contract_info['End_Date'])
contract_months = ((contract_end.year - contract_start.year) * 12 +
contract_end.month - contract_start.month + 1)
period_target = contract_info['Target_Hours'] / contract_months
utilization = (non_assessment_hours / period_target * 100) if period_target > 0 else None
print(f"Non-assessment hours: {non_assessment_hours}")
print(f"Period target: {period_target}")
print(f"Calculated utilization: {utilization}%")
return utilization
# Add utilization to monthly summary
monthly_utilization = []
print("\nProcessing utilization calculations...")
for staff in monthly_summary['Staff_Standard'].unique():
print(f"\nProcessing staff member: {staff}")
# Get contract history for this staff member
staff_contracts = contract_history[
contract_history['Staff_Name'] == staff
].sort_values('Start_Date')
if len(staff_contracts) > 0:
staff_monthly = monthly_summary[
monthly_summary['Staff_Standard'] == staff
].copy()
if len(staff_monthly) > 0:
print(f"Found {len(staff_monthly)} monthly records")
# Process each month based on contract at that time
for idx, month_data in staff_monthly.iterrows():
month_date = pd.Timestamp(year=month_data['Year'],
month=month_data['Month'],
day=1)
# Find applicable contract
relevant_contract = staff_contracts[
(staff_contracts['Start_Date'] <= month_date) &
(staff_contracts['End_Date'] >= month_date)
]
if len(relevant_contract) > 0:
contract = relevant_contract.iloc[0]
print(f"Contract for {month_date.strftime('%B %Y')}:")
print(f"Position: {contract['Position']}")
print(f"Contract Type: {contract['Contract_Type']}")
print(f"Target Hours: {contract['Target_Hours']}")
staff_monthly.at[idx, 'Position'] = contract['Position']
staff_monthly.at[idx, 'Contract_Type'] = contract['Contract_Type']
staff_monthly.at[idx, 'Target_Hours'] = contract['Target_Hours']
staff_monthly.at[idx, 'Utilization_Percentage'] = calculate_utilization(
staff_monthly[staff_monthly.index == idx],
contract
)
else:
print(f"No contract found for {month_date.strftime('%B %Y')}")
staff_monthly.at[idx, 'Utilization_Percentage'] = None
monthly_utilization.append(staff_monthly)
if not monthly_utilization:
print("WARNING: No monthly utilization data generated!")
return None
monthly_summary_with_utilization = pd.concat(monthly_utilization)
print("Tutor pay report processing completed!")
return {
'daily_summary': daily_summary,
'weekly_summary': weekly_summary,
'monthly_summary': monthly_summary_with_utilization,
'detailed_data': df
}
# --- PART 3: QUALIFICATION MAPPING ---
def process_qualification_lists(approved_file, archived_file):
"""Process approved and archived qualification lists"""
print("\nProcessing qualification lists...")
# Read approved list
approved_df = pd.read_excel(approved_file)
approved_df = approved_df[['QualificationName', 'QualificationNumber']].copy()
approved_df['Source'] = 'Approved'
# Read archived list
archived_df = pd.read_excel(archived_file)
archived_df = archived_df[['QualificationName', 'QualificationNumber']].copy()
archived_df['Source'] = 'Archived'
# Combine lists
qual_df = pd.concat([approved_df, archived_df], ignore_index=True)
# Clean qualification numbers
qual_df['QualificationNumber'] = qual_df['QualificationNumber'].apply(clean_qualification_number)
# Remove duplicates, keeping 'Approved' version if exists
qual_df = qual_df.sort_values('Source', ascending=False).drop_duplicates('QualificationNumber', keep='first')
print(f"Processed {len(qual_df)} unique qualifications")
return qual_df
def process_terms_caseloading(terms_file, qualification_df, contract_history):
"""Process Terms caseloading data and link with qualification information"""
print("\nProcessing Terms caseloading data...")
# Read Terms caseloading data
terms_df = pd.read_excel(terms_file)
# Select and rename relevant columns
cols_to_keep = {
'Primary Staff Name': 'Staff_First_Name',
'Primary Staff Surname': 'Staff_Last_Name',
'Start Date': 'Start_Date',
'End Date': 'End_Date',
'Activity Name': 'Activity_Name',
'Funding Reference': 'Qualification_Number',
'Level': 'Level',
'Activity Group': 'Activity_Group',
'Name': 'Learner_First_Name',
'Surname': 'Learner_Last_Name'
}
terms_df = terms_df[cols_to_keep.keys()].copy()
terms_df.rename(columns=cols_to_keep, inplace=True)
# Clean qualification numbers
terms_df['Qualification_Number'] = terms_df['Qualification_Number'].apply(clean_qualification_number)
# Create staff standard name format
terms_df['Staff_Standard'] = terms_df.apply(
lambda x: f"{x['Staff_First_Name']} {x['Staff_Last_Name']}", axis=1
)
# Convert dates
date_columns = ['Start_Date', 'End_Date']
for col in date_columns:
terms_df[col] = pd.to_datetime(terms_df[col], errors='coerce')
# Look up qualification names
def get_qualification_name(row):
if pd.notna(row['Qualification_Number']):
qual_match = qualification_df[
qualification_df['QualificationNumber'] == row['Qualification_Number']
]
if len(qual_match) > 0:
return qual_match.iloc[0]['QualificationName']
return row['Activity_Name']
terms_df['Qualification_Name'] = terms_df.apply(get_qualification_name, axis=1)
# Add contract information based on date
def get_contract_info(row):
staff_contracts = contract_history[
contract_history['Staff_Name'] == row['Staff_Standard']
]
relevant_contract = staff_contracts[
(staff_contracts['Start_Date'] <= row['Start_Date']) &
(staff_contracts['End_Date'] >= row['Start_Date'])
]
if len(relevant_contract) > 0:
return pd.Series({
'Staff_Position': relevant_contract.iloc[0]['Position'],
'Staff_Contract': relevant_contract.iloc[0]['Contract_Type'],
'Staff_Target_Hours': relevant_contract.iloc[0]['Target_Hours']
})
return pd.Series({
'Staff_Position': None,
'Staff_Contract': None,
'Staff_Target_Hours': None
})
# Add contract details
contract_details = terms_df.apply(get_contract_info, axis=1)
terms_df = pd.concat([terms_df, contract_details], axis=1)
# Create summary of learners per staff member
learner_summary = terms_df.groupby('Staff_Standard').agg({
'Learner_First_Name': 'count'
}).reset_index()
# Add level counts as separate columns
level_pivot = terms_df.pivot_table(
index='Staff_Standard',
columns='Level',
values='Learner_First_Name',
aggfunc='count',
fill_value=0
).reset_index()
level_pivot.columns = ['Staff_Standard'] + [f'Level_{col}_Count' for col in level_pivot.columns[1:]]
# Add activity group counts as separate columns
activity_pivot = terms_df.pivot_table(
index='Staff_Standard',
columns='Activity_Group',
values='Learner_First_Name',
aggfunc='count',
fill_value=0
).reset_index()
activity_pivot.columns = ['Staff_Standard'] + [f'Activity_{col.replace(" ", "_")}_Count' for col in activity_pivot.columns[1:]]
# Merge all summaries
learner_summary = (
learner_summary.merge(level_pivot, on='Staff_Standard', how='left')
.merge(activity_pivot, on='Staff_Standard', how='left')
)
learner_summary.rename(columns={
'Learner_First_Name': 'Total_Learners'
}, inplace=True)
# Create a qualification summary with contract information
qual_summary = terms_df.groupby(
['Staff_Standard', 'Qualification_Name', 'Level', 'Staff_Position', 'Staff_Contract']
).agg({
'Learner_First_Name': 'count',
'Staff_Target_Hours': 'first'
}).reset_index()
qual_summary.rename(columns={
'Learner_First_Name': 'Learners_In_Qualification'
}, inplace=True)
print("Terms caseloading processing completed!")
return {
'terms_detailed': terms_df,
'learner_summary': learner_summary,
'qualification_summary': qual_summary,
'qualification_reference': qualification_df
}
# --- PART 4: PICS DATA PROCESSING ---
def process_pics_data(caseload_file, hours_lookup_file, contract_history):
"""Process PICS caseload data and hours lookup"""
print("\nProcessing PICS data...")
# Read hours lookup first
print("Reading hours lookup data...")
hours_df = pd.read_excel(hours_lookup_file, skiprows=2) # Skip first two rows
# Process hours lookup
hours_lookup = hours_df[[
'Standard title', 'Weighted Monthly Hours (1.6)'
]].copy()
print("\nSample of original hours data:")
print(hours_lookup.head())
# Convert time format to decimal hours
hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(
convert_time_to_hours
)
print("\nSample after conversion to decimal hours:")
print(hours_lookup.head())
# Verify we have non-zero hours
zero_hours = hours_lookup['Monthly_Hours'].eq(0).sum()
if zero_hours > 0:
print(f"\nWARNING: Found {zero_hours} entries with zero hours!")
print("\nSample of zero-hour entries:")
print(hours_lookup[hours_lookup['Monthly_Hours'] == 0].head())
# Clean and standardize titles
hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip()
print(f"Processed {len(hours_lookup)} standard titles with hours")
# Read caseload data
print("\nReading PICS caseload data...")
caseload_df = pd.read_excel(caseload_file)
# Select and rename relevant columns
cols_to_keep = {
'Assessor Full Name': 'Assessor_Name',
'Programme': 'Programme_Level',
'Apprenticeship Standard Title': 'Standard_Title',
'Apprenticeship Achieved Date': 'Achieved_Date',
'Start Date': 'Start_Date',
'Learning Expected End': 'Expected_End',
'Actual End': 'Actual_End'
}
df = caseload_df[cols_to_keep.keys()].copy()
df.rename(columns=cols_to_keep, inplace=True)
# Convert dates
date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End']
for col in date_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# Clean titles for matching with lookup
df['Standard_Title'] = df['Standard_Title'].str.strip()
# Add hours from lookup
df = df.merge(
hours_lookup[['Standard_Title', 'Monthly_Hours']],
on='Standard_Title',
how='left'
)
# Fill missing hours with 0
df['Monthly_Hours'] = df['Monthly_Hours'].fillna(0)
# Add contract information for assessors
def get_assessor_contract(row, date):
staff_contracts = contract_history[
contract_history['Staff_Name'] == row['Assessor_Name']
]
relevant_contract = staff_contracts[
(staff_contracts['Start_Date'] <= date) &
(staff_contracts['End_Date'] >= date)
]
if len(relevant_contract) > 0:
return pd.Series({
'Assessor_Position': relevant_contract.iloc[0]['Position'],
'Assessor_Contract': relevant_contract.iloc[0]['Contract_Type'],
'Assessor_Target_Hours': relevant_contract.iloc[0]['Target_Hours']
})
return pd.Series({
'Assessor_Position': None,
'Assessor_Contract': None,
'Assessor_Target_Hours': None
})
# Function to check if a student was active in a given month
def was_active_in_month(row, month_start, month_end):
"""
Check if student was active during any part of the month
month_start: First day of the month
month_end: Last day of the month
"""
start_date = row['Start_Date']
end_date = row['Actual_End'] if pd.notna(row['Actual_End']) else row['Expected_End']
if pd.isna(start_date) or pd.isna(end_date):
return False
# Student is active if their period overlaps with the month at all
return (start_date <= month_end) and (end_date >= month_start)
# Create monthly snapshots for FY23/24
monthly_data = []
# Generate dates for FY23/24
dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M')
print("\nGenerating monthly snapshots...")
for date in dates:
month_start = date.replace(day=1)
month_end = date
print(f"\nProcessing month: {month_start.strftime('%B %Y')}")
print(f"Month range: {month_start.strftime('%Y-%m-%d')} to {month_end.strftime('%Y-%m-%d')}")
# Filter for active students in this month
month_mask = df.apply(lambda row: was_active_in_month(row, month_start, month_end), axis=1)
month_data = df[month_mask].copy()
# Add month info
month_data['Snapshot_Date'] = month_end
month_data['Year'] = month_end.year
month_data['Month'] = month_end.month
# Add contract info for this month
contract_details = month_data.apply(
lambda row: get_assessor_contract(row, month_end),
axis=1
)
month_data = pd.concat([month_data, contract_details], axis=1)
active_count = len(month_data)
print(f"Active students in this month: {active_count}")
monthly_data.append(month_data)
monthly_df = pd.concat(monthly_data, ignore_index=True)
# Create summaries
# 1. Monthly summary per assessor
monthly_summary = monthly_df.groupby(
['Assessor_Name', 'Year', 'Month', 'Assessor_Position', 'Assessor_Contract']
).agg({
'Standard_Title': 'count', # Number of active students
'Monthly_Hours': 'sum', # Total hours needed
'Assessor_Target_Hours': 'first'
}).reset_index()
monthly_summary.rename(columns={
'Standard_Title': 'Active_Students'
}, inplace=True)
# 2. Programme level summary
programme_summary = monthly_df.groupby(
['Assessor_Name', 'Programme_Level', 'Year', 'Month',
'Assessor_Position', 'Assessor_Contract']
).agg({
'Standard_Title': 'count',
'Assessor_Target_Hours': 'first'
}).reset_index()
programme_summary.rename(columns={
'Standard_Title': 'Students_In_Programme'
}, inplace=True)
# 3. Standard title summary
standard_summary = monthly_df.groupby(
['Assessor_Name', 'Standard_Title', 'Year', 'Month',
'Assessor_Position', 'Assessor_Contract']
).agg({
'Monthly_Hours': 'sum',
'Snapshot_Date': 'count',
'Assessor_Target_Hours': 'first'
}).reset_index()
standard_summary.rename(columns={
'Snapshot_Date': 'Students_In_Standard',
'Monthly_Hours': 'Required_Hours'
}, inplace=True)
print("PICS data processing completed!")
return {
'detailed_monthly': monthly_df,
'monthly_summary': monthly_summary,
'programme_summary': programme_summary,
'standard_summary': standard_summary,
'hours_reference': hours_lookup
}
# Execute processing
if __name__ == "__main__":
# Get current directory
current_dir = os.getcwd()
monthly_reports_folder = os.path.join(current_dir, "monthly_reports")
output_folder = os.path.join(current_dir, "processed_data")
print(f"Processing data from: {current_dir}")
try:
# --- Process Monthly Reports ---
print("\nStep 1: Processing Monthly Reports")
staff_data, staff_summary, contract_history = process_monthly_reports(monthly_reports_folder)
# Save staff data
if not os.path.exists(output_folder):
os.makedirs(output_folder)
staff_data.to_csv(os.path.join(output_folder, 'staff_monthly_data.csv'), index=False)
staff_summary.to_csv(os.path.join(output_folder, 'staff_summary.csv'), index=False)
contract_history.to_csv(os.path.join(output_folder, 'contract_history.csv'), index=False)
# --- Process Tutor Pay Report ---
print("\nStep 2: Processing Tutor Pay Report")
tutor_report_data = process_tutor_report(
"TutorPayReport_New 23.24 FY 22.11.24.xlsx",
os.path.join(output_folder, 'staff_monthly_data.csv'),
contract_history
)
if tutor_report_data is not None:
for name, df in tutor_report_data.items():
output_file = os.path.join(output_folder, f'{name}.csv')
df.to_csv(output_file, index=False)
print(f"Saved {name} to {output_file}")
# --- Process Qualification Data ---
print("\nStep 3: Processing Qualification Data")
# Define input files
approved_file = "Approved List for Terms Caseloading FY2324.xlsx"
archived_file = "Archived List for Terms Caseloading FY2324.xlsx"
terms_file = "Terms caseloading 23.24 FY.xlsx"
# Process qualification lists
qualification_data = process_qualification_lists(approved_file, archived_file)
# Process Terms caseloading
terms_data = process_terms_caseloading(terms_file, qualification_data, contract_history)
# Save qualification data results
for name, df in terms_data.items():
output_file = os.path.join(output_folder, f'{name}.csv')
df.to_csv(output_file, index=False)
print(f"Saved {name} to {output_file}")
# --- Process PICS Data ---
print("\nStep 4: Processing PICS Data")
# Define PICS input files
caseload_file = "PICS caseload for PBI.xlsx"
hours_lookup_file = "PICS Hours for Assessor Look up.xlsx"
# Process PICS data
pics_data = process_pics_data(caseload_file, hours_lookup_file, contract_history)
# Save PICS data results
for name, df in pics_data.items():
output_file = os.path.join(output_folder, f'pics_{name}.csv')
df.to_csv(output_file, index=False)
print(f"Saved {name} to {output_file}")
print("\nAll processing completed successfully!")
except Exception as e:
print(f"\nError during processing: {str(e)}")
raiseEditor is loading...
Leave a Comment