Untitled
unknown
plain_text
6 days ago
3.9 kB
2
Indexable
Never
import pandas as pd import numpy as np from atom import ATOMClassifier import joblib import argparse import os import ray import torch def get_gpu_count(): return torch.cuda.device_count() if torch.cuda.is_available() else 0 @ray.remote def load_and_preprocess_data(csv_file): print(f"Loading CSV file: {csv_file}") X = pd.read_csv(csv_file, index_col=0) X['y'] = [0 if row_name.startswith('1') else (1 if row_name.startswith('2') else 2) for row_name in X.index] X['y'] = X['y'].astype(int) return X @ray.remote(num_gpus=1) def train_model(X, model, solver, gpu_id=None): model_name = f"{model}_{solver}" print(f"Training model: {model_name}" + (f" on GPU {gpu_id}" if gpu_id is not None else " on CPU")) atom = ATOMClassifier( X, y="y", test_size=0.2, verbose=2, n_jobs=-1, # Use all available cores for internal parallelization device='gpu' if gpu_id is not None else 'cpu', engine='cuml' if gpu_id is not None else 'sklearn', random_state=1, index=True ) try: atom.feature_selection(strategy="sfs", solver=solver, n_features=10, random_state=0) except Exception as e: print(f"Feature selection failed for {model_name}: {e}") return None if model == 'XGB' and gpu_id is not None: atom.models[model_name].estimator.set_params( tree_method='gpu_hist', gpu_id=gpu_id, predictor='gpu_predictor' ) atom.run(models=[model_name], n_trials=50, metric="AUC", n_bootstrap=10, parallel=True) return atom def run_atom_classification(csv_file, n_cpus=None, use_gpus=False): ray.init(num_cpus=n_cpus, num_gpus=get_gpu_count() if use_gpus else 0) n_gpus = get_gpu_count() if use_gpus else 0 n_cpus = os.cpu_count() if n_cpus is None else n_cpus print(f"Number of CPUs available: {n_cpus}") print(f"Number of GPUs available: {n_gpus}") X = ray.get(load_and_preprocess_data.remote(csv_file)) print("Dataset has loaded") solvers = ["LGB", "LR", "RF", "LDA", "XGB"] models = ["RF", "XGB", "LDA", "GBM", "LR", "SVM", "CatB"] base_filename = os.path.splitext(os.path.basename(csv_file))[0] os.makedirs("results", exist_ok=True) print("Executing trainings") gpu_ids = iter(np.tile(range(n_gpus), len(solvers) * len(models))) if n_gpus > 0 else None tasks = [ train_model.remote(X, model, solver, next(gpu_ids) if gpu_ids and model in ['XGB', 'LGB'] else None) for model, solver in [(model, solver) for solver in solvers for model in models] ] results = ray.get(tasks) results = [r for r in results if r is not None] if not results: print("No models were successfully trained.") return main_atom = results[0] for atom in results[1:]: main_atom.merge(atom) print("All processes completed") joblib_filename = f"results/{base_filename}_atom.pkl" joblib.dump(main_atom, joblib_filename) print(f"Merged results saved to {joblib_filename}") ray.shutdown() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run optimized ATOM classification using CPUs and GPUs efficiently.") parser.add_argument("csv_file", type=str, help="Path to the input CSV file.") parser.add_argument("--use_gpus", action="store_true", help="Use GPU acceleration if available") parser.add_argument("--n_cpus", type=int, default=None, help="Number of CPU cores to use") args = parser.parse_args() try: run_atom_classification(args.csv_file, n_cpus=args.n_cpus, use_gpus=args.use_gpus) except Exception as e: print(f"An error occurred: {str(e)}") print("Full traceback:") import traceback traceback.print_exc()
Leave a Comment