Untitled
unknown
plain_text
a year ago
3.7 kB
8
Indexable
import pandas as pd
import numpy as np
from atom import ATOMClassifier
import joblib
import argparse
import os
import ray
def get_gpu_count():
return torch.cuda.device_count() if torch.cuda.is_available() else 0
@ray.remote
def load_and_preprocess_data(csv_file):
print(f"Loading CSV file: {csv_file}")
X = pd.read_csv(csv_file, index_col=0)
X['y'] = [0 if row_name.startswith('1') else (1 if row_name.startswith('2') else 2) for row_name in X.index]
X['y'] = X['y'].astype(int)
return X
@ray.remote
def train_model(X, model, solver, gpu_id=None):
model_name = f"{model}_{solver}"
print(f"Training model: {model_name}" + (f" on GPU {gpu_id}" if gpu_id is not None else " on CPU"))
atom = ATOMClassifier(
X,
y="y",
test_size=0.2,
verbose=2,
n_jobs=-1, # Use all available cores for internal parallelization
device='gpu' if gpu_id is not None else 'cpu',
engine='cuml' if gpu_id is not None else 'sklearn',
random_state=1,
index=True,
backend="threading" # Use threading for internal parallelization
)
atom.feature_selection(strategy="sfs", solver=solver, n_features=10, random_state=0)
if model == 'XGB' and gpu_id is not None:
atom.models[model_name].estimator.set_params(
tree_method='gpu_hist',
gpu_id=gpu_id,
predictor='gpu_predictor'
)
atom.run(models=[model_name], n_trials=50, metric="AUC", n_bootstrap=10, parallel=True)
return atom
def run_atom_classification(csv_file, n_cpus=None, use_gpus=False):
ray.init(num_cpus=n_cpus)
n_gpus = get_gpu_count() if use_gpus else 0
n_cpus = os.cpu_count() if n_cpus is None else n_cpus
print(f"Number of CPUs available: {n_cpus}")
print(f"Number of GPUs available: {n_gpus}")
X = ray.get(load_and_preprocess_data.remote(csv_file))
print("Dataset has loaded")
solvers = ["LGB", "LR", "RF", "LDA", "XGB"]
models = ["RF", "XGB", "LDA", "GBM", "LR", "SVM", "CatB"]
base_filename = os.path.splitext(os.path.basename(csv_file))[0]
os.makedirs("results", exist_ok=True)
print("Executing trainings")
gpu_ids = iter(np.tile(range(n_gpus), len(solvers) * len(models))) if n_gpus > 0 else None
# Create a list of all model-solver combinations
tasks = [(model, solver) for solver in solvers for model in models]
# Use Ray to distribute tasks
futures = [train_model.remote(X, model, solver, next(gpu_ids) if gpu_ids and model in ['XGB', 'LGB'] else None)
for model, solver in tasks]
results = ray.get(futures)
# Merge results
main_atom = results[0]
for atom in results[1:]:
main_atom.merge(atom)
print("All processes completed")
joblib_filename = f"results/{base_filename}_atom.pkl"
joblib.dump(main_atom, joblib_filename)
print(f"Merged results saved to {joblib_filename}")
ray.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run optimized ATOM classification using CPUs and GPUs efficiently.")
parser.add_argument("csv_file", type=str, help="Path to the input CSV file.")
parser.add_argument("--use_gpus", action="store_true", help="Use GPU acceleration if available")
parser.add_argument("--n_cpus", type=int, default=None, help="Number of CPU cores to use")
args = parser.parse_args()
try:
run_atom_classification(args.csv_file, n_cpus=args.n_cpus, use_gpus=args.use_gpus)
except Exception as e:
print(f"An error occurred: {str(e)}")
print("Full traceback:")
import traceback
traceback.print_exc()Editor is loading...
Leave a Comment