C-1
user_1718919
plain_text
a year ago
9.1 kB
6
Indexable
# --- Import required libraries ---
import pandas as pd
import numpy as np
import os
from datetime import datetime
import logging
from typing import Tuple, Dict, List, Optional, Union
import warnings
# --- Setup logging ---
def setup_logging(log_file: str = 'staff_utilization.log') -> None:
"""Configure logging for the application"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
# Suppress pandas warning about copy operations
pd.options.mode.chained_assignment = None # default='warn'
warnings.filterwarnings('ignore', category=pd.errors.SettingWithCopyWarning)
# --- Enhanced utility functions ---
def safe_read_excel(file_path: str, **kwargs) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""
Safely read Excel files with error handling
"""
try:
df = pd.read_excel(file_path, **kwargs)
return df, None
except Exception as e:
error_msg = f"Error reading {file_path}: {str(e)}"
logging.error(error_msg)
return None, error_msg
def clean_name(name: Union[str, float, None]) -> Optional[str]:
"""
Clean and standardize names
"""
if pd.isna(name):
return None
name_str = str(name).strip()
return ' '.join(name_str.split())
def standardize_names(df: pd.DataFrame, name_columns: List[str]) -> pd.DataFrame:
"""
Standardize names across specified columns
"""
for col in name_columns:
if col in df.columns:
df[col] = df[col].apply(clean_name)
df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
df[col] = df[col].str.strip()
return df
def extract_month_year(filename: str) -> pd.Timestamp:
"""
Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'
"""
try:
date_str = filename.split()[-1].replace('.xlsx', '')
return pd.to_datetime(date_str, format='%d.%m.%y')
except Exception as e:
logging.error(f"Error extracting date from filename {filename}: {str(e)}")
raise ValueError(f"Invalid filename format: {filename}")
def clean_qualification_number(qual_num: Union[str, float, None]) -> Optional[str]:
"""
Clean qualification number by removing P or N suffix
"""
if pd.isna(qual_num):
return None
qual_str = str(qual_num).strip()
if qual_str.endswith(('P', 'N')):
return qual_str[:-1]
return qual_str
def convert_time_to_hours(time_value: Union[str, float, None, pd.Timestamp, datetime.time]) -> float:
"""
Convert various time formats to hours
"""
if pd.isna(time_value):
return 0.0
try:
if isinstance(time_value, datetime.time):
return time_value.hour + (time_value.minute / 60)
elif isinstance(time_value, str) and ':' in time_value:
hours, minutes = map(float, time_value.split(':')[:2]) # Only use hours and minutes
return hours + (minutes / 60)
else:
return abs(float(time_value)) / (1000 * 60 * 60)
except Exception as e:
logging.warning(f"Error converting time value {time_value}: {str(e)}")
return 0.0
def validate_processed_data(
df: pd.DataFrame,
required_columns: List[str],
name: Optional[str] = None
) -> None:
"""
Validate processed DataFrames for required columns and data quality
"""
df_name = name if name else 'dataframe'
# Check for missing columns
missing_cols = [col for col in required_columns if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns in {df_name}: {missing_cols}")
# Check for empty dataframe
if df.empty:
raise ValueError(f"No data found in {df_name}")
# Check for all-null columns
null_cols = df.columns[df.isna().all()].tolist()
if null_cols:
logging.warning(f"Found columns with all null values in {df_name}: {null_cols}")
# Check for mostly null columns (>90% null)
high_null_cols = df.columns[df.isna().mean() > 0.9].tolist()
if high_null_cols:
logging.warning(f"Found columns with >90% null values in {df_name}: {high_null_cols}")
# Log data quality metrics
logging.info(f"Data validation for {df_name}:")
logging.info(f" - Total rows: {len(df)}")
logging.info(f" - Columns: {len(df.columns)}")
for col in required_columns:
null_count = df[col].isna().sum()
if null_count > 0:
logging.info(f" - {col}: {null_count} null values ({(null_count/len(df))*100:.1f}%)")
def calculate_monthly_utilization(
group_df: pd.DataFrame,
target_hours: float,
exclude_categories: Optional[List[str]] = None
) -> Optional[float]:
"""
Calculate monthly utilization percentage
"""
if pd.isna(target_hours) or target_hours == 0:
return None
if exclude_categories:
included_df = group_df[~group_df['Activity_Category'].isin(exclude_categories)]
else:
included_df = group_df
total_hours = included_df['Actual_Hours'].sum()
monthly_target = target_hours / 12
if monthly_target > 0:
utilization = (total_hours / monthly_target) * 100
logging.debug(f"Utilization calculation:")
logging.debug(f" Total hours: {total_hours}")
logging.debug(f" Monthly target ({target_hours}/12): {monthly_target}")
logging.debug(f" Utilization: {utilization}%")
return utilization
return None
def identify_overlapping_contracts(contract_periods: List[Dict]) -> List[Dict]:
"""
Identify and handle overlapping contract periods
"""
sorted_periods = sorted(contract_periods, key=lambda x: x['Start_Date'])
active_periods = []
result_periods = []
for period in sorted_periods:
active_periods = [p for p in active_periods if p['End_Date'] >= period['Start_Date']]
overlaps = []
for active in active_periods:
if (active['Position'] != period['Position'] or
active['Contract_Type'] != period['Contract_Type']):
overlaps.append(active)
new_period = period.copy()
if overlaps:
overlap_positions = [p['Position'] for p in overlaps] + [period['Position']]
overlap_contracts = [p['Contract_Type'] for p in overlaps] + [period['Contract_Type']]
new_period.update({
'Overlapping_Positions': overlap_positions,
'Overlapping_Contracts': overlap_contracts,
'Has_Overlap': True,
'Overlap_Count': len(overlaps) + 1
})
else:
new_period.update({
'Overlapping_Positions': [period['Position']],
'Overlapping_Contracts': [period['Contract_Type']],
'Has_Overlap': False,
'Overlap_Count': 1
})
result_periods.append(new_period)
active_periods.append(period)
return result_periods
def consolidate_contract_changes(contract_df: pd.DataFrame) -> pd.DataFrame:
"""
Consolidate contract changes to handle overlapping periods
"""
consolidated = []
for staff_name in contract_df['Staff_Name'].unique():
staff_contracts = contract_df[
contract_df['Staff_Name'] == staff_name
].sort_values('Start_Date')
contract_list = staff_contracts.to_dict('records')
processed_contracts = identify_overlapping_contracts(contract_list)
for contract in processed_contracts:
row = {
'Staff_Name': staff_name,
'Start_Date': contract['Start_Date'],
'End_Date': contract['End_Date'],
'Position': ' & '.join(contract['Overlapping_Positions']),
'Contract_Type': ' & '.join(contract['Overlapping_Contracts']),
'Has_Multiple_Contracts': contract['Has_Overlap'],
'Contract_Count': contract['Overlap_Count'],
'Change_Type': contract['Change_Type']
}
# Calculate combined target hours
if contract['Has_Overlap']:
total_target = sum(
float(c.get('Target_Hours', 0) or 0)
for c in contract_list
if (c['Start_Date'] <= contract['End_Date'] and
c['End_Date'] >= contract['Start_Date'])
)
row['Target_Hours'] = total_target
else:
row['Target_Hours'] = contract.get('Target_Hours', 0)
consolidated.append(row)
return pd.DataFrame(consolidated)Editor is loading...
Leave a Comment