zoukankan      html  css  js  c++  java
  • 使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

    先说实验成功的代码:

    git clone https://github.com/tkwoo/anogan-keras.git

    mkdir weights

    python main.py --mode train

    即可看到效果了!

    核心代码:main.py

    from __future__ import print_function
    
    import matplotlib
    matplotlib.use('Qt5Agg')
    
    import os
    import cv2
    import numpy as np
    import matplotlib.pyplot as plt
    from keras.datasets import mnist
    import argparse
    import anogan
    
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--img_idx', type=int, default=14)
    parser.add_argument('--label_idx', type=int, default=7)
    parser.add_argument('--mode', type=str, default='test', help='train, test')
    args = parser.parse_args()
    
    ### 0. prepare data
    (X_train, y_train), (X_test, y_test) = mnist.load_data()
    X_train = (X_train.astype(np.float32) - 127.5) / 127.5
    X_test = (X_test.astype(np.float32) - 127.5) / 127.5
    
    X_train = X_train[:,:,:,None]
    X_test = X_test[:,:,:,None]
    
    X_test_original = X_test.copy()
    
    X_train = X_train[y_train==1]
    X_test = X_test[y_test==1]
    print ('train shape:', X_train.shape)
    
    ### 1. train generator & discriminator
    if args.mode == 'train':
        Model_d, Model_g = anogan.train(64, X_train)
    
    ### 2. test generator
    generated_img = anogan.generate(25)
    img = anogan.combine_images(generated_img)
    img = (img*127.5)+127.5
    img = img.astype(np.uint8)
    img = cv2.resize(img, None, fx=4, fy=4, interpolation=cv2.INTER_NEAREST)
    
    ### opencv view
    # cv2.namedWindow('generated', 0)
    # cv2.resizeWindow('generated', 256, 256)
    # cv2.imshow('generated', img)
    # cv2.imwrite('result_latent_10/generator.png', img)
    # cv2.waitKey()
    
    ### plt view
    # plt.figure(num=0, figsize=(4, 4))
    # plt.title('trained generator')
    # plt.imshow(img, cmap=plt.cm.gray)
    # plt.show()
    
    # exit()
    
    ### 3. other class anomaly detection
    
    def anomaly_detection(test_img, g=None, d=None):
        model = anogan.anomaly_detector(g=g, d=d)
        ano_score, similar_img = anogan.compute_anomaly_score(model, test_img.reshape(1, 28, 28, 1), iterations=500, d=d)
    
        # anomaly area, 255 normalization
        np_residual = test_img.reshape(28,28,1) - similar_img.reshape(28,28,1)
        np_residual = (np_residual + 2)/4
    
        np_residual = (255*np_residual).astype(np.uint8)
        original_x = (test_img.reshape(28,28,1)*127.5+127.5).astype(np.uint8)
        similar_x = (similar_img.reshape(28,28,1)*127.5+127.5).astype(np.uint8)
    
        original_x_color = cv2.cvtColor(original_x, cv2.COLOR_GRAY2BGR)
        residual_color = cv2.applyColorMap(np_residual, cv2.COLORMAP_JET)
        show = cv2.addWeighted(original_x_color, 0.3, residual_color, 0.7, 0.)
    
        return ano_score, original_x, similar_x, show
    
    
    ### compute anomaly score - sample from test set
    # test_img = X_test_original[y_test==1][30]
    
    ### compute anomaly score - sample from strange image
    # test_img = X_test_original[y_test==0][30]
    
    ### compute anomaly score - sample from strange image
    img_idx = args.img_idx
    label_idx = args.label_idx
    test_img = X_test_original[y_test==label_idx][img_idx]
    # test_img = np.random.uniform(-1,1, (28,28,1))
    
    start = cv2.getTickCount()
    score, qurey, pred, diff = anomaly_detection(test_img)
    time = (cv2.getTickCount() - start) / cv2.getTickFrequency() * 1000
    print ('%d label, %d : done'%(label_idx, img_idx), '%.2f'%score, '%.2fms'%time)
    # cv2.imwrite('./qurey.png', qurey)
    # cv2.imwrite('./pred.png', pred)
    # cv2.imwrite('./diff.png', diff)
    
    ## matplot view
    plt.figure(1, figsize=(3, 3))
    plt.title('query image')
    plt.imshow(qurey.reshape(28,28), cmap=plt.cm.gray)
    
    print("anomaly score : ", score)
    plt.figure(2, figsize=(3, 3))
    plt.title('generated similar image')
    plt.imshow(pred.reshape(28,28), cmap=plt.cm.gray)
    
    plt.figure(3, figsize=(3, 3))
    plt.title('anomaly detection')
    plt.imshow(cv2.cvtColor(diff,cv2.COLOR_BGR2RGB))
    plt.show()
    
    
    ### 4. tsne feature view
    
    ### t-SNE embedding 
    ### generating anomaly image for test (radom noise image)
    
    from sklearn.manifold import TSNE
    
    random_image = np.random.uniform(0, 1, (100, 28, 28, 1))
    print("random noise image")
    plt.figure(4, figsize=(2, 2))
    plt.title('random noise image')
    plt.imshow(random_image[0].reshape(28,28), cmap=plt.cm.gray)
    
    # intermidieate output of discriminator
    model = anogan.feature_extractor()
    feature_map_of_random = model.predict(random_image, verbose=1)
    feature_map_of_minist = model.predict(X_test_original[y_test != 1][:300], verbose=1)
    feature_map_of_minist_1 = model.predict(X_test[:100], verbose=1)
    
    # t-SNE for visulization
    output = np.concatenate((feature_map_of_random, feature_map_of_minist, feature_map_of_minist_1))
    output = output.reshape(output.shape[0], -1)
    anomaly_flag = np.array([1]*100+ [0]*300)
    
    X_embedded = TSNE(n_components=2).fit_transform(output)
    plt.figure(5)
    plt.title("t-SNE embedding on the feature representation")
    plt.scatter(X_embedded[:100,0], X_embedded[:100,1], label='random noise(anomaly)')
    plt.scatter(X_embedded[100:400,0], X_embedded[100:400,1], label='mnist(anomaly)')
    plt.scatter(X_embedded[400:,0], X_embedded[400:,1], label='mnist(normal)')
    plt.legend()
    plt.show()
    

     anogan.py

    from __future__ import print_function
    from keras.models import Sequential, Model
    from keras.layers import Input, Reshape, Dense, Dropout, MaxPooling2D, Conv2D, Flatten
    from keras.layers import Conv2DTranspose, LeakyReLU
    from keras.layers.core import Activation
    from keras.layers.normalization import BatchNormalization
    from keras.optimizers import Adam, RMSprop
    from keras import backend as K
    from keras import initializers
    import tensorflow as tf
    import numpy as np
    from tqdm import tqdm
    import cv2
    import math
    
    from keras.utils. generic_utils import Progbar
    
    ### combine images for visualization
    def combine_images(generated_images):
        num = generated_images.shape[0]
        width = int(math.sqrt(num))
        height = int(math.ceil(float(num)/width))
        shape = generated_images.shape[1:4]
        image = np.zeros((height*shape[0], width*shape[1], shape[2]),
                         dtype=generated_images.dtype)
        for index, img in enumerate(generated_images):
            i = int(index/width)
            j = index % width
            image[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1],:] = img[:, :, :]
        return image
    
    ### generator model define
    def generator_model():
        inputs = Input((10,))
        fc1 = Dense(input_dim=10, units=128*7*7)(inputs)
        fc1 = BatchNormalization()(fc1)
        fc1 = LeakyReLU(0.2)(fc1)
        fc2 = Reshape((7, 7, 128), input_shape=(128*7*7,))(fc1)
        up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
        conv1 = Conv2D(64, (3, 3), padding='same')(up1)
        conv1 = BatchNormalization()(conv1)
        conv1 = Activation('relu')(conv1)
        up2 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv1)
        conv2 = Conv2D(1, (5, 5), padding='same')(up2)
        outputs = Activation('tanh')(conv2)
        
        model = Model(inputs=[inputs], outputs=[outputs])
        return model
    
    ### discriminator model define
    def discriminator_model():
        inputs = Input((28, 28, 1))
        conv1 = Conv2D(64, (5, 5), padding='same')(inputs)
        conv1 = LeakyReLU(0.2)(conv1)
        pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
        conv2 = Conv2D(128, (5, 5), padding='same')(pool1)
        conv2 = LeakyReLU(0.2)(conv2)
        pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
        fc1 = Flatten()(pool2)
        fc1 = Dense(1)(fc1)
        outputs = Activation('sigmoid')(fc1)
        
        model = Model(inputs=[inputs], outputs=[outputs])
        return model
    
    ### d_on_g model for training generator
    def generator_containing_discriminator(g, d):
        d.trainable = False
        ganInput = Input(shape=(10,))
        x = g(ganInput)
        ganOutput = d(x)
        gan = Model(inputs=ganInput, outputs=ganOutput)
        # gan.compile(loss='binary_crossentropy', optimizer='adam')
        return gan
    
    def load_model():
        d = discriminator_model()
        g = generator_model()
        d_optim = RMSprop()
        g_optim = RMSprop(lr=0.0002)
        g.compile(loss='binary_crossentropy', optimizer=g_optim)
        d.compile(loss='binary_crossentropy', optimizer=d_optim)
        d.load_weights('./weights/discriminator.h5')
        g.load_weights('./weights/generator.h5')
        return g, d
    
    ### train generator and discriminator
    def train(BATCH_SIZE, X_train):
        
        ### model define
        d = discriminator_model()
        g = generator_model()
        d_on_g = generator_containing_discriminator(g, d)
        d_optim = RMSprop(lr=0.0004)
        g_optim = RMSprop(lr=0.0002)
        g.compile(loss='mse', optimizer=g_optim)
        d_on_g.compile(loss='mse', optimizer=g_optim)
        d.trainable = True
        d.compile(loss='mse', optimizer=d_optim)
        
    
        for epoch in range(10):
            print ("Epoch is", epoch)
            n_iter = int(X_train.shape[0]/BATCH_SIZE)
            progress_bar = Progbar(target=n_iter)
            
            for index in range(n_iter):
                # create random noise -> U(0,1) 10 latent vectors
                noise = np.random.uniform(0, 1, size=(BATCH_SIZE, 10))
    
                # load real data & generate fake data
                image_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]
                generated_images = g.predict(noise, verbose=0)
                
                # visualize training results
                if index % 20 == 0:
                    image = combine_images(generated_images)
                    image = image*127.5+127.5
                    cv2.imwrite('./result/'+str(epoch)+"_"+str(index)+".png", image)
    
                # attach label for training discriminator
                X = np.concatenate((image_batch, generated_images))
                y = np.array([1] * BATCH_SIZE + [0] * BATCH_SIZE)
                
                # training discriminator
                d_loss = d.train_on_batch(X, y)
    
                # training generator
                d.trainable = False
                g_loss = d_on_g.train_on_batch(noise, np.array([1] * BATCH_SIZE))
                d.trainable = True
    
                progress_bar.update(index, values=[('g',g_loss), ('d',d_loss)])
            print ('')
    
            # save weights for each epoch
            g.save_weights('weights/generator.h5', True)
            d.save_weights('weights/discriminator.h5', True)
        return d, g
    
    ### generate images
    def generate(BATCH_SIZE):
        g = generator_model()
        g.load_weights('weights/generator.h5')
        noise = np.random.uniform(0, 1, (BATCH_SIZE, 10))
        generated_images = g.predict(noise)
        return generated_images
    
    ### anomaly loss function 
    def sum_of_residual(y_true, y_pred):
        return K.sum(K.abs(y_true - y_pred))
    
    ### discriminator intermediate layer feautre extraction
    def feature_extractor(d=None):
        if d is None:
            d = discriminator_model()
            d.load_weights('weights/discriminator.h5') 
        intermidiate_model = Model(inputs=d.layers[0].input, outputs=d.layers[-7].output)
        intermidiate_model.compile(loss='binary_crossentropy', optimizer='rmsprop')
        return intermidiate_model
    
    ### anomaly detection model define
    def anomaly_detector(g=None, d=None):
        if g is None:
            g = generator_model()
            g.load_weights('weights/generator.h5')
        intermidiate_model = feature_extractor(d)
        intermidiate_model.trainable = False
        g = Model(inputs=g.layers[1].input, outputs=g.layers[-1].output)
        g.trainable = False
        # Input layer cann't be trained. Add new layer as same size & same distribution
        aInput = Input(shape=(10,))
        gInput = Dense((10), trainable=True)(aInput)
        gInput = Activation('sigmoid')(gInput)
        
        # G & D feature
        G_out = g(gInput)
        D_out= intermidiate_model(G_out)    
        model = Model(inputs=aInput, outputs=[G_out, D_out])
        model.compile(loss=sum_of_residual, loss_weights= [0.90, 0.10], optimizer='rmsprop')
        
        # batchnorm learning phase fixed (test) : make non trainable
        K.set_learning_phase(0)
        
        return model
    
    ### anomaly detection
    def compute_anomaly_score(model, x, iterations=500, d=None):
        z = np.random.uniform(0, 1, size=(1, 10))
        
        intermidiate_model = feature_extractor(d)
        d_x = intermidiate_model.predict(x)
    
        # learning for changing latent
        loss = model.fit(z, [x, d_x], batch_size=1, epochs=iterations, verbose=0)
        similar_data, _ = model.predict(z)
        
        loss = loss.history['loss'][-1]
        
        return loss, similar_data
    

     效果图:

     detect strange imager never seen!!! refer:https://github.com/yjucho1/anoGAN

    ## compute anomaly score - sample from strange image
    
    test_img = plt.imread('assets/test_img.png')
    test_img = test_img[:,:,0]
    
    model = anogan.anomaly_detector()
    ano_score, similar_img = anogan.compute_anomaly_score(model, test_img.reshape(1, 28, 28, 1))
    
    plt.figure(figsize=(2, 2))
    plt.imshow(test_img.reshape(28,28), cmap=plt.cm.gray)
    plt.show()
    print("anomaly score : " + str(ano_score))
    plt.figure(figsize=(2, 2))
    plt.imshow(test_img.reshape(28,28), cmap=plt.cm.gray)
    residual  = test_img.reshape(28,28) - similar_img.reshape(28, 28)
    plt.imshow(residual, cmap='jet', alpha=.5)
    plt.show()
    

    png

    anomaly score : 446.46844482421875
    

    png

    https://github.com/yjucho1/anoGAN

    from keras.models import Sequential, Model
    from keras.layers import Input, Reshape, Dense, Dropout, UpSampling2D, Conv2D, Flatten
    from keras.layers.advanced_activations import LeakyReLU
    from keras.optimizers import Adam
    from keras import backend as K
    from keras import initializers
    import tensorflow as tf
    import numpy as np
    from tqdm import tqdm
    
    def generator_model():
        generator = Sequential()
        generator.add(Dense(128*7*7, input_dim=100, kernel_initializer=initializers.RandomNormal(stddev=0.02)))
        generator.add(LeakyReLU(0.2))
        generator.add(Reshape((7, 7, 128)))
        generator.add(UpSampling2D(size=(2, 2)))
        generator.add(Conv2D(64, kernel_size=(5, 5), padding='same'))
        generator.add(LeakyReLU(0.2))
        generator.add(UpSampling2D(size=(2, 2)))
        generator.add(Conv2D(1, kernel_size=(5, 5), padding='same', activation='tanh'))
        generator.compile(loss='binary_crossentropy', optimizer='adam')
        return generator
    
    
    def discriminator_model():
        discriminator = Sequential()
        discriminator.add(Conv2D(64, kernel_size=(5, 5), strides=(2, 2), padding='same', input_shape=(28,28, 1), kernel_initializer=initializers.RandomNormal(stddev=0.02)))
        discriminator.add(LeakyReLU(0.2))
        discriminator.add(Dropout(0.3))
        discriminator.add(Conv2D(128, kernel_size=(5, 5), strides=(2, 2), padding='same'))
        discriminator.add(LeakyReLU(0.2))
        discriminator.add(Dropout(0.3))
        discriminator.add(Flatten())
        discriminator.add(Dense(1, activation='sigmoid'))
        discriminator.compile(loss='binary_crossentropy', optimizer='adam')
        return discriminator
    
    
    def generator_containing_discriminator(g, d):
        d.trainable = False
        ganInput = Input(shape=(100,))
        x = g(ganInput)
        ganOutput = d(x)
        gan = Model(inputs=ganInput, outputs=ganOutput)
        gan.compile(loss='binary_crossentropy', optimizer='adam')
        return gan
    
    def train(BATCH_SIZE, X_train):
        d = discriminator_model()
        print("#### discriminator ######")
        d.summary()
        g = generator_model()
        print("#### generator ######")
        g.summary()
        d_on_g = generator_containing_discriminator(g, d)
        d.trainable = True
        for epoch in tqdm(range(200)):
            for index in range(int(X_train.shape[0]/BATCH_SIZE)):
                noise = np.random.uniform(0, 1, size=(BATCH_SIZE, 100))
                image_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]
                generated_images = g.predict(noise, verbose=0)
                X = np.concatenate((image_batch, generated_images))
                y = np.array([1] * BATCH_SIZE + [0] * BATCH_SIZE)
                d_loss = d.train_on_batch(X, y)
                noise = np.random.uniform(0, 1, (BATCH_SIZE, 100))
                d.trainable = False
                g_loss = d_on_g.train_on_batch(noise, np.array([1] * BATCH_SIZE))
                d.trainable = True
            g.save_weights('assets/generator', True)
            d.save_weights('assets/discriminator', True)
        return d, g
    
    
    def generate(BATCH_SIZE):
        g = generator_model()
        g.load_weights('assets/generator')
        noise = np.random.uniform(0, 1, (BATCH_SIZE, 100))
        generated_images = g.predict(noise)
        return generated_images
    
    def sum_of_residual(y_true, y_pred):
        return tf.reduce_sum(abs(y_true - y_pred))
    
    def feature_extractor():
        d = discriminator_model()
        d.load_weights('assets/discriminator') 
        intermidiate_model = Model(inputs=d.layers[0].input, outputs=d.layers[-5].output)
        intermidiate_model.compile(loss='binary_crossentropy', optimizer='adam')
        return intermidiate_model
    
    def anomaly_detector():
        g = generator_model()
        g.load_weights('assets/generator')
        g.trainable = False
        intermidiate_model = feature_extractor()
        intermidiate_model.trainable = False
        
        aInput = Input(shape=(100,))
        gInput = Dense((100))(aInput)
        G_out = g(gInput)
        D_out= intermidiate_model(G_out)    
        model = Model(inputs=aInput, outputs=[G_out, D_out])
        model.compile(loss=sum_of_residual, loss_weights= [0.9, 0.1], optimizer='adam')
        return model
    
    def compute_anomaly_score(model, x):    
        z = np.random.uniform(0, 1, size=(1, 100))
        intermidiate_model = feature_extractor()
        d_x = intermidiate_model.predict(x)
        loss = model.fit(z, [x, d_x], epochs=500, verbose=0)
        similar_data, _ = model.predict(z)
        return loss.history['loss'][-1], similar_data
    

     

    GAN异常检测的一些实验

    要做基于GANomaly的异常检测实验,需要准备大量的OK样本和少量的NG样本。找不到合适的数据集怎么办?很简单,随便找个开源的分类数据集,将其中一个类别的样本当作异常类别,其他所有类别的样本当作正常样本即可,文章中的实验就是这么干的。具体试验结果如下:

    反正在效果上,GANomaly是超过了之前两种代表性的方法。此外,作者还做了性能对比的实验。事实上前面已经介绍了GANomaly的推断方法,就是一个简单的前向传播和一个对比阈值的过程,因此速度非常快。具体结果如下:

    可以看出,计算性能上,GANomaly表现也是非常不错的。

  • 相关阅读:
    oracle客户端plsql设置字符集
    命令导入导出oracle库
    java初级开发一系列的工具安装配置
    docker学习笔记-5.数据共享
    docker学习笔记-4.harbor
    flask学习笔记2
    socket编程学习
    docker学习笔记-3.docker镜像制作
    docker学习笔记-2.docker镜像制作
    docker学习笔记-1.docker基础
  • 原文地址:https://www.cnblogs.com/bonelee/p/9882955.html
Copyright © 2011-2022 走看看