zoukankan      html  css  js  c++  java
  • 修改后的神经网络

    修改的地方:

    1. 函数名、数据格式与sklearn一致

    2. 增加了网络性能不变时就结束的判断(否则,存在偶然的机会报NaN错误)

    # neuralnetwork.py
    # modified by Robin 2016/04/20
    
    import numpy as np
    from math import exp, pow
    from mpl_toolkits.mplot3d import Axes3D
    import matplotlib.pyplot as plt
    import sys
    import copy
    from scipy.linalg import norm, pinv
    
    class Layer: 
        ''''''
        def __init__(self, w, b, neure_number, transfer_function, layer_index):
            self.transfer_function = transfer_function
            self.neure_number = neure_number
            self.layer_index = layer_index
            self.w = w
            self.b = b
    
    class NetStruct: 
        '''神经网络结构'''
        def __init__(self, ni, nh, no, active_fun_list):
            # ni 输入层节点(int)
            # ni 隐藏层节点(int 或 list)
            # no 输出层节点(int)
            # active_fun_list 隐藏层激活函数类型(list)
            # ==> 1
            self.neurals = [] # 各层的神经元数目
            self.neurals.append(ni)
            if isinstance(nh, list):
                self.neurals.extend(nh)
            else:
                self.neurals.append(nh)
            self.neurals.append(no)
            # ==> 2
            if len(self.neurals)-2 == len(active_fun_list):
                active_fun_list.append('line')
            self.active_fun_list = active_fun_list
            # ==> 3
            self.layers = [] # 所有的层
            for i in range(0, len(self.neurals)):
                if i == 0:
                    self.layers.append(Layer([], [], self.neurals[i], 'none', i)) 
                    continue
                f = self.neurals[i - 1]
                s = self.neurals[i] 
                self.layers.append(Layer(np.random.randn(s, f), np.random.randn(s, 1), self.neurals[i], self.active_fun_list[i-1], i))
            
    class NeuralNetwork:
        '''神经网络'''
        def __init__(self, net_struct, mu=1e-3, beta=10, tol=0.1):
            '''初始化'''
            self.net_struct = net_struct
            self.mu = mu
            self.beta = beta
            self.tol = tol
        
        def fit(self, x, y, n_iter=100, method = 'lm'):
            '''训练'''
            self.net_struct.x = x.T
            self.net_struct.y = y.reshape(1, -1)
            self.n_iter = n_iter
            if(method == 'lm'):
                self.lm()
        
        def predict(self, x, predict_type='clf'):
            '''预测'''
            self.net_struct.x = x.T
            self.forward()
            layer_num = len(self.net_struct.layers)
            pred = self.net_struct.layers[layer_num - 1].output_val
            pred = pred.reshape(-1,)
            if predict_type == 'clf':    # 分类
                return  np.where(pred>pred.mean(), 1, -1)
            elif predict_type == 'reg':  # 回归
                return pred
                
                
        def score(self, X, y):
            '''预测效果,只适用于分类,不适用回归'''
            y0 = self.predict(X, predict_type='clf')
            return np.sum(y0 == y) / X.shape[0]
        
        def actFun(self, z, active_type = 'sigm'):
            ''' 激活函数 '''
            # activ_type: 激活函数类型有 sigm、tanh、radb、line
            if active_type == 'sigm':
                f = 1.0 / (1.0 + np.exp(-z))
            elif active_type == 'tanh':
                f = (np.exp(z) + np.exp(-z)) / (np.exp(z) + np.exp(-z))
            elif active_type == 'radb':
                f = np.exp(-z * z)
            elif active_type == 'line':
                f = z
            return f
        
        def actFunGrad(self, z, active_type = 'sigm'):
            '''激活函数的变化(派生)率'''
            # active_type: 激活函数类型有 sigm、tanh、radb、line
            y = self.actFun(z, active_type)
            if active_type == 'sigm':
                grad = y * (1.0 - y)
            elif active_type == 'tanh':
                grad = 1.0 - y * y
            elif active_type == 'radb':
                grad = -2.0 * z * y
            elif active_type == 'line':
                m = z.shape[0]
                n = z.shape[1]
                grad = np.ones((m, n))
            return grad
        
        def forward(self): 
            ''' 前向 '''
            layer_num = len(self.net_struct.layers)
            for i in range(0, layer_num):
                if i == 0:
                    curr_layer = self.net_struct.layers[i]
                    curr_layer.input_val = self.net_struct.x
                    curr_layer.output_val = self.net_struct.x
                    continue
                before_layer = self.net_struct.layers[i - 1]
                curr_layer = self.net_struct.layers[i]
                curr_layer.input_val = curr_layer.w.dot(before_layer.output_val) + curr_layer.b
                curr_layer.output_val = self.actFun(curr_layer.input_val, 
                                                    self.net_struct.active_fun_list[i - 1])
        
        def backward(self):
            '''反向'''
            layer_num = len(self.net_struct.layers)
            last_layer = self.net_struct.layers[layer_num - 1]
            last_layer.error = -self.actFunGrad(last_layer.input_val,
                                                self.net_struct.active_fun_list[layer_num - 2])
            layer_index = list(range(1, layer_num - 1)) 
            layer_index.reverse()
            for i in layer_index:
                curr_layer = self.net_struct.layers[i]
                curr_layer.error = (last_layer.w.transpose().dot(last_layer.error)) * self.actFunGrad(curr_layer.input_val,self.net_struct.active_fun_list[i - 1])
                last_layer = curr_layer
        
        def parDeriv(self):
            '''标准梯度(求导)'''
            layer_num = len(self.net_struct.layers)
            for i in range(1, layer_num):
                befor_layer = self.net_struct.layers[i - 1]
                befor_input_val = befor_layer.output_val.transpose()
                curr_layer = self.net_struct.layers[i]
                curr_error = curr_layer.error
                curr_error = curr_error.reshape(curr_error.shape[0]*curr_error.shape[1], 1, order='F')
                row =  curr_error.shape[0]
                col = befor_input_val.shape[1]
                a = np.zeros((row, col))
                num = befor_input_val.shape[0]
                neure_number = curr_layer.neure_number
                for i in range(0, num):
                    a[neure_number*i:neure_number*i + neure_number,:] = np.repeat([befor_input_val[i,:]],neure_number,axis = 0)
                tmp_w_par_deriv = curr_error * a
                curr_layer.w_par_deriv = np.zeros((num, befor_layer.neure_number * curr_layer.neure_number))
                for i in range(0, num):
                    tmp = tmp_w_par_deriv[neure_number*i:neure_number*i + neure_number,:]
                    tmp = tmp.reshape(tmp.shape[0] * tmp.shape[1], order='C')
                    curr_layer.w_par_deriv[i, :] = tmp
                    curr_layer.b_par_deriv = curr_layer.error.transpose()
        
        def jacobian(self):
            '''雅可比行列式'''
            layers = self.net_struct.neurals
            row = self.net_struct.x.shape[1]
            col = 0
            for i in range(0, len(layers) - 1):
                col = col + layers[i] * layers[i + 1] + layers[i + 1]
            j = np.zeros((row, col))
            layer_num = len(self.net_struct.layers)
            index = 0
            for i in range(1, layer_num):
                curr_layer = self.net_struct.layers[i]
                w_col = curr_layer.w_par_deriv.shape[1]
                b_col = curr_layer.b_par_deriv.shape[1]
                j[:, index : index + w_col] = curr_layer.w_par_deriv
                index = index + w_col
                j[:, index : index + b_col] = curr_layer.b_par_deriv
                index = index + b_col
            return j
        
        def gradCheck(self):
            '''梯度检查'''
            W1 = self.net_struct.layers[1].w
            b1 = self.net_struct.layers[1].b
            n = self.net_struct.layers[1].neure_number
            W2 = self.net_struct.layers[2].w
            b2 = self.net_struct.layers[2].b
            x = self.net_struct.x
            p = []
            p.extend(W1.reshape(1,W1.shape[0]*W1.shape[1],order = 'C')[0])
            p.extend(b1.reshape(1,b1.shape[0]*b1.shape[1],order = 'C')[0])
            p.extend(W2.reshape(1,W2.shape[0]*W2.shape[1],order = 'C')[0])
            p.extend(b2.reshape(1,b2.shape[0]*b2.shape[1],order = 'C')[0])
            old_p = p
            jac = []
            for i in range(0, x.shape[1]):
                xi = np.array([x[:,i]])
                xi = xi.transpose()
                ji = []
                for j in range(0, len(p)):
                    W1 = np.array(p[0:2*n]).reshape(n,2,order='C')
                    b1 = np.array(p[2*n:2*n+n]).reshape(n,1,order='C')
                    W2 = np.array(p[3*n:4*n]).reshape(1,n,order='C')
                    b2 = np.array(p[4*n:4*n+1]).reshape(1,1,order='C')
    
                    z2 = W1.dot(xi) + b1
                    a2 = self.actFun(z2)
                    z3 = W2.dot(a2) + b2
                    h1 = self.actFun(z3)
                    p[j] = p[j] + 0.00001
                    W1 = np.array(p[0:2*n]).reshape(n,2,order='C')
                    b1 = np.array(p[2*n:2*n+n]).reshape(n,1,order='C')
                    W2 = np.array(p[3*n:4*n]).reshape(1,n,order='C')
                    b2 = np.array(p[4*n:4*n+1]).reshape(1,1,order='C')
    
                    z2 = W1.dot(xi) + b1
                    a2 = self.actFun(z2)
                    z3 = W2.dot(a2) + b2
                    h = self.actFun(z3)
                    g = (h[0][0]-h1[0][0])/0.00001
                    ji.append(g)
                jac.append(ji)
                p = old_p
            return jac
        
        def jjje(self):
            '''计算jj与je'''
            layer_num = len(self.net_struct.layers)
            e = self.net_struct.y - self.net_struct.layers[layer_num - 1].output_val
            e = e.transpose()
            j = self.jacobian()
            #check gradient
            #j1 = -np.array(self.gradCheck())
            #jk = j.reshape(1,j.shape[0]*j.shape[1])
            #jk1 = j1.reshape(1,j1.shape[0]*j1.shape[1])
            #plt.plot(jk[0])
            #plt.plot(jk1[0],'.')
            #plt.show()
            jj = j.transpose().dot(j)
            je = -j.transpose().dot(e)
            return[jj, je]
        
        def lm(self):
            '''Levenberg-Marquardt训练算法'''
            mu = self.mu
            beta = self.beta
            n_iter = self.n_iter
            tol = self.tol
            y = self.net_struct.y
            layer_num = len(self.net_struct.layers)
            self.forward()
            pred =  self.net_struct.layers[layer_num - 1].output_val
            pref = self.perfermance(y, pred)
            for i in range(0, n_iter):
                print('iter:',i, 'error:', pref)
                #1) 第一步: 
                if(pref < tol):
                    break
                #2) 第二步:
                self.backward()
                self.parDeriv()
                [jj, je] = self.jjje()
                while(1):
                    #3) 第三步: 
                    A = jj + mu * np.diag(np.ones(jj.shape[0]))
                    delta_w_b = pinv(A).dot(je)
                    #4) 第四步:
                    old_net_struct = copy.deepcopy(self.net_struct)
                    self.updataNetStruct(delta_w_b)
                    self.forward()
                    pred1 =  self.net_struct.layers[layer_num - 1].output_val
                    pref1 = self.perfermance(y, pred1)
                    if (pref1 < pref):
                        mu = mu / beta
                        pref = pref1
                        break
                    if pref - pref1 ==0: # 1e-12: # 修改:若几乎不变了,就停止
                        return
                    mu = mu * beta
                    self.net_struct = copy.deepcopy(old_net_struct)
        
        def updataNetStruct(self, delta_w_b):
            '''更新网络权重及阈值'''
            layer_num = len(self.net_struct.layers)
            index = 0
            for i in range(1, layer_num):
                before_layer = self.net_struct.layers[i - 1]
                curr_layer = self.net_struct.layers[i]
                w_num = before_layer.neure_number * curr_layer.neure_number
                b_num = curr_layer.neure_number
                w = delta_w_b[index : index + w_num]
                w = w.reshape(curr_layer.neure_number, before_layer.neure_number, order='C')
                index = index + w_num
                b = delta_w_b[index : index + b_num]
                index = index + b_num
                curr_layer.w += w
                curr_layer.b += b
        
        def perfermance(self, y, pred):
            '''性能函数'''
            error = y - pred
            return norm(error) / len(y)
    
            
            
    # 以下函数为测试样例      
    def plotSamples(n = 40):
        x = np.array([np.linspace(0, 3, n)])
        x = x.repeat(n, axis = 0)
        y = x.transpose()
        z = np.zeros((n, n))
        for i in range(0, x.shape[0]):
            for j in range(0, x.shape[1]):
                z[i][j] = sampleFun(x[i][j], y[i][j])
        fig = plt.figure()
        ax = fig.gca(projection='3d')
        surf = ax.plot_surface(x, y, z, cmap='autumn', cstride=2, rstride=2)
        ax.set_xlabel("X-Label")
        ax.set_ylabel("Y-Label")
        ax.set_zlabel("Z-Label")
        plt.show()
    
    def sinSamples(n):
        x = np.array([np.linspace(-0.5, 0.5, n)])
        #x = x.repeat(n, axis = 0)
        y = x + 0.2
        z = np.zeros((n, 1))
        for i in range(0, x.shape[1]):
            z[i] = np.sin(x[0][i] * y[0][i])
        X = np.zeros((n, 2))
        n = 0
        for xi, yi in zip(x.transpose(), y.transpose()):
            X[n][0] = xi
            X[n][1] = yi
            n = n + 1
        # print(x.shape, y.shape)
        # print(X.shape, z.shape)
        #return X, z.transpose()
        return X, z.reshape(-1,)
    
    def peaksSamples(n):
        x = np.array([np.linspace(-3, 3, n)])
        x = x.repeat(n, axis = 0)
        y = x.transpose()
        z = np.zeros((n, n))
        for i in range(0, x.shape[0]):
            for j in range(0, x.shape[1]):
                z[i][j] = sampleFun(x[i][j], y[i][j])
        X = np.zeros((n*n, 2))
        x_list = x.reshape(n*n,1 )
        y_list = y.reshape(n*n,1)
        z_list = z.reshape(n*n,1)
        n = 0
        for xi, yi in zip(x_list, y_list):
            X[n][0] = xi
            X[n][1] = yi
            n = n + 1
        # print(x.shape, y.shape)
        # print(X.shape, z.shape, z_list.shape, z_list.transpose().shape)
        #return X,z_list.transpose()
        return X, z.reshape(-1,)
    
        
    def sampleFun( x, y):
        z =  3*pow((1-x),2) * exp(-(pow(x,2)) - pow((y+1),2))  - 10*(x/5 - pow(x, 3) - pow(y, 5)) * exp(-pow(x, 2) - pow(y, 2)) - 1/3*exp(-pow((x+1), 2) - pow(y, 2)) 
        return z
    
    
    
    
    
    def test():
        active_fun_list = ['sigm','sigm','sigm']# 【必须】设置【各】隐层的激活函数类型,可以设置为tanh,radb,tanh,line类型,如果不显式的设置最后一层为line 
        ns = NetStruct(2, [10, 10, 10], 1, active_fun_list) # 确定神经网络结构,中间两个隐层各10个神经元
        nn = NeuralNetwork(ns) # 神经网络类实例化
        
        #[X, z] = peaksSamples(20) # 产生训练数据
        [X, z] = sinSamples(20)  # 第二个训练数据
        nn.fit(X, z) # 训练!!!!!!
        
        #[X0, z0] = peaksSamples(40) # 产生测试数据
        [X0, z0] = sinSamples(40)  # 第二个测试数据
        z1 = nn.predict(X0) # 预测!!!!!!
    
        fig  = plt.figure()
        ax = fig.add_subplot(111)
        ax.plot(z0)      # 画出真实值 real data
        ax.plot(z1,'r.') # 画出预测值 predict data
        plt.legend(('real data', 'predict data'))
        plt.show()
        
        
    def test_lotto():
        import lotto
        import kline
        lt = lotto.Lotto()
        lt.make_real_history()
        series = lt.mod(7, m=2)
        
        kl = kline.KLine(series)
        X, y, x_last = kl.data_for_predict()
        
        active_fun_list = ['sigm','sigm','sigm']# 【必须】设置【各】隐层的激活函数类型,可以设置为tanh,radb,tanh,line类型,如果不显式的设置最后一层为line 
        ns = NetStruct(4, [10, 10], 1, active_fun_list) # 确定神经网络结构,中间两个隐层各10个神经元
        nn = NeuralNetwork(ns) # 神经网络类实例化
    
        nn.fit(X, y)        # 训练!!!!!!
        y0 = nn.predict(X)  # 预测!!!!!!
        
        print(y==y0)
        print(nn.score(X, y)) # 0.63...
        
    
    def test3():
        X = np.array([[1,1,0],
                      [1,0,0],
                      [0,1,1],
                      [0,0,1]])
        y = np.array([1,-1,-1,1])
        
        active_fun_list = ['sigm','sigm']# 【必须】设置【各】隐层的激活函数类型,可以设置为tanh,radb,tanh,line类型,如果不显式的设置最后一层为line 
        ns = NetStruct(3, [10], 1, active_fun_list) # 确定神经网络结构,中间两个隐层各10个神经元
        nn = NeuralNetwork(ns) # 神经网络类实例化
    
        nn.fit(X, y)        # 训练!!!!!!
        y0 = nn.predict(X)  # 预测!!!!!!
        print(y==y0)
        print(nn.score(X, y)) # 0.63...
    
    # 测试
    if __name__ == '__main__':
        # test()
        #test_lotto()
        test3()
        plotSamples()
    
    
    
    
        
  • 相关阅读:
    scanf与scanf_s的区别
    PAT 1041 考试座位号
    PAT1018 锤子剪刀布
    Cookie
    JSP--原理
    多线程练习题
    Java线程--线程的同步与锁
    有关toString()和println(Object)
    Java Web请求和响应机制
    IO流
  • 原文地址:https://www.cnblogs.com/hhh5460/p/5415468.html
Copyright © 2011-2022 走看看