Untitled

mail@pastecode.io avatar
unknown
plain_text
6 days ago
3.9 kB
2
Indexable
Never
import pandas as pd
import numpy as np
from atom import ATOMClassifier
import joblib
import argparse
import os
import ray
import torch

def get_gpu_count():
    return torch.cuda.device_count() if torch.cuda.is_available() else 0

@ray.remote
def load_and_preprocess_data(csv_file):
    print(f"Loading CSV file: {csv_file}")
    X = pd.read_csv(csv_file, index_col=0)
    X['y'] = [0 if row_name.startswith('1') else (1 if row_name.startswith('2') else 2) for row_name in X.index]
    X['y'] = X['y'].astype(int)
    return X

@ray.remote(num_gpus=1)
def train_model(X, model, solver, gpu_id=None):
    model_name = f"{model}_{solver}"
    print(f"Training model: {model_name}" + (f" on GPU {gpu_id}" if gpu_id is not None else " on CPU"))
    
    atom = ATOMClassifier(
        X, 
        y="y", 
        test_size=0.2, 
        verbose=2,
        n_jobs=-1,  # Use all available cores for internal parallelization
        device='gpu' if gpu_id is not None else 'cpu',
        engine='cuml' if gpu_id is not None else 'sklearn',
        random_state=1, 
        index=True
    )
    
    try:
        atom.feature_selection(strategy="sfs", solver=solver, n_features=10, random_state=0)
    except Exception as e:
        print(f"Feature selection failed for {model_name}: {e}")
        return None

    if model == 'XGB' and gpu_id is not None:
        atom.models[model_name].estimator.set_params(
            tree_method='gpu_hist',
            gpu_id=gpu_id,
            predictor='gpu_predictor'
        )
    
    atom.run(models=[model_name], n_trials=50, metric="AUC", n_bootstrap=10, parallel=True)
    
    return atom

def run_atom_classification(csv_file, n_cpus=None, use_gpus=False):
    ray.init(num_cpus=n_cpus, num_gpus=get_gpu_count() if use_gpus else 0)
    
    n_gpus = get_gpu_count() if use_gpus else 0
    n_cpus = os.cpu_count() if n_cpus is None else n_cpus
    
    print(f"Number of CPUs available: {n_cpus}")
    print(f"Number of GPUs available: {n_gpus}")
    
    X = ray.get(load_and_preprocess_data.remote(csv_file))
    print("Dataset has loaded")

    solvers = ["LGB", "LR", "RF", "LDA", "XGB"]
    models = ["RF", "XGB", "LDA", "GBM", "LR", "SVM", "CatB"]

    base_filename = os.path.splitext(os.path.basename(csv_file))[0]
    os.makedirs("results", exist_ok=True)

    print("Executing trainings")
    
    gpu_ids = iter(np.tile(range(n_gpus), len(solvers) * len(models))) if n_gpus > 0 else None
    
    tasks = [
        train_model.remote(X, model, solver, next(gpu_ids) if gpu_ids and model in ['XGB', 'LGB'] else None) 
        for model, solver in [(model, solver) for solver in solvers for model in models]
    ]
    
    results = ray.get(tasks)
    results = [r for r in results if r is not None]
    
    if not results:
        print("No models were successfully trained.")
        return
    
    main_atom = results[0]
    for atom in results[1:]:
        main_atom.merge(atom)
    
    print("All processes completed")
    
    joblib_filename = f"results/{base_filename}_atom.pkl"
    joblib.dump(main_atom, joblib_filename)
    print(f"Merged results saved to {joblib_filename}")
    
    ray.shutdown()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run optimized ATOM classification using CPUs and GPUs efficiently.")
    parser.add_argument("csv_file", type=str, help="Path to the input CSV file.")
    parser.add_argument("--use_gpus", action="store_true", help="Use GPU acceleration if available")
    parser.add_argument("--n_cpus", type=int, default=None, help="Number of CPU cores to use")
    args = parser.parse_args()

    try:
        run_atom_classification(args.csv_file, n_cpus=args.n_cpus, use_gpus=args.use_gpus)
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        print("Full traceback:")
        import traceback
        traceback.print_exc()
Leave a Comment