zoukankan      html  css  js  c++  java
  • Keras 之 自编码器(Auto Encoder)

    这里以 fashion_mnist 数据集为例,先写出自编码器的基本实现代码如下:

    import  os
    import  tensorflow as tf
    import  numpy as np
    from    tensorflow import keras
    from    tensorflow.keras import Sequential, layers
    from    PIL import Image
    from    matplotlib import pyplot as plt
    
    tf.random.set_seed(42)
    np.random.seed(42)
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    assert tf.__version__.startswith('2.')
    
    h_dim = 36
    batchsz = 512
    epochs = 1000
    lr = tf.linspace(0.001,0.001,epochs)
    buffersize = batchsz*5
    
    (x_train, y_train), (x_val, y_val) = keras.datasets.fashion_mnist.load_data()
    x_train, x_val = x_train.astype(np.float32) / 255., x_val.astype(np.float32) / 255.
    
    train_db = tf.data.Dataset.from_tensor_slices(x_train)
    train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
    val_db = tf.data.Dataset.from_tensor_slices(x_val)
    val_db = val_db.batch(batchsz)
    
    class AutoEncoder(keras.Model):
    
        def __init__(self,input_shape,hidden_list,activation=tf.nn.relu):
            super(AutoEncoder, self).__init__()
            
            # Encoders
            center = hidden_list.pop()
            self.encoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list] 
                                    + [layers.Dense(center)])
    
            # Decoders
            hidden_list.reverse()
            self.decoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list]
                                    + [layers.Dense(input_shape)])
    
    
        def call(self, inputs, training=None):
            # [b, 784] => [b, 10]
            h = self.encoder(inputs)
            # [b, 10] => [b, 784]
            x_hat = self.decoder(h)
            return x_hat
    
    def train(train_db,val_db,input_shape=784):
        model = AutoEncoder(input_shape,[392,196,98,36])
        model.build(input_shape=(None, input_shape))
        model.summary()
    
        train_list = []
        val_list = []
        for epoch in range(epochs):
            optimizer = tf.optimizers.Adam(lr=lr[epoch])
            train_losses = 0
            val_losses = 0
            for step, x in enumerate(train_db):
                x = tf.reshape(x, [-1, input_shape])
                with tf.GradientTape() as tape:
                    x_rec_logits = tf.sigmoid(model(x))
                    rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
                    rec_loss = tf.reduce_mean(rec_loss)
                train_losses += rec_loss
                grads = tape.gradient(rec_loss, model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.trainable_variables))
    
            for x in val_db:
                x = tf.reshape(x, [-1, input_shape])
                x_rec_logits = tf.sigmoid(model(x))
                rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
                rec_loss = tf.reduce_mean(rec_loss)
                val_losses += rec_loss
    
            print(epoch,"train_losses :",float(train_losses),"val_losses :",float(val_losses))
    
            train_list.append(train_losses)
            val_list.append(val_losses)
    
        model.save('/tmp/model')
    
        x = [i for i in range(0, epochs)]
        # 绘制曲线
        plt.figure()
        plt.plot(x, train_list, color='blue', label='vaildation')
        plt.plot(x, val_list, color='red', label='training')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.show()
        plt.close()
    
    def save_images(imgs, name, shape=(32,32)):
        new_im = Image.new('L', (28*shape[0], 28*shape[1]))
    
        index = 0
        for i in range(0, 28*shape[0], 28):
            for j in range(0, 28*shape[1], 28):
                im = imgs[index]
                im = Image.fromarray(im, mode='L')
                new_im.paste(im, (i, j))
                index += 1
    
        new_im.save(name)
    
    
    def showImage(dataset,input_shape=784):
        model = tf.keras.models.load_model('/tmp/model',compile=False)
    
        for step,x in enumerate(val_db):
            x_hat = tf.sigmoid(model(tf.reshape(x, [-1, input_shape])))
            x_hat = tf.reshape(x_hat, [-1, 28, 28])
            x_concat = tf.concat([x, x_hat], axis=0)
            if(x_concat.shape[0] < batchsz * 2):
                break
            x_concat = x_concat.numpy() * 255.
            x_concat = x_concat.astype(np.uint8)
            shape=(int(tf.sqrt(batchsz*2.)),int(tf.sqrt(batchsz*2.)))
            save_images(x_concat, 'ae_images/rec_epoch_%d.png'%step,shape)
    
    train(train_db,val_db)
    showImage(val_db)
    

    训练曲线如下:

    在这里插入图片描述
    下面写出比较适合博主自己使用的自编码器实现方法,包括:csv文件读取,数据标准化,数据分割,转换为 tf.Dataset 类型,自编码器模型(keras 实现),模型训练,模型导入和保存,变量导入和保存:

    import  os
    import pickle
    import numpy as np
    import pandas as pd
    from math import ceil
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import Sequential, layers
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import StandardScaler
    
    from PIL import Image
    from matplotlib import pyplot as plt
    
    tf.random.set_seed(42)
    np.random.seed(42)
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    assert tf.__version__.startswith('2.')
    
    def get_dataset_to_pandas(file_name, dropList=[]):
    	dataset = pd.read_csv(file_name)
    	for drop_str in dropList:
    		dataset = dataset.drop(drop_str,axis=1)
    	return dataset
    
    def pre_scaler(dataset, type_str = "std"):
    	if type_str == "minmax":
    		scaler = MinMaxScaler()
    	elif type_str == "std":
    		scaler = StandardScaler()
    	else : 
    		return None
    	scaler.fit(dataset)
    	return scaler,scaler.transform(dataset)
    
    def to_tf_dataset(dataset, batch_size = None, shuffle_buffer_size = None, repeat_size = None):
    	dataset = tf.data.Dataset.from_tensor_slices(dataset)
    	if batch_size != None:
    		dataset = dataset.batch(batch_size)
    	if shuffle_buffer_size != None:
    		dataset = dataset.shuffle(shuffle_buffer_size)
    	if repeat_size != None:
    		dataset = dataset.repeat(repeat_size)
    	return dataset
    
    def train_test_split(dataset, test_ratio = 0.3, seed = 42):
    	if seed:
    		np.random.seed(seed)
    	shuffle_index = np.random.permutation(len(dataset))
    	test_size = ceil(len(dataset) * test_ratio)
    	test_index = shuffle_index[:test_size]
    	train_index = shuffle_index[test_size:]
    	dataset_train = dataset[train_index]
    	dataset_test = dataset[test_index]
    	return dataset_train, dataset_test
    
    class AutoEncoder(keras.Model):
    
    	def __init__(self,input_shape,hidden_list,activation=tf.nn.relu):
    		super(AutoEncoder, self).__init__()
    		
    		# Encoders
    		center = hidden_list.pop()
    		self.encoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list] 
    								+ [layers.Dense(center)])
    
    		# Decoders
    		hidden_list.reverse()
    		self.decoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list]
    								+ [layers.Dense(input_shape)])
    
    
    	def call(self, inputs, training=None):
    		# [b, 784] => [b, 10]
    		h = self.encoder(inputs)
    		# [b, 10] => [b, 784]
    		x_hat = self.decoder(h)
    		return x_hat
    
    def get_auto_encoder(dataset,input_shape,hidden_list,epochs,lr=0.001,activation=tf.nn.relu):
    	(train_db,val_db) = dataset
    
    	model = AutoEncoder(input_shape,hidden_list,activation)
    	model.build(input_shape=(None, input_shape))
    	model.summary()
    
    	train_list = []
    	val_list = []
    	optimizer = tf.optimizers.Adam(lr=lr)
    	for epoch in range(epochs):
    		
    		train_losses = 0
    		val_losses = 0
    		for step, x in enumerate(train_db):
    			x = tf.reshape(x, [-1, input_shape])
    			with tf.GradientTape() as tape:
    				x_rec_logits = tf.sigmoid(model(x))
    				rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
    				rec_loss = tf.reduce_mean(rec_loss)
    			train_losses += rec_loss
    			grads = tape.gradient(rec_loss, model.trainable_variables)
    			optimizer.apply_gradients(zip(grads, model.trainable_variables))
    
    		for x in val_db:
    			x = tf.reshape(x, [-1, input_shape])
    			x_rec_logits = tf.sigmoid(model(x))
    			rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
    			rec_loss = tf.reduce_mean(rec_loss)
    			val_losses += rec_loss
    
    		print(epoch,"train_losses :",float(train_losses),"val_losses :",float(val_losses))
    
    		train_list.append(train_losses)
    		val_list.append(val_losses)
    
    	x = [i for i in range(0, epochs)]
    	# 绘制曲线
    	plt.figure()
    	plt.plot(x, train_list, color='blue', label='vaildation')
    	plt.plot(x, val_list, color='red', label='training')
    	plt.xlabel('Epoch')
    	plt.ylabel('Accuracy')
    	plt.legend()
    	plt.show()
    	plt.close()
    	return model
    
    def variable_save(variable, file_name):	
    	data_output = open(file_name, 'wb')
    	pickle.dump(variable,data_output)
    	data_output.close()
    
    def variable_load(file_name):	
    	data_input = open(file_name, 'rb')
    	variable = pickle.load(data_input)
    	data_input.close()
    	return variable
    
    def model_save(model, file_name):
    	model.save(file_name)
    
    def model_load(file_name, is_compile=False):
    	return tf.keras.models.load_model(file_name, compile=is_compile)
    
    if __name__ == '__main__':
    	dataset = get_dataset_to_pandas("walk1.csv", ["Loss","TimeStamp","LT_Foot_TimeStamp","RT_Foot_TimeStamp",'Chest_TimeStamp'])
    	scaler, dataset = pre_scaler(dataset,"minmax")
    	dataset_train, dataset_test = train_test_split(dataset)
    	(dataset_train, dataset_test) = (to_tf_dataset(data,batch_size=20,shuffle_buffer_size=100) for data in [dataset_train, dataset_test])
    	model = get_auto_encoder((dataset_train, dataset_test),input_shape=18,hidden_list=[9,4],epochs=100,lr=0.01)
    	variable_save(scaler,'./auto_model/scaler')
    	model_save(model,'./auto_model')
    
    	# save model and scaler
    	# scaler = variable_load('./auto_model/scaler')
    	# model = model_load('./auto_model')
    	# print(model,'
    ',scaler)
    

    训练完模型之后,来使用模型计算 loss 进而判断是否属于正样本数据集。one-class svm的检测函数效果类似。

    任世事无常,勿忘初心
  • 相关阅读:
    python两个装饰器的运算顺序
    python中私有属性的访问
    python中的方法使用
    创业,宁愿单兵作战也不要参在拙劣的团队里继续寻觅队友
    项目经理问:为什么总是只有我在加班 – 挂包袱现象
    我该怎么安排下属的工作-项目经理如何分配任务
    项目经理自己要写代码吗?
    管理系统书籍《从程序员到项目经理》 从程序员到项目经理(一)
    宗宁:赚快钱的那些坑。。(转载)
    java 实现二分法
  • 原文地址:https://www.cnblogs.com/FlameBlog/p/14714972.html
Copyright © 2011-2022 走看看