zoukankan      html  css  js  c++  java
  • 使用SAE(VAE)检测信用卡欺诈——感觉误报率还是比较高啊 70%+误报 蛋疼

    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from unzip_utils import unzip
    import numpy as np
    import tflearn
    from matplotlib import pyplot as plt
    import seaborn as sns
    from sklearn.metrics import confusion_matrix
    import pandas as pd
    import zipfile
    from sklearn.metrics import average_precision_score, recall_score, precision_score, f1_score
    
    
    def unzip(path_to_zip_file, directory_to_extract_to):
        zip_ref = zipfile.ZipFile(path_to_zip_file, 'r')
        zip_ref.extractall(directory_to_extract_to)
        zip_ref.close()
    
    
    def report_evaluation_metrics(y_true, y_pred):
        average_precision = average_precision_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, labels=[0, 1], pos_label=1)
        recall = recall_score(y_true, y_pred, labels=[0, 1], pos_label=1)
        f1 = f1_score(y_true, y_pred, labels=[0, 1], pos_label=1)
    
        print('Average precision-recall score: {0:0.2f}'.format(average_precision))
        print('Precision: {0:0.2f}'.format(precision))
        print('Recall: {0:0.2f}'.format(recall))
        print('F1: {0:0.2f}'.format(f1))
    
    LABELS = ["Normal", "Fraud"]
    
    
    def plot_confusion_matrix(y_true, y_pred):
        conf_matrix = confusion_matrix(y_true, y_pred)
    
        plt.figure(figsize=(12, 12))
        sns.heatmap(conf_matrix, xticklabels=LABELS, yticklabels=LABELS, annot=True, fmt="d")
        plt.title("Confusion matrix")
        plt.ylabel('True class')
        plt.xlabel('Predicted class')
        plt.show()
    
    
    def plot_training_history(history):
        if history is None:
            return
        plt.plot(history['loss'])
        plt.plot(history['val_loss'])
        plt.title('model loss')
        plt.ylabel('loss')
        plt.xlabel('epoch')
        plt.legend(['train', 'test'], loc='upper right')
        plt.show()
    
    
    def visualize_anomaly(y_true, reconstruction_error, threshold):
        error_df = pd.DataFrame({'reconstruction_error': reconstruction_error,
                                 'true_class': y_true})
        print(error_df.describe())
    
        groups = error_df.groupby('true_class')
        fig, ax = plt.subplots()
    
        for name, group in groups:
            ax.plot(group.index, group.reconstruction_error, marker='o', ms=3.5, linestyle='',
                    label="Fraud" if name == 1 else "Normal")
    
        ax.hlines(threshold, ax.get_xlim()[0], ax.get_xlim()[1], colors="r", zorder=100, label='Threshold')
        ax.legend()
        plt.title("Reconstruction error for different classes")
        plt.ylabel("Reconstruction error")
        plt.xlabel("Data point index")
        plt.show()
    
    
    def visualize_reconstruction_error(reconstruction_error, threshold):
        plt.plot(reconstruction_error, marker='o', ms=3.5, linestyle='',
                 label='Point')
    
        plt.hlines(threshold, xmin=0, xmax=len(reconstruction_error)-1, colors="r", zorder=100, label='Threshold')
        plt.legend()
        plt.title("Reconstruction error")
        plt.ylabel("Reconstruction error")
        plt.xlabel("Data point index")
        plt.show()
    
    
    
    def preprocess_data(csv_data):
        credit_card_data = csv_data.drop(labels=['Class', 'Time'], axis=1)
        credit_card_data['Amount'] = StandardScaler().fit_transform(credit_card_data['Amount'].values.reshape(-1, 1))
        # print(credit_card_data.head())
        credit_card_np_data = credit_card_data.as_matrix()
        y_true = csv_data['Class'].as_matrix()
        return credit_card_np_data, y_true
    
    
    def main():
        seed = 42
        np.random.seed(seed)
    
        data_dir_path = './data'
        model_dir_path = './models'
    
        unzip(data_dir_path + '/creditcardfraud.zip', data_dir_path)
        csv_data = pd.read_csv(data_dir_path + '/creditcard.csv')
        estimated_negative_sample_ratio = 1 - csv_data['Class'].sum() / csv_data['Class'].count()
        print(estimated_negative_sample_ratio)
        X, Y = preprocess_data(csv_data)
        print("sample data: X:{} Y:{}".format(X[:3], Y[:3]))
        print(X.shape)
    
        # detect anomaly for the test data
        Ypred = []
        _, testX, _, testY = train_test_split(X, Y, test_size=0.2, random_state=seed)
    
        blackY_indices = np.where(Y)[0]
        print(blackY_indices[:3], "sample fraud credit data")
        assert Y[blackY_indices[0]]
        assert Y[blackY_indices[-1]]
    
    
        # X, Y, testX, testY = mnist.load_data(one_hot=True)
    
        # Params
        original_dim = len(X[0]) # MNIST images are 28x28 pixels
        print("dim: {}".format(original_dim))
    
        # Building the encoder
        encoder = tflearn.input_data(shape=[None, original_dim])
        encoder = tflearn.fully_connected(encoder, 8)
        encoder = tflearn.fully_connected(encoder, 4)
    
        # Building the decoder
        decoder = tflearn.fully_connected(encoder, 8)
        decoder = tflearn.fully_connected(decoder, original_dim, activation='sigmoid')
    
        # Regression, with mean square error
        net = tflearn.regression(decoder, optimizer='adam', learning_rate=0.001,
                                 loss='mean_square', metric=None)
    
        # Training the auto encoder
        training_model = tflearn.DNN(net, tensorboard_verbose=0)
        training_model.fit(X, X, n_epoch=100, validation_set=(testX, testX),
                  run_id="auto_encoder", batch_size=256)
    
        """
        hidden_dim = 4 #original_dim//2
        latent_dim = 2
    
        # Building the encoder
        encoder = tflearn.input_data(shape=[None, original_dim], name='input_data')
        encoder = tflearn.fully_connected(encoder, hidden_dim, activation='relu')
        z_mean = tflearn.fully_connected(encoder, latent_dim)
        z_std = tflearn.fully_connected(encoder, latent_dim)
    
        # Sampler: Normal (gaussian) random distribution
        eps = tf.random_normal(tf.shape(z_std), dtype=tf.float32, mean=0., stddev=1.0,
                               name='epsilon')
        z = z_mean + tf.exp(z_std / 2) * eps
    
        # Building the decoder (with scope to re-use these layers later)
        decoder = tflearn.fully_connected(z, hidden_dim, activation='relu',
                                          scope='decoder_h')
        decoder = tflearn.fully_connected(decoder, original_dim, activation='sigmoid',
                                          scope='decoder_out')
    
        # Define VAE Loss
        def vae_loss(x_reconstructed, x_true):
            # Reconstruction loss
            encode_decode_loss = x_true * tf.log(1e-10 + x_reconstructed) 
                                 + (1 - x_true) * tf.log(1e-10 + 1 - x_reconstructed)
            encode_decode_loss = -tf.reduce_sum(encode_decode_loss, 1)
            # KL Divergence loss
            kl_div_loss = 1 + z_std - tf.square(z_mean) - tf.exp(z_std)
            kl_div_loss = -0.5 * tf.reduce_sum(kl_div_loss, 1)
            return tf.reduce_mean(encode_decode_loss + kl_div_loss)
    
        net = tflearn.regression(decoder, optimizer='rmsprop', learning_rate=0.001,
                                 loss=vae_loss, metric=None, name='target_out')
    
        # We will need 2 models, one for training that will learn the latent
        # representation, and one that can take random normal noise as input and
        # use the decoder part of the network to generate an image
    
        # Train the VAE
        training_model = tflearn.DNN(net, tensorboard_verbose=0)
        training_model.fit({'input_data': X}, {'target_out': X}, n_epoch=10,
                           validation_set=(testX, testX), batch_size=256, run_id="vae")
    
        # Build an image generator (re-using the decoding layers)
        # Input data is a normal (gaussian) random distribution (with dim = latent_dim)
        # input_noise = tflearn.input_data(shape=[None, latent_dim], name='input_noise')
        # decoder = tflearn.fully_connected(input_noise, hidden_dim, activation='relu',
        #                                   scope='decoder_h', reuse=True)
        # decoder = tflearn.fully_connected(decoder, original_dim, activation='sigmoid',
        #                                   scope='decoder_out', reuse=True)
        # just for generate new data
        # generator_model = tflearn.DNN(decoder, session=training_model.session)
        """
        print("training sample predict:")
        print(training_model.predict(X[:3]))
    
        # pred_x_test = training_model.predict(testX)
    
        reconstruction_error = []
        anomaly_information,adjusted_threshold = get_anomaly(training_model, X, estimated_negative_sample_ratio)
        tp = fp = tn = fn = 0
        blackY_indices = set(blackY_indices)
        for idx, (is_anomaly, dist) in enumerate(anomaly_information):
            predicted_label = 1 if is_anomaly else 0
            if is_anomaly:
                if idx in blackY_indices:
                    tp += 1
                else:
                    fp += 1
            else:
                if idx in blackY_indices:
                    fn += 1
                else:
                    tn += 1
            Ypred.append(predicted_label)
            reconstruction_error.append(dist)
    
        print("blackY_indices len:{} detectd cnt:{}, true attack cnt:{}".format(len(blackY_indices), tp+fn, tp))
        precision = float(tp) / (tp + fp)
        hit_rate = float(tp) / (tp + fn)
        accuracy = float(tp + tn) / (tp + tn + fp + fn)
        print('precision = {}, hit_rate = {}, accuracy = {}'.format(precision, hit_rate, accuracy))
    
    
        report_evaluation_metrics(Y, Ypred)
        # plot_training_history(history)
        visualize_anomaly(Y, reconstruction_error, adjusted_threshold)
        plot_confusion_matrix(Y, Ypred)
    
    
    def get_anomaly(model, data, estimated_negative_sample_ratio):
        target_data = model.predict(data)
        scores = np.linalg.norm(data - target_data, axis=-1)
        scores2 = np.array(scores)
        """
        np.linalg.norm(np.array([[1,1,1],[2,2,2]])-np.array([[0,0,0],[0,0,0]]),axis=-1)
        array([1.73205081, 3.46410162])
        >>> 3.46*3.46
        11.9716
        """
        scores.sort()
        cut_point = int(estimated_negative_sample_ratio * len(scores))
        threshold = scores[cut_point]
        print('estimated threshold is ' + str(threshold))
        return zip(scores2 >= threshold, scores2), threshold
    
    
    if __name__ == '__main__':
        main()
    

     效果图:

    使用VAE的:

    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from unzip_utils import unzip
    import numpy as np
    import tensorflow as tf
    import tflearn
    from matplotlib import pyplot as plt
    import seaborn as sns
    from sklearn.metrics import confusion_matrix
    import pandas as pd
    import zipfile
    from sklearn.metrics import average_precision_score, recall_score, precision_score, f1_score
    
    
    def unzip(path_to_zip_file, directory_to_extract_to):
        zip_ref = zipfile.ZipFile(path_to_zip_file, 'r')
        zip_ref.extractall(directory_to_extract_to)
        zip_ref.close()
    
    
    def report_evaluation_metrics(y_true, y_pred):
        average_precision = average_precision_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, labels=[0, 1], pos_label=1)
        recall = recall_score(y_true, y_pred, labels=[0, 1], pos_label=1)
        f1 = f1_score(y_true, y_pred, labels=[0, 1], pos_label=1)
    
        print('Average precision-recall score: {0:0.2f}'.format(average_precision))
        print('Precision: {0:0.2f}'.format(precision))
        print('Recall: {0:0.2f}'.format(recall))
        print('F1: {0:0.2f}'.format(f1))
    
    LABELS = ["Normal", "Fraud"]
    
    
    def plot_confusion_matrix(y_true, y_pred):
        conf_matrix = confusion_matrix(y_true, y_pred)
    
        plt.figure(figsize=(12, 12))
        sns.heatmap(conf_matrix, xticklabels=LABELS, yticklabels=LABELS, annot=True, fmt="d")
        plt.title("Confusion matrix")
        plt.ylabel('True class')
        plt.xlabel('Predicted class')
        plt.show()
    
    
    def plot_training_history(history):
        if history is None:
            return
        plt.plot(history['loss'])
        plt.plot(history['val_loss'])
        plt.title('model loss')
        plt.ylabel('loss')
        plt.xlabel('epoch')
        plt.legend(['train', 'test'], loc='upper right')
        plt.show()
    
    
    def visualize_anomaly(y_true, reconstruction_error, threshold):
        error_df = pd.DataFrame({'reconstruction_error': reconstruction_error,
                                 'true_class': y_true})
        print(error_df.describe())
    
        groups = error_df.groupby('true_class')
        fig, ax = plt.subplots()
    
        for name, group in groups:
            ax.plot(group.index, group.reconstruction_error, marker='o', ms=3.5, linestyle='',
                    label="Fraud" if name == 1 else "Normal")
    
        ax.hlines(threshold, ax.get_xlim()[0], ax.get_xlim()[1], colors="r", zorder=100, label='Threshold')
        ax.legend()
        plt.title("Reconstruction error for different classes")
        plt.ylabel("Reconstruction error")
        plt.xlabel("Data point index")
        plt.show()
    
    
    def visualize_reconstruction_error(reconstruction_error, threshold):
        plt.plot(reconstruction_error, marker='o', ms=3.5, linestyle='',
                 label='Point')
    
        plt.hlines(threshold, xmin=0, xmax=len(reconstruction_error)-1, colors="r", zorder=100, label='Threshold')
        plt.legend()
        plt.title("Reconstruction error")
        plt.ylabel("Reconstruction error")
        plt.xlabel("Data point index")
        plt.show()
    
    
    
    def preprocess_data(csv_data):
        credit_card_data = csv_data.drop(labels=['Class', 'Time'], axis=1)
        credit_card_data['Amount'] = StandardScaler().fit_transform(credit_card_data['Amount'].values.reshape(-1, 1))
        # print(credit_card_data.head())
        credit_card_np_data = credit_card_data.as_matrix()
        y_true = csv_data['Class'].as_matrix()
        return credit_card_np_data, y_true
    
    
    # encoder
    def encode(input_x, encoder_hidden_dim, latent_dim):
        """
        # keras
    # build encoder model
    inputs = Input(shape=input_shape, name='encoder_input')
    x = Dense(intermediate_dim, activation='relu')(inputs)
    z_mean = Dense(latent_dim, name='z_mean')(x)
    z_log_var = Dense(latent_dim, name='z_log_var')(x)
        """
        encoder = tflearn.fully_connected(input_x, encoder_hidden_dim, activation='relu')
        mu_encoder = tflearn.fully_connected(encoder, latent_dim, activation='linear')
        logvar_encoder = tflearn.fully_connected(encoder, latent_dim, activation='linear')
        return mu_encoder, logvar_encoder
    
    
    # decoder
    def decode(z, decoder_hidden_dim, input_dim):
        """
    # build decoder model
    latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
    x = Dense(intermediate_dim, activation='relu')(latent_inputs)
    outputs = Dense(original_dim, activation='sigmoid')(x)
        """
        decoder = tflearn.fully_connected(z, decoder_hidden_dim, activation='relu')
        x_hat = tflearn.fully_connected(decoder, input_dim, activation='linear')
        return x_hat
    
    
    # sampler
    def sample(mu, logvar):
        """
        keras
        z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])
        # reparameterization trick
    # instead of sampling from Q(z|X), sample eps = N(0,I)
    # z = z_mean + sqrt(var)*eps
    def sampling(args):
        z_mean, z_log_var = args
        batch = K.shape(z_mean)[0]
        dim = K.int_shape(z_mean)[1]
        # by default, random_normal has mean=0 and std=1.0
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + K.exp(0.5 * z_log_var) * epsilon
        """
        epsilon = tf.random_normal(tf.shape(logvar), dtype=tf.float32, name='epsilon')
        # std_encoder = tf.exp(tf.mul(0.5, logvar))
        # z = tf.add(mu, tf.mul(std_encoder, epsilon))
        z = mu + tf.exp(logvar/2) * epsilon
        return z
    
    
    # loss function(regularization)
    def calculate_regularization_loss(mu, logvar):
        kl_divergence = -0.5 * tf.reduce_sum(1 + logvar - tf.square(mu) - tf.exp(logvar), reduction_indices=1)
        return kl_divergence
    
    
    # loss function(reconstruction)
    def calculate_reconstruction_loss(x_hat, input_x):
        mse = tflearn.objectives.mean_square(x_hat, input_x)
        return mse
    
    
    def main():
        seed = 42
        np.random.seed(seed)
    
        data_dir_path = './data'
        model_dir_path = './models'
    
        unzip(data_dir_path + '/creditcardfraud.zip', data_dir_path)
        csv_data = pd.read_csv(data_dir_path + '/creditcard.csv')
        estimated_negative_sample_ratio = 1 - csv_data['Class'].sum() / csv_data['Class'].count()
        print(estimated_negative_sample_ratio)
        X, Y = preprocess_data(csv_data)
        print("sample data: X:{} Y:{}".format(X[:3], Y[:3]))
        print(X.shape)
    
        # detect anomaly for the test data
        Ypred = []
        _, testX, _, testY = train_test_split(X, Y, test_size=0.2, random_state=seed)
    
        blackY_indices = np.where(Y)[0]
        print(blackY_indices[:3], "sample fraud credit data")
        assert Y[blackY_indices[0]]
        assert Y[blackY_indices[-1]]
    
    
        # X, Y, testX, testY = mnist.load_data(one_hot=True)
    
        # Params
        original_dim = len(X[0]) # MNIST images are 28x28 pixels
        print("dim: {}".format(original_dim))
    
        """
        # Building the encoder
        encoder = tflearn.input_data(shape=[None, original_dim])
        encoder = tflearn.fully_connected(encoder, 8)
        encoder = tflearn.fully_connected(encoder, 4)
    
        # Building the decoder
        decoder = tflearn.fully_connected(encoder, 8)
        decoder = tflearn.fully_connected(decoder, original_dim, activation='sigmoid')
    
        # Regression, with mean square error
        net = tflearn.regression(decoder, optimizer='adam', learning_rate=0.001,
                                 loss='mean_square', metric=None)
    
        # Training the auto encoder
        training_model = tflearn.DNN(net, tensorboard_verbose=0)
        training_model.fit(X, X, n_epoch=100, validation_set=(testX, testX),
                  run_id="auto_encoder", batch_size=256)
    
        """
        hidden_dim = 8 #original_dim//2
        latent_dim = 4
        input_x = tflearn.input_data(shape=(None, original_dim), name='input_x')
        mu, logvar = encode(input_x, hidden_dim, latent_dim)
        z = sample(mu, logvar)
        x_hat = decode(z, hidden_dim, original_dim)
    
        regularization_loss = calculate_regularization_loss(mu, logvar)
        reconstruction_loss = calculate_reconstruction_loss(x_hat, input_x)
        target = tf.reduce_mean(tf.add(regularization_loss, reconstruction_loss))
    
        net = tflearn.regression(x_hat, optimizer='rmsprop', learning_rate=0.001,
                                 loss=target, metric=None, name='target_out')
    
        # We will need 2 models, one for training that will learn the latent
        # representation, and one that can take random normal noise as input and
        # use the decoder part of the network to generate an image
    
        # Train the VAE
        training_model = tflearn.DNN(net, tensorboard_verbose=0)
        training_model.fit({'input_x': X}, {'target_out': X}, n_epoch=30,
                           validation_set=(testX, testX), batch_size=256, run_id="vae")
    
    
        """
        # Build an image generator (re-using the decoding layers)
        # Input data is a normal (gaussian) random distribution (with dim = latent_dim)
        # input_noise = tflearn.input_data(shape=[None, latent_dim], name='input_noise')
        # decoder = tflearn.fully_connected(input_noise, hidden_dim, activation='relu',
        #                                   scope='decoder_h', reuse=True)
        # decoder = tflearn.fully_connected(decoder, original_dim, activation='sigmoid',
        #                                   scope='decoder_out', reuse=True)
        # just for generate new data
        # generator_model = tflearn.DNN(decoder, session=training_model.session)
        """
    
        print("training sample predict:")
        print(training_model.predict(X[:3]))
    
        # pred_x_test = training_model.predict(testX)
    
        reconstruction_error = []
        anomaly_information,adjusted_threshold = get_anomaly(training_model, X, estimated_negative_sample_ratio)
        tp = fp = tn = fn = 0
        blackY_indices = set(blackY_indices)
        for idx, (is_anomaly, dist) in enumerate(anomaly_information):
            predicted_label = 1 if is_anomaly else 0
            if is_anomaly:
                if idx in blackY_indices:
                    tp += 1
                else:
                    fp += 1
            else:
                if idx in blackY_indices:
                    fn += 1
                else:
                    tn += 1
            Ypred.append(predicted_label)
            reconstruction_error.append(dist)
    
        print("blackY_indices len:{} detectd cnt:{}, true attack cnt:{}".format(len(blackY_indices), tp+fn, tp))
        precision = float(tp) / (tp + fp)
        hit_rate = float(tp) / (tp + fn)
        accuracy = float(tp + tn) / (tp + tn + fp + fn)
        print('precision = {}, hit_rate = {}, accuracy = {}'.format(precision, hit_rate, accuracy))
    
    
        report_evaluation_metrics(Y, Ypred)
        # plot_training_history(history)
        visualize_anomaly(Y, reconstruction_error, adjusted_threshold)
        plot_confusion_matrix(Y, Ypred)
    
    
    def get_anomaly(model, data, estimated_negative_sample_ratio):
        target_data = model.predict(data)
        scores = np.linalg.norm(data - target_data, axis=-1)
        scores2 = np.array(scores)
        """
        np.linalg.norm(np.array([[1,1,1],[2,2,2]])-np.array([[0,0,0],[0,0,0]]),axis=-1)
        array([1.73205081, 3.46410162])
        >>> 3.46*3.46
        11.9716
        """
        scores.sort()
        cut_point = int(estimated_negative_sample_ratio * len(scores))
        threshold = scores[cut_point]
        print('estimated threshold is ' + str(threshold))
        return zip(scores2 >= threshold, scores2), threshold
    
    
    if __name__ == '__main__':
        main()
    

     

  • 相关阅读:
    7.6 C程序的存储空间布局
    7.10 setjmp和longjmp函数
    7.2 main函数
    7.8 存储器分配
    7.5 环境表
    7.1 进程环境 引言
    7.4 命令行参数
    7.3 进程终止
    电影名扬四海主题歌节奏强动感十足的经典歌曲!
    随便写点人生感悟
  • 原文地址:https://www.cnblogs.com/bonelee/p/9855161.html
Copyright © 2011-2022 走看看