Untitled
unknown
python
3 years ago
22 kB
67
Indexable
import contextlib
import enum
import json
import os
import pathlib
import typing as tp
import uuid
import numpy as np
import pandas as pd
import sklearn
from scipy.stats import mode
from sklearn.metrics._scorer import _check_multimetric_scoring
from sklearn.model_selection._validation import _score
from collections import Counter
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_val_score, cross_validate
class NodeType(enum.Enum):
REGULAR = 1
TERMINAL = 2
def gini(y: np.ndarray) -> float:
"""
Computes Gini index for given set of labels
:param y: labels
:return: Gini impurity
"""
count = dict(Counter(y))
pi = np.array(list(count.values())) / sum(list(count.values()))
return 1 - sum(pi**2)
def weighted_impurity(y_left: np.ndarray, y_right: np.ndarray) -> \
tp.Tuple[float, float, float]:
"""
Computes weighted impurity by averaging children impurities
:param y_left: left partition
:param y_right: right partition
:return: averaged impurity, left child impurity, right child impurity
"""
left_imp = gini(y_left)
right_imp = gini(y_right)
left_coef = len(y_left) / (len(y_left) + len(y_right))
right_coef = len(y_right) / (len(y_left) + len(y_right))
weighted_impurity = left_coef * left_imp + right_coef * right_imp
return weighted_impurity, left_imp, right_imp
def create_split(feature_values: np.ndarray, threshold: float) -> tp.Tuple[np.ndarray, np.ndarray]:
"""
splits given 1-d array according to relation to threshold into two subarrays
:param feature_values: feature values extracted from data
:param threshold: value to compare with
:return: two sets of indices
"""
left_idx = np.where(feature_values <= threshold)
right_idx = np.where(feature_values > threshold)
return left_idx, right_idx
class MyDecisionTreeNode:
"""
Auxiliary class serving as representation of a decision tree node
"""
def __init__(
self,
meta: 'MyDecisionTreeClassifier',
depth,
node_type: NodeType = NodeType.REGULAR,
predicted_class: tp.Optional[tp.Union[int, str]] = None,
left_subtree: tp.Optional['MyDecisionTreeNode'] = None,
right_subtree: tp.Optional['MyDecisionTreeNode'] = None,
feature_id: int = None,
threshold: float = None,
impurity: float = np.inf
):
"""
:param meta: object, holding meta information about tree
:param depth: depth of this node in a tree (is deduced on creation by depth of ancestor)
:param node_type: 'regular' or 'terminal' depending on whether this node is a leaf node
:param predicted_class: class label assigned to a terminal node
:param feature_id: index if feature to split by
:param
"""
self._node_type = node_type
self._meta = meta
self._depth = depth
self._predicted_class = predicted_class
self._class_proba = None
self._left_subtree = left_subtree
self._right_subtree = right_subtree
self._feature_id = feature_id
self._threshold = threshold
self._impurity = impurity
def _best_split(self, X: np.ndarray, y: np.ndarray):
"""
finds best split
:param X: Data, passed to node
:param y: labels
:return: best feature, best threshold, left child impurity, right child impurity
"""
lowest_impurity = np.inf
best_feature_id = None
best_threshold = None
lowest_left_child_impurity, lowest_right_child_impurity = None, None
features = self._meta.rng.permutation(X.shape[1])
for feature in features:
current_feature_values = X[:, feature]
thresholds = np.unique(current_feature_values)
for threshold in thresholds:
# find indices for split with current threshold
left_idxs, right_idxs = create_split(current_feature_values, threshold)
current_weighted_impurity, current_left_impurity, current_right_impurity = weighted_impurity(y[left_idxs], y[right_idxs])
if current_weighted_impurity < lowest_impurity:
lowest_impurity = current_weighted_impurity
best_feature_id = feature
best_threshold = threshold
lowest_left_child_impurity = current_left_impurity
lowest_right_child_impurity = current_right_impurity
return best_feature_id, best_threshold, lowest_left_child_impurity, lowest_right_child_impurity
def fit(self, X: np.ndarray, y: np.ndarray):
"""
recursively fits a node, providing it with predicted class or split condition
:param X: Data
:param y: labels
:return: fitted node
"""
# raise NotImplementedError("COMPLETE THIS FUNCTION")
if (
(np.unique(y).shape[0] == 1) or
(self._meta.max_depth == self._depth) or
(self._meta.min_samples_split > len(y))
):
self._node_type = NodeType.TERMINAL
c = Counter(y)
self._predicted_class = c.most_common(1)[0][0] # choose most common class
if (np.unique(y).shape[0] == 1):
if 1 in dict(c):
self._class_proba = [0,1]
else:
self._class_proba = [1,0]
else:
c = dict(Counter(y))
c = {k:v for k,v in sorted(c.items(), key = lambda x : x[0])}
# print (c)
self._class_proba = list(np.array(list(c.values())) / len(y))
self._class_proba = np.array(self._class_proba)
# self._class_proba = list(np.array(sorted(c.values())[::-1]) / len(y)) # compute its probability as ratio
# print (self._class_proba)
return self
self._feature_id, self._threshold, left_imp, right_imp = self._best_split(X, y)
left_idx, right_idx = create_split(X[:, self._feature_id], self._threshold)
self._left_subtree = MyDecisionTreeNode(
meta=self._meta,
depth=self._depth + 1, # adjust depth
impurity=left_imp
).fit(
X[left_idx], y[left_idx]
)
self._right_subtree = MyDecisionTreeNode(
meta=self._meta,
depth = self._depth + 1, # adjust depth
impurity=right_imp
).fit(
X[right_idx], y[right_idx]
)
return self
def predict(self, x: np.ndarray):
"""
Predicts class for a single object
:param x: object of shape (n_features, )
:return: class assigned to object
"""
if self._node_type is NodeType.TERMINAL:
return self._predicted_class
if x[self._feature_id] <= self._threshold:
return self._left_subtree.predict(x)
else:
return self._right_subtree.predict(x)
def predict_proba(self, x: np.ndarray):
"""
Predicts probability for a single object
:param x: object of shape (n_features, )
:return: vector of probabilities assigned to object
"""
if self._node_type is NodeType.TERMINAL:
return self._class_proba
if x[self._feature_id] <= self._threshold:
return self._left_subtree.predict_proba(x)
else:
return self._right_subtree.predict_proba(x)
class MyDecisionTreeClassifier:
"""
Class analogous to sklearn implementation of decision tree classifier with Gini impurity criterion,
named in a manner avoiding collisions
"""
def __init__(
self,
max_depth: tp.Optional[int] = None,
min_samples_split: tp.Optional[int] = 2,
seed: int = 0
):
"""
:param max_depth: maximal depth of tree, prevents overfitting
:param min_samples_split: minimal amount of samples for node to be a splitter node
:param seed: seed for RNG, enables reproducibility
"""
self.root = MyDecisionTreeNode(self, 1)
self._is_trained = False
self.max_depth = max_depth or np.inf
self.min_samples_split = min_samples_split or 2
self.rng = np.random.default_rng(seed)
self._n_classes = 0
def fit(self, X: np.ndarray, y: np.ndarray):
"""
starts recursive process of node criterion fitting from the root
:param X: Data
:param y: labels
:return: fitted self
"""
self._n_classes = np.unique(y).shape[0]
self.root.fit(X, y)
self._is_trained = True
return self
def predict(self, X: np.ndarray) -> pd.DataFrame:
"""
Predicts class for a sequence of objects
:param x: Data
:return: classes assigned to each object
"""
if not self._is_trained:
return np.array([-228 for i in X])
else:
preds = []
for obj in X:
preds.append(self.root.predict(obj))
return np.array(preds)
# predict class for each object
def predict_proba(self, X: np.ndarray) -> pd.DataFrame:
"""
Predicts class for a sequence of objects
:param x: Data
:return: probabilities of all classes for each object
"""
if not self._is_trained:
raise RuntimeError('predict call on untrained model')
else:
preds = []
for obj in X:
preds.append(self.root.predict_proba(obj))
return np.array(preds)
class MyRandomForestClassifier:
"""
Data-diverse ensemble of tree calssifiers
"""
big_number = 1 << 32
def __init__(
self,
n_estimators: int,
max_depth: tp.Optional[int] = None,
min_samples_split: tp.Optional[int] = 2,
seed: int = 0
):
"""
:param n_estimators: number of trees in forest
:param max_depth: maximal depth of tree, prevents overfitting
:param min_samples_split: minimal amount of samples for node to be a splitter node
:param seed: seed for RNG, enables reproducibility
"""
self._n_classes = 0
self._is_trained = False
self.rng = np.random.default_rng(seed)
self.estimators = [
MyDecisionTreeClassifier(max_depth, min_samples_split, seed=seed) for
seed in self.rng.choice(max(MyRandomForestClassifier.big_number, n_estimators), size=(n_estimators,),
replace=False)]
def _bootstrap_sample(self, X: np.ndarray, y: np.ndarray):
"""
returns bootstrapped sample from X of equal size
:param X: objects collection to sample from
:param y: corresponding labels
:return:
"""
# raise NotImplementedError("COMPLETE THIS FUNCTION")
indices = self.rng.choice(X, size=(X.shape[0],))
return X[indices], y[indices]
def fit(self, X: np.ndarray, y: np.ndarray):
"""
fits each estimator of the ensemble on the bootstrapped data sample
:param X: Data
:param y: labels
:return: fitted self
"""
self._n_classes = np.unique(y).shape[0]
for estimator in self.estimators:
# X_bootstraped, y_bootstraped = self._bootstrap_sample(X, y)
estimator.fit(X, y)
estimator._is_trained = True
self._is_trained = True
return self
def predict_proba(self, X: np.ndarray):
"""
predict probability of each class by averaging over all base estimators
:param X: Data
:return: array of probabilities
"""
probas = []
# for estimator in self.estimators:
# predicts.append(estimator.predict_proba(X))
# get averaged probabilities from each estimator
return self.estimators[0].predict_proba(X)
def predict(self, X):
"""
predict class for each object
:param X: Data
:return: array of class labels
"""
# raise NotImplementedError("COMPLETE THIS FUNCTION")
predicts = []
for estimator in self.estimators:
to_append = estimator.predict(X).reshape(-1,1)
if to_append == -228:
continue
else:
predicts.append(to_append)
try:
stacked_predicts = np.hstack(predicts)
except:
return 2
most_common = []
for row in stacked_predicts:
c = Counter(row)
most_common.append(c.most_common(1)[0][0])
return most_common
class Logger:
"""Logger performs data management and stores scores and other relevant information"""
def __init__(self, logs_path: tp.Union[str, os.PathLike]):
self.path = pathlib.Path(logs_path)
records = []
for root, dirs, files in os.walk(self.path):
for file in files:
if file.lower().endswith('.json'):
uuid = os.path.splitext(file)[0]
with open(os.path.join(root, file), 'r') as f:
try:
logged_data = json.load(f)
records.append(
{
'id': uuid,
**logged_data
}
)
except json.JSONDecodeError:
pass
if records:
self.leaderboard = pd.DataFrame.from_records(records, index='id')
else:
self.leaderboard = pd.DataFrame(index=pd.Index([], name='id'))
self._current_run = None
class Run:
"""Run incapsulates information for a particular entry of logged material. Each run is solitary experiment"""
def __init__(self, name, storage, path):
self.name = name
self._storage = storage
self._path = path
self._storage.append(pd.Series(name=name))
def log(self, key, value):
self._storage.loc[self.name, key] = value
def log_values(self, log_values: tp.Dict[str, tp.Any]):
for key, value in log_values.items():
self.log(key, value)
def save_logs(self):
with open(self._path / f'{self.name}.json', 'w+') as f:
json.dump(self._storage.loc[self.name].to_dict(), f)
def log_artifact(self, fname: str, writer: tp.Callable):
with open(self._path / fname, 'wb+') as f:
writer(f)
@contextlib.contextmanager
def run(self, name: tp.Optional[str] = None):
if name is None:
name = str(uuid.uuid4())
elif name in self.leaderboard.index:
raise NameError("Run with given name already exists, name should be unique")
else:
name = name.replace(' ', '_')
self._current_run = Logger.Run(name, self.leaderboard, self.path / name)
os.makedirs(self.path / name, exist_ok=True)
try:
yield self._current_run
finally:
self._current_run.save_logs()
def load_predictions_dataframe(filename, column_prefix, index):
with open(filename, 'rb') as file:
data = np.load(file)
dataframe = pd.DataFrame(data, columns=[f'{column_prefix}_{i}' for i in range(data.shape[1])],
index=index)
return dataframe
class ExperimentHandler:
"""This class perfoms experiments with given model, measures metrics and logs everything for thorough comparison"""
stacking_prediction_filename = 'cv_stacking_prediction.npy'
test_stacking_prediction_filename = 'test_stacking_prediction.npy'
def __init__(
self,
X_train: pd.DataFrame,
y_train: pd.Series,
X_test: pd.DataFrame,
y_test: pd.Series,
cv_iterable: tp.Iterable,
logger: Logger,
metrics: tp.Dict[str, tp.Union[tp.Callable, str]],
n_jobs=-1
):
self.X_train = X_train
self.y_train = y_train
self.X_test = X_test
self.y_test = y_test
self._cv_iterable = cv_iterable
self.logger = logger
self._metrics = metrics
self._n_jobs = n_jobs
def score_test(self, estimator, metrics, run, test_data=None):
"""
Computes scores for test data and logs them to given run
:param estimator: fitted estimator
:param metrics: metrics to compute
:param run: run to log into
:param test_data: optional argument if one wants to pass augmented test dataset
:return: None
"""
if test_data is None:
test_data = self.X_test
test_scores = _score(estimator, test_data, self.y_test, metrics)
run.log_values({key + '_test': value for key, value in test_scores.items()})
def score_cv(self, estimator, metrics, run):
"""
computes scores on cross-validation
:param estimator: estimator to fit
:param metrics: metrics to compute
:param run: run to log to
:return: None
"""
# raise NotImplementedError("COMPLETE THIS FUNCTION")
cross_val_results = cross_validate(estimator = estimator, X = self.X_train, y = self.y_train, scoring = metrics, cv = self._cv_iterable)
for key, value in cross_val_results.items():
if key.startswith('test_'):
metric_name = key.split('_', maxsplit=1)[1]
mean_score = np.mean(value)
std_score = np.std(value)
run.log_values(
{
metric_name + '_mean': mean_score,
metric_name + '_std': std_score
}
)
def generate_stacking_predictions(self, estimator, run):
"""
generates predictions over cross-validation folds, then saves them as artifacts
returns fitted estimator for convinience and les train overhead
:param estimator: estimator to use
:param run: run to log to
:return: estimator fitted on train, stacking cross-val predictions, stacking test predictions
"""
# raise NotImplementedError("COMPLETE THIS FUNCTION")
if hasattr(estimator, "predict_proba"): # choose the most informative method for stacking predictions
method = "predict_proba"
elif hasattr(estimator, "decision_function"):
method = "decision_function"
else:
method = "predict"
cross_val_stacking_prediction = ... # generate crossval predictions for stacking using most informative method
stack = []
for train_idx, test_idx in self._cv_iterable.split(self.X_train, self.y_train):
estimator.fit(self.X_train[train_idx], self.y_train[train_idx])
new_pred = getattr(estimator, method)(self.X_train[test_idx])
stack.extend(list(zip(test_idx, new_pred)))
# stack.extend(new_pred)
cross_val_stacking_prediction = [v for k,v in sorted(stack, key=lambda x : x[0])]
# cross_val_stacking_prediction = stack
run.log_artifact(ExperimentHandler.stacking_prediction_filename,
lambda file: np.save(file, cross_val_stacking_prediction))
estimator.fit(self.X_train, self.y_train)
test_stacking_prediction = getattr(estimator, method)(self.X_test)
run.log_artifact(ExperimentHandler.test_stacking_prediction_filename,
lambda file: np.save(file, test_stacking_prediction))
return estimator, cross_val_stacking_prediction, test_stacking_prediction
def get_metrics(self, estimator):
"""
get callable metrics with estimator validation
(e.g. estimator has predict_proba necessary for likelihood computation, etc)
"""
return _check_multimetric_scoring(estimator, self._metrics)
def run(self, estimator: sklearn.base.BaseEstimator, name=None):
"""
perform run for given estimator
:param estimator: estimator to use
:param name: name of run for convinience and consitent logging
:return: leaderboard with conducted run
"""
metrics = self.get_metrics(estimator)
with self.logger.run(name=name) as run:
# compute predictions over cross-validation
self.score_cv(estimator, metrics, run)
fitted_on_train, _, _ = self.generate_stacking_predictions(estimator, run)
self.score_test(fitted_on_train, metrics, run, test_data=self.X_test)
return self.logger.leaderboard.loc[[run.name]]
def get_stacking_predictions(self, run_names):
"""
:param run_names: run names for which to extract stacking predictions for averaging and stacking
:return: dataframe with predictions indexed by run names
"""
train_dataframes = []
test_dataframes = []
for run_name in run_names:
train_filename = self.logger.path / run_name / ExperimentHandler.stacking_prediction_filename
train_dataframes.append(load_predictions_dataframe(filename=train_filename, column_prefix=run_name,
index=self.X_train.index))
test_filename = self.logger.path / run_name / ExperimentHandler.test_stacking_prediction_filename
test_dataframes.append(load_predictions_dataframe(filename=test_filename, column_prefix=run_name,
index=self.X_test.index))
return pd.concat(train_dataframes, axis=1), pd.concat(test_dataframes, axis=1)
Editor is loading...