  • 《机器学习Python实现_09_02_决策树_CART》


    CART树即分类回归树(classification and regression tree),顾名思义,它即能用作分类任务又能用作回归任务,它的应用比较广泛,通常会用作集成学习的基分类器,总得来说,它与ID3/C4.5有如下不同:






    [Gini(p)=sum_{k=1}^Kp_k(1-p_k)=1-sum_{k=1}^Kp_k^2 ]


    [Gini(D)=1-sum_{k=1}^K(frac{mid C_k mid}{mid D mid})^2 ]


    [D_1={(x,y)in Dmid A(x)=a},D_2=D-D_1 ]


    [Gini(D,A,a)=frac{mid D_1 mid}{mid D mid}Gini(D_1)+frac{mid D_2 mid}{mid D mid}Gini(D_2),这里D_1={(x,y)in Dmid A(x)=a},D_2=D-D_1 ]



    import numpy as np
    def gini(x, sample_weight=None):
        计算基尼系数 Gini(D)
        :param x:
        :param sample_weight:
        x_num = len(x)
        # 如果sample_weight为None设均设置一样
        if sample_weight is None:
            sample_weight = np.asarray([1.0] * x_num)
        x_counter = {}
        weight_counter = {}
        # 统计各x取值出现的次数以及其对应的sample_weight列表
        for index in range(0, x_num):
            x_value = x[index]
            if x_counter.get(x_value) is None:
                x_counter[x_value] = 0
                weight_counter[x_value] = []
            x_counter[x_value] += 1
        # 计算gini系数
        gini_value = 1.0
        for key, value in x_counter.items():
            p_i = 1.0 * value * np.mean(weight_counter.get(key)) / x_num
            gini_value -= p_i * p_i
        return gini_value
    def cond_gini(x, y, sample_weight=None):
        x = np.asarray(x)
        y = np.asarray(y)
        # x中元素个数
        x_num = len(x)
        # 如果sample_weight为None设均设置一样
        if sample_weight is None:
            sample_weight = np.asarray([1.0] * x_num)
        # 计算
        gini_value = .0
        for x_value in set(x):
            x_index = np.where(x == x_value)
            new_x = x[x_index]
            new_y = y[x_index]
            new_sample_weight = sample_weight[x_index]
            p_i = 1.0 * len(new_x) / x_num
            gini_value += p_i * gini(new_y, new_sample_weight)
        return gini_value
    def gini_gain(x, y, sample_weight=None):
        x_num = len(x)
        if sample_weight is None:
            sample_weight = np.asarray([1.0] * x_num)
        return gini(y, sample_weight) - cond_gini(x, y, sample_weight)
    import os
    from ml_models import utils
    from ml_models.wrapper_models import DataBinWrapper
    class CARTClassifier(object):
        class Node(object):
            def __init__(self, feature_index: int = None, feature_value=None, target_distribute: dict = None,
                         weight_distribute: dict = None,
                         left_child_node=None, right_child_node=None, num_sample: int = None):
                :param feature_index: 特征id
                :param feature_value: 特征取值
                :param target_distribute: 目标分布
                :param weight_distribute:权重分布
                :param left_child_node: 左孩子结点
                :param right_child_node: 右孩子结点
                :param num_sample:样本量
                self.feature_index = feature_index
                self.feature_value = feature_value
                self.target_distribute = target_distribute
                self.weight_distribute = weight_distribute
                self.left_child_node = left_child_node
                self.right_child_node = right_child_node
                self.num_sample = num_sample
        def __init__(self, criterion='gini', max_depth=None, min_samples_split=2, min_samples_leaf=1,
                     min_impurity_decrease=0, max_bins=10):
            :param criterion:划分标准,默认为gini,另外entropy表示用信息增益比
            :param max_depth:树的最大深度
            :param min_samples_split:当对一个内部结点划分时,要求该结点上的最小样本数,默认为2
            :param min_samples_leaf:设置叶子结点上的最小样本数,默认为1
            :param min_impurity_decrease:打算划分一个内部结点时,只有当划分后不纯度(可以用criterion参数指定的度量来描述)减少值不小于该参数指定的值,才会对该结点进行划分,默认值为0
            self.criterion = criterion
            if criterion == 'gini':
                self.criterion_func = utils.gini_gain
                self.criterion_func = utils.info_gain_rate
            self.max_depth = max_depth
            self.min_samples_split = min_samples_split
            self.min_samples_leaf = min_samples_leaf
            self.min_impurity_decrease = min_impurity_decrease
            self.root_node: self.Node = None
            self.dbw = DataBinWrapper(max_bins=max_bins)
        def _build_tree(self, current_depth, current_node: Node, x, y, sample_weight):
            :param x:
            :param y:
            :param sample_weight:
            rows, cols = x.shape
            # 计算y分布以及其权重分布
            target_distribute = {}
            weight_distribute = {}
            for index, tmp_value in enumerate(y):
                if tmp_value not in target_distribute:
                    target_distribute[tmp_value] = 0.0
                    weight_distribute[tmp_value] = []
                target_distribute[tmp_value] += 1.0
            for key, value in target_distribute.items():
                target_distribute[key] = value / rows
                weight_distribute[key] = np.mean(weight_distribute[key])
            current_node.target_distribute = target_distribute
            current_node.weight_distribute = weight_distribute
            current_node.num_sample = rows
            # 判断停止切分的条件
            if len(target_distribute) <= 1:
            if rows < self.min_samples_split:
            if self.max_depth is not None and current_depth > self.max_depth:
            # 寻找最佳的特征以及取值
            best_index = None
            best_index_value = None
            best_criterion_value = 0
            for index in range(0, cols):
                for index_value in set(x[:, index]):
                    criterion_value = self.criterion_func((x[:, index] == index_value).astype(int), y, sample_weight)
                    if criterion_value > best_criterion_value:
                        best_criterion_value = criterion_value
                        best_index = index
                        best_index_value = index_value
            # 如果criterion_value减少不够则停止
            if best_index is None:
            if best_criterion_value <= self.min_impurity_decrease:
            # 切分
            current_node.feature_index = best_index
            current_node.feature_value = best_index_value
            selected_x = x[:, best_index]
            # 创建左孩子结点
            left_selected_index = np.where(selected_x == best_index_value)
            # 如果切分后的点太少,以至于都不能做叶子节点,则停止分割
            if len(left_selected_index[0]) >= self.min_samples_leaf:
                left_child_node = self.Node()
                current_node.left_child_node = left_child_node
                self._build_tree(current_depth + 1, left_child_node, x[left_selected_index], y[left_selected_index],
            # 创建右孩子结点
            right_selected_index = np.where(selected_x != best_index_value)
            # 如果切分后的点太少,以至于都不能做叶子节点,则停止分割
            if len(right_selected_index[0]) >= self.min_samples_leaf:
                right_child_node = self.Node()
                current_node.right_child_node = right_child_node
                self._build_tree(current_depth + 1, right_child_node, x[right_selected_index], y[right_selected_index],
        def fit(self, x, y, sample_weight=None):
            # check sample_weight
            n_sample = x.shape[0]
            if sample_weight is None:
                sample_weight = np.asarray([1.0] * n_sample)
            # check sample_weight
            if len(sample_weight) != n_sample:
                raise Exception('sample_weight size error:', len(sample_weight))
            # 构建空的根节点
            self.root_node = self.Node()
            # 对x分箱
            # 递归构建树
            self._build_tree(1, self.root_node, self.dbw.transform(x), y, sample_weight)
        # 检索叶子节点的结果
        def _search_node(self, current_node: Node, x, class_num):
            if current_node.left_child_node is not None and x[current_node.feature_index] == current_node.feature_value:
                return self._search_node(current_node.left_child_node, x, class_num)
            elif current_node.right_child_node is not None and x[current_node.feature_index] != current_node.feature_value:
                return self._search_node(current_node.right_child_node, x, class_num)
                result = []
                total_value = 0.0
                for index in range(0, class_num):
                    value = current_node.target_distribute.get(index, 0) * current_node.weight_distribute.get(index, 1.0)
                    total_value += value
                # 归一化
                for index in range(0, class_num):
                    result[index] = result[index] / total_value
                return result
        def predict_proba(self, x):
            # 计算结果概率分布
            x = self.dbw.transform(x)
            rows = x.shape[0]
            results = []
            class_num = len(self.root_node.target_distribute)
            for row in range(0, rows):
                results.append(self._search_node(self.root_node, x[row], class_num))
            return np.asarray(results)
        def predict(self, x):
            return np.argmax(self.predict_proba(x), axis=1)
        def _prune_node(self, current_node: Node, alpha):
            # 如果有子结点,先对子结点部分剪枝
            if current_node.left_child_node is not None:
                self._prune_node(current_node.left_child_node, alpha)
            if current_node.right_child_node is not None:
                self._prune_node(current_node.right_child_node, alpha)
            # 再尝试对当前结点剪枝
            if current_node.left_child_node is not None or current_node.right_child_node is not None:
                # 避免跳层剪枝
                for child_node in [current_node.left_child_node, current_node.right_child_node]:
                    # 当前剪枝的层必须是叶子结点的层
                    if child_node.left_child_node is not None or child_node.right_child_node is not None:
                # 计算剪枝的前的损失值
                pre_prune_value = alpha * 2
                for child_node in [current_node.left_child_node, current_node.right_child_node]:
                    for key, value in child_node.target_distribute.items():
                        pre_prune_value += -1 * child_node.num_sample * value * np.log(
                            value) * child_node.weight_distribute.get(key, 1.0)
                # 计算剪枝后的损失值
                after_prune_value = alpha
                for key, value in current_node.target_distribute.items():
                    after_prune_value += -1 * current_node.num_sample * value * np.log(
                        value) * current_node.weight_distribute.get(key, 1.0)
                if after_prune_value <= pre_prune_value:
                    # 剪枝操作
                    current_node.left_child_node = None
                    current_node.right_child_node = None
                    current_node.feature_index = None
                    current_node.feature_value = None
        def prune(self, alpha=0.01):
            决策树剪枝 C(T)+alpha*|T|
            :param alpha:
            # 递归剪枝
            self._prune_node(self.root_node, alpha)
    from sklearn.datasets import make_classification
    data, target = make_classification(n_samples=100, n_features=2, n_classes=2, n_informative=1, n_redundant=0,
                                       n_repeated=0, n_clusters_per_class=1, class_sep=.5,random_state=21)
    tree = CARTClassifier()
    tree.fit(data, target)
    utils.plot_decision_function(data, target, tree)



    utils.plot_decision_function(data, target, tree)



    回归树的特征选择是使用的平方误差,即选择一个特征(j)和一个取值(s),将训练集按(X^jleq s)(X^j>s)分为两部分,寻找使这两部分的误差平方之和下降最多的(j,s),这个过程可以描述如下:

    [min_{j,s}[min_{c_1}sum_{x_iin R_1(j,s)}(y_i-c_1)^2+min_{c_2}sum_{x_iin R_2(j,s)}(y_i-c_2)^2] ]

    这里(R_1(j,s)={xmid x^jleq s},R_2(j,s)={xmid x^j> s},c_1=ave(y_imid x_iin R_1(j,s)),c_2=ave(y_imid x_iin R_2(j,s)))


    def square_error(x, sample_weight=None):
        :param x:
        :param sample_weight:
        x = np.asarray(x)
        x_mean = np.mean(x)
        x_num = len(x)
        if sample_weight is None:
            sample_weight = np.asarray([1.0] * x_num)
        error = 0.0
        for index in range(0, x_num):
            error += (x[index] - x_mean) * (x[index] - x_mean) * sample_weight[index]
        return error
    def cond_square_error(x, y, sample_weight=None):
        :param x:
        :param y:
        :param sample_weight:
        x = np.asarray(x)
        y = np.asarray(y)
        # x中元素个数
        x_num = len(x)
        # 如果sample_weight为None设均设置一样
        if sample_weight is None:
            sample_weight = np.asarray([1.0] * x_num)
        # 计算
        error = .0
        for x_value in set(x):
            x_index = np.where(x == x_value)
            new_y = y[x_index]
            new_sample_weight = sample_weight[x_index]
            error += square_error(new_y, new_sample_weight)
        return error
    def square_error_gain(x, y, sample_weight=None):
        :param x:
        :param y:
        :param sample_weight:
        x_num = len(x)
        if sample_weight is None:
            sample_weight = np.asarray([1.0] * x_num)
        return square_error(y, sample_weight) - cond_square_error(x, y, sample_weight)
    class CARTRegressor(object):
        class Node(object):
            def __init__(self, feature_index: int = None, feature_value=None, y_hat=None, square_error=None,
                         left_child_node=None, right_child_node=None, num_sample: int = None):
                :param feature_index: 特征id
                :param feature_value: 特征取值
                :param y_hat: 预测值
                :param square_error: 当前结点的平方误差
                :param left_child_node: 左孩子结点
                :param right_child_node: 右孩子结点
                :param num_sample:样本量
                self.feature_index = feature_index
                self.feature_value = feature_value
                self.y_hat = y_hat
                self.square_error = square_error
                self.left_child_node = left_child_node
                self.right_child_node = right_child_node
                self.num_sample = num_sample
        def __init__(self, criterion='mse', max_depth=None, min_samples_split=2, min_samples_leaf=1, min_std=1e-3,
                     min_impurity_decrease=0, max_bins=10):
            :param criterion:划分标准,目前仅有平方误差
            :param max_depth:树的最大深度
            :param min_samples_split:当对一个内部结点划分时,要求该结点上的最小样本数,默认为2
            :param min_std:最小的标准差
            :param min_samples_leaf:设置叶子结点上的最小样本数,默认为1
            :param min_impurity_decrease:打算划分一个内部结点时,只有当划分后不纯度(可以用criterion参数指定的度量来描述)减少值不小于该参数指定的值,才会对该结点进行划分,默认值为0
            self.criterion = criterion
            if criterion == 'mse':
                self.criterion_func = utils.square_error_gain
            self.max_depth = max_depth
            self.min_samples_split = min_samples_split
            self.min_samples_leaf = min_samples_leaf
            self.min_std = min_std
            self.min_impurity_decrease = min_impurity_decrease
            self.root_node: self.Node = None
            self.dbw = DataBinWrapper(max_bins=max_bins)
        def _build_tree(self, current_depth, current_node: Node, x, y, sample_weight):
            :param x:
            :param y:
            :param sample_weight:
            rows, cols = x.shape
            # 计算当前y的加权平均值
            current_node.y_hat = np.dot(sample_weight / np.sum(sample_weight), y)
            current_node.num_sample = rows
            # 判断停止切分的条件
            current_node.square_error = np.dot(y - np.mean(y), y - np.mean(y))
            if np.sqrt(current_node.square_error / rows) <= self.min_std:
            if rows < self.min_samples_split:
            if self.max_depth is not None and current_depth > self.max_depth:
            # 寻找最佳的特征以及取值
            best_index = None
            best_index_value = None
            best_criterion_value = 0
            for index in range(0, cols):
                for index_value in sorted(set(x[:, index])):
                    criterion_value = self.criterion_func((x[:, index] <= index_value).astype(int), y, sample_weight)
                    if criterion_value > best_criterion_value:
                        best_criterion_value = criterion_value
                        best_index = index
                        best_index_value = index_value
            # 如果criterion_value减少不够则停止
            if best_index is None:
            if best_criterion_value <= self.min_impurity_decrease:
            # 切分
            current_node.feature_index = best_index
            current_node.feature_value = best_index_value
            selected_x = x[:, best_index]
            # 创建左孩子结点
            left_selected_index = np.where(selected_x <= best_index_value)
            # 如果切分后的点太少,以至于都不能做叶子节点,则停止分割
            if len(left_selected_index[0]) >= self.min_samples_leaf:
                left_child_node = self.Node()
                current_node.left_child_node = left_child_node
                self._build_tree(current_depth + 1, left_child_node, x[left_selected_index], y[left_selected_index],
            # 创建右孩子结点
            right_selected_index = np.where(selected_x > best_index_value)
            # 如果切分后的点太少,以至于都不能做叶子节点,则停止分割
            if len(right_selected_index[0]) >= self.min_samples_leaf:
                right_child_node = self.Node()
                current_node.right_child_node = right_child_node
                self._build_tree(current_depth + 1, right_child_node, x[right_selected_index], y[right_selected_index],
        def fit(self, x, y, sample_weight=None):
            # check sample_weight
            n_sample = x.shape[0]
            if sample_weight is None:
                sample_weight = np.asarray([1.0] * n_sample)
            # check sample_weight
            if len(sample_weight) != n_sample:
                raise Exception('sample_weight size error:', len(sample_weight))
            # 构建空的根节点
            self.root_node = self.Node()
            # 对x分箱
            # 递归构建树
            self._build_tree(1, self.root_node, self.dbw.transform(x), y, sample_weight)
        # 检索叶子节点的结果
        def _search_node(self, current_node: Node, x):
            if current_node.left_child_node is not None and x[current_node.feature_index] <= current_node.feature_value:
                return self._search_node(current_node.left_child_node, x)
            elif current_node.right_child_node is not None and x[current_node.feature_index] > current_node.feature_value:
                return self._search_node(current_node.right_child_node, x)
                return current_node.y_hat
        def predict(self, x):
            # 计算结果概率分布
            x = self.dbw.transform(x)
            rows = x.shape[0]
            results = []
            for row in range(0, rows):
                results.append(self._search_node(self.root_node, x[row]))
            return np.asarray(results)
        def _prune_node(self, current_node: Node, alpha):
            # 如果有子结点,先对子结点部分剪枝
            if current_node.left_child_node is not None:
                self._prune_node(current_node.left_child_node, alpha)
            if current_node.right_child_node is not None:
                self._prune_node(current_node.right_child_node, alpha)
            # 再尝试对当前结点剪枝
            if current_node.left_child_node is not None or current_node.right_child_node is not None:
                # 避免跳层剪枝
                for child_node in [current_node.left_child_node, current_node.right_child_node]:
                    # 当前剪枝的层必须是叶子结点的层
                    if child_node.left_child_node is not None or child_node.right_child_node is not None:
                # 计算剪枝的前的损失值
                pre_prune_value = alpha * 2 + 
                                  (0.0 if current_node.left_child_node.square_error is None else current_node.left_child_node.square_error) + 
                                  (0.0 if current_node.right_child_node.square_error is None else current_node.right_child_node.square_error)
                # 计算剪枝后的损失值
                after_prune_value = alpha + current_node.square_error
                if after_prune_value <= pre_prune_value:
                    # 剪枝操作
                    current_node.left_child_node = None
                    current_node.right_child_node = None
                    current_node.feature_index = None
                    current_node.feature_value = None
                    current_node.square_error = None
        def prune(self, alpha=0.01):
            决策树剪枝 C(T)+alpha*|T|
            :param alpha:
            # 递归剪枝
            self._prune_node(self.root_node, alpha)
    data = np.linspace(1, 10, num=100)
    target = np.sin(data) + np.random.random(size=100)#添加噪声
    data = data.reshape((-1, 1))
    tree = CARTRegressor(max_bins=50)
    tree.fit(data, target)
    import matplotlib.pyplot as plt
    plt.scatter(data, target)
    plt.plot(data, tree.predict(data), color='r')
    [<matplotlib.lines.Line2D at 0x221783ed9b0>]


    plt.scatter(data, target)
    plt.plot(data, tree.predict(data), color='r')
    [<matplotlib.lines.Line2D at 0x221783fcb70>]


