выфыфв

 avatar
unknown
python
a year ago
12 kB
5
Indexable
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.special import xlogy


class Node:
    def __init__(self):
        self.type = None
        self.height = 1
        self.entropy = None
        self.n_samples = None

        # if splitting
        self.IG = None
        self.split_feature_id = None
        self.split_feature = None
        self.split_value = None
        self.left = None
        self.right = None

        # if terminating
        self.label = None


class DecisionTreeClassifier:
    def __init__(self, max_depth=None, min_samples_split=2):
        # initialize tree paramters
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split

    def fit(self, X, y):
        # check inputs
        if isinstance(X, pd.DataFrame):
            self.X = X.values
            self.features = X.columns
        elif isinstance(X, np.ndarray):
            if len(X.shape) != 2:
                raise ValueError("X should be of shape (n_objects, n_features)")
            self.X = X
            self.features = np.arange(0, X.shape[1])
        else:
            raise TypeError("X should be pandas DataFrame or numpy ndarray")

        if isinstance(y, pd.DataFrame) or isinstance(y, pd.Series):
            self.y = y.values
        elif isinstance(y, np.ndarray):
            if len(X.shape) != 2:
                raise ValueError("y should be of shape (n_objects,)")
            self.y = y
        else:
            raise TypeError("y should be pandas DataFrame or numpy ndarray")

        idx = np.arange(self.X.shape[0])
        feature_ids = np.arange(self.X.shape[1])
        self.node = None

        # start recursion
        self.node = self._fit_rec(idx, feature_ids, self.node)

    def _entropy(self, idx):
        # count occurencies of each class
        count = np.unique(self.y[idx], return_counts=True)[1]
        # calculate probabilities of each class
        p = count / idx.size
        # calculate entropy
        entropy = -(xlogy(p, p) / np.log(2)).sum()
        return entropy

    def _cond_entropy(self, idx, feature_id):
        # note that here we buil binary tree
        # if some feature takes multiple values
        # we try splitting the tree as follows
        #
        #           target_feature == value
        #                  /   \
        #             yes /     \ no
        #                /       \
        #               V         V
        #
        # and not by bultiple features
        #               target_feature ==
        #              /    /    \    \
        #             /     |     |    \
        #             V     V     V     V
        #          val_1  val_2  val_3  val_4

        # get feature values and corresponding counts of target feature
        feature_vals, val_counts = np.unique(
            self.X[idx, feature_id],
            return_counts=True,
        )
        P = val_counts / idx.size
        # calculate probabilities of each feature value of target feature

        # indices, where target feature is equal to each of the feature values
        equal_idx = [np.intersect1d(np.where(self.X[:, feature_id] == val)[0], idx) for val in feature_vals]
        # indices, where target feature is not equal to each of the feature values
        n_equal_idx = [np.intersect1d(np.where(self.X[:, feature_id] != val)[0], idx) for val in feature_vals]

        # conditional entropies of each value of each feature value
        H_cond = [
            self._entropy(eq_idx) * p + self._entropy(neq_idx) * (1 - p)
            for (eq_idx, neq_idx, p) in zip(equal_idx, n_equal_idx, P)
        ]

        # get best feature value
        best_id = np.argmax(H_cond)
        return (
            feature_vals[best_id],
            H_cond[best_id],
            equal_idx[best_id],
            n_equal_idx[best_id],
        )

    def _info_gain(self, idx, feature_id):
        # claculate information gain if split target feature
        H = self._entropy(idx)

        (
            best_split_val,
            best_H_cond,
            best_equal_idx,
            best_n_equal_idx,
        ) = self._cond_entropy(idx, feature_id)

        best_IG = H - best_H_cond
        return best_split_val, best_IG, best_equal_idx, best_n_equal_idx

    def _get_best_feature(self, idx, feature_ids):
        # get information gains of all features
        best_split_vals, best_IGs, best_equal_idx, best_n_equal_idx = map(
            list,
            zip(
                *[self._info_gain(idx, feature_id) for feature_id in feature_ids],
            ),
        )

        best_feature_idx = np.argmax(best_IGs)

        return (
            feature_ids[best_feature_idx],
            best_split_vals[best_feature_idx],
            best_IGs[best_feature_idx],
            best_equal_idx[best_feature_idx],
            best_n_equal_idx[best_feature_idx],
        )

    def _fit_rec(self, idx, feature_ids, node):
        if not node:
            node = Node()
            node.height = 1

        node.n_samples = idx.size
        # terminating cases
        # 1) all labels left are the same
        if np.unique(self.y[idx]).size == 1:
            node.type = "terminal"
            node.label = self.y[idx][0]
            desc = "N: {}\nclass: {}".format(
                node.n_samples,
                node.label,
            )
            node.desc = desc
            return node

        # 2) no more features to split or not enough samples to split
        if feature_ids.size == 0 or node.n_samples < self.min_samples_split or node.height == self.max_depth:

            node.type = "terminal"
            left_labels, label_counts = np.unique(self.y[idx], return_counts=True)
            best_label = left_labels[np.argmax(label_counts)]
            node.label = best_label
            desc = "N: {}\nclass: {}".format(
                node.n_samples,
                node.label,
            )
            node.desc = desc
            return node

        (
            best_feature_id,
            best_split_value,
            best_IG,
            best_equal_idx,
            best_n_equal_idx,
        ) = self._get_best_feature(idx, feature_ids)

        node.type = "internal"
        node.entropy = self._entropy(idx)
        node.IG = best_IG
        node.split_value = best_split_value
        node.split_feature_id = best_feature_id
        node.split_feature = self.features[best_feature_id]
        desc = "{} == {}\nentropy: {:.2f}\nN: {}".format(
            node.split_feature,
            node.split_value,
            node.entropy,
            node.n_samples,
        )
        node.desc = desc

        left = Node()
        left.height = node.height + 1
        # eliminate feature if all values of target feature are the same in child node
        feature_ids_left = feature_ids
        if len(np.unique(self.X[best_equal_idx, best_feature_id])) == 1:
            feature_ids_left = np.delete(
                feature_ids,
                np.where(feature_ids == best_feature_id)[0],
            )
        left = self._fit_rec(best_equal_idx, feature_ids_left, left)
        node.left = left

        right = Node()
        right.height = node.height + 1
        # eliminate feature if all values of target feature are the same in child node
        feature_ids_right = feature_ids
        if len(np.unique(self.X[best_n_equal_idx, best_feature_id])) == 1:
            feature_ids_right = np.delete(
                feature_ids,
                np.where(feature_ids == best_feature_id)[0],
            )
        right = self._fit_rec(best_n_equal_idx, feature_ids_right, right)
        node.right = right
        return node

    def _predict(self, item, node):
        if node.type == "terminal":
            return node.label
        if item[node.split_feature_id] == node.split_value:
            return self._predict(item, node.left)
        else:
            return self._predict(item, node.right)

    def predict(self, X):
        # check input
        if isinstance(X, pd.DataFrame):
            X = X.values
        elif isinstance(X, np.ndarray):
            if len(X.shape) != 2:
                raise ValueError("X should be of shape (n_objects, n_features)")
        else:
            raise TypeError("X should be pandas DataFrame or numpy ndarray")

        ans = np.zeros(X.shape[0], dtype=self.y.dtype)
        for i in range(X.shape[0]):
            ans[i] = self._predict(X[i], self.node)

        return ans

    # def plot_tree(self):
    #     self.G = nx.DiGraph()
    #     _ = self._traverse(self.node, 0)
    #     pos = self._hierarchy_pos(self.G, 0)
    #     labels = nx.get_node_attributes(self.G, "desc")
    #     plt.figure(figsize=(20, 20))
    #     nx.draw_networkx(
    #         self.G,
    #         pos=pos,
    #         labels=labels,
    #         arrows=True,
    #         bbox=dict(facecolor="white"),
    #     )
        
    # def _hierarchy_pos(self, G, root, levels=None, width=1.0, height=1.0):
    #     TOTAL = "total"
    #     CURRENT = "current"

    #     def make_levels(levels, node=root, currentLevel=0, parent=None):
    #         """Compute the number of nodes for each level"""
    #         if currentLevel not in levels:
    #             levels[currentLevel] = {TOTAL: 0, CURRENT: 0}
    #         levels[currentLevel][TOTAL] += 1
    #         neighbors = G.neighbors(node)
    #         for neighbor in neighbors:
    #             if not neighbor == parent:
    #                 levels = make_levels(levels, neighbor, currentLevel + 1, node)
    #         return levels

    #     def make_pos(pos, node=root, currentLevel=0, parent=None, vert_loc=0):
    #         dx = 1 / levels[currentLevel][TOTAL]
    #         left = dx / 2
    #         pos[node] = ((left + dx * levels[currentLevel][CURRENT]) * width, vert_loc)
    #         levels[currentLevel][CURRENT] += 1
    #         neighbors = G.neighbors(node)
    #         for neighbor in neighbors:
    #             if not neighbor == parent:
    #                 pos = make_pos(
    #                     pos,
    #                     neighbor,
    #                     currentLevel + 1,
    #                     node,
    #                     vert_loc - vert_gap,
    #                 )
    #         return pos

    #     if levels is None:
    #         levels = make_levels({})
    #     else:
    #         levels = {level: {TOTAL: levels[level], CURRENT: 0} for level in levels}
    #     vert_gap = height / (max([level for level in levels]) + 1)
    #     return make_pos({})

    # def _traverse(self, node, node_num):

    #     if node.type == "terminal":
    #         self.G.add_node(
    #             node_num,
    #             type=node.type,
    #             n_samples=node.n_samples,
    #             label=node.label,
    #             desc=node.desc,
    #         )
    #         return node_num
    #     else:
    #         self.G.add_node(
    #             node_num,
    #             type=node.type,
    #             entropy=node.entropy,
    #             n_samples=node.n_samples,
    #             info_gain=node.IG,
    #             split_feature=node.split_feature,
    #             split_value=node.split_value,
    #             desc=node.desc,
    #         )

    #         self.G.add_node(node_num + 1)
    #         self.G.add_edge(node_num, node_num + 1)
    #         last_num = self._traverse(node.left, node_num + 1)

    #         self.G.add_node(last_num + 1)
    #         self.G.add_edge(node_num, last_num + 1)
    #         last_num = self._traverse(node.right, last_num + 1)

    #         return last_num
Editor is loading...
Leave a Comment