# coding-utf-8 import numpy as np import pandas as pd import matplotlib.pyplot as plt import random import pickle class LR: def __init__(self, data, learning_rate=0.001, iter_max=100, batch_size=2): self.data = data self.learning_rate = learning_rate self.iter_max = iter_max self.batch_size = batch_size self.process_data() # 数据标准化 def standard_scaler(self, data): data1 = data[:, :-1] mean = np.mean(data1, axis=0) std = np.std(data1, axis=0) data1 = (data1 - mean) / std return np.hstack((data1, data[:, -1:])) def process_data(self): data = np.array(self.data) # data = self.standard_scaler(data) one = np.ones((data.shape[0], 1)) self.data = np.hstack((one, data)) self.m = self.data.shape[0] # 样本总数量 self.n = self.data.shape[1] - 1 # 特征总数量 def model(self, data): return np.dot(data[:, :-1], self.theta) def mse(self): predict = np.dot(self.data[:, :-1], self.theta) return np.sum((predict - self.data[:, -1:]) ** 2) / len(predict) def cal_grad(self, batch_data, predict, y): ''' 梯度更新可以用矩阵相乘计算 #grad = np.zeros(self.theta.shape) #grad n*1 data 10*n data的转置 n*10 predict 10*1 #for i in range(len(grad)): #grad[i] = np.mean((predict - y)*self.data[:,i]) #return grad''' return np.dot(batch_data[:, :-1].T, predict - y) / len(y) @staticmethod def draw(list_data): plt.plot(range(len(list_data)), list_data) plt.show() def train(self): loss_list = [] n = 1 # 迭代次数 epoch = 1 # 1、初始化theta self.theta = np.ones((self.n, 1)) ## 2、计算误差 loss = self.mse() best_loss = loss loss_list.append(loss) b = len(self.data) // self.batch_size # 向下取整 获取一轮(epoch)的迭代次数 while True: # 打乱数据 self.data = np.array(random.sample(self.data.tolist(), len(self.data))) # 3、求梯度 for i in range(b): batch_data = self.data[i * self.batch_size:(i + 1) * self.batch_size] predict = self.model(batch_data) grad = self.cal_grad(batch_data, predict, batch_data[:, -1:]) # 4、更新theta self.theta = self.theta - self.learning_rate * grad # 5、计算误差 loss = self.mse() loss_list.append(loss) if loss < best_loss: # 保存模型 best_theta = self.theta if n % 100 == 0: print('轮次:{},迭代次数:{},损失:{}'.format(epoch, n, loss)) n += 1 # if 判断停止条件 满足则跳出训练 if n > self.iter_max: break epoch += 1 # 持久化模型 写入磁盘或者数据库 with open('model.pt', 'wb') as f: pickle.dump(best_theta, f) self.draw(loss_list) if __name__ == "__main__": data = pd.read_excel('C:/Users/jiedada/Desktop/python/回归/lr.xlsx') lr = LR(data) lr.train()