Untitled

 avatar
unknown
plain_text
8 months ago
8.2 kB
2
Indexable
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_validate, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.dummy import DummyClassifier
from sklearn.metrics import make_scorer, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.feature_selection import mutual_info_classif, SelectKBest
import joblib
import argparse
import os
import ray
import logging
from typing import List, Dict
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def format_for_atom_evaluate(trained_models, X, y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    results = {}
    for model_name, model in trained_models.items():
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test) if hasattr(model, "predict_proba") else None
        
        results[model_name] = {
            "y_pred": y_pred,
            "y_pred_proba": y_pred_proba,
            "metrics": {
                "accuracy": accuracy_score(y_test, y_pred),
                "precision": precision_score(y_test, y_pred, average='weighted'),
                "recall": recall_score(y_test, y_pred, average='weighted'),
                "f1": f1_score(y_test, y_pred, average='weighted'),
                "auc": roc_auc_score(y_test, y_pred_proba, multi_class='ovr', average='weighted') if y_pred_proba is not None else None
            }
        }
    
    return results, y_test

def load_and_preprocess_data(csv_file: str) -> pd.DataFrame:
    logging.info(f"Loading CSV file: {csv_file}")
    df = pd.read_csv(csv_file)
    logging.info(f"DataFrame shape: {df.shape}")
    df['y'] = df.index.map(lambda x: 0 if str(x).startswith('1') else (1 if str(x).startswith('2') else 2))
    logging.info(f"Target variable distribution:\n{df['y'].value_counts(normalize=True)}")
    return df

def preselect_features(X: pd.DataFrame, y: pd.Series, n_features: int = 1000) -> List[str]:
    X_features = X.select_dtypes(include=[np.number])
    selector = SelectKBest(mutual_info_classif, k=min(n_features, X_features.shape[1]))
    selector.fit(X_features, y)
    selected_features = list(X_features.columns[selector.get_support()])
    logging.info(f"Number of features selected: {len(selected_features)}")
    return selected_features + ['y']


@ray.remote
def train_and_evaluate_model(X, y, model_name: str):
    logging.info(f"Training and evaluating model: {model_name}")
    start_time = time.time()
    
    if model_name == 'LDA':
        model = LinearDiscriminantAnalysis()
        param_dist = {}
    elif model_name == 'LR':
        model = LogisticRegression(max_iter=1000)
        param_dist = {'C': np.logspace(-4, 4, 20)}
    elif model_name == 'GBM':
        model = GradientBoostingClassifier()
        param_dist = {
            'n_estimators': [50, 100, 200],
            'learning_rate': np.logspace(-3, 0, 10),
            'max_depth': [3, 5, 7]
        }
    elif model_name == 'RF':
        model = RandomForestClassifier()
        param_dist = {
            'n_estimators': [50, 100, 200],
            'max_depth': [5, 10, None],
            'min_samples_split': [2, 5, 10]
        }
    elif model_name == 'Dummy':
        model = DummyClassifier(strategy='stratified')
        param_dist = {}
    else:
        raise ValueError(f"Unknown model: {model_name}")
    
    scoring = {
        'accuracy': 'accuracy',
        'precision': 'precision_weighted',
        'recall': 'recall_weighted',
        'f1': 'f1_weighted',
        'auc': 'roc_auc_ovr_weighted'
    }
    
    if param_dist:
        n_iter = min(50, np.prod([len(v) for v in param_dist.values()]))
        random_search = RandomizedSearchCV(model, param_distributions=param_dist, n_iter=n_iter, cv=10, scoring=scoring, n_jobs=-1, random_state=42, refit='f1')
        random_search.fit(X, y)
        best_model = random_search.best_estimator_
        cv_results = random_search.cv_results_
        results = {
            'model': model_name,
            'accuracy': cv_results['mean_test_accuracy'].mean(),
            'precision': cv_results['mean_test_precision'].mean(),
            'recall': cv_results['mean_test_recall'].mean(),
            'f1': cv_results['mean_test_f1'].mean(),
            'auc': cv_results['mean_test_auc'].mean()
        }
    else:
        cv_results = cross_validate(model, X, y, cv=10, scoring=scoring, n_jobs=-1)
        best_model = model.fit(X, y)
        results = {
            'model': model_name,
            'accuracy': cv_results['test_accuracy'].mean(),
            'precision': cv_results['test_precision'].mean(),
            'recall': cv_results['test_recall'].mean(),
            'f1': cv_results['test_f1'].mean(),
            'auc': cv_results['test_auc'].mean()
        }
    
    end_time = time.time()
    logging.info(f"{model_name} training and evaluation completed in {end_time - start_time:.2f} seconds")
    
    return results, best_model


def run_classification(csv_file: str, n_cpus: int = None):
    n_cpus = os.cpu_count() if n_cpus is None else n_cpus
    logging.info(f"Number of CPUs available: {n_cpus}")
    ray.init(num_cpus=n_cpus)

    try:
        df = load_and_preprocess_data(csv_file)
        logging.info("Dataset has been loaded and preprocessed.")
        
        y = df['y']
        X = df.drop('y', axis=1)
        
        selected_features = preselect_features(X, y, n_features=1000)
        X = X[selected_features[:-1]]  # Exclude 'y' from X
        logging.info(f"Shape of feature matrix after selection: {X.shape}")
        
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        models = ["Dummy", "LDA", "LR", "GBM", "RF"]
        
        tasks = [train_and_evaluate_model.remote(X_scaled, y, model) for model in models]
        results = ray.get(tasks)
        
        all_results = []
        trained_models = {}
        for result, model in results:
            all_results.append(result)
            trained_models[result['model']] = model
        
        results_df = pd.DataFrame(all_results)
        logging.info("\nModel Evaluation Results:")
        logging.info(results_df)
        
        base_filename = os.path.splitext(os.path.basename(csv_file))[0]
        results_filename = f"results/{base_filename}_evaluation_results.csv"
        models_filename = f"results/{base_filename}_trained_models.joblib"
        
        os.makedirs(os.path.dirname(results_filename), exist_ok=True)
        results_df.to_csv(results_filename, index=False)
        joblib.dump(trained_models, models_filename)
        
        logging.info(f"Evaluation results saved to {results_filename}")
        logging.info(f"Trained models saved to {models_filename}")

        # Format results for atom.evaluate()
        atom_results, y_test = format_for_atom_evaluate(trained_models, X_scaled, y)
        
        # Save formatted results
        atom_results_filename = f"results/{base_filename}_atom_evaluation_results.joblib"
        joblib.dump((atom_results, y_test), atom_results_filename)
        logging.info(f"Results formatted for atom.evaluate() saved to {atom_results_filename}")

    except Exception as e:
        logging.error(f"An error occurred during processing: {str(e)}")
        logging.error("Full traceback:", exc_info=True)
    finally:
        ray.shutdown()

    return atom_results, y_test  # Return these in case they're needed for further processing


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run ML classification with cross-validation and hyperparameter tuning.")
    parser.add_argument("csv_file", type=str, help="Path to the input CSV file.")
    parser.add_argument("--n_cpus", type=int, default=None, help="Number of CPU cores to use")
    args = parser.parse_args()
    atom_results, y_test = run_classification(args.csv_file, n_cpus=args.n_cpus)
Editor is loading...
Leave a Comment