Untitled
unknown
python
a year ago
22 kB
42
Indexable
Never
import contextlib import enum import json import os import pathlib import typing as tp import uuid import numpy as np import pandas as pd import sklearn from scipy.stats import mode from sklearn.metrics._scorer import _check_multimetric_scoring from sklearn.model_selection._validation import _score from collections import Counter from sklearn.model_selection import cross_val_score from sklearn.model_selection import cross_val_score, cross_validate class NodeType(enum.Enum): REGULAR = 1 TERMINAL = 2 def gini(y: np.ndarray) -> float: """ Computes Gini index for given set of labels :param y: labels :return: Gini impurity """ count = dict(Counter(y)) pi = np.array(list(count.values())) / sum(list(count.values())) return 1 - sum(pi**2) def weighted_impurity(y_left: np.ndarray, y_right: np.ndarray) -> \ tp.Tuple[float, float, float]: """ Computes weighted impurity by averaging children impurities :param y_left: left partition :param y_right: right partition :return: averaged impurity, left child impurity, right child impurity """ left_imp = gini(y_left) right_imp = gini(y_right) left_coef = len(y_left) / (len(y_left) + len(y_right)) right_coef = len(y_right) / (len(y_left) + len(y_right)) weighted_impurity = left_coef * left_imp + right_coef * right_imp return weighted_impurity, left_imp, right_imp def create_split(feature_values: np.ndarray, threshold: float) -> tp.Tuple[np.ndarray, np.ndarray]: """ splits given 1-d array according to relation to threshold into two subarrays :param feature_values: feature values extracted from data :param threshold: value to compare with :return: two sets of indices """ left_idx = np.where(feature_values <= threshold) right_idx = np.where(feature_values > threshold) return left_idx, right_idx class MyDecisionTreeNode: """ Auxiliary class serving as representation of a decision tree node """ def __init__( self, meta: 'MyDecisionTreeClassifier', depth, node_type: NodeType = NodeType.REGULAR, predicted_class: tp.Optional[tp.Union[int, str]] = None, left_subtree: tp.Optional['MyDecisionTreeNode'] = None, right_subtree: tp.Optional['MyDecisionTreeNode'] = None, feature_id: int = None, threshold: float = None, impurity: float = np.inf ): """ :param meta: object, holding meta information about tree :param depth: depth of this node in a tree (is deduced on creation by depth of ancestor) :param node_type: 'regular' or 'terminal' depending on whether this node is a leaf node :param predicted_class: class label assigned to a terminal node :param feature_id: index if feature to split by :param """ self._node_type = node_type self._meta = meta self._depth = depth self._predicted_class = predicted_class self._class_proba = None self._left_subtree = left_subtree self._right_subtree = right_subtree self._feature_id = feature_id self._threshold = threshold self._impurity = impurity def _best_split(self, X: np.ndarray, y: np.ndarray): """ finds best split :param X: Data, passed to node :param y: labels :return: best feature, best threshold, left child impurity, right child impurity """ lowest_impurity = np.inf best_feature_id = None best_threshold = None lowest_left_child_impurity, lowest_right_child_impurity = None, None features = self._meta.rng.permutation(X.shape[1]) for feature in features: current_feature_values = X[:, feature] thresholds = np.unique(current_feature_values) for threshold in thresholds: # find indices for split with current threshold left_idxs, right_idxs = create_split(current_feature_values, threshold) current_weighted_impurity, current_left_impurity, current_right_impurity = weighted_impurity(y[left_idxs], y[right_idxs]) if current_weighted_impurity < lowest_impurity: lowest_impurity = current_weighted_impurity best_feature_id = feature best_threshold = threshold lowest_left_child_impurity = current_left_impurity lowest_right_child_impurity = current_right_impurity return best_feature_id, best_threshold, lowest_left_child_impurity, lowest_right_child_impurity def fit(self, X: np.ndarray, y: np.ndarray): """ recursively fits a node, providing it with predicted class or split condition :param X: Data :param y: labels :return: fitted node """ # raise NotImplementedError("COMPLETE THIS FUNCTION") if ( (np.unique(y).shape[0] == 1) or (self._meta.max_depth == self._depth) or (self._meta.min_samples_split > len(y)) ): self._node_type = NodeType.TERMINAL c = Counter(y) self._predicted_class = c.most_common(1)[0][0] # choose most common class if (np.unique(y).shape[0] == 1): if 1 in dict(c): self._class_proba = [0,1] else: self._class_proba = [1,0] else: c = dict(Counter(y)) c = {k:v for k,v in sorted(c.items(), key = lambda x : x[0])} # print (c) self._class_proba = list(np.array(list(c.values())) / len(y)) self._class_proba = np.array(self._class_proba) # self._class_proba = list(np.array(sorted(c.values())[::-1]) / len(y)) # compute its probability as ratio # print (self._class_proba) return self self._feature_id, self._threshold, left_imp, right_imp = self._best_split(X, y) left_idx, right_idx = create_split(X[:, self._feature_id], self._threshold) self._left_subtree = MyDecisionTreeNode( meta=self._meta, depth=self._depth + 1, # adjust depth impurity=left_imp ).fit( X[left_idx], y[left_idx] ) self._right_subtree = MyDecisionTreeNode( meta=self._meta, depth = self._depth + 1, # adjust depth impurity=right_imp ).fit( X[right_idx], y[right_idx] ) return self def predict(self, x: np.ndarray): """ Predicts class for a single object :param x: object of shape (n_features, ) :return: class assigned to object """ if self._node_type is NodeType.TERMINAL: return self._predicted_class if x[self._feature_id] <= self._threshold: return self._left_subtree.predict(x) else: return self._right_subtree.predict(x) def predict_proba(self, x: np.ndarray): """ Predicts probability for a single object :param x: object of shape (n_features, ) :return: vector of probabilities assigned to object """ if self._node_type is NodeType.TERMINAL: return self._class_proba if x[self._feature_id] <= self._threshold: return self._left_subtree.predict_proba(x) else: return self._right_subtree.predict_proba(x) class MyDecisionTreeClassifier: """ Class analogous to sklearn implementation of decision tree classifier with Gini impurity criterion, named in a manner avoiding collisions """ def __init__( self, max_depth: tp.Optional[int] = None, min_samples_split: tp.Optional[int] = 2, seed: int = 0 ): """ :param max_depth: maximal depth of tree, prevents overfitting :param min_samples_split: minimal amount of samples for node to be a splitter node :param seed: seed for RNG, enables reproducibility """ self.root = MyDecisionTreeNode(self, 1) self._is_trained = False self.max_depth = max_depth or np.inf self.min_samples_split = min_samples_split or 2 self.rng = np.random.default_rng(seed) self._n_classes = 0 def fit(self, X: np.ndarray, y: np.ndarray): """ starts recursive process of node criterion fitting from the root :param X: Data :param y: labels :return: fitted self """ self._n_classes = np.unique(y).shape[0] self.root.fit(X, y) self._is_trained = True return self def predict(self, X: np.ndarray) -> pd.DataFrame: """ Predicts class for a sequence of objects :param x: Data :return: classes assigned to each object """ if not self._is_trained: return np.array([-228 for i in X]) else: preds = [] for obj in X: preds.append(self.root.predict(obj)) return np.array(preds) # predict class for each object def predict_proba(self, X: np.ndarray) -> pd.DataFrame: """ Predicts class for a sequence of objects :param x: Data :return: probabilities of all classes for each object """ if not self._is_trained: raise RuntimeError('predict call on untrained model') else: preds = [] for obj in X: preds.append(self.root.predict_proba(obj)) return np.array(preds) class MyRandomForestClassifier: """ Data-diverse ensemble of tree calssifiers """ big_number = 1 << 32 def __init__( self, n_estimators: int, max_depth: tp.Optional[int] = None, min_samples_split: tp.Optional[int] = 2, seed: int = 0 ): """ :param n_estimators: number of trees in forest :param max_depth: maximal depth of tree, prevents overfitting :param min_samples_split: minimal amount of samples for node to be a splitter node :param seed: seed for RNG, enables reproducibility """ self._n_classes = 0 self._is_trained = False self.rng = np.random.default_rng(seed) self.estimators = [ MyDecisionTreeClassifier(max_depth, min_samples_split, seed=seed) for seed in self.rng.choice(max(MyRandomForestClassifier.big_number, n_estimators), size=(n_estimators,), replace=False)] def _bootstrap_sample(self, X: np.ndarray, y: np.ndarray): """ returns bootstrapped sample from X of equal size :param X: objects collection to sample from :param y: corresponding labels :return: """ # raise NotImplementedError("COMPLETE THIS FUNCTION") indices = self.rng.choice(X, size=(X.shape[0],)) return X[indices], y[indices] def fit(self, X: np.ndarray, y: np.ndarray): """ fits each estimator of the ensemble on the bootstrapped data sample :param X: Data :param y: labels :return: fitted self """ self._n_classes = np.unique(y).shape[0] for estimator in self.estimators: # X_bootstraped, y_bootstraped = self._bootstrap_sample(X, y) estimator.fit(X, y) estimator._is_trained = True self._is_trained = True return self def predict_proba(self, X: np.ndarray): """ predict probability of each class by averaging over all base estimators :param X: Data :return: array of probabilities """ probas = [] # for estimator in self.estimators: # predicts.append(estimator.predict_proba(X)) # get averaged probabilities from each estimator return self.estimators[0].predict_proba(X) def predict(self, X): """ predict class for each object :param X: Data :return: array of class labels """ # raise NotImplementedError("COMPLETE THIS FUNCTION") predicts = [] for estimator in self.estimators: to_append = estimator.predict(X).reshape(-1,1) if to_append == -228: continue else: predicts.append(to_append) try: stacked_predicts = np.hstack(predicts) except: return 2 most_common = [] for row in stacked_predicts: c = Counter(row) most_common.append(c.most_common(1)[0][0]) return most_common class Logger: """Logger performs data management and stores scores and other relevant information""" def __init__(self, logs_path: tp.Union[str, os.PathLike]): self.path = pathlib.Path(logs_path) records = [] for root, dirs, files in os.walk(self.path): for file in files: if file.lower().endswith('.json'): uuid = os.path.splitext(file)[0] with open(os.path.join(root, file), 'r') as f: try: logged_data = json.load(f) records.append( { 'id': uuid, **logged_data } ) except json.JSONDecodeError: pass if records: self.leaderboard = pd.DataFrame.from_records(records, index='id') else: self.leaderboard = pd.DataFrame(index=pd.Index([], name='id')) self._current_run = None class Run: """Run incapsulates information for a particular entry of logged material. Each run is solitary experiment""" def __init__(self, name, storage, path): self.name = name self._storage = storage self._path = path self._storage.append(pd.Series(name=name)) def log(self, key, value): self._storage.loc[self.name, key] = value def log_values(self, log_values: tp.Dict[str, tp.Any]): for key, value in log_values.items(): self.log(key, value) def save_logs(self): with open(self._path / f'{self.name}.json', 'w+') as f: json.dump(self._storage.loc[self.name].to_dict(), f) def log_artifact(self, fname: str, writer: tp.Callable): with open(self._path / fname, 'wb+') as f: writer(f) @contextlib.contextmanager def run(self, name: tp.Optional[str] = None): if name is None: name = str(uuid.uuid4()) elif name in self.leaderboard.index: raise NameError("Run with given name already exists, name should be unique") else: name = name.replace(' ', '_') self._current_run = Logger.Run(name, self.leaderboard, self.path / name) os.makedirs(self.path / name, exist_ok=True) try: yield self._current_run finally: self._current_run.save_logs() def load_predictions_dataframe(filename, column_prefix, index): with open(filename, 'rb') as file: data = np.load(file) dataframe = pd.DataFrame(data, columns=[f'{column_prefix}_{i}' for i in range(data.shape[1])], index=index) return dataframe class ExperimentHandler: """This class perfoms experiments with given model, measures metrics and logs everything for thorough comparison""" stacking_prediction_filename = 'cv_stacking_prediction.npy' test_stacking_prediction_filename = 'test_stacking_prediction.npy' def __init__( self, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, cv_iterable: tp.Iterable, logger: Logger, metrics: tp.Dict[str, tp.Union[tp.Callable, str]], n_jobs=-1 ): self.X_train = X_train self.y_train = y_train self.X_test = X_test self.y_test = y_test self._cv_iterable = cv_iterable self.logger = logger self._metrics = metrics self._n_jobs = n_jobs def score_test(self, estimator, metrics, run, test_data=None): """ Computes scores for test data and logs them to given run :param estimator: fitted estimator :param metrics: metrics to compute :param run: run to log into :param test_data: optional argument if one wants to pass augmented test dataset :return: None """ if test_data is None: test_data = self.X_test test_scores = _score(estimator, test_data, self.y_test, metrics) run.log_values({key + '_test': value for key, value in test_scores.items()}) def score_cv(self, estimator, metrics, run): """ computes scores on cross-validation :param estimator: estimator to fit :param metrics: metrics to compute :param run: run to log to :return: None """ # raise NotImplementedError("COMPLETE THIS FUNCTION") cross_val_results = cross_validate(estimator = estimator, X = self.X_train, y = self.y_train, scoring = metrics, cv = self._cv_iterable) for key, value in cross_val_results.items(): if key.startswith('test_'): metric_name = key.split('_', maxsplit=1)[1] mean_score = np.mean(value) std_score = np.std(value) run.log_values( { metric_name + '_mean': mean_score, metric_name + '_std': std_score } ) def generate_stacking_predictions(self, estimator, run): """ generates predictions over cross-validation folds, then saves them as artifacts returns fitted estimator for convinience and les train overhead :param estimator: estimator to use :param run: run to log to :return: estimator fitted on train, stacking cross-val predictions, stacking test predictions """ # raise NotImplementedError("COMPLETE THIS FUNCTION") if hasattr(estimator, "predict_proba"): # choose the most informative method for stacking predictions method = "predict_proba" elif hasattr(estimator, "decision_function"): method = "decision_function" else: method = "predict" cross_val_stacking_prediction = ... # generate crossval predictions for stacking using most informative method stack = [] for train_idx, test_idx in self._cv_iterable.split(self.X_train, self.y_train): estimator.fit(self.X_train[train_idx], self.y_train[train_idx]) new_pred = getattr(estimator, method)(self.X_train[test_idx]) stack.extend(list(zip(test_idx, new_pred))) # stack.extend(new_pred) cross_val_stacking_prediction = [v for k,v in sorted(stack, key=lambda x : x[0])] # cross_val_stacking_prediction = stack run.log_artifact(ExperimentHandler.stacking_prediction_filename, lambda file: np.save(file, cross_val_stacking_prediction)) estimator.fit(self.X_train, self.y_train) test_stacking_prediction = getattr(estimator, method)(self.X_test) run.log_artifact(ExperimentHandler.test_stacking_prediction_filename, lambda file: np.save(file, test_stacking_prediction)) return estimator, cross_val_stacking_prediction, test_stacking_prediction def get_metrics(self, estimator): """ get callable metrics with estimator validation (e.g. estimator has predict_proba necessary for likelihood computation, etc) """ return _check_multimetric_scoring(estimator, self._metrics) def run(self, estimator: sklearn.base.BaseEstimator, name=None): """ perform run for given estimator :param estimator: estimator to use :param name: name of run for convinience and consitent logging :return: leaderboard with conducted run """ metrics = self.get_metrics(estimator) with self.logger.run(name=name) as run: # compute predictions over cross-validation self.score_cv(estimator, metrics, run) fitted_on_train, _, _ = self.generate_stacking_predictions(estimator, run) self.score_test(fitted_on_train, metrics, run, test_data=self.X_test) return self.logger.leaderboard.loc[[run.name]] def get_stacking_predictions(self, run_names): """ :param run_names: run names for which to extract stacking predictions for averaging and stacking :return: dataframe with predictions indexed by run names """ train_dataframes = [] test_dataframes = [] for run_name in run_names: train_filename = self.logger.path / run_name / ExperimentHandler.stacking_prediction_filename train_dataframes.append(load_predictions_dataframe(filename=train_filename, column_prefix=run_name, index=self.X_train.index)) test_filename = self.logger.path / run_name / ExperimentHandler.test_stacking_prediction_filename test_dataframes.append(load_predictions_dataframe(filename=test_filename, column_prefix=run_name, index=self.X_test.index)) return pd.concat(train_dataframes, axis=1), pd.concat(test_dataframes, axis=1)