C3
user_1718919
plain_text
10 months ago
6.2 kB
4
Indexable
import pandas as pd
import os
from datetime import datetime
def extract_month_year(filename):
"""Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'"""
date_str = filename.split()[-1].replace('.xlsx', '')
return pd.to_datetime(date_str, format='%d.%m.%y')
def process_monthly_reports(folder_path):
"""Process all monthly reports in the specified folder"""
all_data = []
# List all monthly report files
files = [f for f in os.listdir(folder_path) if f.startswith('ACL Monthly Report')]
for file in files:
# Read Excel file
df = pd.read_excel(os.path.join(folder_path, file))
# Extract report date from filename
report_date = extract_month_year(file)
df['Report_Date'] = report_date
# Filter for only Tutors and Learning Support Assistants
mask = df['Position Name'].isin(['Learning Support Assistant', 'Tutor'])
df = df[mask].copy()
# Calculate Target Hours
df['Target_Hours'] = df['Full-Time Equivalent'] * 840
# Determine Contract Type (Sessional vs Salaried)
df['Contract_Type'] = df['Working Hours'].apply(
lambda x: 'Sessional' if float(x) <= 3 else 'Salaried'
)
# Process Line Manager names
def split_manager_name(name):
if pd.isna(name):
return pd.NA, pd.NA, pd.NA
parts = name.split(',')
if len(parts) != 2:
return name, name, name # Return original if format doesn't match
first_known = parts[0].strip()
last = parts[1].strip()
# Split first/known if space exists
if ' ' in first_known:
first, known = first_known.split(' ', 1)
else:
first = known = first_known
return first, known, last
# Create separate columns for manager names
df[['Manager_First', 'Manager_Known', 'Manager_Last']] = df['Line Manager'].apply(
split_manager_name).apply(pd.Series)
# Create standard and known_as formats for manager names
df['Manager_Standard'] = df.apply(
lambda x: f"{x['Manager_First']} {x['Manager_Last']}"
if pd.notna(x['Manager_First']) else pd.NA, axis=1)
df['Manager_Known_As'] = df.apply(
lambda x: f"{x['Manager_Known']}, {x['Manager_Last']}"
if pd.notna(x['Manager_Known']) else pd.NA, axis=1)
# Create standard and known_as formats for staff names
df['Staff_Standard'] = df.apply(
lambda x: f"{x['First Name']} {x['Last Name']}", axis=1)
df['Staff_Known_As'] = df.apply(
lambda x: f"{x['Known As']}, {x['Last Name']}", axis=1)
# Select needed columns
columns_to_keep = [
'Assignment Number', 'Staff_Standard', 'Staff_Known_As',
'Position Name', 'Contract_Type', 'Target_Hours',
'Manager_Standard', 'Manager_Known_As', 'Report_Date'
]
all_data.append(df[columns_to_keep])
# Combine all months
combined_df = pd.concat(all_data, ignore_index=True)
# Sort by date and staff name
combined_df = combined_df.sort_values(['Report_Date', 'Staff_Standard'])
# Track contract changes
combined_df['Previous_Contract'] = combined_df.groupby('Staff_Standard')['Contract_Type'].shift(1)
combined_df['Contract_Changed'] = (
combined_df['Contract_Type'] != combined_df['Previous_Contract']
)
# Create a staff history dataframe
staff_history = []
for name in combined_df['Staff_Standard'].unique():
staff_data = combined_df[combined_df['Staff_Standard'] == name].copy()
# Get first appearance (joining date)
first_date = staff_data['Report_Date'].min()
last_date = staff_data['Report_Date'].max()
# Get contract changes
contract_changes = staff_data[staff_data['Contract_Changed']].copy()
staff_history.append({
'Staff_Name': name,
'First_Appearance': first_date,
'Last_Appearance': last_date,
'Current_Position': staff_data.iloc[-1]['Position Name'],
'Current_Contract': staff_data.iloc[-1]['Contract_Type'],
'Number_Of_Contract_Changes': len(contract_changes),
'Contract_Change_Dates': contract_changes['Report_Date'].tolist()
})
staff_history_df = pd.DataFrame(staff_history)
return combined_df, staff_history_df
def save_processed_data(combined_df, staff_history_df, output_folder):
"""Save processed dataframes to CSV files"""
if not os.path.exists(output_folder):
os.makedirs(output_folder)
combined_df.to_csv(os.path.join(output_folder, 'staff_monthly_data.csv'), index=False)
staff_history_df.to_csv(os.path.join(output_folder, 'staff_history.csv'), index=False)
# Usage example:
if __name__ == "__main__":
# Get current working directory where the Excel files are
current_dir = os.getcwd()
print(f"Current working directory: {current_dir}")
# List all files in current directory
print("\nFiles in current directory:")
for file in os.listdir(current_dir):
print(f"- {file}")
# Use current directory as input folder since files are here
input_folder = current_dir
output_folder = os.path.join(current_dir, "processed_data")
print(f"\nProcessing files from: {input_folder}")
print(f"Output will be saved to: {output_folder}")
# Process the data
combined_data, staff_history = process_monthly_reports(input_folder)
# Save the processed data
save_processed_data(combined_data, staff_history, output_folder)
print("Data processing completed!")
print(f"Total staff members: {len(staff_history)}")
print(f"Total monthly records: {len(combined_data)}")Editor is loading...
Leave a Comment