zoukankan      html  css  js  c++  java
  • 机器学习 类条件随机标签噪声情况下的二分类问题研究 复现NIPS论文learning with noisy label(logistic & C-SVM)

    论文百度网盘链接:https://pan.baidu.com/s/1XWj1YInZCuKldRpYnZTttg

    密码:u4lb

    UCI数据可以在论文中给的一个URL中找到,如找不到也可联系QQ1551904915获取。

    对类随机标签噪声下的二分类问题可以通过构造修正损失函数以及设置正负偏置两种方案解决。

    流程:构造二维线性可分数据->UCI高维数据测试->构造二分线性不可分数据->UCI数据测试

    import numpy as np
    import matplotlib.pyplot as plt
    import scipy.io as scio
    from sklearn.preprocessing import StandardScaler
    import math
    from libsvm.svmutil import *
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    
    
    class LogisticRegression:
        def __init__(self, n_iter=200, eta=1e-3, tol=None,rho_positive = 0.2,rho_negative = 0.2):
            # 训练迭代次数
            self.n_iter = n_iter
            # 学习率
            self.eta = eta
            # 误差变化阈值
            self.tol = tol
            # 模型参数w(训练时初始化)
            self.w = None
            # 批量大小
            self.batch_size = 0.1
            self.rho_positive = rho_positive
            self.rho_negative = rho_negative
    
        def _z(self, X, w):
            return np.dot(X, w)
    
        def _sigmoid(self, z):
            return 1. / (1. + np.exp(-z))
    
        def _predict_proba(self, X, w):
            z = self._z(X, w)
            return self._sigmoid(z)
    
        # 标签噪声不存在时的梯度
        def _loss(self, y, y_proba):
            ret = 0
            for yi, ti in zip(y, y_proba):
                ret += np.log(1 + np.exp(- yi * ti))
    
            return ret + 1/2*np.dot(self.w,self.w)
    
        # 标签噪声不存在时的梯度
        def _gradient(self, X, y):
            ret = np.zeros_like(self.w)
            for xi,yi in zip(X,y):
                ti = np.dot(self.w.T,xi)
                ret += -yi*np.exp(-ti*yi)/(1+np.exp(-ti*yi)) * xi
            return ret + self.w
    
        # 存在标签噪声情况下的SGD
        def _gradient_descent_under_noise(self, w, X, y):
    
            # 若用户指定tol, 则启用早期停止法.
            if self.tol is not None:
                loss_old = np.inf
    
            self.loss_list = []
            self.w = w
    
            # 使用梯度下降至多迭代n_iter次, 更新w.
            for step_i in range(self.n_iter):
                # 预测所有点为1的概率
                y_proba = self._predict_proba(X, self.w)
                # 计算损失
                loss = self._loss_under_noise(y, y_proba)
                self.loss_list.append(loss)
                # print('%4i Loss: %s' % (step_i, loss))
    
                # 早期停止法
                if self.tol is not None:
                    # 如果损失下降不足阈值, 则终止迭代.
                    if loss_old - loss < self.tol:
                        break
                    loss_old = loss
    
                index = np.arange(X.shape[0])
                np.random.shuffle(index)
                selected = index[:math.ceil(self.batch_size * X.shape[0])]
                # 计算梯度
                grad = self._gradient_under_noise(X[selected], y[selected])
                # 更新参数w
                self.w -= self.eta * grad
    
        def _gradient_descent(self, w, X, y):
    
            # 若用户指定tol, 则启用早期停止法.
            if self.tol is not None:
                loss_old = np.inf
    
            self.loss_list = []
            self.w = w
    
            # 使用梯度下降至多迭代n_iter次, 更新w.
            for step_i in range(self.n_iter):
                # 预测所有点为1的概率
                y_proba = self._predict_proba(X, self.w)
                # 计算损失
                loss = self._loss(y, y_proba)
                self.loss_list.append(loss)
                print('%4i Loss: %s' % (step_i, loss))
                if self.tol is not None:
                    # 如果损失下降不足阈值, 则终止迭代.
                    if loss_old - loss < self.tol:
                        break
                    loss_old = loss
    
                index = np.arange(X.shape[0])
                np.random.shuffle(index)
                selected = index[:math.ceil(self.batch_size * X.shape[0])]
                # 计算梯度
                grad = self._gradient(X[selected], y[selected])
                # 更新参数w
                self.w -= self.eta * grad
        # 数据预处理
        def _preprocess_data_X(self, X):
            m, n = X.shape
            X_ = np.empty((m, n + 1))
            X_[:, 0] = 1
            X_[:, 1:] = X
    
            return X_
    
        def train(self, X_train, y_train):
            '''训练'''
            # 预处理X_train(添加x0=1)
            X_train = self._preprocess_data_X(X_train)
            _, n = X_train.shape
            self.w = np.random.random(n) * 0.05
            self._gradient_descent(self.w, X_train, y_train)
    
    
        def train_from_noise_data(self,X_train,y_train):
            X_train = self._preprocess_data_X(X_train)
            _,n = X_train.shape
            self.w = np.random.random(n) * 0.05
            self._gradient_descent_under_noise(self.w,X_train,y_train)
    
        #     计算在标签噪声下的损失
        def _loss_under_noise(self,y,y_proba):
            ret = 0
            for yi, ti in zip(y, y_proba):
                ret += (1-np.where(yi == 1,self.rho_negative,self.rho_positive)) * np.log(1 + np.exp(- yi * ti)) + 
                       np.where(yi == 1,self.rho_positive,self.rho_negative) * np.log(1 + np.exp(yi * ti))
    
            return (ret + 1 / 2 * np.dot(self.w, self.w)) / (1 - self.rho_negative - self.rho_positive)
            # return (ret) / (1 - rho_negative - rho_positive)
        # 在标签噪声存在情况下的梯度
        def _gradient_under_noise(self,X,y):
            ret = np.zeros_like(self.w)
            for xi, yi in zip(X, y):
                ti = np.dot(self.w.T, xi)
                ret += (1-np.where(yi == 1,self.rho_negative,self.rho_positive)) * (-yi * np.exp(-ti * yi) / (1 + np.exp(-ti * yi)) * xi)
                    + np.where(yi == 1,self.rho_positive,self.rho_negative) * (-yi * np.exp(ti * yi) / (1 + np.exp(ti * yi)) * xi)
            return (ret + self.w) / (1 - self.rho_negative - self.rho_positive)
            # return (ret) / (1 - rho_negative - rho_positive)
    
        def predict(self, X):
    
            X = self._preprocess_data_X(X)
            y_pred = self._predict_proba(X, self.w)
            return np.where(y_pred >= 0.5, 1, -1)
    
    # 生成三角形区域的随机点集
    def random_data(x1,y1,x2,y2,x3,y3):
        x1, y1 = x1, y1
        x3, y3 = x3, y3
        x2, y2 = x2, y2
        sample_size = 1000
        theta = np.arange(0, 1, 0.001)
        rnd1 = np.random.random(size=sample_size)
        rnd2 = np.random.random(size=sample_size)
        rnd2 = np.sqrt(rnd2)
        x = rnd2 * (rnd1 * x1 + (1 - rnd1) * x2) + (1 - rnd2) * x3
        y = rnd2 * (rnd1 * y1 + (1 - rnd1) * y2) + (1 - rnd2) * y3
        return x,y
    
    def method_1_on_synthetic_data():
        x_positive, y_positive = random_data(-100, -100, -100, 100, 100, 100)
        x_negative, y_negative = random_data(-40, -100, 100, 20, 100, -100)
        x_data = np.concatenate([x_positive, x_negative])
        y_data = np.concatenate([y_positive, y_negative])
        label = np.concatenate([np.ones_like(x_positive), np.ones_like(x_negative) * (-1)])
    
        # view data from true distribution
        plt.scatter(x_data[label == 1], y_data[label == 1], c='', edgecolors='b', facecolor='none', marker='o', s=25,
                    linewidths=1)
        plt.scatter(x_data[label == -1], y_data[label == -1], c='r', marker='+')
        plt.axis([-100, 100, -100, 100])
        plt.title('initial data')
        plt.show()
    
        print('=' * 10 + '在正确数据下的logistic预测' + '=' * 10)
        # view regression result from data without noise
        reg = LogisticRegression(n_iter=100, eta=0.0001, tol=1e-5)
    
    
        # 划分训练集以及测试集
        X_train, X_test, y_train, y_test = train_test_split(np.column_stack([x_data, y_data]), label, test_size=0.5)
        reg.train(X_train, y_train)
    
        plt.plot(reg.loss_list)
        plt.title('train loss with SGD')
        plt.show()
    
        y_pred = reg.predict(X_test)
        accuracy_rate = accuracy_score(y_test, y_pred)
        print('测试集上的训练准确度:', accuracy_rate)
        theta = reg.w
        print('theta参数为:', theta)
        print('分割函数:0={0}+({1})*x1+({2})*x2'.format(np.round(theta[0], 3), np.round(theta[1], 3), np.round(theta[2], 3)))
    
        rho_positive_list = [0.2,0.3,0.4]
        rho_negative_list = [0.2,0.1,0.4]
        print('=' * 10 + '在类标签噪声条件下的logistic预测' + '=' * 10)
        for index in range(3):
            mask_1 = np.random.uniform(0, 1, size=1000) < rho_positive_list[index]
            mask_2 = np.random.uniform(0, 1, size=1000) < rho_negative_list[index]
    
            label_dirty = np.concatenate([np.where(mask_1, -1, 1), np.where(mask_2, 1, -1)])
    
            # view regression result from data with noise
            plt.scatter(x_data[label_dirty == 1], y_data[label_dirty == 1], c='', edgecolors='b', facecolor='none', marker='o',
                        s=25,
                        linewidths=1)
            plt.scatter(x_data[label_dirty == -1], y_data[label_dirty == -1], c='r', marker='+')
            plt.axis([-100, 100, -100, 100])
            plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index],rho_negative_list[index]))
            plt.show()
    
    
            reg = LogisticRegression(n_iter=1000, eta=0.00001, tol=1e-5, rho_positive=rho_positive_list[index], rho_negative=rho_negative_list[index])
            # 划分训练集以及测试集
            X_train = np.column_stack([x_data, y_data])
            reg.train_from_noise_data(X_train, label_dirty)
    
            y_pred = reg.predict(X_train)
    
            plt.scatter(x_data[y_pred == 1], y_data[y_pred == 1], c='', edgecolors='b', facecolor='none', marker='o', s=25,linewidths=1)
            plt.scatter(x_data[y_pred == -1], y_data[y_pred == -1], c='r', marker='+')
            plt.axis([-100, 100, -100, 100])
            plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index], rho_negative_list[index]))
            plt.show()
    
            y_pred_under_noise = reg.predict(X_train)
            accuracy_rate = accuracy_score(label, y_pred_under_noise)
            print('rho_positive:{0} rho_negative:{1} accuracy:{2}'.format(rho_positive_list[index],rho_negative_list[index],accuracy_rate))
        print('='*20)
    
    # 模拟模块
    def train_mode(data,rho_negative,rho_positive,index):
        Xdata = data['x'][0][0]
        label = data['t'][0][0]
        sampleCapacity,_ = Xdata.shape
        dirty_label = np.zeros_like(label)
        # 构造噪声数据
        for i in range(sampleCapacity):
            if label[i] == 1:
                dirty_label[i] = np.where(np.random.uniform(0,1) < rho_positive,-1,1)
            else:
                dirty_label[i] = np.where(np.random.uniform(0,1) < rho_positive,1,-1)
        eta_list = [0.008,0.002,0.009]
        model = LogisticRegression(n_iter=1000, eta=eta_list[index], tol=1e-5, rho_positive=rho_positive, rho_negative=rho_negative)
        index = np.arange(sampleCapacity)
        np.random.shuffle(index)
        accuracy_rate = 0
        # 3折交叉验证
        for i in range(1, 4):
            # 构造索引集,将其中五分之一大小的一段作为测试集
            selected = np.array([False] * sampleCapacity)
            selected[int((i - 1) * sampleCapacity / 3):int(i * sampleCapacity / 3)] = True
            X_test = Xdata[index[selected]]
            X_train = Xdata[index[~selected]]
            # 对属性数据进行标准化,消除量纲以及数量级造成的差距
            ss = StandardScaler()
            ss.fit(X_train)
            X_train_std = ss.transform(X_train)
            X_test_std = ss.transform(X_test)
            y_train = dirty_label[index[~selected]]
            true_test_label = label[index[selected]]
            model.train_from_noise_data(X_train_std,y_train)
            pred = model.predict(X_test_std)
            accuracy_rate += accuracy_score(true_test_label,pred)
    
        print("rho_positive={0}  rho_negative={1} accuracy:{2}".format(rho_positive,rho_negative,round(accuracy_rate / 3,4)))
    
    def train_mode_2(data,rho_negative,rho_positive,index):
        Xdata = data['x'][0][0]
        label = data['t'][0][0]
        label[label == -1] = 0
        sampleCapacity, _ = Xdata.shape
        dirty_label = np.zeros_like(label)
        # 构造噪声数据
        for i in range(sampleCapacity):
            if label[i] == 1:
                dirty_label[i] = np.where(np.random.uniform(0, 1) < rho_positive, 0, 1)
            else:
                dirty_label[i] = np.where(np.random.uniform(0, 1) < rho_positive, 1, 0)
        index = np.arange(sampleCapacity)
        np.random.shuffle(index)
        # 3折交叉验证
        for i in range(1, 4):
            # 构造索引集,将其中五分之一大小的一段作为测试集
            selected = np.array([False] * sampleCapacity)
            selected[int((i - 1) * sampleCapacity / 3):int(i * sampleCapacity / 3)] = True
            X_test = Xdata[index[selected]]
            X_train = Xdata[index[~selected]]
            # 对属性数据进行标准化,消除量纲以及数量级造成的差距
            ss = StandardScaler()
            ss.fit(X_train)
            X_train_std = ss.transform(X_train)
            X_test_std = ss.transform(X_test)
            y_train = dirty_label[index[~selected]]
            true_test_label = label[index[selected]]
            alpha = (1 - rho_positive + rho_negative) / 2
            model = svm_train(y_train.reshape(y_train.shape[0],),X_train_std,'-s 0 -t 2 -c 0.01 -w0 {0} -w1 {1}'.format(alpha,1 - alpha))
            svm_predict(true_test_label,X_test_std,model)
    
    
    # logistic test on UCI benchmark
    def method_1_on_UCI_benchmark():
        rho_positive_list = [0.2,0.3,0.4]
        rho_negative_list = [0.2,0.1,0.4]
        # 加载数据集
        benchmarks = scio.loadmat("benchmarks.mat")
        breastCancer = benchmarks['breast_cancer']
        diabetis = benchmarks['diabetis']
        thyroid = benchmarks['thyroid']
        german = benchmarks['german']
        heart = benchmarks['heart']
        image = benchmarks['image']
    
        print("test on breastCancer:")
        for i in range(3):
            train_mode(data=breastCancer,rho_negative=rho_negative_list[i],rho_positive=rho_positive_list[i],index = i)
        print("test on diabetis:")
        for i in range(3):
            train_mode(data=diabetis, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print("test on thyroid:")
        for i in range(3):
            train_mode(data=thyroid, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print("test on german:")
        for i in range(3):
            train_mode(data=german, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print("test on heart:")
        for i in range(3):
            train_mode(data=heart, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print("test on image:")
        for i in range(3):
            train_mode(data=image, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    # C-SVM test on banana data
    def method_2_on_banana_data():
        benchmarks = scio.loadmat("benchmarks.mat")
        banana = benchmarks['banana']
        y_data = banana['t'][0][0].reshape(banana['t'][0][0].shape[0], )
        y_data[y_data == -1] = 0
        x_data = banana['x'][0][0]
        plt.scatter(x_data[y_data == 1, 0], x_data[y_data == 1, 1], c='', edgecolors='b', facecolor='none',
                    marker='o', s=25,
                    linewidths=1)
        plt.scatter(x_data[y_data == 0, 0], x_data[y_data == 0, 1], c='r', marker='+')
        plt.title('initial image')
        plt.plot()
        plt.show()
    
        rho_positive_list = [0.2, 0.3, 0.4]
        rho_negative_list = [0.2, 0.1, 0.4]
        print('=' * 10 + '在类标签噪声条件下的C-SVM对香蕉数据的预测' + '=' * 10)
        for index in range(3):
            label_dirty = np.zeros_like(y_data)
            for i in range(label_dirty.shape[0]):
                if y_data[i] == 1:
                    label_dirty[i] = np.where(np.random.uniform(0,1) < rho_positive_list[index],0,1)
                else:
                    label_dirty[i] = np.where(np.random.uniform(0, 1) < rho_negative_list[index], 1, 0)
    
            # view regression result from data with noise
            plt.scatter(x_data[label_dirty == 1,0],
                        x_data[label_dirty == 1,1],
                        c='',
                        edgecolors='b',
                        facecolor='none',
                        marker='o',
                        s=25,
                        linewidths=1)
            plt.scatter(x_data[label_dirty == 0,0], x_data[label_dirty == 0,1], c='r', marker='+')
            plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index],
                                                                                         rho_negative_list[index]))
            plt.show()
    
            # 划分训练集以及测试集
            X_train = np.column_stack([x_data, y_data])
            alpha = (1 - rho_positive_list[index] + rho_negative_list[index]) / 2
            model = svm_train(label_dirty,X_train,'-s 0 -t 2 -c 0.05 -w0 {0} -w1 {1}'.format(alpha,1 - alpha))
            # 在真实数据上进心预测
            y_pred = svm_predict(y_data,X_train,model)[0]
            y_pred = np.array(y_pred).astype('int64')
            # 绘制出预测之后的图像
            plt.scatter(x_data[y_pred == 1,0], x_data[y_pred == 1,1], c='', edgecolors='b', facecolor='none', marker='o', s=25,
                        linewidths=1)
            plt.scatter(x_data[y_pred == 0,0], x_data[y_pred == 0,1], c='r', marker='+')
            plt.title('dirty data with rho_positive = {0} and rho_negative = {1}'.format(rho_positive_list[index],
                                                                                         rho_negative_list[index]))
            plt.show()
    
    
        print('=' * 20)
    
    def method_2_on_UCI_data():
        rho_positive_list = [0.2, 0.3, 0.4]
        rho_negative_list = [0.2, 0.1, 0.4]
        benchmarks = scio.loadmat("benchmarks.mat")
        breastCancer = benchmarks['breast_cancer']
        diabetis = benchmarks['diabetis']
        thyroid = benchmarks['thyroid']
        german = benchmarks['german']
        heart = benchmarks['heart']
        image = benchmarks['image']
    
        print('='*20+"test on breastCancer:"+'='*20)
        for i in range(3):
            train_mode_2(data=breastCancer, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print('='*20+"test on diabetis:"+'='*20)
        for i in range(3):
            train_mode_2(data=diabetis, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print('='*20+"test on thyroid:"+'='*20)
        for i in range(3):
            train_mode_2(data=thyroid, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print('='*20+"test on german:"+'='*20)
        for i in range(3):
            train_mode_2(data=german, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print('='*20+"test on heart:"+'='*20)
        for i in range(3):
            train_mode_2(data=heart, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
        print('='*20+"test on image:"+'='*20)
        for i in range(3):
            train_mode_2(data=image, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    
    if __name__ == '__main__':
        # 方法一在问题一的人造数据上进行测试
        method_1_on_synthetic_data()
        # 方法一在UCI数据上进行测试
        method_1_on_UCI_benchmark()
        # 方法二在香蕉数据上的可视化展示
        method_2_on_banana_data()
        # 方法二在UCI数据上进行测试
        method_2_on_UCI_data()

    二分线性可分:

    原数据:

     噪声概率不同情况下的结果:

     

    测试结果:

    ==========在正确数据下的logistic预测==========
       0 Loss: 655.0571765768963
       1 Loss: 523.6231639912271
       2 Loss: 508.3195448331075
       3 Loss: 506.8197572187573
       4 Loss: 506.8141896877468
       5 Loss: 506.68658996045286
       6 Loss: 507.0420107165859
    测试集上的训练准确度: 0.993
    theta参数为: [ 0.00242018 -0.16241579  0.15456798]
    分割函数:0=0.002+(-0.162)*x1+(0.155)*x2
    ==========在类标签噪声条件下的logistic预测==========
    rho_positive:0.2 rho_negative:0.2 accuracy:0.9955
    rho_positive:0.3 rho_negative:0.1 accuracy:0.97
    rho_positive:0.4 rho_negative:0.4 accuracy:1.0
    ====================
    test on breastCancer:
    rho_positive=0.2  rho_negative=0.2 accuracy:0.6314
    rho_positive=0.3  rho_negative=0.1 accuracy:0.5319
    rho_positive=0.4  rho_negative=0.4 accuracy:0.6239
    test on diabetis:
    rho_positive=0.2  rho_negative=0.2 accuracy:0.7474
    rho_positive=0.3  rho_negative=0.1 accuracy:0.6654
    rho_positive=0.4  rho_negative=0.4 accuracy:0.6107
    test on thyroid:
    rho_positive=0.2  rho_negative=0.2 accuracy:0.8184
    rho_positive=0.3  rho_negative=0.1 accuracy:0.4797
    rho_positive=0.4  rho_negative=0.4 accuracy:0.5445
    test on german:
    rho_positive=0.2  rho_negative=0.2 accuracy:0.696
    rho_positive=0.3  rho_negative=0.1 accuracy:0.5609
    rho_positive=0.4  rho_negative=0.4 accuracy:0.507
    test on heart:
    rho_positive=0.2  rho_negative=0.2 accuracy:0.8
    rho_positive=0.3  rho_negative=0.1 accuracy:0.7667
    rho_positive=0.4  rho_negative=0.4 accuracy:0.6704
    test on image:
    rho_positive=0.2  rho_negative=0.2 accuracy:0.7171
    rho_positive=0.3  rho_negative=0.1 accuracy:0.6961
    rho_positive=0.4  rho_negative=0.4 accuracy:0.6424

    二维线性不可分:

    原图:

    不同噪声情况下的分类结果:

     

     

    高维的C-SVM结果比较长,不展示了。

  • 相关阅读:
    竞赛备考建议
    谷山丰的一生
    从首个IMO季军谈起 作者 : 付云皓
    孔庆东:单刀赴高考
    LaTeX 技巧 802:国内期刊 CCT 模板编译经验
    数学书籍推荐
    翻译
    printf中用法详解
    黎活明给程序员的忠告【转】
    log4j2配置文件log4j2.xml详解(转载)
  • 原文地址:https://www.cnblogs.com/randy-lo/p/14050456.html
Copyright © 2011-2022 走看看