C-2

 avatar
unknown
plain_text
4 months ago
10 kB
3
Indexable
def track_contract_changes(df: pd.DataFrame) -> pd.DataFrame:
    """
    Track staff contract changes with full history, including overlapping contracts
    """
    logging.info("Starting contract change tracking...")
    changes = []
    
    for name in df['Staff_Standard'].unique():
        logging.debug(f"Processing contracts for: {name}")
        staff_data = df[df['Staff_Standard'] == name].sort_values('Report_Date')
        
        contract_periods = []
        current_contracts = set()  # Track currently active contracts
        
        for idx, row in staff_data.iterrows():
            current_contract = (row['Position Name'], row['Contract_Type'], row['Target_Hours'])
            
            # Check for changes in existing contracts
            if current_contract not in current_contracts:
                # New contract type detected
                contract_periods.append({
                    'Staff_Name': name,
                    'Start_Date': row['Report_Date'],
                    'End_Date': staff_data['Report_Date'].max(),
                    'Position': row['Position Name'],
                    'Contract_Type': row['Contract_Type'],
                    'Target_Hours': row['Target_Hours'],
                    'Change_Type': 'New Contract' if idx > staff_data.index[0] else 'Initial Contract'
                })
                current_contracts.add(current_contract)
            
            # Check for ended contracts
            ended_contracts = set()
            for contract in current_contracts:
                pos, type_, target = contract
                if not any((staff_data.loc[idx:, 'Position Name'] == pos) & 
                          (staff_data.loc[idx:, 'Contract_Type'] == type_) &
                          (staff_data.loc[idx:, 'Target_Hours'] == target)):
                    # Contract has ended
                    ended_contracts.add(contract)
                    contract_periods.append({
                        'Staff_Name': name,
                        'Start_Date': row['Report_Date'],
                        'End_Date': row['Report_Date'],
                        'Position': pos,
                        'Contract_Type': type_,
                        'Target_Hours': target,
                        'Change_Type': 'Contract End'
                    })
            
            current_contracts -= ended_contracts
        
        # Process contract periods to identify overlaps
        if contract_periods:
            processed_periods = identify_overlapping_contracts(contract_periods)
            changes.extend(processed_periods)
    
    changes_df = consolidate_contract_changes(pd.DataFrame(changes))
    
    # Log summary statistics
    total_staff = len(changes_df['Staff_Name'].unique())
    multi_contract_staff = len(changes_df[changes_df['Has_Multiple_Contracts']]['Staff_Name'].unique())
    
    logging.info(f"Processed contract changes for {total_staff} staff members")
    logging.info(f"Found {multi_contract_staff} staff with multiple contracts")
    
    if multi_contract_staff > 0:
        logging.info("Staff with multiple contracts:")
        for staff in changes_df[changes_df['Has_Multiple_Contracts']]['Staff_Name'].unique():
            staff_records = changes_df[
                (changes_df['Staff_Name'] == staff) & 
                (changes_df['Has_Multiple_Contracts'])
            ]
            logging.info(f"  - {staff}:")
            for _, record in staff_records.iterrows():
                logging.info(f"    * {record['Position']} ({record['Contract_Type']}) - "
                           f"Target Hours: {record['Target_Hours']}")
    
    return changes_df

def process_monthly_reports(folder_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Process all monthly reports and track staff/contract changes
    """
    logging.info(f"Processing Monthly Reports from: {folder_path}")
    all_data = []
    
    # Get list of files and sort chronologically
    files = [f for f in os.listdir(folder_path) if f.startswith('ACL Monthly Report')]
    files.sort()
    
    required_columns = [
        'Assignment Number', 'First Name', 'Known As', 'Last Name',
        'Position Name', 'Working Hours', 'Full-Time Equivalent',
        'Line Manager Name'
    ]
    
    for file in files:
        logging.info(f"Processing file: {file}")
        
        # Read file with error handling
        df, error = safe_read_excel(os.path.join(folder_path, file), skiprows=2)
        if error:
            logging.error(f"Skipping file {file} due to error: {error}")
            continue
            
        # Validate required columns
        try:
            validate_processed_data(df, required_columns, name=file)
        except ValueError as e:
            logging.error(f"Validation failed for {file}: {str(e)}")
            continue
        
        report_date = extract_month_year(file)
        df['Report_Date'] = report_date
        
        # Filter for Tutors and Learning Support Assistant
        mask = df['Position Name'].isin(['Learning Support Assistant', 'Tutor'])
        df = df[mask].copy()
        
        # Calculate Target Hours - only for salaried tutors
        def calculate_target_hours(row):
            try:
                working_hours = float(row['Working Hours'])
                fte = float(row['Full-Time Equivalent'])
                if row['Position Name'] == 'Tutor' and working_hours > 3:
                    return fte * 840
            except (ValueError, TypeError) as e:
                logging.warning(f"Error calculating target hours: {str(e)}")
            return None
        
        df['Target_Hours'] = df.apply(calculate_target_hours, axis=1)
        
        # Determine Contract Type
        df['Contract_Type'] = df['Working Hours'].apply(
            lambda x: 'Sessional' if float(x) <= 3 else 'Salaried'
        )
        
        # Process Line Manager names
        def split_manager_name(name):
            if pd.isna(name):
                return pd.NA, pd.NA, pd.NA
                
            parts = name.split(',')
            if len(parts) != 2:
                logging.warning(f"Unexpected manager name format: {name}")
                return name, name, name
            
            first_known = parts[0].strip()
            last = parts[1].strip()
            
            if ' ' in first_known:
                first, known = first_known.split(' ', 1)
            else:
                first = known = first_known
                
            return first, known, last
        
        # Create manager name formats
        df[['Manager_First', 'Manager_Known', 'Manager_Last']] = df['Line Manager Name'].apply(
            split_manager_name).apply(pd.Series)
            
        df['Manager_Standard'] = df.apply(
            lambda x: f"{x['Manager_First']} {x['Manager_Last']}"
            if pd.notna(x['Manager_First']) else pd.NA, axis=1)
        df['Manager_Known_As'] = df.apply(
            lambda x: f"{x['Manager_Known']}, {x['Manager_Last']}"
            if pd.notna(x['Manager_Known']) else pd.NA, axis=1)
        
        # Create staff name formats
        df['Staff_Standard'] = df.apply(
            lambda x: f"{x['First Name']} {x['Last Name']}", axis=1)
        df['Staff_Known_As'] = df.apply(
            lambda x: f"{x['Known As']}, {x['Last Name']}", axis=1)
        
        # Clean all name columns
        name_columns = [
            'Staff_Standard', 'Staff_Known_As',
            'Manager_Standard', 'Manager_Known_As'
        ]
        df = standardize_names(df, name_columns)
        
        columns_to_keep = [
            'Assignment Number', 'Staff_Standard', 'Staff_Known_As',
            'Position Name', 'Contract_Type', 'Target_Hours',
            'Manager_Standard', 'Manager_Known_As', 'Report_Date'
        ]
        
        all_data.append(df[columns_to_keep])
        
        logging.info(f"Processed {len(df)} records from {file}")
    
    if not all_data:
        raise ValueError("No data was successfully processed from monthly reports")
    
    # Combine all monthly data
    combined_df = pd.concat(all_data, ignore_index=True)
    combined_df = combined_df.sort_values(['Report_Date', 'Staff_Standard'])
    
    # Track contract history with enhanced tracking
    contract_history = track_contract_changes(combined_df)
    
    # Create staff summary with contract history
    staff_summary = []
    for name in combined_df['Staff_Standard'].unique():
        staff_data = combined_df[combined_df['Staff_Standard'] == name].copy()
        contract_data = contract_history[contract_history['Staff_Name'] == name]
        
        latest_contracts = contract_data[
            contract_data['End_Date'] == staff_data['Report_Date'].max()
        ]
        
        staff_summary.append({
            'Staff_Name': name,
            'First_Appearance': staff_data['Report_Date'].min(),
            'Last_Appearance': staff_data['Report_Date'].max(),
            'Current_Position': latest_contracts.iloc[0]['Position'],
            'Current_Contract': latest_contracts.iloc[0]['Contract_Type'],
            'Current_Target': latest_contracts.iloc[0]['Target_Hours'],
            'Has_Multiple_Contracts': any(latest_contracts['Has_Multiple_Contracts']),
            'Number_Of_Changes': len(contract_data),
            'Contract_History': contract_data.to_dict('records')
        })
    
    staff_summary_df = pd.DataFrame(staff_summary)
    
    # Final validation
    validate_processed_data(
        combined_df,
        ['Staff_Standard', 'Position Name', 'Contract_Type', 'Report_Date'],
        'final_staff_data'
    )
    
    logging.info("Monthly reports processing completed!")
    logging.info(f"Processed data for {len(staff_summary_df)} staff members")
    logging.info(f"Generated {len(contract_history)} contract records")
    
    return combined_df, staff_summary_df, contract_history
Editor is loading...
Leave a Comment