zoukankan      html  css  js  c++  java
  • Keras 之 自编码器(Auto Encoder)

    这里以 fashion_mnist 数据集为例,先写出自编码器的基本实现代码如下:

    import  os
    import  tensorflow as tf
    import  numpy as np
    from    tensorflow import keras
    from    tensorflow.keras import Sequential, layers
    from    PIL import Image
    from    matplotlib import pyplot as plt
    
    tf.random.set_seed(42)
    np.random.seed(42)
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    assert tf.__version__.startswith('2.')
    
    h_dim = 36
    batchsz = 512
    epochs = 1000
    lr = tf.linspace(0.001,0.001,epochs)
    buffersize = batchsz*5
    
    (x_train, y_train), (x_val, y_val) = keras.datasets.fashion_mnist.load_data()
    x_train, x_val = x_train.astype(np.float32) / 255., x_val.astype(np.float32) / 255.
    
    train_db = tf.data.Dataset.from_tensor_slices(x_train)
    train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
    val_db = tf.data.Dataset.from_tensor_slices(x_val)
    val_db = val_db.batch(batchsz)
    
    class AutoEncoder(keras.Model):
    
        def __init__(self,input_shape,hidden_list,activation=tf.nn.relu):
            super(AutoEncoder, self).__init__()
            
            # Encoders
            center = hidden_list.pop()
            self.encoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list] 
                                    + [layers.Dense(center)])
    
            # Decoders
            hidden_list.reverse()
            self.decoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list]
                                    + [layers.Dense(input_shape)])
    
    
        def call(self, inputs, training=None):
            # [b, 784] => [b, 10]
            h = self.encoder(inputs)
            # [b, 10] => [b, 784]
            x_hat = self.decoder(h)
            return x_hat
    
    def train(train_db,val_db,input_shape=784):
        model = AutoEncoder(input_shape,[392,196,98,36])
        model.build(input_shape=(None, input_shape))
        model.summary()
    
        train_list = []
        val_list = []
        for epoch in range(epochs):
            optimizer = tf.optimizers.Adam(lr=lr[epoch])
            train_losses = 0
            val_losses = 0
            for step, x in enumerate(train_db):
                x = tf.reshape(x, [-1, input_shape])
                with tf.GradientTape() as tape:
                    x_rec_logits = tf.sigmoid(model(x))
                    rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
                    rec_loss = tf.reduce_mean(rec_loss)
                train_losses += rec_loss
                grads = tape.gradient(rec_loss, model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.trainable_variables))
    
            for x in val_db:
                x = tf.reshape(x, [-1, input_shape])
                x_rec_logits = tf.sigmoid(model(x))
                rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
                rec_loss = tf.reduce_mean(rec_loss)
                val_losses += rec_loss
    
            print(epoch,"train_losses :",float(train_losses),"val_losses :",float(val_losses))
    
            train_list.append(train_losses)
            val_list.append(val_losses)
    
        model.save('/tmp/model')
    
        x = [i for i in range(0, epochs)]
        # 绘制曲线
        plt.figure()
        plt.plot(x, train_list, color='blue', label='vaildation')
        plt.plot(x, val_list, color='red', label='training')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.show()
        plt.close()
    
    def save_images(imgs, name, shape=(32,32)):
        new_im = Image.new('L', (28*shape[0], 28*shape[1]))
    
        index = 0
        for i in range(0, 28*shape[0], 28):
            for j in range(0, 28*shape[1], 28):
                im = imgs[index]
                im = Image.fromarray(im, mode='L')
                new_im.paste(im, (i, j))
                index += 1
    
        new_im.save(name)
    
    
    def showImage(dataset,input_shape=784):
        model = tf.keras.models.load_model('/tmp/model',compile=False)
    
        for step,x in enumerate(val_db):
            x_hat = tf.sigmoid(model(tf.reshape(x, [-1, input_shape])))
            x_hat = tf.reshape(x_hat, [-1, 28, 28])
            x_concat = tf.concat([x, x_hat], axis=0)
            if(x_concat.shape[0] < batchsz * 2):
                break
            x_concat = x_concat.numpy() * 255.
            x_concat = x_concat.astype(np.uint8)
            shape=(int(tf.sqrt(batchsz*2.)),int(tf.sqrt(batchsz*2.)))
            save_images(x_concat, 'ae_images/rec_epoch_%d.png'%step,shape)
    
    train(train_db,val_db)
    showImage(val_db)
    

    训练曲线如下:

    在这里插入图片描述
    下面写出比较适合博主自己使用的自编码器实现方法,包括:csv文件读取,数据标准化,数据分割,转换为 tf.Dataset 类型,自编码器模型(keras 实现),模型训练,模型导入和保存,变量导入和保存:

    import  os
    import pickle
    import numpy as np
    import pandas as pd
    from math import ceil
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import Sequential, layers
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import StandardScaler
    
    from PIL import Image
    from matplotlib import pyplot as plt
    
    tf.random.set_seed(42)
    np.random.seed(42)
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    assert tf.__version__.startswith('2.')
    
    def get_dataset_to_pandas(file_name, dropList=[]):
    	dataset = pd.read_csv(file_name)
    	for drop_str in dropList:
    		dataset = dataset.drop(drop_str,axis=1)
    	return dataset
    
    def pre_scaler(dataset, type_str = "std"):
    	if type_str == "minmax":
    		scaler = MinMaxScaler()
    	elif type_str == "std":
    		scaler = StandardScaler()
    	else : 
    		return None
    	scaler.fit(dataset)
    	return scaler,scaler.transform(dataset)
    
    def to_tf_dataset(dataset, batch_size = None, shuffle_buffer_size = None, repeat_size = None):
    	dataset = tf.data.Dataset.from_tensor_slices(dataset)
    	if batch_size != None:
    		dataset = dataset.batch(batch_size)
    	if shuffle_buffer_size != None:
    		dataset = dataset.shuffle(shuffle_buffer_size)
    	if repeat_size != None:
    		dataset = dataset.repeat(repeat_size)
    	return dataset
    
    def train_test_split(dataset, test_ratio = 0.3, seed = 42):
    	if seed:
    		np.random.seed(seed)
    	shuffle_index = np.random.permutation(len(dataset))
    	test_size = ceil(len(dataset) * test_ratio)
    	test_index = shuffle_index[:test_size]
    	train_index = shuffle_index[test_size:]
    	dataset_train = dataset[train_index]
    	dataset_test = dataset[test_index]
    	return dataset_train, dataset_test
    
    class AutoEncoder(keras.Model):
    
    	def __init__(self,input_shape,hidden_list,activation=tf.nn.relu):
    		super(AutoEncoder, self).__init__()
    		
    		# Encoders
    		center = hidden_list.pop()
    		self.encoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list] 
    								+ [layers.Dense(center)])
    
    		# Decoders
    		hidden_list.reverse()
    		self.decoder = Sequential([layers.Dense(num, activation=tf.nn.relu) for num in hidden_list]
    								+ [layers.Dense(input_shape)])
    
    
    	def call(self, inputs, training=None):
    		# [b, 784] => [b, 10]
    		h = self.encoder(inputs)
    		# [b, 10] => [b, 784]
    		x_hat = self.decoder(h)
    		return x_hat
    
    def get_auto_encoder(dataset,input_shape,hidden_list,epochs,lr=0.001,activation=tf.nn.relu):
    	(train_db,val_db) = dataset
    
    	model = AutoEncoder(input_shape,hidden_list,activation)
    	model.build(input_shape=(None, input_shape))
    	model.summary()
    
    	train_list = []
    	val_list = []
    	optimizer = tf.optimizers.Adam(lr=lr)
    	for epoch in range(epochs):
    		
    		train_losses = 0
    		val_losses = 0
    		for step, x in enumerate(train_db):
    			x = tf.reshape(x, [-1, input_shape])
    			with tf.GradientTape() as tape:
    				x_rec_logits = tf.sigmoid(model(x))
    				rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
    				rec_loss = tf.reduce_mean(rec_loss)
    			train_losses += rec_loss
    			grads = tape.gradient(rec_loss, model.trainable_variables)
    			optimizer.apply_gradients(zip(grads, model.trainable_variables))
    
    		for x in val_db:
    			x = tf.reshape(x, [-1, input_shape])
    			x_rec_logits = tf.sigmoid(model(x))
    			rec_loss = tf.losses.mean_squared_error(x, x_rec_logits)
    			rec_loss = tf.reduce_mean(rec_loss)
    			val_losses += rec_loss
    
    		print(epoch,"train_losses :",float(train_losses),"val_losses :",float(val_losses))
    
    		train_list.append(train_losses)
    		val_list.append(val_losses)
    
    	x = [i for i in range(0, epochs)]
    	# 绘制曲线
    	plt.figure()
    	plt.plot(x, train_list, color='blue', label='vaildation')
    	plt.plot(x, val_list, color='red', label='training')
    	plt.xlabel('Epoch')
    	plt.ylabel('Accuracy')
    	plt.legend()
    	plt.show()
    	plt.close()
    	return model
    
    def variable_save(variable, file_name):	
    	data_output = open(file_name, 'wb')
    	pickle.dump(variable,data_output)
    	data_output.close()
    
    def variable_load(file_name):	
    	data_input = open(file_name, 'rb')
    	variable = pickle.load(data_input)
    	data_input.close()
    	return variable
    
    def model_save(model, file_name):
    	model.save(file_name)
    
    def model_load(file_name, is_compile=False):
    	return tf.keras.models.load_model(file_name, compile=is_compile)
    
    if __name__ == '__main__':
    	dataset = get_dataset_to_pandas("walk1.csv", ["Loss","TimeStamp","LT_Foot_TimeStamp","RT_Foot_TimeStamp",'Chest_TimeStamp'])
    	scaler, dataset = pre_scaler(dataset,"minmax")
    	dataset_train, dataset_test = train_test_split(dataset)
    	(dataset_train, dataset_test) = (to_tf_dataset(data,batch_size=20,shuffle_buffer_size=100) for data in [dataset_train, dataset_test])
    	model = get_auto_encoder((dataset_train, dataset_test),input_shape=18,hidden_list=[9,4],epochs=100,lr=0.01)
    	variable_save(scaler,'./auto_model/scaler')
    	model_save(model,'./auto_model')
    
    	# save model and scaler
    	# scaler = variable_load('./auto_model/scaler')
    	# model = model_load('./auto_model')
    	# print(model,'
    ',scaler)
    

    训练完模型之后,来使用模型计算 loss 进而判断是否属于正样本数据集。one-class svm的检测函数效果类似。

    任世事无常,勿忘初心
  • 相关阅读:
    jstl 部分标签
    Maven pom.xml 元素配置说明(一)
    spring 参数绑定
    mysql 索引
    ArrayList和HashSet的Contains()方法(转)
    每日记载内容总结44
    剑指offer42:不用加减乘除做加法
    动态规划常见题型
    华为机试-统计每个月兔子的总数
    华为机试-字符串合并处理
  • 原文地址:https://www.cnblogs.com/FlameBlog/p/14714972.html
Copyright © 2011-2022 走看看