C-1

 avatar
user_1718919
plain_text
7 months ago
9.1 kB
4
Indexable
# --- Import required libraries ---
import pandas as pd
import numpy as np
import os
from datetime import datetime
import logging
from typing import Tuple, Dict, List, Optional, Union
import warnings

# --- Setup logging ---
def setup_logging(log_file: str = 'staff_utilization.log') -> None:
    """Configure logging for the application"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    # Suppress pandas warning about copy operations
    pd.options.mode.chained_assignment = None  # default='warn'
    warnings.filterwarnings('ignore', category=pd.errors.SettingWithCopyWarning)

# --- Enhanced utility functions ---
def safe_read_excel(file_path: str, **kwargs) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
    """
    Safely read Excel files with error handling
    """
    try:
        df = pd.read_excel(file_path, **kwargs)
        return df, None
    except Exception as e:
        error_msg = f"Error reading {file_path}: {str(e)}"
        logging.error(error_msg)
        return None, error_msg

def clean_name(name: Union[str, float, None]) -> Optional[str]:
    """
    Clean and standardize names
    """
    if pd.isna(name):
        return None
    name_str = str(name).strip()
    return ' '.join(name_str.split())

def standardize_names(df: pd.DataFrame, name_columns: List[str]) -> pd.DataFrame:
    """
    Standardize names across specified columns
    """
    for col in name_columns:
        if col in df.columns:
            df[col] = df[col].apply(clean_name)
            df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
            df[col] = df[col].str.strip()
    return df

def extract_month_year(filename: str) -> pd.Timestamp:
    """
    Extract month and year from filename format 'ACL Monthly Report DD.MM.YY.xlsx'
    """
    try:
        date_str = filename.split()[-1].replace('.xlsx', '')
        return pd.to_datetime(date_str, format='%d.%m.%y')
    except Exception as e:
        logging.error(f"Error extracting date from filename {filename}: {str(e)}")
        raise ValueError(f"Invalid filename format: {filename}")

def clean_qualification_number(qual_num: Union[str, float, None]) -> Optional[str]:
    """
    Clean qualification number by removing P or N suffix
    """
    if pd.isna(qual_num):
        return None
    qual_str = str(qual_num).strip()
    if qual_str.endswith(('P', 'N')):
        return qual_str[:-1]
    return qual_str

def convert_time_to_hours(time_value: Union[str, float, None, pd.Timestamp, datetime.time]) -> float:
    """
    Convert various time formats to hours
    """
    if pd.isna(time_value):
        return 0.0
        
    try:
        if isinstance(time_value, datetime.time):
            return time_value.hour + (time_value.minute / 60)
        elif isinstance(time_value, str) and ':' in time_value:
            hours, minutes = map(float, time_value.split(':')[:2])  # Only use hours and minutes
            return hours + (minutes / 60)
        else:
            return abs(float(time_value)) / (1000 * 60 * 60)
    except Exception as e:
        logging.warning(f"Error converting time value {time_value}: {str(e)}")
        return 0.0

def validate_processed_data(
    df: pd.DataFrame,
    required_columns: List[str],
    name: Optional[str] = None
) -> None:
    """
    Validate processed DataFrames for required columns and data quality
    """
    df_name = name if name else 'dataframe'
    
    # Check for missing columns
    missing_cols = [col for col in required_columns if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns in {df_name}: {missing_cols}")
    
    # Check for empty dataframe
    if df.empty:
        raise ValueError(f"No data found in {df_name}")
    
    # Check for all-null columns
    null_cols = df.columns[df.isna().all()].tolist()
    if null_cols:
        logging.warning(f"Found columns with all null values in {df_name}: {null_cols}")
    
    # Check for mostly null columns (>90% null)
    high_null_cols = df.columns[df.isna().mean() > 0.9].tolist()
    if high_null_cols:
        logging.warning(f"Found columns with >90% null values in {df_name}: {high_null_cols}")
    
    # Log data quality metrics
    logging.info(f"Data validation for {df_name}:")
    logging.info(f"  - Total rows: {len(df)}")
    logging.info(f"  - Columns: {len(df.columns)}")
    for col in required_columns:
        null_count = df[col].isna().sum()
        if null_count > 0:
            logging.info(f"  - {col}: {null_count} null values ({(null_count/len(df))*100:.1f}%)")

def calculate_monthly_utilization(
    group_df: pd.DataFrame,
    target_hours: float,
    exclude_categories: Optional[List[str]] = None
) -> Optional[float]:
    """
    Calculate monthly utilization percentage
    """
    if pd.isna(target_hours) or target_hours == 0:
        return None
    
    if exclude_categories:
        included_df = group_df[~group_df['Activity_Category'].isin(exclude_categories)]
    else:
        included_df = group_df
    
    total_hours = included_df['Actual_Hours'].sum()
    monthly_target = target_hours / 12
    
    if monthly_target > 0:
        utilization = (total_hours / monthly_target) * 100
        logging.debug(f"Utilization calculation:")
        logging.debug(f"  Total hours: {total_hours}")
        logging.debug(f"  Monthly target ({target_hours}/12): {monthly_target}")
        logging.debug(f"  Utilization: {utilization}%")
        return utilization
    return None

def identify_overlapping_contracts(contract_periods: List[Dict]) -> List[Dict]:
    """
    Identify and handle overlapping contract periods
    """
    sorted_periods = sorted(contract_periods, key=lambda x: x['Start_Date'])
    active_periods = []
    result_periods = []
    
    for period in sorted_periods:
        active_periods = [p for p in active_periods if p['End_Date'] >= period['Start_Date']]
        
        overlaps = []
        for active in active_periods:
            if (active['Position'] != period['Position'] or
                active['Contract_Type'] != period['Contract_Type']):
                overlaps.append(active)
        
        new_period = period.copy()
        if overlaps:
            overlap_positions = [p['Position'] for p in overlaps] + [period['Position']]
            overlap_contracts = [p['Contract_Type'] for p in overlaps] + [period['Contract_Type']]
            new_period.update({
                'Overlapping_Positions': overlap_positions,
                'Overlapping_Contracts': overlap_contracts,
                'Has_Overlap': True,
                'Overlap_Count': len(overlaps) + 1
            })
        else:
            new_period.update({
                'Overlapping_Positions': [period['Position']],
                'Overlapping_Contracts': [period['Contract_Type']],
                'Has_Overlap': False,
                'Overlap_Count': 1
            })
        
        result_periods.append(new_period)
        active_periods.append(period)
    
    return result_periods

def consolidate_contract_changes(contract_df: pd.DataFrame) -> pd.DataFrame:
    """
    Consolidate contract changes to handle overlapping periods
    """
    consolidated = []
    
    for staff_name in contract_df['Staff_Name'].unique():
        staff_contracts = contract_df[
            contract_df['Staff_Name'] == staff_name
        ].sort_values('Start_Date')
        
        contract_list = staff_contracts.to_dict('records')
        processed_contracts = identify_overlapping_contracts(contract_list)
        
        for contract in processed_contracts:
            row = {
                'Staff_Name': staff_name,
                'Start_Date': contract['Start_Date'],
                'End_Date': contract['End_Date'],
                'Position': ' & '.join(contract['Overlapping_Positions']),
                'Contract_Type': ' & '.join(contract['Overlapping_Contracts']),
                'Has_Multiple_Contracts': contract['Has_Overlap'],
                'Contract_Count': contract['Overlap_Count'],
                'Change_Type': contract['Change_Type']
            }
            
            # Calculate combined target hours
            if contract['Has_Overlap']:
                total_target = sum(
                    float(c.get('Target_Hours', 0) or 0)
                    for c in contract_list
                    if (c['Start_Date'] <= contract['End_Date'] and
                        c['End_Date'] >= contract['Start_Date'])
                )
                row['Target_Hours'] = total_target
            else:
                row['Target_Hours'] = contract.get('Target_Hours', 0)
            
            consolidated.append(row)
    
    return pd.DataFrame(consolidated)
Editor is loading...
Leave a Comment