Untitled
unknown
plain_text
8 months ago
8.2 kB
2
Indexable
import pandas as pd import numpy as np from sklearn.model_selection import train_test_split, cross_validate, RandomizedSearchCV from sklearn.preprocessing import StandardScaler from sklearn.discriminant_analysis import LinearDiscriminantAnalysis from sklearn.linear_model import LogisticRegression from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier from sklearn.dummy import DummyClassifier from sklearn.metrics import make_scorer, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score from sklearn.feature_selection import mutual_info_classif, SelectKBest import joblib import argparse import os import ray import logging from typing import List, Dict import time logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def format_for_atom_evaluate(trained_models, X, y): X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) results = {} for model_name, model in trained_models.items(): y_pred = model.predict(X_test) y_pred_proba = model.predict_proba(X_test) if hasattr(model, "predict_proba") else None results[model_name] = { "y_pred": y_pred, "y_pred_proba": y_pred_proba, "metrics": { "accuracy": accuracy_score(y_test, y_pred), "precision": precision_score(y_test, y_pred, average='weighted'), "recall": recall_score(y_test, y_pred, average='weighted'), "f1": f1_score(y_test, y_pred, average='weighted'), "auc": roc_auc_score(y_test, y_pred_proba, multi_class='ovr', average='weighted') if y_pred_proba is not None else None } } return results, y_test def load_and_preprocess_data(csv_file: str) -> pd.DataFrame: logging.info(f"Loading CSV file: {csv_file}") df = pd.read_csv(csv_file) logging.info(f"DataFrame shape: {df.shape}") df['y'] = df.index.map(lambda x: 0 if str(x).startswith('1') else (1 if str(x).startswith('2') else 2)) logging.info(f"Target variable distribution:\n{df['y'].value_counts(normalize=True)}") return df def preselect_features(X: pd.DataFrame, y: pd.Series, n_features: int = 1000) -> List[str]: X_features = X.select_dtypes(include=[np.number]) selector = SelectKBest(mutual_info_classif, k=min(n_features, X_features.shape[1])) selector.fit(X_features, y) selected_features = list(X_features.columns[selector.get_support()]) logging.info(f"Number of features selected: {len(selected_features)}") return selected_features + ['y'] @ray.remote def train_and_evaluate_model(X, y, model_name: str): logging.info(f"Training and evaluating model: {model_name}") start_time = time.time() if model_name == 'LDA': model = LinearDiscriminantAnalysis() param_dist = {} elif model_name == 'LR': model = LogisticRegression(max_iter=1000) param_dist = {'C': np.logspace(-4, 4, 20)} elif model_name == 'GBM': model = GradientBoostingClassifier() param_dist = { 'n_estimators': [50, 100, 200], 'learning_rate': np.logspace(-3, 0, 10), 'max_depth': [3, 5, 7] } elif model_name == 'RF': model = RandomForestClassifier() param_dist = { 'n_estimators': [50, 100, 200], 'max_depth': [5, 10, None], 'min_samples_split': [2, 5, 10] } elif model_name == 'Dummy': model = DummyClassifier(strategy='stratified') param_dist = {} else: raise ValueError(f"Unknown model: {model_name}") scoring = { 'accuracy': 'accuracy', 'precision': 'precision_weighted', 'recall': 'recall_weighted', 'f1': 'f1_weighted', 'auc': 'roc_auc_ovr_weighted' } if param_dist: n_iter = min(50, np.prod([len(v) for v in param_dist.values()])) random_search = RandomizedSearchCV(model, param_distributions=param_dist, n_iter=n_iter, cv=10, scoring=scoring, n_jobs=-1, random_state=42, refit='f1') random_search.fit(X, y) best_model = random_search.best_estimator_ cv_results = random_search.cv_results_ results = { 'model': model_name, 'accuracy': cv_results['mean_test_accuracy'].mean(), 'precision': cv_results['mean_test_precision'].mean(), 'recall': cv_results['mean_test_recall'].mean(), 'f1': cv_results['mean_test_f1'].mean(), 'auc': cv_results['mean_test_auc'].mean() } else: cv_results = cross_validate(model, X, y, cv=10, scoring=scoring, n_jobs=-1) best_model = model.fit(X, y) results = { 'model': model_name, 'accuracy': cv_results['test_accuracy'].mean(), 'precision': cv_results['test_precision'].mean(), 'recall': cv_results['test_recall'].mean(), 'f1': cv_results['test_f1'].mean(), 'auc': cv_results['test_auc'].mean() } end_time = time.time() logging.info(f"{model_name} training and evaluation completed in {end_time - start_time:.2f} seconds") return results, best_model def run_classification(csv_file: str, n_cpus: int = None): n_cpus = os.cpu_count() if n_cpus is None else n_cpus logging.info(f"Number of CPUs available: {n_cpus}") ray.init(num_cpus=n_cpus) try: df = load_and_preprocess_data(csv_file) logging.info("Dataset has been loaded and preprocessed.") y = df['y'] X = df.drop('y', axis=1) selected_features = preselect_features(X, y, n_features=1000) X = X[selected_features[:-1]] # Exclude 'y' from X logging.info(f"Shape of feature matrix after selection: {X.shape}") scaler = StandardScaler() X_scaled = scaler.fit_transform(X) models = ["Dummy", "LDA", "LR", "GBM", "RF"] tasks = [train_and_evaluate_model.remote(X_scaled, y, model) for model in models] results = ray.get(tasks) all_results = [] trained_models = {} for result, model in results: all_results.append(result) trained_models[result['model']] = model results_df = pd.DataFrame(all_results) logging.info("\nModel Evaluation Results:") logging.info(results_df) base_filename = os.path.splitext(os.path.basename(csv_file))[0] results_filename = f"results/{base_filename}_evaluation_results.csv" models_filename = f"results/{base_filename}_trained_models.joblib" os.makedirs(os.path.dirname(results_filename), exist_ok=True) results_df.to_csv(results_filename, index=False) joblib.dump(trained_models, models_filename) logging.info(f"Evaluation results saved to {results_filename}") logging.info(f"Trained models saved to {models_filename}") # Format results for atom.evaluate() atom_results, y_test = format_for_atom_evaluate(trained_models, X_scaled, y) # Save formatted results atom_results_filename = f"results/{base_filename}_atom_evaluation_results.joblib" joblib.dump((atom_results, y_test), atom_results_filename) logging.info(f"Results formatted for atom.evaluate() saved to {atom_results_filename}") except Exception as e: logging.error(f"An error occurred during processing: {str(e)}") logging.error("Full traceback:", exc_info=True) finally: ray.shutdown() return atom_results, y_test # Return these in case they're needed for further processing if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run ML classification with cross-validation and hyperparameter tuning.") parser.add_argument("csv_file", type=str, help="Path to the input CSV file.") parser.add_argument("--n_cpus", type=int, default=None, help="Number of CPU cores to use") args = parser.parse_args() atom_results, y_test = run_classification(args.csv_file, n_cpus=args.n_cpus)
Editor is loading...
Leave a Comment