Untitled
unknown
plain_text
a year ago
8.2 kB
3
Indexable
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_validate, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.dummy import DummyClassifier
from sklearn.metrics import make_scorer, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.feature_selection import mutual_info_classif, SelectKBest
import joblib
import argparse
import os
import ray
import logging
from typing import List, Dict
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def format_for_atom_evaluate(trained_models, X, y):
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
results = {}
for model_name, model in trained_models.items():
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test) if hasattr(model, "predict_proba") else None
results[model_name] = {
"y_pred": y_pred,
"y_pred_proba": y_pred_proba,
"metrics": {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, average='weighted'),
"recall": recall_score(y_test, y_pred, average='weighted'),
"f1": f1_score(y_test, y_pred, average='weighted'),
"auc": roc_auc_score(y_test, y_pred_proba, multi_class='ovr', average='weighted') if y_pred_proba is not None else None
}
}
return results, y_test
def load_and_preprocess_data(csv_file: str) -> pd.DataFrame:
logging.info(f"Loading CSV file: {csv_file}")
df = pd.read_csv(csv_file)
logging.info(f"DataFrame shape: {df.shape}")
df['y'] = df.index.map(lambda x: 0 if str(x).startswith('1') else (1 if str(x).startswith('2') else 2))
logging.info(f"Target variable distribution:\n{df['y'].value_counts(normalize=True)}")
return df
def preselect_features(X: pd.DataFrame, y: pd.Series, n_features: int = 1000) -> List[str]:
X_features = X.select_dtypes(include=[np.number])
selector = SelectKBest(mutual_info_classif, k=min(n_features, X_features.shape[1]))
selector.fit(X_features, y)
selected_features = list(X_features.columns[selector.get_support()])
logging.info(f"Number of features selected: {len(selected_features)}")
return selected_features + ['y']
@ray.remote
def train_and_evaluate_model(X, y, model_name: str):
logging.info(f"Training and evaluating model: {model_name}")
start_time = time.time()
if model_name == 'LDA':
model = LinearDiscriminantAnalysis()
param_dist = {}
elif model_name == 'LR':
model = LogisticRegression(max_iter=1000)
param_dist = {'C': np.logspace(-4, 4, 20)}
elif model_name == 'GBM':
model = GradientBoostingClassifier()
param_dist = {
'n_estimators': [50, 100, 200],
'learning_rate': np.logspace(-3, 0, 10),
'max_depth': [3, 5, 7]
}
elif model_name == 'RF':
model = RandomForestClassifier()
param_dist = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, None],
'min_samples_split': [2, 5, 10]
}
elif model_name == 'Dummy':
model = DummyClassifier(strategy='stratified')
param_dist = {}
else:
raise ValueError(f"Unknown model: {model_name}")
scoring = {
'accuracy': 'accuracy',
'precision': 'precision_weighted',
'recall': 'recall_weighted',
'f1': 'f1_weighted',
'auc': 'roc_auc_ovr_weighted'
}
if param_dist:
n_iter = min(50, np.prod([len(v) for v in param_dist.values()]))
random_search = RandomizedSearchCV(model, param_distributions=param_dist, n_iter=n_iter, cv=10, scoring=scoring, n_jobs=-1, random_state=42, refit='f1')
random_search.fit(X, y)
best_model = random_search.best_estimator_
cv_results = random_search.cv_results_
results = {
'model': model_name,
'accuracy': cv_results['mean_test_accuracy'].mean(),
'precision': cv_results['mean_test_precision'].mean(),
'recall': cv_results['mean_test_recall'].mean(),
'f1': cv_results['mean_test_f1'].mean(),
'auc': cv_results['mean_test_auc'].mean()
}
else:
cv_results = cross_validate(model, X, y, cv=10, scoring=scoring, n_jobs=-1)
best_model = model.fit(X, y)
results = {
'model': model_name,
'accuracy': cv_results['test_accuracy'].mean(),
'precision': cv_results['test_precision'].mean(),
'recall': cv_results['test_recall'].mean(),
'f1': cv_results['test_f1'].mean(),
'auc': cv_results['test_auc'].mean()
}
end_time = time.time()
logging.info(f"{model_name} training and evaluation completed in {end_time - start_time:.2f} seconds")
return results, best_model
def run_classification(csv_file: str, n_cpus: int = None):
n_cpus = os.cpu_count() if n_cpus is None else n_cpus
logging.info(f"Number of CPUs available: {n_cpus}")
ray.init(num_cpus=n_cpus)
try:
df = load_and_preprocess_data(csv_file)
logging.info("Dataset has been loaded and preprocessed.")
y = df['y']
X = df.drop('y', axis=1)
selected_features = preselect_features(X, y, n_features=1000)
X = X[selected_features[:-1]] # Exclude 'y' from X
logging.info(f"Shape of feature matrix after selection: {X.shape}")
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
models = ["Dummy", "LDA", "LR", "GBM", "RF"]
tasks = [train_and_evaluate_model.remote(X_scaled, y, model) for model in models]
results = ray.get(tasks)
all_results = []
trained_models = {}
for result, model in results:
all_results.append(result)
trained_models[result['model']] = model
results_df = pd.DataFrame(all_results)
logging.info("\nModel Evaluation Results:")
logging.info(results_df)
base_filename = os.path.splitext(os.path.basename(csv_file))[0]
results_filename = f"results/{base_filename}_evaluation_results.csv"
models_filename = f"results/{base_filename}_trained_models.joblib"
os.makedirs(os.path.dirname(results_filename), exist_ok=True)
results_df.to_csv(results_filename, index=False)
joblib.dump(trained_models, models_filename)
logging.info(f"Evaluation results saved to {results_filename}")
logging.info(f"Trained models saved to {models_filename}")
# Format results for atom.evaluate()
atom_results, y_test = format_for_atom_evaluate(trained_models, X_scaled, y)
# Save formatted results
atom_results_filename = f"results/{base_filename}_atom_evaluation_results.joblib"
joblib.dump((atom_results, y_test), atom_results_filename)
logging.info(f"Results formatted for atom.evaluate() saved to {atom_results_filename}")
except Exception as e:
logging.error(f"An error occurred during processing: {str(e)}")
logging.error("Full traceback:", exc_info=True)
finally:
ray.shutdown()
return atom_results, y_test # Return these in case they're needed for further processing
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run ML classification with cross-validation and hyperparameter tuning.")
parser.add_argument("csv_file", type=str, help="Path to the input CSV file.")
parser.add_argument("--n_cpus", type=int, default=None, help="Number of CPU cores to use")
args = parser.parse_args()
atom_results, y_test = run_classification(args.csv_file, n_cpus=args.n_cpus)
Editor is loading...
Leave a Comment