C0
user_1718919
plain_text
10 months ago
18 kB
5
Indexable
import pandas as pd
import numpy as np
import os
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
# Create processed_data directory if it doesn't exist
if not os.path.exists('processed_data'):
os.makedirs('processed_data')
def clean_string(s):
"""Clean strings by removing extra spaces and converting to lowercase"""
if pd.isna(s):
return s
return str(s).strip().lower()
def extract_month_year(filename):
"""Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'"""
date_str = filename.split()[-1].replace('.xlsx', '')
return pd.to_datetime(date_str, format='%d.%m.%y')
def process_monthly_report(file_path):
"""Process a single monthly report"""
# Read Excel file starting from row 3 (0-based index 2)
df = pd.read_excel(file_path, skiprows=2)
# Extract report date from filename
report_date = extract_month_year(os.path.basename(file_path))
# Clean column names
df.columns = [col.strip() for col in df.columns]
# Select and rename relevant columns
columns_mapping = {
'Assignment Number': 'assignment_number',
'Title': 'title',
'First Name': 'first_name',
'Known As': 'known_as',
'Last Name': 'last_name',
'Full-Time Equivalent': 'fte',
'Position Name': 'position',
'Working Hours': 'working_hours',
'Line Manager Name': 'line_manager'
}
df = df[columns_mapping.keys()].copy()
df.rename(columns=columns_mapping, inplace=True)
# Clean string columns
string_columns = ['title', 'first_name', 'known_as', 'last_name', 'position', 'line_manager']
for col in string_columns:
df[col] = df[col].apply(clean_string)
# Filter for Tutors and Learning Support Assistants
df = df[df['position'].isin(['learning support assistant', 'tutor'])]
# Convert numeric columns
df['fte'] = pd.to_numeric(df['fte'], errors='coerce')
df['working_hours'] = pd.to_numeric(df['working_hours'], errors='coerce')
# Calculate target hours (only for salaried staff)
df['is_salaried'] = df['working_hours'] > 3
df['target_hours'] = np.where(df['is_salaried'], df['fte'] * 840, 0)
# Process line manager names
df['line_manager_first_name'] = df['line_manager'].apply(
lambda x: x.split(',')[0].split()[0] if pd.notna(x) and ',' in x else None
)
df['line_manager_known_as'] = df['line_manager'].apply(
lambda x: x.split(',')[0].split()[-1] if pd.notna(x) and ',' in x else None
)
df['line_manager_last_name'] = df['line_manager'].apply(
lambda x: x.split(',')[1].strip() if pd.notna(x) and ',' in x else None
)
# Add report date
df['report_date'] = report_date
return df
def process_qualification_references(approved_file='Approved List for Terms Caseloading FY2324.xlsx',
archived_file='Archived List for Terms Caseloading FY2324.xlsx'):
"""Process approved and archived qualification reference lists"""
print("\nProcessing qualification reference lists...")
# Read both files
approved_df = pd.read_excel(approved_file)[['QualificationName', 'QualificationNumber']]
archived_df = pd.read_excel(archived_file)[['QualificationName', 'QualificationNumber']]
# Combine both dataframes
qual_ref_df = pd.concat([approved_df, archived_df], ignore_index=True)
# Clean data
qual_ref_df['QualificationName'] = qual_ref_df['QualificationName'].apply(clean_string)
qual_ref_df['QualificationNumber'] = qual_ref_df['QualificationNumber'].apply(clean_string)
# Remove duplicates (in case same qualification appears in both files)
qual_ref_df = qual_ref_df.drop_duplicates()
# Save processed data
qual_ref_df.to_csv('processed_data/qualification_references.csv', index=False)
print(f"Total unique qualifications: {len(qual_ref_df)}")
return qual_ref_df
def clean_qualification_number(number):
"""Remove P or N suffix from qualification numbers"""
if pd.isna(number):
return number
number = str(number).strip()
if number.endswith('P') or number.endswith('N'):
return number[:-1]
return number
def process_terms_caseloading(filename='Terms caseloading 23.24 FY.xlsx', qual_ref_df=None):
"""Process Terms caseloading data with qualification reference mapping"""
print("\nProcessing Terms caseloading data...")
# Read the Excel file
df = pd.read_excel(filename)
# Select and rename relevant columns
columns_mapping = {
'Primary Staff Name': 'first_name',
'Primary Staff Surname': 'last_name',
'Activity Name': 'activity_name',
'Funding Reference': 'funding_reference',
'Start Date': 'start_date',
'End Date': 'end_date',
'Level': 'level',
'Activity Group': 'activity_group'
}
df = df[columns_mapping.keys()].copy()
df.rename(columns=columns_mapping, inplace=True)
# Clean string columns
string_columns = ['first_name', 'last_name', 'activity_name', 'funding_reference',
'level', 'activity_group']
for col in string_columns:
df[col] = df[col].apply(clean_string)
# Clean qualification numbers in funding reference
df['funding_reference_clean'] = df['funding_reference'].apply(clean_qualification_number)
# Map qualifications
if qual_ref_df is not None:
# Create a mapping dictionary for faster lookup
qual_mapping = dict(zip(qual_ref_df['QualificationNumber'],
qual_ref_df['QualificationName']))
# Map qualification names, use activity name as fallback
df['qualification_name'] = df['funding_reference_clean'].map(qual_mapping)
df['qualification_name'] = df['qualification_name'].fillna(df['activity_name'])
# Convert dates
date_columns = ['start_date', 'end_date']
for col in date_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# Add academic year column (for easier filtering)
df['academic_year'] = df['start_date'].dt.year.astype(str) + '/' + \
(df['start_date'].dt.year + 1).astype(str)
# Calculate student counts per course
course_counts = df.groupby([
'first_name', 'last_name', 'qualification_name',
'level', 'activity_group', 'start_date', 'end_date'
]).size().reset_index(name='student_count')
# Save processed data
df.to_csv('processed_data/terms_caseloading_processed.csv', index=False)
course_counts.to_csv('processed_data/course_student_counts.csv', index=False)
print("\nTerms Caseloading Processing Summary:")
print(f"Total records: {len(df)}")
print(f"Unique tutors: {df[['first_name', 'last_name']].drop_duplicates().shape[0]}")
print(f"Unique qualifications: {df['qualification_name'].nunique()}")
print(f"Date range: {df['start_date'].min().strftime('%Y-%m-%d')} to {df['end_date'].max().strftime('%Y-%m-%d')}")
return df, course_counts
def process_assessor_lookup(filename='PICS Hours for Assessor Look up.xlsx'):
"""Process the Assessor lookup data"""
print("\nProcessing Assessor lookup data...")
# Read Excel file starting from row 3
df = pd.read_excel(filename, skiprows=2)
# Select and rename relevant columns
columns_mapping = {
'Standard title': 'standard_title',
'Weighted Monthly Hours (1.6)': 'monthly_hours'
}
df = df[columns_mapping.keys()].copy()
df.rename(columns=columns_mapping, inplace=True)
# Clean string columns
df['standard_title'] = df['standard_title'].apply(clean_string)
# Convert time format (H:MM) to decimal hours
def convert_time_to_hours(time_str):
if pd.isna(time_str):
return 0
parts = str(time_str).split(':')
try:
hours = float(parts[0])
minutes = float(parts[1]) if len(parts) > 1 else 0
return hours + minutes/60
except:
return 0
df['monthly_hours'] = df['monthly_hours'].apply(convert_time_to_hours)
# Save processed data
df.to_csv('processed_data/assessor_lookup_processed.csv', index=False)
print(f"Total unique apprenticeship standards: {len(df)}")
return df
def process_pics_caseload(filename='PICS caseload for PBI.xlsx', assessor_lookup_df=None):
"""Process PICS caseload data with assessor lookup integration"""
print("\nProcessing PICS caseload data...")
# Read the Excel file
df = pd.read_excel(filename)
# Select and rename relevant columns
columns_mapping = {
'Assessor Full Name': 'assessor_name',
'Programme': 'programme',
'Apprenticeship Standard Title': 'standard_title',
'Apprenticeship Achieved Date': 'achieved_date',
'Start Date': 'start_date',
'Learning Expected End': 'expected_end_date',
'Actual End': 'actual_end_date'
}
df = df[columns_mapping.keys()].copy()
df.rename(columns=columns_mapping, inplace=True)
# Clean string columns
string_columns = ['assessor_name', 'programme', 'standard_title']
for col in string_columns:
df[col] = df[col].apply(clean_string)
# Convert all date columns
date_columns = ['achieved_date', 'start_date', 'expected_end_date', 'actual_end_date']
for col in date_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# Use expected end date when actual end date is missing
df['end_date'] = df['actual_end_date'].fillna(df['expected_end_date'])
# Create a function to check if a student is active in a given month
def is_active_in_month(row, month_date):
start = row['start_date']
end = row['end_date']
if pd.isna(start) or pd.isna(end):
return False
return (start <= month_date) & (month_date <= end)
# Generate monthly student counts for the FY23/24
monthly_dates = pd.date_range('2023-04-01', '2024-03-31', freq='M')
monthly_counts = []
for date in monthly_dates:
# Create mask for active students in this month
active_mask = df.apply(lambda row: is_active_in_month(row, date), axis=1)
# Get counts per assessor and standard
month_count = df[active_mask].groupby(
['assessor_name', 'standard_title', 'programme']
).size().reset_index(name='student_count')
month_count['month'] = date
monthly_counts.append(month_count)
monthly_df = pd.concat(monthly_counts, ignore_index=True)
# Add monthly hours requirement if lookup data is available
if assessor_lookup_df is not None:
monthly_df = monthly_df.merge(
assessor_lookup_df[['standard_title', 'monthly_hours']],
on='standard_title',
how='left'
)
monthly_df['required_hours'] = monthly_df['student_count'] * monthly_df['monthly_hours']
# Save processed data
df.to_csv('processed_data/pics_caseload_processed.csv', index=False)
monthly_df.to_csv('processed_data/pics_monthly_counts.csv', index=False)
print("\nPICS Caseload Processing Summary:")
print(f"Total apprentices: {len(df)}")
print(f"Unique assessors: {df['assessor_name'].nunique()}")
print(f"Unique standards: {df['standard_title'].nunique()}")
print(f"Date range: {df['start_date'].min().strftime('%Y-%m-%d')} to {df['end_date'].max().strftime('%Y-%m-%d')}")
return df, monthly_df
def process_all_monthly_reports():
"""Process all monthly reports from the monthly_reports folder"""
monthly_reports = []
# Process each monthly report
for filename in os.listdir('monthly_reports'):
if filename.startswith('ACL Monthly Report') and filename.endswith('.xlsx'):
file_path = os.path.join('monthly_reports', filename)
try:
monthly_df = process_monthly_report(file_path)
monthly_reports.append(monthly_df)
print(f"Successfully processed {filename}")
except Exception as e:
print(f"Error processing {filename}: {str(e)}")
# Combine all monthly reports
if monthly_reports:
combined_df = pd.concat(monthly_reports, ignore_index=True)
# Create staff master list
staff_master = combined_df.drop_duplicates(
subset=['first_name', 'last_name', 'known_as', 'position']
)[['first_name', 'last_name', 'known_as', 'position']]
# Save processed data
combined_df.to_csv('processed_data/monthly_reports_processed.csv', index=False)
staff_master.to_csv('processed_data/staff_master.csv', index=False)
print("\nProcessing Summary:")
print(f"Total monthly reports processed: {len(monthly_reports)}")
print(f"Total staff records: {len(combined_df)}")
print(f"Unique staff members: {len(staff_master)}")
return combined_df, staff_master
return None, None
def process_tutor_pay_report(filename='TutorPayReport_New 23.24 FY 22.11.24.xlsx'):
"""Process the Tutor Pay Report"""
print(f"\nProcessing Tutor Pay Report: {filename}")
# Read the Excel file
df = pd.read_excel(filename)
# Select and rename relevant columns
columns_mapping = {
'PEOPLENAME': 'first_name',
'PEOPLESURNAME': 'last_name',
'STAFFROLE': 'staff_role',
'CONTRACTNAME': 'contract_type',
'ACTIVITYTYPE': 'activity_type',
'Course Funding Group': 'funding_group',
'Course Prov Mon C': 'curriculum_name',
'Course Prov Mon D': 'venue_topline',
'VENUE': 'venue_name',
'EVENTDATE': 'event_date',
'TIMEADJUSTEARLY': 'time_adjust_early',
'TIMEADJUSTLATE': 'time_adjust_late',
'Potential Hours': 'potential_hours'
}
# Select only needed columns and rename
df = df[columns_mapping.keys()].copy()
df.rename(columns=columns_mapping, inplace=True)
# Clean string columns
string_columns = ['first_name', 'last_name', 'staff_role', 'contract_type',
'activity_type', 'funding_group', 'curriculum_name',
'venue_topline', 'venue_name']
for col in string_columns:
df[col] = df[col].apply(clean_string)
# Filter for relevant staff roles
df = df[df['staff_role'].isin(['tutor', 'learning support'])]
# Filter for relevant contract types
df = df[df['contract_type'].isin(['lsa-nb', 'salaried tutor', 'sessional tutor'])]
# Group assessment activities
assessment_types = [
'access assessments', 'assessments', 'beauty ol assess',
'cfl pa vols assess', 'creative assessments', 'e&m apprenticeassess',
'english fs assess', 'english gcse assess', 'esol f2f assess',
'esol online assess', 'hair/barb f2f assess', 'maths assessments',
'swis assessments'
]
# Create activity group column
df['activity_group'] = df['activity_type'].apply(
lambda x: 'Assessment' if x.lower() in assessment_types
else x if x.lower() in ['community engagement', 'tutorials/drop ins']
else 'Other'
)
# Convert time adjustments from milliseconds to hours
df['time_adjust_early'] = abs(df['time_adjust_early']) / (1000 * 60 * 60)
df['time_adjust_late'] = abs(df['time_adjust_late']) / (1000 * 60 * 60)
# Calculate actual hours
df['actual_hours'] = df['potential_hours'] - (df['time_adjust_early'] + df['time_adjust_late'])
df['actual_hours'] = df['actual_hours'].clip(lower=0) # Replace negative values with 0
# Convert event date to datetime
df['event_date'] = pd.to_datetime(df['event_date'])
# Add derived date columns for easier filtering
df['week'] = df['event_date'].dt.isocalendar().week
df['month'] = df['event_date'].dt.month
df['year'] = df['event_date'].dt.year
# Calculate utilization metrics
# Note: We'll only calculate utilization percentage for salaried tutors
df['is_salaried'] = df['contract_type'] == 'salaried tutor'
df['hours_for_utilization'] = np.where(
(df['is_salaried']) & (df['activity_group'] != 'Assessment'),
df['actual_hours'],
0
)
# Save processed data
df.to_csv('processed_data/tutor_pay_report_processed.csv', index=False)
print("\nTutor Pay Report Processing Summary:")
print(f"Total records: {len(df)}")
print(f"Unique tutors/LSAs: {df[['first_name', 'last_name']].drop_duplicates().shape[0]}")
print(f"Date range: {df['event_date'].min().strftime('%Y-%m-%d')} to {df['event_date'].max().strftime('%Y-%m-%d')}")
return df
# Execute the processing
if __name__ == "__main__":
print("Starting monthly reports processing...")
combined_df, staff_master = process_all_monthly_reports()
print("\nProcessing Tutor Pay Report...")
tutor_pay_df = process_tutor_pay_report()
print("\nProcessing Qualification References and Terms Caseloading...")
qual_ref_df = process_qualification_references()
terms_df, course_counts = process_terms_caseloading(qual_ref_df=qual_ref_df)
print("\nProcessing PICS data...")
assessor_lookup_df = process_assessor_lookup()
pics_df, monthly_pics_df = process_pics_caseload(assessor_lookup_df=assessor_lookup_df)Editor is loading...
Leave a Comment