zoukankan      html  css  js  c++  java
  • 决策树

    因为自己平时写计算的代码比较多,很少写树结构的(决策树算法的实现零零整整花了近两周),所以数据结构和代码效率上待优化的地方应该还有很多,仅提供给大家借鉴。代码建议在文本编辑器上浏览。

    类的定义

    自己网上搜到的代码很多都没有定义类,我自己对于这样的代码很是不喜欢,一点点看算法流程是看不下去的。因为是树,所以一定存在节点类和树类。

    • 节点类
      • entropy分支前的交叉熵
      • label当前节点标签
      • divide_info存储分支信息
      • except_error剪枝用
        好吧,下面两个set函数好像没什么必要
    class Node():
    	def __init__(self, entropy=-1, label=None, divide_info=None):
            self.entropy = entropy
            self.label = label
            self.divide_info = divide_info
            self.left_child = None
            self.right_child = None
            self.except_error = None
    
        def set_left(self, node):
            self.left_child = node
    
        def set_right(self, node):
            self.right_child = node
    
    • 树类
      • 实现了树的广度优先遍历和查询节点所在层数两个功能,两个功能实现方法差不多。
    class Tree():
        def __init__(self):
            self.root = Node()
    
        def breadth_traversal(self):
            queue = []
            result = []
            if self.root.entropy == -1:
                return
            else:
                queue.append(self.root)
                while queue:
                    next_queue = []
                    next_result = []
                    for node in queue:
                        next_result.append(node.label)
                        if node.left_child:
                            next_queue.append(node.left_child)
                            next_queue.append(node.right_child)
                    result.append(next_result)
                    queue = next_queue
            for i in range(len(result)):
                print("第", i, "层信息", result[i])
            return
    	
    	
        def query_layer(self, node):
            if self.root.entropy != -1:
                queue = [self.root]
                layer = 0
                while queue:
                    next_queue = []
                    for i in queue:
                        if i == node:
                            return layer
                        if i.left_child:
                            next_queue.append(i.left_child)
                            next_queue.append(i.right_child)
                    layer = layer +1
                    queue = next_queue
                return
            else:
                return
    
    • 决策树类
      决策树类的代码有点长,一步一步贴代码吧。
      • 初始化函数
        初始化嘛,初始化一个树就好了。
      class Decision_tree():
      
          def __init__(self):
              self.tree = Tree()
      
      • fit函数——根据数据构造树
        因为用的方法是递归的方法,所以有必要加一个node参数,告诉它的位置在哪里。其他参数的话看看名字应该就知道什么意思,就不说了。
        里面调用了返回分支信息的find_variable函数和计算熵的entropy函数,这两个函数放在最后说。
      def fit(self, X, y, minimum_entropy=0, cur_node=None, maximum_layer = np.inf):
          """ build decision tree according to data
      
          param:
              X:features, array_like
              y:label, array_like
              minimun_entropy:minimum entropy difference allowed
              cur_node: current node
              maximum_layer: maximumn layer allowed, count from 0
          """
          X = pd.DataFrame(X)
          y = pd.DataFrame(y)
          minimum_entropy = minimum_entropy
          maximum_layer = maximum_layer
          current_node = cur_node
          if cur_node:
              cur_layer = self.tree.query_layer(cur_node)
              if cur_layer >= maximum_layer:
                  return
      
          label = y.iloc[:, 0].value_counts().index[0]
          current_entropy = entropy(y)
      
          divided_info, left_index, right_index, divide_column = find_variable(X, y, minimum_entropy)
      
          if self.tree.root.entropy == -1:
              if divided_info == None:
                  print("无需决策分类")
              else:
                  self.tree.root = Node(entropy=current_entropy, label=label, divide_info=divided_info)
                  X_next = X[X.columns.difference([divide_column])]
                  X_left = X_next[left_index]
                  y_left = y[left_index]
                  X_right = X_next[right_index]
                  y_right = y[right_index]
                  self.fit(X_left, y_left, minimum_entropy,self.tree.root, maximum_layer)
                  self.fit(X_right, y_right, minimum_entropy, self.tree.root, maximum_layer)
          else:
              if divided_info == None:
                  if cur_node.left_child == None:
                      cur_node.left_child = Node(entropy=None, label=label)
                      return
                  else:
                      cur_node.right_child = Node(entropy=None, label=label)
                      return
              else:
                  X_next = X[X.columns.difference([divide_column])]
                  X_left = X_next[left_index]
                  y_left = y[left_index]
                  X_right = X_next[right_index]
                  y_right = y[right_index]
                  if cur_node.left_child == None:
                      cur_node.set_left(Node(current_entropy, label, divided_info))
                      self.fit(X_left, y_left,minimum_entropy, cur_node.left_child, maximum_layer)
                      self.fit(X_right, y_right, minimum_entropy, cur_node.left_child, maximum_layer)
                  else:
                      cur_node.set_right(Node(current_entropy, label, divided_info))
                      self.fit(X_left, y_left, minimum_entropy, cur_node.right_child, maximum_layer)
                      self.fit(X_right, y_right, minimum_entropy, cur_node.right_child, maximum_layer)
      
      • post_pruning——后剪枝函数
        函数调用了add_error函数为每个节点添加悲观的期望错误数信息,添加后再从下往上进行剪枝。这个从下往上剪枝的过程借鉴了广度优先遍历的思想。同样利用到的error_rate函数放到了最后面。
      def post_pruning(self, X_test, y_test):
          self.add_error(X_test, y_test)
          queue = [self.tree.root]
          node_layer = [queue]
          while queue:
              next_queue = []
              for node in queue:
                  if node.divide_info:
                      next_queue.append(node.left_child)
                      next_queue.append(node.right_child)
              node_layer.append(next_queue)
              queue = next_queue
          for i in range(len(node_layer)-2, -1, -1):
              current_layer = node_layer[i]
              for node in current_layer:
                  if node.divide_info:
                      if node.except_error < node.left_child.except_error + node.right_child.except_error:
                          node.left_child = None
                          node.right_child = None
                          node.divide_info = None
      
      • predict——预测函数
        这个函数的实现也是用了递归,要注意的一点就是一定要把函数返回结果传给X["predict"]
      def predict(self, X, y, node=None):
          X = pd.DataFrame(X)
          y = pd.DataFrame(y)
          if node==None:
              cur_node = self.tree.root
              X["predict"] = y.copy()
              if self.tree.root.divide_info:
                  column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"]
                  # 判定变量是分类还是连续
                  if len(X[column].unique()) <= 4:
                      left_index = (X[column] == value).values
                  else:
                      left_index = (X[column] <= value).values
                  right_index = ~left_index
                  X_left, y_left = X[left_index], y[left_index]
                  X_right, y_right = X[right_index], y[right_index]
                  X[left_index]["predict"] = cur_node.left_child.label
                  X[right_index]["predict"] = cur_node.right_child.label
                  X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child)
                  X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child)
              else:
                      return
          else:
              cur_node = node
              if cur_node.divide_info:
                  column, value = cur_node.divide_info["column"], cur_node.divide_info["value"]
                  if len(X[column].unique()) <= 4:
                      left_index = (X[column] == value).values
                  else:
                      left_index = (X[column] <= value).values
                  right_index = ~left_index
                  X_left, y_left = X[left_index], y[left_index]
                  X_right, y_right = X[right_index], y[right_index]
                  X[left_index]["predict"] = cur_node.left_child.label
                  X[right_index]["predict"] = cur_node.right_child.label
                  X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child)
                  X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child)
              else:
                  return
          return X["predict"]
      
      • add_rate——剪枝辅助函数
        这个算法也是递归实现的(好吧,我喜欢递归)。或许你注意到了判断变量是否连续的判断条件(如果你没有注意到,那你现在需要注意到了),它可能不是精确的,如果你有更好判断变量类型的方法欢迎你告诉我。
      def add_error(self, X_test, y_test, node=None):
          X = pd.DataFrame(X_test)
          y = pd.DataFrame(y_test)
          N = len(X)
          if node == None:
              cur_node = self.tree.root
              error = error_rate(y, cur_node.label)
              cur_node.except_error = N * (error + 1.15 * np.sqrt(error * (1 - error) / N))
              if self.tree.root.divide_info:
                  column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"]
                  # 判定变量是分类还是连续
                  if len(X[column].unique()) <= 4:
                      left_index = (X[column] == value).values
                  else:
                      left_index = (X[column] <= value).values
                  right_index = ~left_index
                  X_left, y_left = X[left_index], y[left_index]
                  X_right, y_right = X[right_index], y[right_index]
                  self.add_error(X_left, y_left, cur_node.left_child)
                  self.add_error(X_right, y_right, cur_node.right_child)
              else:
                  return
          else:
              cur_node = node
              error = error_rate(y, cur_node.label)
              cur_node.except_error = error + 1.15 * np.sqrt(error * (1 - error) / N)
              if cur_node.divide_info:
                  column, value = cur_node.divide_info["column"], cur_node.divide_info["value"]
                  if len(X[column].unique()) <= 4:
                      left_index = (X[column] == value).values
                  else:
                      left_index = (X[column] <= value).values
                  right_index = ~left_index
                  X_left, y_left = X[left_index], y[left_index]
                  X_right, y_right = X[right_index], y[right_index]
                  self.add_error(X_left, y_left, cur_node.left_child)
                  self.add_error(X_right, y_right, cur_node.right_child)
              else:
                  return
      

    辅助函数

    一共也就三个辅助函数:计算熵、查找分支信息和计算错误率函数。

    • entropy——计算熵
    def entropy(y):
        category = y.apply(pd.value_counts) / len(y)
        return (-category * np.log2(category)).sum()[0]
    
    • find_variable——查找分支信息
      跟前面add_rate函数一样,你需要注意变量类型判断条件。
    def find_variable(X, y, minimum_entropy=0):
        """ find the variable that split the data set getting minmum entropy
    
        param:
            X:array_like
            y:array_like
            minimum_entropy:minimum difference allowed
    
        return:
            variable_name: str, split information
            X_left_index: array_like, left dateset index
            X_right_index:array_like, right dataset index
            column:str or int depending on X's columns, split column
        """
        X = pd.DataFrame(X)
        y = pd.DataFrame(y)
        minimum_entropy = minimum_entropy
    
        assert len(y.iloc[:, 0].unique()) <= 2
        label = y.iloc[:, 0].value_counts().index[0]
        current_entropy = entropy(y)
    
        if (current_entropy == 0) | (X.shape[1] == 0):
            return (None, None, None, None)
    
        min_divide_entropy = np.inf  # 标记最小交叉熵
        divide_variable = None  # 标记分组信息
        divide_column = None  # 标记分组column名称
        X_left_index = None  # 左子树序列
        X_right_index = None  # 右子树序列
    
        for column in X.columns:
            category_index = X.loc[:, column].unique()  # 当前属性的种类索引
            category_number = len(category_index)  # 种类数量
            # 如果变量是0-1变量
            if category_number == 2:
                temp_left_index = (X[column] == category_index[0]).values
                temp_right_index = ~temp_left_index
                temp_y1 = y[temp_left_index]
                temp_y2 = y[temp_right_index]
                temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(
                    temp_y2)  # 计算当前变脸的熵
                if temp_entropy < min_divide_entropy:
                    X_left_index = temp_left_index
                    X_right_index = temp_right_index
                    min_divide_entropy = temp_entropy
                    divide_variable = {"column":column, "value":category_index[0]}
                    divide_column = column
            # 当是无序的分类变量时
            elif (category_number >= 3) & (category_number <= 4):
                best_divide_level = category_index[0]
                best_divide_value_entropy = np.inf
                for level in category_index:
                    set1_index = (X[column] == level).values
                    set2_index = ~set1_index
                    temp_y1 = y[set1_index]
                    temp_y2 = y[set2_index]
                    temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
                    if temp_entropy < best_divide_value_entropy:
                        best_divide_level = level
                        best_divide_value_entropy = temp_entropy
                if best_divide_value_entropy < min_divide_entropy:
                    X_left_index = (X[column] == best_divide_level).values
                    X_right_index = ~X_left_index
                    min_divide_entropy = best_divide_value_entropy
                    divide_variable = {"column":column, "value":best_divide_level}
                    divide_column = column
            # 连续变量
            else:
                all_possible_value = np.sort(X[column].unique())
                best_divide_value = np.inf
                best_divide_value_entropy = np.inf
                for i in all_possible_value:
                    set1_index = (X[column] <= i).values
                    set2_index = ~set1_index
                    temp_y1 = y[set1_index]
                    temp_y2 = y[set2_index]
                    temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
                    if temp_entropy < best_divide_value_entropy:
                        best_divide_value_entropy = temp_entropy
                        best_divide_value = i
                if best_divide_value_entropy < min_divide_entropy:
                    X_left_index = (X[column] <= best_divide_value).values
                    X_right_index = ~X_right_index
                    min_divide_entropy = best_divide_value_entropy
                    divide_variable = {"column":column, "value":best_divide_value}
                    divide_column = column
        if (current_entropy - min_divide_entropy) < minimum_entropy:
            return (None, None, None, None)
        return (divide_variable, X_left_index, X_right_index, divide_column)
    
    • error_rate——计算错误率
    def error_rate(y_true, label):
        y = pd.DataFrame(y_true)
        return len(y[y != label])/len(y)
    

    小结

    到这里,算法基本编完了,可以小测一下:

    if __name__ == "__main__":
        X = pd.DataFrame([["Rainy", "Saturday"],
                          ["Sunny", "Saturday"],
                          ["Windy", "Tuesday"],
                          ["Sunny", "Saturday"],
                          ["Sunny", "Monday"],
                          ["Windy", "Saturday"]], columns=["Weather", "Dow"])
        y = pd.DataFrame([["No"],
                          ["Yes"],
                          ["No"],
                          ["Yes"],
                          ["No"],
                          ["No"]], columns=["Play"])
        decision_tree = Decision_tree()
        decision_tree.fit(X, y, 0.4, maximum_layer=2)
        decision_tree.tree.breadth_traversal()
        decision_tree.post_pruning(X, y)
        decision_tree.tree.breadth_traversal()
        result = decision_tree.predict(X, y)
        print(result)
    

    没想到吧,猛男的测试就是这么的温柔。因为实在是太懒了,所以请你们加大测试力度,如果有错误记得提醒我。
    对于日后想从事数据分析甚至算法岗位的我来说,数据结构有必要补一补了,还有放在文件夹里的C++。

  • 相关阅读:
    Tomcat的startup.bat启动后显示乱码
    SKF密码设备研究
    《网络攻防》第十一周作业
    《网络攻防》第十周作业
    《网络攻防》第九周作业
    Maven环境搭建以及建立Maven项目
    JavaSE (unbound)的问题解决
    对任性孩子,只要做到“四个不要”就可以了
    layui中对table中的数据进行判断(0、1)转换为提示信息
    Asp.Net Core Mvc上Json序列化首字母大小写的问题
  • 原文地址:https://www.cnblogs.com/DemonHunter/p/10820654.html
Copyright © 2011-2022 走看看