先说实验成功的代码:

git clone https://github.com/tkwoo/anogan-keras.git

mkdir weights

python main.py --mode train

即可看到效果了!

核心代码:main.py

from __future__ import print_functionimport matplotlib
matplotlib.use('Qt5Agg')import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from keras.datasets import mnist
import argparse
import anoganos.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'parser = argparse.ArgumentParser()
parser.add_argument('--img_idx', type=int, default=14)
parser.add_argument('--label_idx', type=int, default=7)
parser.add_argument('--mode', type=str, default='test', help='train, test')
args = parser.parse_args()### 0. prepare data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = (X_train.astype(np.float32) - 127.5) / 127.5
X_test = (X_test.astype(np.float32) - 127.5) / 127.5X_train = X_train[:,:,:,None]
X_test = X_test[:,:,:,None]X_test_original = X_test.copy()X_train = X_train[y_train==1]
X_test = X_test[y_test==1]
print ('train shape:', X_train.shape)### 1. train generator & discriminator
if args.mode == 'train':Model_d, Model_g = anogan.train(64, X_train)### 2. test generator
generated_img = anogan.generate(25)
img = anogan.combine_images(generated_img)
img = (img*127.5)+127.5
img = img.astype(np.uint8)
img = cv2.resize(img, None, fx=4, fy=4, interpolation=cv2.INTER_NEAREST)### opencv view
# cv2.namedWindow('generated', 0)
# cv2.resizeWindow('generated', 256, 256)
# cv2.imshow('generated', img)
# cv2.imwrite('result_latent_10/generator.png', img)
# cv2.waitKey()### plt view
# plt.figure(num=0, figsize=(4, 4))
# plt.title('trained generator')
# plt.imshow(img, cmap=plt.cm.gray)
# plt.show()# exit()### 3. other class anomaly detectiondef anomaly_detection(test_img, g=None, d=None):model = anogan.anomaly_detector(g=g, d=d)ano_score, similar_img = anogan.compute_anomaly_score(model, test_img.reshape(1, 28, 28, 1), iterations=500, d=d)# anomaly area, 255 normalizationnp_residual = test_img.reshape(28,28,1) - similar_img.reshape(28,28,1)np_residual = (np_residual + 2)/4np_residual = (255*np_residual).astype(np.uint8)original_x = (test_img.reshape(28,28,1)*127.5+127.5).astype(np.uint8)similar_x = (similar_img.reshape(28,28,1)*127.5+127.5).astype(np.uint8)original_x_color = cv2.cvtColor(original_x, cv2.COLOR_GRAY2BGR)residual_color = cv2.applyColorMap(np_residual, cv2.COLORMAP_JET)show = cv2.addWeighted(original_x_color, 0.3, residual_color, 0.7, 0.)return ano_score, original_x, similar_x, show### compute anomaly score - sample from test set
# test_img = X_test_original[y_test==1][30]### compute anomaly score - sample from strange image
# test_img = X_test_original[y_test==0][30]### compute anomaly score - sample from strange image
img_idx = args.img_idx
label_idx = args.label_idx
test_img = X_test_original[y_test==label_idx][img_idx]
# test_img = np.random.uniform(-1,1, (28,28,1))start = cv2.getTickCount()
score, qurey, pred, diff = anomaly_detection(test_img)
time = (cv2.getTickCount() - start) / cv2.getTickFrequency() * 1000
print ('%d label, %d : done'%(label_idx, img_idx), '%.2f'%score, '%.2fms'%time)
# cv2.imwrite('./qurey.png', qurey)
# cv2.imwrite('./pred.png', pred)
# cv2.imwrite('./diff.png', diff)## matplot view
plt.figure(1, figsize=(3, 3))
plt.title('query image')
plt.imshow(qurey.reshape(28,28), cmap=plt.cm.gray)print("anomaly score : ", score)
plt.figure(2, figsize=(3, 3))
plt.title('generated similar image')
plt.imshow(pred.reshape(28,28), cmap=plt.cm.gray)plt.figure(3, figsize=(3, 3))
plt.title('anomaly detection')
plt.imshow(cv2.cvtColor(diff,cv2.COLOR_BGR2RGB))
plt.show()### 4. tsne feature view### t-SNE embedding
### generating anomaly image for test (radom noise image)from sklearn.manifold import TSNErandom_image = np.random.uniform(0, 1, (100, 28, 28, 1))
print("random noise image")
plt.figure(4, figsize=(2, 2))
plt.title('random noise image')
plt.imshow(random_image[0].reshape(28,28), cmap=plt.cm.gray)# intermidieate output of discriminator
model = anogan.feature_extractor()
feature_map_of_random = model.predict(random_image, verbose=1)
feature_map_of_minist = model.predict(X_test_original[y_test != 1][:300], verbose=1)
feature_map_of_minist_1 = model.predict(X_test[:100], verbose=1)# t-SNE for visulization
output = np.concatenate((feature_map_of_random, feature_map_of_minist, feature_map_of_minist_1))
output = output.reshape(output.shape[0], -1)
anomaly_flag = np.array([1]*100+ [0]*300)X_embedded = TSNE(n_components=2).fit_transform(output)
plt.figure(5)
plt.title("t-SNE embedding on the feature representation")
plt.scatter(X_embedded[:100,0], X_embedded[:100,1], label='random noise(anomaly)')
plt.scatter(X_embedded[100:400,0], X_embedded[100:400,1], label='mnist(anomaly)')
plt.scatter(X_embedded[400:,0], X_embedded[400:,1], label='mnist(normal)')
plt.legend()
plt.show()

anogan.py

from __future__ import print_function
from keras.models import Sequential, Model
from keras.layers import Input, Reshape, Dense, Dropout, MaxPooling2D, Conv2D, Flatten
from keras.layers import Conv2DTranspose, LeakyReLU
from keras.layers.core import Activation
from keras.layers.normalization import BatchNormalization
from keras.optimizers import Adam, RMSprop
from keras import backend as K
from keras import initializers
import tensorflow as tf
import numpy as np
from tqdm import tqdm
import cv2
import mathfrom keras.utils. generic_utils import Progbar### combine images for visualization
def combine_images(generated_images):num = generated_images.shape[0]width = int(math.sqrt(num))height = int(math.ceil(float(num)/width))shape = generated_images.shape[1:4]image = np.zeros((height*shape[0], width*shape[1], shape[2]),dtype=generated_images.dtype)for index, img in enumerate(generated_images):i = int(index/width)j = index % widthimage[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1],:] = img[:, :, :]return image### generator model define
def generator_model():inputs = Input((10,))fc1 = Dense(input_dim=10, units=128*7*7)(inputs)fc1 = BatchNormalization()(fc1)fc1 = LeakyReLU(0.2)(fc1)fc2 = Reshape((7, 7, 128), input_shape=(128*7*7,))(fc1)up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)conv1 = Conv2D(64, (3, 3), padding='same')(up1)conv1 = BatchNormalization()(conv1)conv1 = Activation('relu')(conv1)up2 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv1)conv2 = Conv2D(1, (5, 5), padding='same')(up2)outputs = Activation('tanh')(conv2)model = Model(inputs=[inputs], outputs=[outputs])return model### discriminator model define
def discriminator_model():inputs = Input((28, 28, 1))conv1 = Conv2D(64, (5, 5), padding='same')(inputs)conv1 = LeakyReLU(0.2)(conv1)pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)conv2 = Conv2D(128, (5, 5), padding='same')(pool1)conv2 = LeakyReLU(0.2)(conv2)pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)fc1 = Flatten()(pool2)fc1 = Dense(1)(fc1)outputs = Activation('sigmoid')(fc1)model = Model(inputs=[inputs], outputs=[outputs])return model### d_on_g model for training generator
def generator_containing_discriminator(g, d):d.trainable = FalseganInput = Input(shape=(10,))x = g(ganInput)ganOutput = d(x)gan = Model(inputs=ganInput, outputs=ganOutput)# gan.compile(loss='binary_crossentropy', optimizer='adam')return gandef load_model():d = discriminator_model()g = generator_model()d_optim = RMSprop()g_optim = RMSprop(lr=0.0002)g.compile(loss='binary_crossentropy', optimizer=g_optim)d.compile(loss='binary_crossentropy', optimizer=d_optim)d.load_weights('./weights/discriminator.h5')g.load_weights('./weights/generator.h5')return g, d### train generator and discriminator
def train(BATCH_SIZE, X_train):### model defined = discriminator_model()g = generator_model()d_on_g = generator_containing_discriminator(g, d)d_optim = RMSprop(lr=0.0004)g_optim = RMSprop(lr=0.0002)g.compile(loss='mse', optimizer=g_optim)d_on_g.compile(loss='mse', optimizer=g_optim)d.trainable = Trued.compile(loss='mse', optimizer=d_optim)for epoch in range(10):print ("Epoch is", epoch)n_iter = int(X_train.shape[0]/BATCH_SIZE)progress_bar = Progbar(target=n_iter)for index in range(n_iter):# create random noise -> U(0,1) 10 latent vectorsnoise = np.random.uniform(0, 1, size=(BATCH_SIZE, 10))# load real data & generate fake dataimage_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]generated_images = g.predict(noise, verbose=0)# visualize training resultsif index % 20 == 0:image = combine_images(generated_images)image = image*127.5+127.5cv2.imwrite('./result/'+str(epoch)+"_"+str(index)+".png", image)# attach label for training discriminatorX = np.concatenate((image_batch, generated_images))y = np.array([1] * BATCH_SIZE + [0] * BATCH_SIZE)# training discriminatord_loss = d.train_on_batch(X, y)# training generatord.trainable = Falseg_loss = d_on_g.train_on_batch(noise, np.array([1] * BATCH_SIZE))d.trainable = Trueprogress_bar.update(index, values=[('g',g_loss), ('d',d_loss)])print ('')# save weights for each epochg.save_weights('weights/generator.h5', True)d.save_weights('weights/discriminator.h5', True)return d, g### generate images
def generate(BATCH_SIZE):g = generator_model()g.load_weights('weights/generator.h5')noise = np.random.uniform(0, 1, (BATCH_SIZE, 10))generated_images = g.predict(noise)return generated_images### anomaly loss function
def sum_of_residual(y_true, y_pred):return K.sum(K.abs(y_true - y_pred))### discriminator intermediate layer feautre extraction
def feature_extractor(d=None):if d is None:d = discriminator_model()d.load_weights('weights/discriminator.h5') intermidiate_model = Model(inputs=d.layers[0].input, outputs=d.layers[-7].output)intermidiate_model.compile(loss='binary_crossentropy', optimizer='rmsprop')return intermidiate_model### anomaly detection model define
def anomaly_detector(g=None, d=None):if g is None:g = generator_model()g.load_weights('weights/generator.h5')intermidiate_model = feature_extractor(d)intermidiate_model.trainable = Falseg = Model(inputs=g.layers[1].input, outputs=g.layers[-1].output)g.trainable = False# Input layer cann't be trained. Add new layer as same size & same distributionaInput = Input(shape=(10,))gInput = Dense((10), trainable=True)(aInput)gInput = Activation('sigmoid')(gInput)# G & D featureG_out = g(gInput)D_out= intermidiate_model(G_out)    model = Model(inputs=aInput, outputs=[G_out, D_out])model.compile(loss=sum_of_residual, loss_weights= [0.90, 0.10], optimizer='rmsprop')# batchnorm learning phase fixed (test) : make non trainableK.set_learning_phase(0)return model### anomaly detection
def compute_anomaly_score(model, x, iterations=500, d=None):z = np.random.uniform(0, 1, size=(1, 10))intermidiate_model = feature_extractor(d)d_x = intermidiate_model.predict(x)# learning for changing latentloss = model.fit(z, [x, d_x], batch_size=1, epochs=iterations, verbose=0)similar_data, _ = model.predict(z)loss = loss.history['loss'][-1]return loss, similar_data

效果图:

detect strange imager never seen!!! refer:https://github.com/yjucho1/anoGAN

## compute anomaly score - sample from strange imagetest_img = plt.imread('assets/test_img.png')
test_img = test_img[:,:,0]model = anogan.anomaly_detector()
ano_score, similar_img = anogan.compute_anomaly_score(model, test_img.reshape(1, 28, 28, 1))plt.figure(figsize=(2, 2))
plt.imshow(test_img.reshape(28,28), cmap=plt.cm.gray)
plt.show()
print("anomaly score : " + str(ano_score))
plt.figure(figsize=(2, 2))
plt.imshow(test_img.reshape(28,28), cmap=plt.cm.gray)
residual  = test_img.reshape(28,28) - similar_img.reshape(28, 28)
plt.imshow(residual, cmap='jet', alpha=.5)
plt.show()

png

anomaly score : 446.46844482421875

png

https://github.com/yjucho1/anoGAN

from keras.models import Sequential, Model
from keras.layers import Input, Reshape, Dense, Dropout, UpSampling2D, Conv2D, Flatten
from keras.layers.advanced_activations import LeakyReLU
from keras.optimizers import Adam
from keras import backend as K
from keras import initializers
import tensorflow as tf
import numpy as np
from tqdm import tqdmdef generator_model():generator = Sequential()generator.add(Dense(128*7*7, input_dim=100, kernel_initializer=initializers.RandomNormal(stddev=0.02)))generator.add(LeakyReLU(0.2))generator.add(Reshape((7, 7, 128)))generator.add(UpSampling2D(size=(2, 2)))generator.add(Conv2D(64, kernel_size=(5, 5), padding='same'))generator.add(LeakyReLU(0.2))generator.add(UpSampling2D(size=(2, 2)))generator.add(Conv2D(1, kernel_size=(5, 5), padding='same', activation='tanh'))generator.compile(loss='binary_crossentropy', optimizer='adam')return generatordef discriminator_model():discriminator = Sequential()discriminator.add(Conv2D(64, kernel_size=(5, 5), strides=(2, 2), padding='same', input_shape=(28,28, 1), kernel_initializer=initializers.RandomNormal(stddev=0.02)))discriminator.add(LeakyReLU(0.2))discriminator.add(Dropout(0.3))discriminator.add(Conv2D(128, kernel_size=(5, 5), strides=(2, 2), padding='same'))discriminator.add(LeakyReLU(0.2))discriminator.add(Dropout(0.3))discriminator.add(Flatten())discriminator.add(Dense(1, activation='sigmoid'))discriminator.compile(loss='binary_crossentropy', optimizer='adam')return discriminatordef generator_containing_discriminator(g, d):d.trainable = FalseganInput = Input(shape=(100,))x = g(ganInput)ganOutput = d(x)gan = Model(inputs=ganInput, outputs=ganOutput)gan.compile(loss='binary_crossentropy', optimizer='adam')return gandef train(BATCH_SIZE, X_train):d = discriminator_model()print("#### discriminator ######")d.summary()g = generator_model()print("#### generator ######")g.summary()d_on_g = generator_containing_discriminator(g, d)d.trainable = Truefor epoch in tqdm(range(200)):for index in range(int(X_train.shape[0]/BATCH_SIZE)):noise = np.random.uniform(0, 1, size=(BATCH_SIZE, 100))image_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]generated_images = g.predict(noise, verbose=0)X = np.concatenate((image_batch, generated_images))y = np.array([1] * BATCH_SIZE + [0] * BATCH_SIZE)d_loss = d.train_on_batch(X, y)noise = np.random.uniform(0, 1, (BATCH_SIZE, 100))d.trainable = Falseg_loss = d_on_g.train_on_batch(noise, np.array([1] * BATCH_SIZE))d.trainable = Trueg.save_weights('assets/generator', True)d.save_weights('assets/discriminator', True)return d, gdef generate(BATCH_SIZE):g = generator_model()g.load_weights('assets/generator')noise = np.random.uniform(0, 1, (BATCH_SIZE, 100))generated_images = g.predict(noise)return generated_imagesdef sum_of_residual(y_true, y_pred):return tf.reduce_sum(abs(y_true - y_pred))def feature_extractor():d = discriminator_model()d.load_weights('assets/discriminator') intermidiate_model = Model(inputs=d.layers[0].input, outputs=d.layers[-5].output)intermidiate_model.compile(loss='binary_crossentropy', optimizer='adam')return intermidiate_modeldef anomaly_detector():g = generator_model()g.load_weights('assets/generator')g.trainable = Falseintermidiate_model = feature_extractor()intermidiate_model.trainable = FalseaInput = Input(shape=(100,))gInput = Dense((100))(aInput)G_out = g(gInput)D_out= intermidiate_model(G_out)    model = Model(inputs=aInput, outputs=[G_out, D_out])model.compile(loss=sum_of_residual, loss_weights= [0.9, 0.1], optimizer='adam')return modeldef compute_anomaly_score(model, x):    z = np.random.uniform(0, 1, size=(1, 100))intermidiate_model = feature_extractor()d_x = intermidiate_model.predict(x)loss = model.fit(z, [x, d_x], epochs=500, verbose=0)similar_data, _ = model.predict(z)return loss.history['loss'][-1], similar_data

 

GAN异常检测的一些实验

要做基于GANomaly的异常检测实验,需要准备大量的OK样本和少量的NG样本。找不到合适的数据集怎么办?很简单,随便找个开源的分类数据集,将其中一个类别的样本当作异常类别,其他所有类别的样本当作正常样本即可,文章中的实验就是这么干的。具体试验结果如下:

反正在效果上,GANomaly是超过了之前两种代表性的方法。此外,作者还做了性能对比的实验。事实上前面已经介绍了GANomaly的推断方法,就是一个简单的前向传播和一个对比阈值的过程,因此速度非常快。具体结果如下:

可以看出,计算性能上,GANomaly表现也是非常不错的。

转载于:https://www.cnblogs.com/bonelee/p/9882955.html

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验相关推荐

  1. PyOD是一种基于Python的异常检测工具箱。它提供了一系列流行的异常检测算法,可以用于识别各种异常情况,例如离群值、孤立点和噪声数据。在本文中,将介绍PyO...

    PyOD是一种基于Python的异常检测工具箱.它提供了一系列流行的异常检测算法,可以用于识别各种异常情况,例如离群值.孤立点和噪声数据.在本文中,将介绍PyOD的简介.安装和使用方法. 一.简介 P ...

  2. 使用GAN进行异常检测——可以进行网络流量的自学习哇,哥哥,人家是半监督,无监督的话,还是要VAE,SAE。...

    实验了效果,下面的还是图像的异常检测居多. https://github.com/LeeDoYup/AnoGAN https://github.com/tkwoo/anogan-keras 看了下,本 ...

  3. 【轩说AI】基于GAN模型的异常检测

    文章目录 基于GAN模型的异常检测 AnoGAN 基本思想 工作流程 在迭代z时的损失函数 问题 Conditional GAN 简单回顾. Info GAN 基本思想 网络结构 实验结果 VAE-G ...

  4. GAN异常检测论文笔记(一)《GANomaly: Semi-Supervised Anomaly Detection via Adversarial Training》

    0 Abstract: 通过使用一个联合学习高维图像空间的生成和潜伏空间的推理的生成对抗性网络,引入了这种新型的异常检测模型.在生成器网络中采用编码器-解码器-编码器子网络,使模型能够将输入图像映射为 ...

  5. 用于半监督高光谱异常检测的Frequency‐to‐spectrum mapping GAN

    基于深度学习的高光谱异常检测算法根据标签的可用性可以大致分为三类:监督学习.半监督学习和无监督学习. 监督学习异常检测方法.数据预处理后,有监督异常检测方法先用训练集数据进行模型训练,再运用验证集进行 ...

  6. 杜伦大学提出GANomaly:无需负例样本实现异常检测

    杜伦大学提出GANomaly:无需负例样本实现异常检测 本期推荐的论文笔记来自 PaperWeekly 社区用户 @TwistedW.在异常检测模块下,如果没有异常(负例样本)来训练模型,应该如何实现 ...

  7. 基于深度学习的日志数据异常检测

    基于深度学习的日志数据异常检测 数据对象 智能运维(AIOps)是通过机器学习等算法分析来自于多种运维工具和设备的大规模数据.智能运维的分析数据对象多源运维数据包括系统运行时数据和历史记录数据,历史记 ...

  8. 什么是异常检测,相关资源汇总

    这两年,异常检测方面的工作越来越多.相比单纯的分类模型,异常检测需要更少的标注数据,更少的异常数据,因此,减轻了数据的获取难度以及标注难度. 目录 1 什么是异常检测(Anomaly detectio ...

  9. 基于深度模型的日志序列异常检测

    基于深度模型的日志序列异常检测 ​ 日志异常检测的核心是利用人工智能算法自动分析系统日志来发现并定位故障.根据送入检测模型的数据格式,日志异常检测算法模型分为序列模型和频率模型,其中序列模型又可以分为 ...

最新文章

  1. soft_argmax
  2. format 转化时间格式不起作用
  3. lepus监控oracle数据库_MySQL数据库监控软件lepus使用问题以及解决办法
  4. 洛谷 P1352 没有上司的舞会
  5. 设计模式_4_原型模式(对象的拷贝)
  6. python安装pika模块rabbitmq
  7. 用for循环输出九九乘法表
  8. 【EI会议推荐】机电一体化、自动化与智能控制领域
  9. OSPF相关配置案例!
  10. python 爬取全国最新省市区数据,并存入表
  11. QT跨平台使用libvlc播放视频
  12. 数据分析报告入门(3)
  13. matlab实现正弦内插算法(低通滤波)
  14. Ruby gem的proxy代理设置方法
  15. python 动态图形_在matplotlib中动态更新图形
  16. 卸载程序时总是 显示“请等待当前程序完成卸载或更改”!
  17. 数学三大核心领域概述:几何
  18. Easy-UI入门案例
  19. 企业级指标数据体系建设思路探讨
  20. 别管哪条公链了,你听过数字恐龙吗?

热门文章

  1. linux shell 字符串操作(长度,查找,替换)详解 BASH
  2. 参数数组长度_JS数组操作方法总结(二)——pop、shift、push、unshift
  3. php mssql 端口,MSSQL_SQL Server端口更改后的数据库连接方式,SQL Server端口,大家可以通过quot - phpStudy...
  4. mysql报错注入_关于Mysql注入过程中的三种报错方式
  5. nslookup命令dns请求超时_网络工程师之nslookup命令
  6. 【深度学习】深入浅出YOLOv3目标检测算法和实现(图片和视频)
  7. python【数据结构与算法】一种时间复杂度和空间复杂度的计算方法
  8. python【蓝桥杯vip练习题库】ADV-181质因数2(短除法)
  9. Android移动开发之【Android实战项目】漂亮Button样式
  10. 打开c语言运行不了_手机上有哪些不错的c语言编程软件?