Untitled
unknown
plain_text
14 days ago
3.2 kB
2
Indexable
Never
import pandas as pd from atom import ATOMClassifier import joblib import argparse import os import multiprocessing from functools import partial from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import Manager def load_and_preprocess_data(csv_file): print(f"Loading CSV file: {csv_file}") X = pd.read_csv(csv_file, index_col=0) X['y'] = [0 if row_name.startswith('1') else (1 if row_name.startswith('2') else 2) for row_name in X.index] return X def parallel_feature_selection(atom, solver, n_features=10): print(f"Performing feature selection with solver: {solver}") atom.feature_selection(strategy="sfs", solver=solver, n_features=n_features, random_state=0) return atom def train_model(atom, model, solver): model_name = f"{model}_{solver}" print(f"Training model: {model_name}") atom.run(models=[model_name], n_trials=50, metric="AUC", n_bootstrap=5) return atom def process_solver(solver, X, models, results_dict): print(f"Starting process for solver: {solver}") atom = ATOMClassifier(X, y="y", test_size=0.2, verbose=2, random_state=1, index=True) atom.branch = "main" atom.branch = f"sfs_{solver}" atom = parallel_feature_selection(atom, solver) with ProcessPoolExecutor(max_workers=min(len(models), multiprocessing.cpu_count())) as executor: future_to_model = {executor.submit(train_model, atom, model, solver): model for model in models} for future in as_completed(future_to_model): model = future_to_model[future] try: atom = future.result() except Exception as exc: print(f'{model} generated an exception: {exc}') # Store results in the shared dictionary results_dict[solver] = atom print(f"Results for solver {solver} stored") def run_atom_classification(csv_file): X = load_and_preprocess_data(csv_file) solvers = ["LGB", "LR", "RF", "LDA", "XGB"] #solvers = ["LR", "RF", "LDA", "XGB"] models = ["RF", "XGB", "LDA", "GBM", "LR", "SVM"] base_filename = os.path.splitext(os.path.basename(csv_file))[0] os.makedirs("results", exist_ok=True) # Create a manager to share the results dictionary across processes with Manager() as manager: results_dict = manager.dict() process_solver_partial = partial(process_solver, X=X, models=models, results_dict=results_dict) with ProcessPoolExecutor(max_workers=len(solvers)) as executor: executor.map(process_solver_partial, solvers) print("All processes completed") # Save all results to a single file joblib_filename = f"results/{base_filename}_atom_sfm_all.pkl" joblib.dump(dict(results_dict), joblib_filename) print(f"All results saved to {joblib_filename}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run ATOM classification with parallel feature selection and model training.") parser.add_argument("csv_file", type=str, help="Path to the input CSV file.") args = parser.parse_args() run_atom_classification(args.csv_file)
Leave a Comment