介绍摘自李航《统计学习方法》
考虑如何求得一个几何间隔最大的分离超平面,即最大间隔分离超平面。得到下面的线性可分支持向量机学习的最优化问题。
为了求解线性可分支持向量机的最优化问题(7.13)~(7.14),将它作为原始最优化问题,应用拉格朗日对偶性(参阅附录C),
通过求解对偶问题(dual problem)得到原始问题(primal problem)的最优解,这就是线性可分支持向量机的对偶算法(dual algorithm)。
这样做的优点,一是对偶问题往往更容易求解;二是自然引入核函数,进而推广到非线性分类问题。
定理C.3 对原始问题(C.1)~(C.3)和对偶问题(C.12)~(C.13),假设函数f(x)和ci(x)是凸函数,hj(x)是仿射函数,
并且不等式约束ci(x)是严格可行的,则x*和a*,β*分别是原始问题和对偶问题的解的充分必要条件是x*,a*,β*满足下面的Karush-Kuhn-Tucker(KKT)条件:
SMO算法要解如下凸二次规划的对偶问题:
变量更新方式
SMO称选择第1个变量的过程为外层循环。外层循环在训练样本中选取违反KKT条件最严重的样本点,并将其对应的变量作为第1个变量。
SMO称选择第2个变量的过程为内层循环。假设在外层循环中已经找到第1个变量a1,现在要在内层循环中找第2个变量a2。第2个变量选择的标准是希望能使a2有足够大的变化。
由式(7.106)和式(7.108)可知,a2new是依赖于|E1-E2|的,为了加快计算速度,一种简单的做法是选择a2,使其对应的|E1-E2|最大。因为a1已定,E1也确定了。如果E1是正的,那么选择最小的Ei作为E2;如果E1是负的,那么选择最大的Ei作为E2。为了节省计算时间,将所有Ei值保存在一个列表中。
在特殊情况下,如果内层循环通过以上方法选择的a2不能使目标函数有足够的下降,那么采用以下启发式规则继续选择a2。遍历在间隔边界上的支持向量点,依次将其对应的变量作为a2试用,直到目标函数有足够的下降。若找不到合适的a2,那么遍历训练数据集;若仍找不到合适的a2,则放弃第1个a1,再通过外层循环寻求另外的a1。
1 # encoding: utf-8 2 import numpy as np 3 import matplotlib.pyplot as plt 4 5 6 class SVC(object): 7 def __init__(self, c=1.0, delta=0.001): # 初始化 8 self.N = 0 9 self.delta = delta 10 self.X = None 11 self.y = None 12 self.w = None 13 self.wn = 0 14 self.K = np.zeros((self.N, self.N)) 15 self.a = np.zeros((self.N, 1)) 16 self.b = 0 17 self.C = c 18 self.stop=1 19 20 def kernel_function(self,x1, x2): # 核函数 21 return np.dot(x1, x2) 22 23 def kernel_matrix(self, x): # 核矩阵 24 for i in range(0, len(x)): 25 for j in range(i, len(x)): 26 self.K[j][i] = self.K[i][j] = self.kernel_function(self.X[i], self.X[j]) 27 28 def get_w(self): # 计算更新w 29 ay = self.a * self.y 30 w = np.zeros((1, self.wn)) 31 for i in range(0, self.N): 32 w += self.X[i] * ay[i] 33 return w 34 35 def get_b(self, a1, a2, a1_old, a2_old): # 计算更新B 36 y1 = self.y[a1] 37 y2 = self.y[a2] 38 a1_new = self.a[a1] 39 a2_new = self.a[a2] 40 b1_new = -self.E[a1] - y1 * self.K[a1][a1] * (a1_new - a1_old) - y2 * self.K[a2][a1] * ( 41 a2_new - a2_old) + self.b 42 b2_new = -self.E[a2] - y1 * self.K[a1][a2] * (a1_new - a1_old) - y2 * self.K[a2][a2] * ( 43 a2_new - a2_old) + self.b 44 if (0 < a1_new) and (a1_new < self.C) and (0 < a2_new) and (a2_new < self.C): 45 return b1_new[0] 46 else: 47 return (b1_new[0] + b2_new[0]) / 2.0 48 49 def gx(self, x): # 判别函数g(x) 50 return np.dot(self.w, x) + self.b 51 52 def satisfy_kkt(self, a): # 判断样本点是否满足kkt条件 53 index = a[1] 54 if a[0] == 0 and self.y[index] * self.gx(self.X[index]) > 1: 55 return 1 56 elif a[0] < self.C and self.y[index] * self.gx(self.X[index]) == 1: 57 return 1 58 elif a[0] == self.C and self.y[index] * self.gx(self.X[index]) < 1: 59 return 1 60 return 0 61 62 def clip_func(self, a_new, a1_old, a2_old, y1, y2): # 拉格朗日乘子的裁剪函数 63 if (y1 == y2): 64 L = max(0, a1_old + a2_old - self.C) 65 H = min(self.C, a1_old + a2_old) 66 else: 67 L = max(0, a2_old - a1_old) 68 H = min(self.C, self.C + a2_old - a1_old) 69 if a_new < L: 70 a_new = L 71 if a_new > H: 72 a_new = H 73 return a_new 74 75 def update_a(self, a1, a2): # 更新a1,a2 76 partial_a2 = self.K[a1][a1] + self.K[a2][a2] - 2 * self.K[a1][a2] 77 if partial_a2 <= 1e-9: 78 print "error:", partial_a2 79 a2_new_unc = self.a[a2] + (self.y[a2] * ((self.E[a1] - self.E[a2]) / partial_a2)) 80 a2_new = self.clip_func(a2_new_unc, self.a[a1], self.a[a2], self.y[a1], self.y[a2]) 81 a1_new = self.a[a1] + self.y[a1] * self.y[a2] * (self.a[a2] - a2_new) 82 if abs(a1_new - self.a[a1]) < self.delta: 83 return 0 84 self.a[a1] = a1_new 85 self.a[a2] = a2_new 86 self.is_update = 1 87 return 1 88 89 def update(self, first_a): # 更新拉格朗日乘子 90 for second_a in range(0, self.N): 91 if second_a == first_a: 92 continue 93 a1_old = self.a[first_a] 94 a2_old = self.a[second_a] 95 if self.update_a(first_a, second_a) == 0: 96 return 97 self.b= self.get_b(first_a, second_a, a1_old, a2_old) 98 self.w = self.get_w() 99 self.E = [self.gx(self.X[i]) - self.y[i] for i in range(0, self.N)] 100 self.stop=0 101 102 def train(self, x, y, max_iternum=100): # SVC 103 x_len = len(x) 104 self.X = x 105 self.N = x_len 106 self.wn = len(x[0]) 107 self.y = np.array(y).reshape((self.N, 1)) 108 self.K = np.zeros((self.N, self.N)) 109 self.kernel_matrix(self.X) 110 self.b = 0 111 self.a = np.zeros((self.N, 1)) 112 self.w = self.get_w() 113 self.E = [self.gx(self.X[i]) - self.y[i] for i in range(0, self.N)] 114 self.is_update = 0 115 for i in range(0, max_iternum): 116 self.stop=1 117 data_on_bound = [[x,y] for x,y in zip(self.a, range(0, len(self.a))) if x > 0 and x< self.C] 118 if len(data_on_bound) == 0: 119 data_on_bound = [[x,y] for x,y in zip(self.a, range(0, len(self.a)))] 120 for data in data_on_bound: 121 if self.satisfy_kkt(data) != 1: 122 self.update(data[1]) 123 if self.is_update == 0: 124 for data in [[x,y] for x,y in zip(self.a, range(0, len(self.a)))]: 125 if self.satisfy_kkt(data) != 1: 126 self.update(data[1]) 127 if self.stop: 128 break 129 print self.w, self.b 130 131 def classfy(self,x_new): # 预测 132 y_new=self.gx(x_new) 133 cl = int(np.sign(y_new)) 134 if cl == 0: 135 import random 136 print "Warning, class = 0, assigning label 1..." 137 cl = 1 138 return cl 139 140 def draw_p(self): # 作图 141 min_x = min(min(self.X[:,0]),min(self.X[:,1])) - 0.1 142 max_x = max(max(self.X[:,0]), max(self.X[:,1])) +0.1 143 w = -self.w[0][0]/self.w[0][1] 144 b = -self.b/self.w[0][1] 145 r = 1/self.w[0][1] 146 x_line = (min_x, max_x) 147 plt.plot(x_line, [w*x+b for x in x_line], "b") 148 plt.plot(x_line, [w*x+b+r for x in x_line], "b--") 149 plt.plot(x_line, [w*x+b-r for x in x_line], "r--") 150 [plt.plot(self.X[i, 0], self.X[i, 1], "ob" if self.y[i] == 1 else "or") for i in range(0, self.N)] 151 plt.show() 152 153 if __name__ == "__main__": 154 svc = SVC() 155 np.random.seed(0) 156 X = np.r_[np.random.randn(20, 2) - [2, 2], np.random.randn(20, 2) + [2, 2]] 157 Y = [-1] * 20 + [1] * 20 158 svc.train(X, Y) 159 svc.draw_p() 160 print svc.classfy([2,5]) # 1 161 print svc.classfy([2,1]) # 1 162 print svc.classfy([2,-1]) # 1 163 print svc.classfy([2,-5]) # -1