C
user_1718919
plain_text
a year ago
56 kB
3
Indexable
# Cell 1: Setup and Utilities
# --- Import required libraries ---
import pandas as pd
import numpy as np
import os
from datetime import datetime
import logging
from typing import Tuple, Dict, List, Optional, Union
import warnings
# --- Setup logging ---
def setup_logging(log_file: str = 'staff_utilization.log') -> None:
"""Configure logging for the application"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
# Suppress pandas warning about copy operations
pd.options.mode.chained_assignment = None # default='warn'
warnings.filterwarnings('ignore', category=pd.errors.SettingWithCopyWarning)
# --- Enhanced utility functions ---
def safe_read_excel(file_path: str, **kwargs) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""
Safely read Excel files with error handling
"""
try:
df = pd.read_excel(file_path, **kwargs)
return df, None
except Exception as e:
error_msg = f"Error reading {file_path}: {str(e)}"
logging.error(error_msg)
return None, error_msg
def clean_name(name: Union[str, float, None]) -> Optional[str]:
"""
Clean and standardize names
"""
if pd.isna(name):
return None
name_str = str(name).strip()
return ' '.join(name_str.split())
def standardize_names(df: pd.DataFrame, name_columns: List[str]) -> pd.DataFrame:
"""
Standardize names across specified columns
"""
for col in name_columns:
if col in df.columns:
df[col] = df[col].apply(clean_name)
df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
df[col] = df[col].str.strip()
return df
def extract_month_year(filename: str) -> pd.Timestamp:
"""
Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'
"""
try:
date_str = filename.split()[-1].replace('.xlsx', '')
return pd.to_datetime(date_str, format='%d.%m.%y')
except Exception as e:
logging.error(f"Error extracting date from filename {filename}: {str(e)}")
raise ValueError(f"Invalid filename format: {filename}")
def clean_qualification_number(qual_num: Union[str, float, None]) -> Optional[str]:
"""
Clean qualification number by removing P or N suffix
"""
if pd.isna(qual_num):
return None
qual_str = str(qual_num).strip()
if qual_str.endswith(('P', 'N')):
return qual_str[:-1]
return qual_str
def convert_time_to_hours(time_value: Union[str, float, None, pd.Timestamp, datetime.time]) -> float:
"""
Convert various time formats to hours
"""
if pd.isna(time_value):
return 0.0
try:
if isinstance(time_value, datetime.time):
return time_value.hour + (time_value.minute / 60)
elif isinstance(time_value, str) and ':' in time_value:
hours, minutes = map(float, time_value.split(':')[:2]) # Only use hours and minutes
return hours + (minutes / 60)
else:
return abs(float(time_value)) / (1000 * 60 * 60)
except Exception as e:
logging.warning(f"Error converting time value {time_value}: {str(e)}")
return 0.0
def validate_processed_data(
df: pd.DataFrame,
required_columns: List[str],
name: Optional[str] = None
) -> None:
"""
Validate processed DataFrames for required columns and data quality
"""
df_name = name if name else 'dataframe'
# Check for missing columns
missing_cols = [col for col in required_columns if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns in {df_name}: {missing_cols}")
# Check for empty dataframe
if df.empty:
raise ValueError(f"No data found in {df_name}")
# Check for all-null columns
null_cols = df.columns[df.isna().all()].tolist()
if null_cols:
logging.warning(f"Found columns with all null values in {df_name}: {null_cols}")
# Check for mostly null columns (>90% null)
high_null_cols = df.columns[df.isna().mean() > 0.9].tolist()
if high_null_cols:
logging.warning(f"Found columns with >90% null values in {df_name}: {high_null_cols}")
# Log data quality metrics
logging.info(f"Data validation for {df_name}:")
logging.info(f" - Total rows: {len(df)}")
logging.info(f" - Columns: {len(df.columns)}")
for col in required_columns:
null_count = df[col].isna().sum()
if null_count > 0:
logging.info(f" - {col}: {null_count} null values ({(null_count/len(df))*100:.1f}%)")
def calculate_monthly_utilization(
group_df: pd.DataFrame,
target_hours: float,
exclude_categories: Optional[List[str]] = None
) -> Optional[float]:
"""
Calculate monthly utilization percentage
"""
if pd.isna(target_hours) or target_hours == 0:
return None
if exclude_categories:
included_df = group_df[~group_df['Activity_Category'].isin(exclude_categories)]
else:
included_df = group_df
total_hours = included_df['Actual_Hours'].sum()
monthly_target = target_hours / 12
if monthly_target > 0:
utilization = (total_hours / monthly_target) * 100
logging.debug(f"Utilization calculation:")
logging.debug(f" Total hours: {total_hours}")
logging.debug(f" Monthly target ({target_hours}/12): {monthly_target}")
logging.debug(f" Utilization: {utilization}%")
return utilization
return None
def identify_overlapping_contracts(contract_periods: List[Dict]) -> List[Dict]:
"""
Identify and handle overlapping contract periods
"""
sorted_periods = sorted(contract_periods, key=lambda x: x['Start_Date'])
active_periods = []
result_periods = []
for period in sorted_periods:
active_periods = [p for p in active_periods if p['End_Date'] >= period['Start_Date']]
overlaps = []
for active in active_periods:
if (active['Position'] != period['Position'] or
active['Contract_Type'] != period['Contract_Type']):
overlaps.append(active)
new_period = period.copy()
if overlaps:
overlap_positions = [p['Position'] for p in overlaps] + [period['Position']]
overlap_contracts = [p['Contract_Type'] for p in overlaps] + [period['Contract_Type']]
new_period.update({
'Overlapping_Positions': overlap_positions,
'Overlapping_Contracts': overlap_contracts,
'Has_Overlap': True,
'Overlap_Count': len(overlaps) + 1
})
else:
new_period.update({
'Overlapping_Positions': [period['Position']],
'Overlapping_Contracts': [period['Contract_Type']],
'Has_Overlap': False,
'Overlap_Count': 1
})
result_periods.append(new_period)
active_periods.append(period)
return result_periods
def consolidate_contract_changes(contract_df: pd.DataFrame) -> pd.DataFrame:
"""
Consolidate contract changes to handle overlapping periods
"""
consolidated = []
for staff_name in contract_df['Staff_Name'].unique():
staff_contracts = contract_df[
contract_df['Staff_Name'] == staff_name
].sort_values('Start_Date')
contract_list = staff_contracts.to_dict('records')
processed_contracts = identify_overlapping_contracts(contract_list)
for contract in processed_contracts:
row = {
'Staff_Name': staff_name,
'Start_Date': contract['Start_Date'],
'End_Date': contract['End_Date'],
'Position': ' & '.join(contract['Overlapping_Positions']),
'Contract_Type': ' & '.join(contract['Overlapping_Contracts']),
'Has_Multiple_Contracts': contract['Has_Overlap'],
'Contract_Count': contract['Overlap_Count'],
'Change_Type': contract['Change_Type']
}
# Calculate combined target hours
if contract['Has_Overlap']:
total_target = sum(
float(c.get('Target_Hours', 0) or 0)
for c in contract_list
if (c['Start_Date'] <= contract['End_Date'] and
c['End_Date'] >= contract['Start_Date'])
)
row['Target_Hours'] = total_target
else:
row['Target_Hours'] = contract.get('Target_Hours', 0)
consolidated.append(row)
return pd.DataFrame(consolidated)
# Cell 2: Contract Tracking and Monthly Reports
def track_contract_changes(df: pd.DataFrame) -> pd.DataFrame:
"""
Track staff contract changes with full history, including overlapping contracts
"""
logging.info("Starting contract change tracking...")
changes = []
for name in df['Staff_Standard'].unique():
logging.debug(f"Processing contracts for: {name}")
staff_data = df[df['Staff_Standard'] == name].sort_values('Report_Date')
contract_periods = []
current_contracts = set() # Track currently active contracts
for idx, row in staff_data.iterrows():
current_contract = (row['Position Name'], row['Contract_Type'], row['Target_Hours'])
# Check for changes in existing contracts
if current_contract not in current_contracts:
# New contract type detected
contract_periods.append({
'Staff_Name': name,
'Start_Date': row['Report_Date'],
'End_Date': staff_data['Report_Date'].max(),
'Position': row['Position Name'],
'Contract_Type': row['Contract_Type'],
'Target_Hours': row['Target_Hours'],
'Change_Type': 'New Contract' if idx > staff_data.index[0] else 'Initial Contract'
})
current_contracts.add(current_contract)
# Check for ended contracts
ended_contracts = set()
for contract in current_contracts:
pos, type_, target = contract
if not any((staff_data.loc[idx:, 'Position Name'] == pos) &
(staff_data.loc[idx:, 'Contract_Type'] == type_) &
(staff_data.loc[idx:, 'Target_Hours'] == target)):
# Contract has ended
ended_contracts.add(contract)
contract_periods.append({
'Staff_Name': name,
'Start_Date': row['Report_Date'],
'End_Date': row['Report_Date'],
'Position': pos,
'Contract_Type': type_,
'Target_Hours': target,
'Change_Type': 'Contract End'
})
current_contracts -= ended_contracts
# Process contract periods to identify overlaps
if contract_periods:
processed_periods = identify_overlapping_contracts(contract_periods)
changes.extend(processed_periods)
changes_df = consolidate_contract_changes(pd.DataFrame(changes))
# Log summary statistics
total_staff = len(changes_df['Staff_Name'].unique())
multi_contract_staff = len(changes_df[changes_df['Has_Multiple_Contracts']]['Staff_Name'].unique())
logging.info(f"Processed contract changes for {total_staff} staff members")
logging.info(f"Found {multi_contract_staff} staff with multiple contracts")
if multi_contract_staff > 0:
logging.info("Staff with multiple contracts:")
for staff in changes_df[changes_df['Has_Multiple_Contracts']]['Staff_Name'].unique():
staff_records = changes_df[
(changes_df['Staff_Name'] == staff) &
(changes_df['Has_Multiple_Contracts'])
]
logging.info(f" - {staff}:")
for _, record in staff_records.iterrows():
logging.info(f" * {record['Position']} ({record['Contract_Type']}) - "
f"Target Hours: {record['Target_Hours']}")
return changes_df
def process_monthly_reports(folder_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Process all monthly reports and track staff/contract changes
"""
logging.info(f"Processing Monthly Reports from: {folder_path}")
all_data = []
# Get list of files and sort chronologically
files = [f for f in os.listdir(folder_path) if f.startswith('ACL Monthly Report')]
files.sort()
required_columns = [
'Assignment Number', 'First Name', 'Known As', 'Last Name',
'Position Name', 'Working Hours', 'Full-Time Equivalent',
'Line Manager Name'
]
for file in files:
logging.info(f"Processing file: {file}")
# Read file with error handling
df, error = safe_read_excel(os.path.join(folder_path, file), skiprows=2)
if error:
logging.error(f"Skipping file {file} due to error: {error}")
continue
# Validate required columns
try:
validate_processed_data(df, required_columns, name=file)
except ValueError as e:
logging.error(f"Validation failed for {file}: {str(e)}")
continue
report_date = extract_month_year(file)
df['Report_Date'] = report_date
# Filter for Tutors and Learning Support Assistant
mask = df['Position Name'].isin(['Learning Support Assistant', 'Tutor'])
df = df[mask].copy()
# Calculate Target Hours - only for salaried tutors
def calculate_target_hours(row):
try:
working_hours = float(row['Working Hours'])
fte = float(row['Full-Time Equivalent'])
if row['Position Name'] == 'Tutor' and working_hours > 3:
return fte * 840
except (ValueError, TypeError) as e:
logging.warning(f"Error calculating target hours: {str(e)}")
return None
df['Target_Hours'] = df.apply(calculate_target_hours, axis=1)
# Determine Contract Type
df['Contract_Type'] = df['Working Hours'].apply(
lambda x: 'Sessional' if float(x) <= 3 else 'Salaried'
)
# Process Line Manager names
def split_manager_name(name):
if pd.isna(name):
return pd.NA, pd.NA, pd.NA
parts = name.split(',')
if len(parts) != 2:
logging.warning(f"Unexpected manager name format: {name}")
return name, name, name
first_known = parts[0].strip()
last = parts[1].strip()
if ' ' in first_known:
first, known = first_known.split(' ', 1)
else:
first = known = first_known
return first, known, last
# Create manager name formats
df[['Manager_First', 'Manager_Known', 'Manager_Last']] = df['Line Manager Name'].apply(
split_manager_name).apply(pd.Series)
df['Manager_Standard'] = df.apply(
lambda x: f"{x['Manager_First']} {x['Manager_Last']}"
if pd.notna(x['Manager_First']) else pd.NA, axis=1)
df['Manager_Known_As'] = df.apply(
lambda x: f"{x['Manager_Known']}, {x['Manager_Last']}"
if pd.notna(x['Manager_Known']) else pd.NA, axis=1)
# Create staff name formats
df['Staff_Standard'] = df.apply(
lambda x: f"{x['First Name']} {x['Last Name']}", axis=1)
df['Staff_Known_As'] = df.apply(
lambda x: f"{x['Known As']}, {x['Last Name']}", axis=1)
# Clean all name columns
name_columns = [
'Staff_Standard', 'Staff_Known_As',
'Manager_Standard', 'Manager_Known_As'
]
df = standardize_names(df, name_columns)
columns_to_keep = [
'Assignment Number', 'Staff_Standard', 'Staff_Known_As',
'Position Name', 'Contract_Type', 'Target_Hours',
'Manager_Standard', 'Manager_Known_As', 'Report_Date'
]
all_data.append(df[columns_to_keep])
logging.info(f"Processed {len(df)} records from {file}")
if not all_data:
raise ValueError("No data was successfully processed from monthly reports")
# Combine all monthly data
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values(['Report_Date', 'Staff_Standard'])
# Track contract history with enhanced tracking
contract_history = track_contract_changes(combined_df)
# Create staff summary with contract history
staff_summary = []
for name in combined_df['Staff_Standard'].unique():
staff_data = combined_df[combined_df['Staff_Standard'] == name].copy()
contract_data = contract_history[contract_history['Staff_Name'] == name]
latest_contracts = contract_data[
contract_data['End_Date'] == staff_data['Report_Date'].max()
]
staff_summary.append({
'Staff_Name': name,
'First_Appearance': staff_data['Report_Date'].min(),
'Last_Appearance': staff_data['Report_Date'].max(),
'Current_Position': latest_contracts.iloc[0]['Position'],
'Current_Contract': latest_contracts.iloc[0]['Contract_Type'],
'Current_Target': latest_contracts.iloc[0]['Target_Hours'],
'Has_Multiple_Contracts': any(latest_contracts['Has_Multiple_Contracts']),
'Number_Of_Changes': len(contract_data),
'Contract_History': contract_data.to_dict('records')
})
staff_summary_df = pd.DataFrame(staff_summary)
# Final validation
validate_processed_data(
combined_df,
['Staff_Standard', 'Position Name', 'Contract_Type', 'Report_Date'],
'final_staff_data'
)
logging.info("Monthly reports processing completed!")
logging.info(f"Processed data for {len(staff_summary_df)} staff members")
logging.info(f"Generated {len(contract_history)} contract records")
return combined_df, staff_summary_df, contract_history
#Cell 3: Tutor Pay Report Processing
def calculate_activity_hours(row: pd.Series) -> Tuple[str, float]:
"""
Calculate and categorize activity hours
Args:
row: DataFrame row containing activity information
Returns:
Tuple of (activity_category, actual_hours)
"""
assessment_types = [
'Access Assessments', 'Assessments', 'Beauty OL Assess',
'CFL PA Vols Assess', 'Creative Assessments', 'E&M ApprenticeAssess',
'English FS Assess', 'English GCSE Assess', 'ESOL F2F Assess',
'ESOL Online Assess', 'Hair/Barb F2F Assess', 'Maths Assessments',
'SWiS Assessments'
]
if pd.isna(row['ACTIVITYTYPE']):
return 'Other', row['Actual_Hours']
activity = str(row['ACTIVITYTYPE']).strip()
hours = float(row['Actual_Hours'])
if activity in assessment_types:
return 'Assessment', hours
elif activity in ['Community Engagement', 'Tutorials/Drop Ins']:
return activity, hours
return 'Other', hours
def process_tutor_report(
file_path: str,
staff_data_path: str,
contract_history: pd.DataFrame
) -> Optional[Dict[str, pd.DataFrame]]:
"""
Process the TutorPayReport and link with staff data
Args:
file_path: Path to tutor pay report file
staff_data_path: Path to processed staff data
contract_history: Contract history DataFrame
Returns:
Dictionary of processed DataFrames or None if processing fails
"""
logging.info("Processing Tutor Pay Report...")
# Read files with error handling
df, error = safe_read_excel(file_path)
if error:
logging.error(f"Failed to read tutor pay report: {error}")
return None
staff_df = pd.read_csv(staff_data_path)
required_columns = [
'PEOPLENAME', 'PEOPLESURNAME', 'STAFFROLE', 'CONTRACTNAME',
'ACTIVITYTYPE', 'Potential Hours', 'TIMEADJUSTEARLY',
'TIMEADJUSTLATE', 'EVENTDATE', 'Course Funding Group',
'Course Prov Mon C', 'Course Prov Mon D', 'VENUE'
]
try:
validate_processed_data(df, required_columns, 'tutor_pay_report')
except ValueError as e:
logging.error(f"Validation failed: {str(e)}")
return None
# Clean and standardize names
name_columns = ['PEOPLENAME', 'PEOPLESURNAME']
df = standardize_names(df, name_columns)
df['Staff_Standard'] = df.apply(
lambda x: f"{x['PEOPLENAME']} {x['PEOPLESURNAME']}", axis=1
)
# Filter staff roles and clean
df['STAFFROLE'] = df['STAFFROLE'].apply(clean_name)
role_mask = df['STAFFROLE'].isin(['Tutor', 'Learning Support'])
invalid_roles = df[~role_mask]['STAFFROLE'].unique()
if len(invalid_roles) > 0:
logging.warning(f"Found invalid staff roles: {invalid_roles}")
df = df[role_mask].copy()
# Calculate actual hours with time adjustments
logging.info("Calculating actual hours...")
# Convert time adjustments
for col in ['TIMEADJUSTEARLY', 'TIMEADJUSTLATE']:
df[f'{col}_Hours'] = pd.to_numeric(df[col], errors='coerce').fillna(0).apply(
lambda x: convert_time_to_hours(x) if x != 0 else 0
)
# Calculate actual hours
df['Potential Hours'] = pd.to_numeric(df['Potential Hours'], errors='coerce').fillna(0)
df['Actual_Hours'] = df.apply(
lambda x: max(0, x['Potential Hours'] -
(x['TIMEADJUSTEARLY_Hours'] + x['TIMEADJUSTLATE_Hours'])),
axis=1
)
# Verify hour calculations
zero_hours = df['Actual_Hours'].eq(0).sum()
if zero_hours > 0:
logging.warning(f"Found {zero_hours} entries with zero actual hours")
logging.debug("Sample of zero-hour entries:")
sample_zero = df[df['Actual_Hours'] == 0].head()
for _, row in sample_zero.iterrows():
logging.debug(f" Activity: {row['ACTIVITYTYPE']}, "
f"Potential: {row['Potential Hours']}, "
f"Adjustments: {row['TIMEADJUSTEARLY_Hours'] + row['TIMEADJUSTLATE_Hours']}")
# Process dates and create time periods
df['EVENTDATE'] = pd.to_datetime(df['EVENTDATE'], format='%d %b %Y', errors='coerce')
invalid_dates = df['EVENTDATE'].isna().sum()
if invalid_dates > 0:
logging.warning(f"Found {invalid_dates} invalid event dates")
logging.debug("Sample of invalid dates:")
sample_invalid = df[df['EVENTDATE'].isna()]['EVENTDATE'].head()
for date in sample_invalid:
logging.debug(f" Invalid date value: {date}")
df['Year'] = df['EVENTDATE'].dt.year
df['Month'] = df['EVENTDATE'].dt.month
df['Week'] = df['EVENTDATE'].dt.isocalendar().week
# Categorize activities and calculate hours
logging.info("Categorizing activities...")
df['Activity_Category'], df['Category_Hours'] = zip(*df.apply(calculate_activity_hours, axis=1))
# Keep required columns
columns_to_keep = [
'Staff_Standard', 'STAFFROLE', 'CONTRACTNAME',
'Activity_Category', 'Actual_Hours', 'Category_Hours',
'EVENTDATE', 'Year', 'Month', 'Week',
'Course Funding Group', 'Course Prov Mon C',
'Course Prov Mon D', 'VENUE'
]
df = df[columns_to_keep]
# Create monthly summaries
logging.info("Creating monthly summaries...")
monthly_hours = df.groupby(
['Staff_Standard', 'Year', 'Month', 'Activity_Category']
).agg({
'Actual_Hours': 'sum',
'Category_Hours': 'sum'
}).reset_index()
# Get contract details for each month
def get_monthly_contract(row):
month_date = pd.Timestamp(year=int(row['Year']), month=int(row['Month']), day=1)
staff_contracts = contract_history[
contract_history['Staff_Name'] == row['Staff_Standard']
]
# Find applicable contracts for the month
relevant_contracts = staff_contracts[
(staff_contracts['Start_Date'] <= month_date) &
(staff_contracts['End_Date'] >= month_date)
]
if len(relevant_contracts) > 0:
# If multiple contracts, combine information
positions = relevant_contracts['Position'].unique()
contract_types = relevant_contracts['Contract_Type'].unique()
total_target = relevant_contracts['Target_Hours'].sum()
return pd.Series({
'Position': ' & '.join(positions),
'Contract_Type': ' & '.join(contract_types),
'Target_Hours': total_target,
'Multiple_Contracts': len(relevant_contracts) > 1
})
return pd.Series({
'Position': None,
'Contract_Type': None,
'Target_Hours': None,
'Multiple_Contracts': False
})
# Add contract info to monthly summary
logging.info("Adding contract information...")
contract_details = monthly_hours.apply(get_monthly_contract, axis=1)
monthly_hours = pd.concat([monthly_hours, contract_details], axis=1)
# Calculate utilization
logging.info("Calculating monthly utilization...")
monthly_utilization = []
for name in monthly_hours['Staff_Standard'].unique():
staff_monthly = monthly_hours[
monthly_hours['Staff_Standard'] == name
].copy()
# Process each month
for month_idx, month_group in staff_monthly.groupby(['Year', 'Month']):
year, month = month_idx
# Only calculate utilization if we have target hours
if pd.notna(month_group['Target_Hours'].iloc[0]):
utilization = calculate_monthly_utilization(
month_group,
month_group['Target_Hours'].iloc[0],
exclude_categories=['Assessment']
)
staff_monthly.loc[
(staff_monthly['Year'] == year) &
(staff_monthly['Month'] == month),
'Utilization_Percentage'
] = utilization
monthly_utilization.append(staff_monthly)
if not monthly_utilization:
logging.warning("No monthly utilization data generated!")
return None
monthly_summary = pd.concat(monthly_utilization)
# Create additional summaries
logging.info("Creating additional summaries...")
# Daily summary
daily_summary = df.groupby(
['Staff_Standard', 'EVENTDATE', 'Activity_Category']
).agg({
'Actual_Hours': 'sum',
'Category_Hours': 'sum'
}).reset_index()
# Weekly summary
weekly_summary = df.groupby(
['Staff_Standard', 'Year', 'Week', 'Activity_Category']
).agg({
'Actual_Hours': 'sum',
'Category_Hours': 'sum'
}).reset_index()
# Add total hours columns for PowerBI
monthly_summary['Total_Hours'] = monthly_summary['Actual_Hours']
monthly_summary['Assessment_Hours'] = monthly_summary.apply(
lambda x: x['Category_Hours'] if x['Activity_Category'] == 'Assessment' else 0,
axis=1
)
monthly_summary['Non_Assessment_Hours'] = monthly_summary.apply(
lambda x: x['Category_Hours'] if x['Activity_Category'] != 'Assessment' else 0,
axis=1
)
# Venue and curriculum summaries
venue_summary = df.groupby(
['Staff_Standard', 'Year', 'Month', 'VENUE']
).agg({
'Actual_Hours': 'sum',
'Category_Hours': 'sum'
}).reset_index()
curriculum_summary = df.groupby(
['Staff_Standard', 'Year', 'Month', 'Course Prov Mon C']
).agg({
'Actual_Hours': 'sum',
'Category_Hours': 'sum'
}).reset_index()
logging.info("Tutor pay report processing completed!")
# Return all processed data
return {
'daily_summary': daily_summary,
'weekly_summary': weekly_summary,
'monthly_summary': monthly_summary,
'venue_summary': venue_summary,
'curriculum_summary': curriculum_summary,
'detailed_data': df
}
#Cell 4: Qualification Data Processing
def process_qualification_lists(
approved_file: str,
archived_file: str
) -> pd.DataFrame:
"""
Process approved and archived qualification lists
"""
logging.info("Processing qualification lists...")
required_columns = ['QualificationName', 'QualificationNumber']
# Read both files with validation
approved_df, error = safe_read_excel(approved_file)
if error:
logging.error(f"Failed to read approved qualifications: {error}")
return pd.DataFrame(columns=required_columns + ['Source'])
archived_df, error = safe_read_excel(archived_file)
if error:
logging.error(f"Failed to read archived qualifications: {error}")
return pd.DataFrame(columns=required_columns + ['Source'])
# Process approved list
approved_df = approved_df[required_columns].copy()
approved_df['Source'] = 'Approved'
# Process archived list
archived_df = archived_df[required_columns].copy()
archived_df['Source'] = 'Archived'
# Validate data
try:
validate_processed_data(approved_df, required_columns, 'approved_qualifications')
validate_processed_data(archived_df, required_columns, 'archived_qualifications')
except ValueError as e:
logging.error(f"Validation failed: {str(e)}")
return pd.DataFrame(columns=required_columns + ['Source'])
# Clean qualification numbers in both datasets
for df in [approved_df, archived_df]:
df['QualificationNumber'] = df['QualificationNumber'].apply(clean_qualification_number)
df['QualificationName'] = df['QualificationName'].str.strip()
# Check for duplicates
all_quals = pd.concat([approved_df, archived_df], ignore_index=True)
duplicates = all_quals.groupby('QualificationNumber').size()
duplicate_quals = duplicates[duplicates > 1]
if len(duplicate_quals) > 0:
logging.warning(f"Found {len(duplicate_quals)} duplicate qualification numbers")
for qual_num in duplicate_quals.index:
dupes = all_quals[all_quals['QualificationNumber'] == qual_num]
logging.warning(f"Duplicate entries for {qual_num}:")
for _, row in dupes.iterrows():
logging.warning(f" {row['Source']}: {row['QualificationName']}")
# Remove duplicates, keeping 'Approved' version if exists
qual_df = all_quals.sort_values(
['Source', 'QualificationNumber'],
ascending=[False, True] # False for Source puts 'Approved' first
).drop_duplicates('QualificationNumber', keep='first')
# Log summary statistics
logging.info(f"Processed {len(qual_df)} unique qualifications:")
logging.info(f" - Approved: {len(qual_df[qual_df['Source'] == 'Approved'])}")
logging.info(f" - Archived: {len(qual_df[qual_df['Source'] == 'Archived'])}")
return qual_df
def process_terms_caseloading(
terms_file: str,
qualification_df: pd.DataFrame,
contract_history: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
"""
Process Terms caseloading data and link with qualification information
"""
logging.info("Processing Terms caseloading data...")
# Read Terms caseloading data with error handling
terms_df, error = safe_read_excel(terms_file)
if error:
logging.error(f"Failed to read Terms caseloading data: {error}")
raise ValueError(f"Cannot process Terms caseloading data: {error}")
# Select and rename relevant columns
cols_to_keep = {
'Primary Staff Name': 'Staff_First_Name',
'Primary Staff Surname': 'Staff_Last_Name',
'Start Date': 'Start_Date',
'End Date': 'End_Date',
'Activity Name': 'Activity_Name',
'Funding Reference': 'Qualification_Number',
'Level': 'Level',
'Activity Group': 'Activity_Group',
'Name': 'Learner_First_Name',
'Surname': 'Learner_Last_Name'
}
try:
terms_df = terms_df[cols_to_keep.keys()].copy()
terms_df.rename(columns=cols_to_keep, inplace=True)
except KeyError as e:
logging.error(f"Missing required columns: {str(e)}")
raise ValueError(f"Required columns missing from Terms caseloading data: {str(e)}")
# Clean qualification numbers and create staff names
terms_df['Qualification_Number'] = terms_df['Qualification_Number'].apply(clean_qualification_number)
# Clean name columns
name_columns = ['Staff_First_Name', 'Staff_Last_Name',
'Learner_First_Name', 'Learner_Last_Name']
terms_df = standardize_names(terms_df, name_columns)
terms_df['Staff_Standard'] = terms_df.apply(
lambda x: f"{x['Staff_First_Name']} {x['Staff_Last_Name']}", axis=1
)
# Convert dates with validation
date_columns = ['Start_Date', 'End_Date']
for col in date_columns:
terms_df[col] = pd.to_datetime(terms_df[col], errors='coerce')
invalid_dates = terms_df[col].isna().sum()
if invalid_dates > 0:
logging.warning(f"Found {invalid_dates} invalid dates in {col}")
logging.debug("Sample of rows with invalid dates:")
sample_invalid = terms_df[terms_df[col].isna()].head()
for _, row in sample_invalid.iterrows():
logging.debug(f" Staff: {row['Staff_Standard']}, Activity: {row['Activity_Name']}")
# Look up qualification names
def get_qualification_name(row):
if pd.isna(row['Qualification_Number']):
logging.debug(f"No qualification number for activity: {row['Activity_Name']}")
return row['Activity_Name']
qual_match = qualification_df[
qualification_df['QualificationNumber'] == row['Qualification_Number']
]
if len(qual_match) > 0:
source = qual_match.iloc[0]['Source']
qual_name = qual_match.iloc[0]['QualificationName']
logging.debug(f"Found qualification {row['Qualification_Number']} in {source} list: {qual_name}")
return qual_name
logging.debug(f"No match found for qualification {row['Qualification_Number']}, "
f"using activity name: {row['Activity_Name']}")
return row['Activity_Name']
terms_df['Qualification_Name'] = terms_df.apply(get_qualification_name, axis=1)
# Add contract info based on start date with handling of multiple contracts
def get_contract_info(row):
if pd.isna(row['Start_Date']):
return pd.Series({
'Staff_Position': None,
'Staff_Contract': None,
'Staff_Target_Hours': None,
'Multiple_Contracts': False
})
staff_contracts = contract_history[
contract_history['Staff_Name'] == row['Staff_Standard']
]
relevant_contracts = staff_contracts[
(staff_contracts['Start_Date'] <= row['Start_Date']) &
(staff_contracts['End_Date'] >= row['Start_Date'])
]
if len(relevant_contracts) > 0:
positions = relevant_contracts['Position'].unique()
contract_types = relevant_contracts['Contract_Type'].unique()
total_target = relevant_contracts['Target_Hours'].sum()
return pd.Series({
'Staff_Position': ' & '.join(positions),
'Staff_Contract': ' & '.join(contract_types),
'Staff_Target_Hours': total_target,
'Multiple_Contracts': len(relevant_contracts) > 1
})
return pd.Series({
'Staff_Position': None,
'Staff_Contract': None,
'Staff_Target_Hours': None,
'Multiple_Contracts': False
})
# Add contract details
logging.info("Adding contract information...")
contract_details = terms_df.apply(get_contract_info, axis=1)
terms_df = pd.concat([terms_df, contract_details], axis=1)
# Create monthly learner counts
logging.info("Calculating monthly learner counts...")
monthly_data = []
dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M')
for date in dates:
month_start = date.replace(day=1)
month_end = date
# Get active learners for this month
month_mask = (
(terms_df['Start_Date'] <= month_end) &
((terms_df['End_Date'].isna()) | (terms_df['End_Date'] >= month_start))
)
month_data = terms_df[month_mask].copy()
if not month_data.empty:
month_data['Year'] = date.year
month_data['Month'] = date.month
monthly_data.append(month_data)
logging.debug(
f"Month {date.strftime('%Y-%m')}: {len(month_data)} active learners"
)
# Combine monthly data with empty DataFrame handling
monthly_df = pd.concat(monthly_data, ignore_index=True) if monthly_data else pd.DataFrame()
if monthly_df.empty:
logging.warning("No monthly data generated!")
return {}
# Create summary dataframes
logging.info("Creating summary dataframes...")
# Learner summary
learner_summary = monthly_df.groupby(
['Staff_Standard', 'Year', 'Month']
).agg({
'Learner_First_Name': 'count',
'Staff_Position': 'first',
'Staff_Contract': 'first',
'Staff_Target_Hours': 'first',
'Multiple_Contracts': 'first'
}).reset_index()
learner_summary.rename(columns={
'Learner_First_Name': 'Total_Learners'
}, inplace=True)
# Qualification summary
qual_summary = monthly_df.groupby(
['Staff_Standard', 'Year', 'Month', 'Qualification_Name',
'Level', 'Staff_Position', 'Staff_Contract', 'Activity_Group']
).agg({
'Learner_First_Name': 'count',
'Staff_Target_Hours': 'first',
'Multiple_Contracts': 'first'
}).reset_index()
qual_summary.rename(columns={
'Learner_First_Name': 'Learners_In_Qualification'
}, inplace=True)
# Level summary
level_summary = monthly_df.groupby(
['Staff_Standard', 'Year', 'Month', 'Level']
).agg({
'Learner_First_Name': 'count',
'Multiple_Contracts': 'first'
}).reset_index()
level_summary.rename(columns={
'Learner_First_Name': 'Learners_In_Level'
}, inplace=True)
# Activity group summary
activity_summary = monthly_df.groupby(
['Staff_Standard', 'Year', 'Month', 'Activity_Group']
).agg({
'Learner_First_Name': 'count',
'Multiple_Contracts': 'first'
}).reset_index()
activity_summary.rename(columns={
'Learner_First_Name': 'Learners_In_Activity'
}, inplace=True)
logging.info("Terms caseloading processing completed!")
return {
'terms_detailed': monthly_df,
'learner_summary': learner_summary,
'qualification_summary': qual_summary,
'level_summary': level_summary,
'activity_summary': activity_summary,
'qualification_reference': qualification_df
}
# Cell 5: PICS Data Processing and Main Execution
def process_pics_data(
caseload_file: str,
hours_lookup_file: str,
contract_history: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
"""
Process PICS caseload data and hours lookup
"""
logging.info("Processing PICS data...")
# Read hours lookup first
logging.info("Reading hours lookup data...")
hours_df, error = safe_read_excel(hours_lookup_file, skiprows=2)
if error:
logging.error(f"Failed to read hours lookup: {error}")
raise ValueError(f"Cannot process PICS data: {error}")
# Process hours lookup
hours_lookup = process_pics_hours_lookup(hours_df)
logging.info(f"Processed {len(hours_lookup)} standard titles with hours")
# Read and validate caseload data
logging.info("Reading PICS caseload data...")
caseload_df, error = safe_read_excel(caseload_file)
if error:
logging.error(f"Failed to read caseload data: {error}")
raise ValueError(f"Cannot process PICS data: {error}")
required_columns = [
'Assessor Full Name', 'Programme', 'Apprenticeship Standard Title',
'Apprenticeship Achieved Date', 'Start Date', 'Learning Expected End',
'Actual End'
]
# Validate and select columns
try:
validate_processed_data(caseload_df, required_columns, 'PICS_caseload')
df = caseload_df[required_columns].copy()
except ValueError as e:
logging.error(f"Validation failed: {str(e)}")
raise
# Rename columns for consistency
column_mapping = {
'Assessor Full Name': 'Assessor_Name',
'Programme': 'Programme_Level',
'Apprenticeship Standard Title': 'Standard_Title',
'Apprenticeship Achieved Date': 'Achieved_Date',
'Start Date': 'Start_Date',
'Learning Expected End': 'Expected_End',
'Actual End': 'Actual_End'
}
df.rename(columns=column_mapping, inplace=True)
# Clean assessor names
df['Assessor_Name'] = df['Assessor_Name'].apply(clean_name)
# Convert dates with validation
date_columns = ['Achieved_Date', 'Start_Date', 'Expected_End', 'Actual_End']
for col in date_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
invalid_dates = df[col].isna().sum()
if invalid_dates > 0:
logging.warning(f"Found {invalid_dates} invalid dates in {col}")
if invalid_dates < 10: # Log details only for small numbers of issues
invalid_rows = df[df[col].isna()]
for _, row in invalid_rows.iterrows():
logging.warning(f"Invalid {col} for {row['Assessor_Name']}: {row['Standard_Title']}")
# Clean titles and match with hours lookup
df['Standard_Title'] = df['Standard_Title'].str.strip()
# Add hours from lookup with validation
merged_df = df.merge(
hours_lookup[['Standard_Title', 'Monthly_Hours']],
on='Standard_Title',
how='left',
validate='m:1'
)
# Check for unmatched standards
unmatched = merged_df[merged_df['Monthly_Hours'].isna()]['Standard_Title'].unique()
if len(unmatched) > 0:
logging.warning(f"Found {len(unmatched)} standards without matching hours:")
for title in unmatched:
logging.warning(f" {title}")
merged_df['Monthly_Hours'] = merged_df['Monthly_Hours'].fillna(0)
# Create monthly snapshots
monthly_data = []
dates = pd.date_range(start='2023-04-01', end='2024-03-31', freq='M')
logging.info("Generating monthly snapshots...")
for date in dates:
month_start = date.replace(day=1)
month_end = date
logging.info(f"Processing month: {month_start.strftime('%B %Y')}")
# Get active students for this month
month_mask = merged_df.apply(
lambda row: (
row['Start_Date'] <= month_end and
(pd.isna(row['Actual_End']) or row['Actual_End'] >= month_start) and
(pd.isna(row['Expected_End']) or row['Expected_End'] >= month_start)
),
axis=1
)
month_data = merged_df[month_mask].copy()
if not month_data.empty:
month_data['Snapshot_Date'] = month_end
month_data['Year'] = month_end.year
month_data['Month'] = month_end.month
# Add contract info for this month
def get_assessor_contract(row):
staff_contracts = contract_history[
contract_history['Staff_Name'] == row['Assessor_Name']
]
relevant_contracts = staff_contracts[
(staff_contracts['Start_Date'] <= month_end) &
(staff_contracts['End_Date'] >= month_end)
]
if len(relevant_contracts) > 0:
positions = relevant_contracts['Position'].unique()
contract_types = relevant_contracts['Contract_Type'].unique()
total_target = relevant_contracts['Target_Hours'].sum()
return pd.Series({
'Assessor_Position': ' & '.join(positions),
'Assessor_Contract': ' & '.join(contract_types),
'Assessor_Target_Hours': total_target,
'Multiple_Contracts': len(relevant_contracts) > 1
})
return pd.Series({
'Assessor_Position': None,
'Assessor_Contract': None,
'Assessor_Target_Hours': None,
'Multiple_Contracts': False
})
# Add contract details for the month
contract_details = month_data.apply(get_assessor_contract, axis=1)
month_data = pd.concat([month_data, contract_details], axis=1)
monthly_data.append(month_data)
logging.info(f"Active students in {month_start.strftime('%B %Y')}: {len(month_data)}")
# Combine all monthly data
monthly_df = pd.concat(monthly_data, ignore_index=True) if monthly_data else pd.DataFrame()
if monthly_df.empty:
logging.warning("No monthly data generated!")
return {}
# Create summaries
logging.info("Creating summary views...")
# Monthly summary per assessor
monthly_summary = monthly_df.groupby(
['Assessor_Name', 'Year', 'Month', 'Assessor_Position',
'Assessor_Contract', 'Multiple_Contracts']
).agg({
'Standard_Title': 'count',
'Monthly_Hours': 'sum',
'Assessor_Target_Hours': 'first'
}).reset_index()
monthly_summary.rename(columns={
'Standard_Title': 'Active_Students',
'Monthly_Hours': 'Required_Hours'
}, inplace=True)
# Programme level summary
programme_summary = monthly_df.groupby(
['Assessor_Name', 'Programme_Level', 'Year', 'Month',
'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
).agg({
'Standard_Title': 'count',
'Assessor_Target_Hours': 'first'
}).reset_index()
programme_summary.rename(columns={
'Standard_Title': 'Students_In_Programme'
}, inplace=True)
# Standard title summary
standard_summary = monthly_df.groupby(
['Assessor_Name', 'Standard_Title', 'Year', 'Month',
'Assessor_Position', 'Assessor_Contract', 'Multiple_Contracts']
).agg({
'Monthly_Hours': 'sum',
'Snapshot_Date': 'count',
'Assessor_Target_Hours': 'first'
}).reset_index()
standard_summary.rename(columns={
'Snapshot_Date': 'Students_In_Standard',
'Monthly_Hours': 'Required_Hours'
}, inplace=True)
logging.info("PICS data processing completed!")
return {
'detailed_monthly': monthly_df,
'monthly_summary': monthly_summary,
'programme_summary': programme_summary,
'standard_summary': standard_summary,
'hours_reference': hours_lookup
}
def process_pics_hours_lookup(hours_df: pd.DataFrame) -> pd.DataFrame:
"""Process the PICS hours lookup data"""
# Process hours lookup
hours_lookup = hours_df[[
'Standard title', 'Weighted Monthly Hours (1.6)'
]].copy()
# Convert time strings to decimal hours
def parse_time_str(time_str):
if pd.isna(time_str):
return 0.0
try:
if isinstance(time_str, datetime.time):
return time_str.hour + (time_str.minute / 60)
elif isinstance(time_str, str) and ':' in time_str:
parts = time_str.split(':')
if len(parts) == 3:
hours, minutes, _ = map(float, parts)
else:
hours, minutes = map(float, parts)
return hours + (minutes / 60)
elif isinstance(time_str, (int, float)):
return float(time_str)
except Exception as e:
logging.warning(f"Error parsing time value '{time_str}': {str(e)}")
return 0.0
return 0.0
hours_lookup['Monthly_Hours'] = hours_lookup['Weighted Monthly Hours (1.6)'].apply(parse_time_str)
# Log any zero hours entries
zero_hours = hours_lookup['Monthly_Hours'].eq(0)
if zero_hours.any():
logging.warning(f"Found {zero_hours.sum()} entries with zero hours in lookup")
logging.warning("Zero-hour entries:")
zero_entries = hours_lookup[zero_hours]
for _, row in zero_entries.iterrows():
logging.warning(f" {row['Standard title']}: {row['Weighted Monthly Hours (1.6)']}")
# Clean and standardize titles
hours_lookup['Standard_Title'] = hours_lookup['Standard title'].str.strip()
return hours_lookup
# Main execution block
if __name__ == "__main__":
try:
# Setup logging
setup_logging()
# Get current directory and setup paths
current_dir = os.getcwd()
monthly_reports_folder = os.path.join(current_dir, "monthly_reports")
output_folder = os.path.join(current_dir, "processed_data")
logging.info(f"Processing data from: {current_dir}")
logging.info(f"Monthly reports folder: {monthly_reports_folder}")
logging.info(f"Output folder: {output_folder}")
# Ensure monthly reports folder exists
if not os.path.exists(monthly_reports_folder):
raise FileNotFoundError(
f"Monthly reports folder not found: {monthly_reports_folder}"
)
# Create output folder if needed
if not os.path.exists(output_folder):
os.makedirs(output_folder)
logging.info(f"Created output folder: {output_folder}")
# --- Process Monthly Reports ---
logging.info("\nStep 1: Processing Monthly Reports")
staff_data, staff_summary, contract_history = process_monthly_reports(
monthly_reports_folder
)
# Save staff data
staff_data.to_csv(
os.path.join(output_folder, 'staff_monthly_data.csv'),
index=False
)
staff_summary.to_csv(
os.path.join(output_folder, 'staff_summary.csv'),
index=False
)
contract_history.to_csv(
os.path.join(output_folder, 'contract_history.csv'),
index=False
)
# --- Process Tutor Pay Report ---
logging.info("\nStep 2: Processing Tutor Pay Report")
tutor_report_data = process_tutor_report(
"TutorPayReport_New 23.24 FY 22.11.24.xlsx",
os.path.join(output_folder, 'staff_monthly_data.csv'),
contract_history
)
if tutor_report_data is not None:
for name, df in tutor_report_data.items():
output_file = os.path.join(output_folder, f'{name}.csv')
df.to_csv(output_file, index=False)
logging.info(f"Saved {name} to {output_file}")
else:
logging.error("No tutor report data generated!")
# --- Process Qualification Data ---
logging.info("\nStep 3: Processing Qualification Data")
# Define input files
approved_file = "Approved List for Terms Caseloading FY2324.xlsx"
archived_file = "Archived List for Terms Caseloading FY2324.xlsx"
terms_file = "Terms caseloading 23.24 FY.xlsx"
# Process qualification lists
qualification_data = process_qualification_lists(
approved_file,
archived_file
)
# Process Terms caseloading
terms_data = process_terms_caseloading(
terms_file,
qualification_data,
contract_history
)
# Save qualification data results
for name, df in terms_data.items():
output_file = os.path.join(output_folder, f'{name}.csv')
df.to_csv(output_file, index=False)
logging.info(f"Saved {name} to {output_file}")
# --- Process PICS Data ---
logging.info("\nStep 4: Processing PICS Data")
# Define PICS input files
caseload_file = "PICS caseload for PBI.xlsx"
hours_lookup_file = "PICS Hours for Assessor Look up.xlsx"
# Process PICS data
pics_data = process_pics_data(
caseload_file,
hours_lookup_file,
contract_history
)
# Save PICS data results
for name, df in pics_data.items():
output_file = os.path.join(output_folder, f'pics_{name}.csv')
df.to_csv(output_file, index=False)
logging.info(f"Saved {name} to {output_file}")
logging.info("\nAll processing completed successfully!")
except Exception as e:
logging.error(f"\nError during processing: {str(e)}", exc_info=True)
raiseEditor is loading...
Leave a Comment