import numpy as np
import os
os.chdir('../')
import matplotlib.pyplot as plt
%matplotlib inline
简介
上一讲我们实现了一个简单二元分类器:LogisticRegression,但通常情况下,我们面对的更多是多分类器的问题,而二分类转多分类的通常做法也很朴素,一般分为两种:one-vs-rest以及one-vs-one。顾名思义,one-vs-rest将多类别中的其中一类作为正类,剩余其他所有类别作为负类,对于n_class
类别的分类问题,需要构建(n\_class)种分类器;而one-vs-one是指进行两两分类,这样将会构造(n\_class*(n\_class-1)/2)种分类器,由于实现思路很简单,就直接贴出代码,将多分类实现封装到MultiClassWrapper
类,并放到ml_models.wrapper_models
包
from ml_models.linear_model import *
from ml_models.wrapper_models import *
#准备手写数据
from sklearn.metrics import f1_score
from sklearn import model_selection
from sklearn import datasets
digits = datasets.load_digits()
data = digits['data']
target = digits['target']
X_train, X_test, y_train, y_test = model_selection.train_test_split(data, target, test_size=0.3,
random_state=0)
#构建初始模型
lr = LogisticRegression()
#进行one-vs-rest训练并评估
ovr = MultiClassWrapper(lr, mode='ovr')
ovr.fit(X_train, y_train)
y = ovr.predict(X_test)
print('ovr:', f1_score(y_test, y, average='macro'))
ovr: 0.9492701335705958
#进行one-vs-one训练并评估
ovo = MultiClassWrapper(lr, mode='ovo')
ovo.fit(X_train, y_train)
y = ovo.predict(X_test)
print('ovo:', f1_score(y_test, y, average='macro'))
ovo: 0.959902103714483
MultiClassWrapper
类实现细节
import threading
import copy
import numpy as np
"""
继承Thread,获取函数的返回值
"""
class MyThread(threading.Thread):
def __init__(self, target, args, kwargs, name=''):
threading.Thread.__init__(self)
self.name = name
self.target = target
self.args = args
self.kwargs = kwargs
self.result = self.target(*self.args, **self.kwargs)
def get_result(self):
try:
return self.result
except:
return None
class MultiClassWrapper(object):
def __init__(self, base_classifier, mode='ovr'):
"""
:param base_classifier: 实例化后的分类器
:param mode: 'ovr'表示one-vs-rest方式,'ovo'表示one-vs-one方式
"""
self.base_classifier = base_classifier
self.mode = mode
@staticmethod
def fit_base_classifier(base_classifier, x, y, **kwargs):
base_classifier.fit(x, y, **kwargs)
@staticmethod
def predict_proba_base_classifier(base_classifier, x):
return base_classifier.predict_proba(x)
def fit(self, x, y, **kwargs):
# 对y分组并行fit
self.n_class = np.max(y)
if self.mode == 'ovr':
# 打包数据
self.classifiers = []
for cls in range(0, self.n_class + 1):
self.classifiers.append(copy.deepcopy(self.base_classifier))
# 并行训练
tasks = []
for cls in range(len(self.classifiers)):
task = MyThread(target=self.fit_base_classifier,
args=(self.classifiers[cls], x, (y == cls).astype('int')), kwargs=kwargs)
task.start()
tasks.append(task)
for task in tasks:
task.join()
elif self.mode == "ovo":
# 打包数据
self.classifiers = {}
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
self.classifiers[(first_cls, second_cls)] = copy.deepcopy(self.base_classifier)
# 并行训练
tasks = {}
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
index = np.where(y == first_cls)[0].tolist() + np.where(y == second_cls)[0].tolist()
new_x = x[index, :]
new_y = y[index]
task = MyThread(target=self.fit_base_classifier,
args=(self.classifiers[(first_cls, second_cls)], new_x,
(new_y == first_cls).astype('int')), kwargs=kwargs)
task.start()
tasks[(first_cls, second_cls)] = task
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
tasks[(first_cls, second_cls)].join()
def predict_proba(self, x, **kwargs):
if self.mode == 'ovr':
tasks = []
probas = []
for cls in range(len(self.classifiers)):
task = MyThread(target=self.predict_proba_base_classifier, args=(self.classifiers[cls], x),
kwargs=kwargs)
task.start()
tasks.append(task)
for task in tasks:
task.join()
for task in tasks:
probas.append(task.get_result())
total_probas = np.concatenate(probas, axis=1)
# 归一化
return total_probas / total_probas.sum(axis=1, keepdims=True)
elif self.mode == 'ovo':
tasks = {}
probas = {}
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
task = MyThread(target=self.predict_proba_base_classifier,
args=(self.classifiers[(first_cls, second_cls)], x), kwargs=kwargs)
task.start()
tasks[(first_cls, second_cls)] = task
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
tasks[(first_cls, second_cls)].join()
for first_cls in range(0, self.n_class):
for second_cls in range(first_cls + 1, self.n_class + 1):
probas[(first_cls, second_cls)] = tasks[(first_cls, second_cls)].get_result()
probas[(second_cls, first_cls)] = 1.0 - probas[(first_cls, second_cls)]
# 统计概率
total_probas = []
for first_cls in range(0, self.n_class + 1):
temp = []
for second_cls in range(0, self.n_class + 1):
if first_cls != second_cls:
temp.append(probas[(first_cls, second_cls)])
temp = np.concatenate(temp, axis=1).sum(axis=1, keepdims=True)
total_probas.append(temp)
# 归一化
total_probas = np.concatenate(total_probas, axis=1)
return total_probas / total_probas.sum(axis=1, keepdims=True)
def predict(self, x):
return np.argmax(self.predict_proba(x), axis=1)