zoukankan      html  css  js  c++  java
  • 《机器学习Python实现_03_二分类转多分类的一般实现》

    import numpy as np
    import os
    os.chdir('../')
    import matplotlib.pyplot as plt
    %matplotlib inline
    

    简介

    上一讲我们实现了一个简单二元分类器:LogisticRegression,但通常情况下,我们面对的更多是多分类器的问题,而二分类转多分类的通常做法也很朴素,一般分为两种:one-vs-rest以及one-vs-one。顾名思义,one-vs-rest将多类别中的其中一类作为正类,剩余其他所有类别作为负类,对于n_class类别的分类问题,需要构建(n\_class)种分类器;而one-vs-one是指进行两两分类,这样将会构造(n\_class*(n\_class-1)/2)种分类器,由于实现思路很简单,就直接贴出代码,将多分类实现封装到MultiClassWrapper类,并放到ml_models.wrapper_models

    from ml_models.linear_model import *
    from ml_models.wrapper_models import *
    
    #准备手写数据
    from sklearn.metrics import f1_score
    from sklearn import model_selection
    from sklearn import datasets
    digits = datasets.load_digits()
    data = digits['data']
    target = digits['target']
    X_train, X_test, y_train, y_test = model_selection.train_test_split(data, target, test_size=0.3,
                                                                        random_state=0)
    
    #构建初始模型
    lr = LogisticRegression()
    
    #进行one-vs-rest训练并评估
    ovr = MultiClassWrapper(lr, mode='ovr')
    ovr.fit(X_train, y_train)
    
    y = ovr.predict(X_test)
    print('ovr:', f1_score(y_test, y, average='macro'))
    
    ovr: 0.9492701335705958
    
    #进行one-vs-one训练并评估
    ovo = MultiClassWrapper(lr, mode='ovo')
    ovo.fit(X_train, y_train)
    
    y = ovo.predict(X_test)
    print('ovo:', f1_score(y_test, y, average='macro'))
    
    ovo: 0.959902103714483
    

    MultiClassWrapper类实现细节

    import threading
    import copy
    import numpy as np
    
    """
    继承Thread,获取函数的返回值
    """
    
    
    class MyThread(threading.Thread):
        def __init__(self, target, args, kwargs, name=''):
            threading.Thread.__init__(self)
            self.name = name
            self.target = target
            self.args = args
            self.kwargs = kwargs
            self.result = self.target(*self.args, **self.kwargs)
    
        def get_result(self):
            try:
                return self.result
            except:
                return None
    
    
    class MultiClassWrapper(object):
        def __init__(self, base_classifier, mode='ovr'):
            """
            :param base_classifier: 实例化后的分类器
            :param mode: 'ovr'表示one-vs-rest方式,'ovo'表示one-vs-one方式
            """
            self.base_classifier = base_classifier
            self.mode = mode
    
        @staticmethod
        def fit_base_classifier(base_classifier, x, y, **kwargs):
            base_classifier.fit(x, y, **kwargs)
    
        @staticmethod
        def predict_proba_base_classifier(base_classifier, x):
            return base_classifier.predict_proba(x)
    
        def fit(self, x, y, **kwargs):
            # 对y分组并行fit
            self.n_class = np.max(y)
            if self.mode == 'ovr':
                # 打包数据
                self.classifiers = []
    
                for cls in range(0, self.n_class + 1):
                    self.classifiers.append(copy.deepcopy(self.base_classifier))
                # 并行训练
                tasks = []
                for cls in range(len(self.classifiers)):
                    task = MyThread(target=self.fit_base_classifier,
                                    args=(self.classifiers[cls], x, (y == cls).astype('int')), kwargs=kwargs)
                    task.start()
                    tasks.append(task)
                for task in tasks:
                    task.join()
            elif self.mode == "ovo":
                # 打包数据
                self.classifiers = {}
                for first_cls in range(0, self.n_class):
                    for second_cls in range(first_cls + 1, self.n_class + 1):
                        self.classifiers[(first_cls, second_cls)] = copy.deepcopy(self.base_classifier)
                # 并行训练
                tasks = {}
                for first_cls in range(0, self.n_class):
                    for second_cls in range(first_cls + 1, self.n_class + 1):
                        index = np.where(y == first_cls)[0].tolist() + np.where(y == second_cls)[0].tolist()
                        new_x = x[index, :]
                        new_y = y[index]
                        task = MyThread(target=self.fit_base_classifier,
                                        args=(self.classifiers[(first_cls, second_cls)], new_x,
                                              (new_y == first_cls).astype('int')), kwargs=kwargs)
                        task.start()
                        tasks[(first_cls, second_cls)] = task
                for first_cls in range(0, self.n_class):
                    for second_cls in range(first_cls + 1, self.n_class + 1):
                        tasks[(first_cls, second_cls)].join()
    
        def predict_proba(self, x, **kwargs):
            if self.mode == 'ovr':
                tasks = []
                probas = []
                for cls in range(len(self.classifiers)):
                    task = MyThread(target=self.predict_proba_base_classifier, args=(self.classifiers[cls], x),
                                    kwargs=kwargs)
                    task.start()
                    tasks.append(task)
                for task in tasks:
                    task.join()
                for task in tasks:
                    probas.append(task.get_result())
                total_probas = np.concatenate(probas, axis=1)
                # 归一化
                return total_probas / total_probas.sum(axis=1, keepdims=True)
            elif self.mode == 'ovo':
                tasks = {}
                probas = {}
                for first_cls in range(0, self.n_class):
                    for second_cls in range(first_cls + 1, self.n_class + 1):
                        task = MyThread(target=self.predict_proba_base_classifier,
                                        args=(self.classifiers[(first_cls, second_cls)], x), kwargs=kwargs)
                        task.start()
                        tasks[(first_cls, second_cls)] = task
                for first_cls in range(0, self.n_class):
                    for second_cls in range(first_cls + 1, self.n_class + 1):
                        tasks[(first_cls, second_cls)].join()
                for first_cls in range(0, self.n_class):
                    for second_cls in range(first_cls + 1, self.n_class + 1):
                        probas[(first_cls, second_cls)] = tasks[(first_cls, second_cls)].get_result()
                        probas[(second_cls, first_cls)] = 1.0 - probas[(first_cls, second_cls)]
                # 统计概率
                total_probas = []
                for first_cls in range(0, self.n_class + 1):
                    temp = []
                    for second_cls in range(0, self.n_class + 1):
                        if first_cls != second_cls:
                            temp.append(probas[(first_cls, second_cls)])
                    temp = np.concatenate(temp, axis=1).sum(axis=1, keepdims=True)
                    total_probas.append(temp)
                # 归一化
                total_probas = np.concatenate(total_probas, axis=1)
                return total_probas / total_probas.sum(axis=1, keepdims=True)
    
        def predict(self, x):
            return np.argmax(self.predict_proba(x), axis=1)
    
  • 相关阅读:
    Java基础五
    Java基础测试
    Java练习题
    Java基础四
    Java基础三
    Java基础二
    Java基础一
    大数据讲解
    python笔记之函数 二
    iOS UICollectionView的使用(用storyboard和xib创建)
  • 原文地址:https://www.cnblogs.com/zhulei227/p/12913616.html
Copyright © 2011-2022 走看看