C-1
user_1718919
plain_text
7 months ago
9.1 kB
4
Indexable
# --- Import required libraries --- import pandas as pd import numpy as np import os from datetime import datetime import logging from typing import Tuple, Dict, List, Optional, Union import warnings # --- Setup logging --- def setup_logging(log_file: str = 'staff_utilization.log') -> None: """Configure logging for the application""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) # Suppress pandas warning about copy operations pd.options.mode.chained_assignment = None # default='warn' warnings.filterwarnings('ignore', category=pd.errors.SettingWithCopyWarning) # --- Enhanced utility functions --- def safe_read_excel(file_path: str, **kwargs) -> Tuple[Optional[pd.DataFrame], Optional[str]]: """ Safely read Excel files with error handling """ try: df = pd.read_excel(file_path, **kwargs) return df, None except Exception as e: error_msg = f"Error reading {file_path}: {str(e)}" logging.error(error_msg) return None, error_msg def clean_name(name: Union[str, float, None]) -> Optional[str]: """ Clean and standardize names """ if pd.isna(name): return None name_str = str(name).strip() return ' '.join(name_str.split()) def standardize_names(df: pd.DataFrame, name_columns: List[str]) -> pd.DataFrame: """ Standardize names across specified columns """ for col in name_columns: if col in df.columns: df[col] = df[col].apply(clean_name) df[col] = df[col].str.replace(r'\s+', ' ', regex=True) df[col] = df[col].str.strip() return df def extract_month_year(filename: str) -> pd.Timestamp: """ Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx' """ try: date_str = filename.split()[-1].replace('.xlsx', '') return pd.to_datetime(date_str, format='%d.%m.%y') except Exception as e: logging.error(f"Error extracting date from filename {filename}: {str(e)}") raise ValueError(f"Invalid filename format: {filename}") def clean_qualification_number(qual_num: Union[str, float, None]) -> Optional[str]: """ Clean qualification number by removing P or N suffix """ if pd.isna(qual_num): return None qual_str = str(qual_num).strip() if qual_str.endswith(('P', 'N')): return qual_str[:-1] return qual_str def convert_time_to_hours(time_value: Union[str, float, None, pd.Timestamp, datetime.time]) -> float: """ Convert various time formats to hours """ if pd.isna(time_value): return 0.0 try: if isinstance(time_value, datetime.time): return time_value.hour + (time_value.minute / 60) elif isinstance(time_value, str) and ':' in time_value: hours, minutes = map(float, time_value.split(':')[:2]) # Only use hours and minutes return hours + (minutes / 60) else: return abs(float(time_value)) / (1000 * 60 * 60) except Exception as e: logging.warning(f"Error converting time value {time_value}: {str(e)}") return 0.0 def validate_processed_data( df: pd.DataFrame, required_columns: List[str], name: Optional[str] = None ) -> None: """ Validate processed DataFrames for required columns and data quality """ df_name = name if name else 'dataframe' # Check for missing columns missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: raise ValueError(f"Missing required columns in {df_name}: {missing_cols}") # Check for empty dataframe if df.empty: raise ValueError(f"No data found in {df_name}") # Check for all-null columns null_cols = df.columns[df.isna().all()].tolist() if null_cols: logging.warning(f"Found columns with all null values in {df_name}: {null_cols}") # Check for mostly null columns (>90% null) high_null_cols = df.columns[df.isna().mean() > 0.9].tolist() if high_null_cols: logging.warning(f"Found columns with >90% null values in {df_name}: {high_null_cols}") # Log data quality metrics logging.info(f"Data validation for {df_name}:") logging.info(f" - Total rows: {len(df)}") logging.info(f" - Columns: {len(df.columns)}") for col in required_columns: null_count = df[col].isna().sum() if null_count > 0: logging.info(f" - {col}: {null_count} null values ({(null_count/len(df))*100:.1f}%)") def calculate_monthly_utilization( group_df: pd.DataFrame, target_hours: float, exclude_categories: Optional[List[str]] = None ) -> Optional[float]: """ Calculate monthly utilization percentage """ if pd.isna(target_hours) or target_hours == 0: return None if exclude_categories: included_df = group_df[~group_df['Activity_Category'].isin(exclude_categories)] else: included_df = group_df total_hours = included_df['Actual_Hours'].sum() monthly_target = target_hours / 12 if monthly_target > 0: utilization = (total_hours / monthly_target) * 100 logging.debug(f"Utilization calculation:") logging.debug(f" Total hours: {total_hours}") logging.debug(f" Monthly target ({target_hours}/12): {monthly_target}") logging.debug(f" Utilization: {utilization}%") return utilization return None def identify_overlapping_contracts(contract_periods: List[Dict]) -> List[Dict]: """ Identify and handle overlapping contract periods """ sorted_periods = sorted(contract_periods, key=lambda x: x['Start_Date']) active_periods = [] result_periods = [] for period in sorted_periods: active_periods = [p for p in active_periods if p['End_Date'] >= period['Start_Date']] overlaps = [] for active in active_periods: if (active['Position'] != period['Position'] or active['Contract_Type'] != period['Contract_Type']): overlaps.append(active) new_period = period.copy() if overlaps: overlap_positions = [p['Position'] for p in overlaps] + [period['Position']] overlap_contracts = [p['Contract_Type'] for p in overlaps] + [period['Contract_Type']] new_period.update({ 'Overlapping_Positions': overlap_positions, 'Overlapping_Contracts': overlap_contracts, 'Has_Overlap': True, 'Overlap_Count': len(overlaps) + 1 }) else: new_period.update({ 'Overlapping_Positions': [period['Position']], 'Overlapping_Contracts': [period['Contract_Type']], 'Has_Overlap': False, 'Overlap_Count': 1 }) result_periods.append(new_period) active_periods.append(period) return result_periods def consolidate_contract_changes(contract_df: pd.DataFrame) -> pd.DataFrame: """ Consolidate contract changes to handle overlapping periods """ consolidated = [] for staff_name in contract_df['Staff_Name'].unique(): staff_contracts = contract_df[ contract_df['Staff_Name'] == staff_name ].sort_values('Start_Date') contract_list = staff_contracts.to_dict('records') processed_contracts = identify_overlapping_contracts(contract_list) for contract in processed_contracts: row = { 'Staff_Name': staff_name, 'Start_Date': contract['Start_Date'], 'End_Date': contract['End_Date'], 'Position': ' & '.join(contract['Overlapping_Positions']), 'Contract_Type': ' & '.join(contract['Overlapping_Contracts']), 'Has_Multiple_Contracts': contract['Has_Overlap'], 'Contract_Count': contract['Overlap_Count'], 'Change_Type': contract['Change_Type'] } # Calculate combined target hours if contract['Has_Overlap']: total_target = sum( float(c.get('Target_Hours', 0) or 0) for c in contract_list if (c['Start_Date'] <= contract['End_Date'] and c['End_Date'] >= contract['Start_Date']) ) row['Target_Hours'] = total_target else: row['Target_Hours'] = contract.get('Target_Hours', 0) consolidated.append(row) return pd.DataFrame(consolidated)
Editor is loading...
Leave a Comment