
本文通过参考论文《Malware Detection by Eating a Whole EXE》,更具使用论文提出的MalConv模型,使用Tensorflow实现了恶意软件的分类。



本小马使用的数据集为kaggle Microsoft Malware Classification Challenge。这个数据集共包含了九类恶意如软件。



论文第四节 “Model Architecture” 对MalConv模型进行说明,下面是论文中关于部分模型的摘要(如果我有闲心可能会翻译一下):

To best capture such high level location invariance, we choose to use a convolution network architecture. Combining the convolutional activations with a global max pooling before going to fully connected layers allows our model to produce its activation regardless of the location of the detected features. Rather than perform convolutions on the raw byte values (i.e., using a scaled version of a byte’s value from 0 to 255), we use an embedding layer to map each byte to a fixed length (but learned) feature vector. We avoid the raw byte value as it implies an interpretation that certain byte values are intrinsically “closer” to each-other than other byte values, which we know a priori to be false, as byte value meaning is dependent on context. Training the embedding jointly with the convolution allows even our shallow network to activate for a wider breadth of input patterns. This also gives it a degree of robustness in the face of minor alterations in byte values. Prior work using byte n-grams lack this quality, as they are dependent on exact byte matches (Kolter and Maloof 2006; Raff et al. 2016).
We note a number of difficult design choices that had to be made in developing a neural network architecture for such long input sequences. One of the primary limitations in practice was GPU memory consumption in the first convolution layer. Regardless of convolution size, storing the activations after the first convolution for forward propagation can easily lead to out-of-memory errors during back-propagation. We chose to use large convolutional filters and strides to control the memory used by activations in these early layers.
Attempts to build deep architectures on such long sequences requires aggressive pooling between layers for our data, which results in lopsided memory use. This makes model parallelism in frameworks like Tensorflow difficult to achieve. Instead we chose to create a shallow architecture with a large filter width of 500 bytes combined with an aggressive stride of 500. This allowed us to better balance computational workload in a data-parallel manner using PyTorch (Paszke, Gross, and Chintala 2016). Our convolutional architecture uses the gated convolution approach following Dauphin et al. (2016), with 128 filters.



  1. 由于需要 “吃“ 下恶意软件的所有byte,所以这个模型必然会非常消耗显存,自然也不可能处理太大的文件,所以我们选取小于2M的文件进行训练
  2. 然后读取文件的byte流,将其转换成整数数组,如果长度不到2M,这在尾部添加0
  3. 把数组进行8维embedding,得到E
  4. 将E分为两个四维的A和B
  5. 将A,B分别进行以为的卷积,其中kernel size = 500,stride = 500, filters = 128
  6. 将B带入Sigmoid函数,其结果再与A相乘,得到G
  7. G进行最大池化得到P
  8. 进入全连接层
  9. 最后进入softmax分类器输出结果




tensorflow-gpu 1.14.0


因为本小马希望把数据放在服务器上跑,所以使用 TFRecord

import osimport tensorflow as tfimport pandas as pdimport hashlibsource = 'D:/TEMP/data/train/'label_Path = 'D:/TEMP/data/trainLabels.csv'file_size = 2000000def string_to_hexsarray(str):return [s for s in str.split() if len(s) == 2 and s != "??"]def read_file(entry):with open(entry, 'r') as f:return f.read()def hexarray_to_bytes(hexarray):while len(hexarray) < file_size:hexarray.append('100')str = " ".join(hexarray)return strdef string_to_bytes(str):return hexarray_to_bytes(string_to_hexsarray(str))# 获取符合要求的文件及其标签def open_files_and_get_label(dir_path, label_path, length=2000, size=2000000, end="bytes"):files = []labels = []df = pd.read_csv(label_path)with os.scandir(dir_path) as dir:for entry in dir:if entry.name.endswith(end):if os.path.getsize(entry) <= size:files.append(entry)file_name = entry.name.split('.')[0]print(file_name)index = df[df['Id'] == file_name].index.tolist()[0]labels.append(df.iat[index, 1])if len(files) == length:breakreturn files, labels# 产生数据"""# source_path 数据集目录# label_path 存储标签的csv文件路径# length 数据集的数量,默认2000条# size 文件最大的大小# end 文件结尾"""def produce_data(source_path, label_path, length=2000, size=2000000, end="bytes"):data, label = open_files_and_get_label(source_path, label_path, length, size, end)data = list(map(read_file, data))data = list(map(string_to_bytes, data))data = dict(zip(data, label))return data# 产生TFRecord文件"""data 数据filename 产生TFRecord文件的路径"""def produce_TFRecord(data, filename):print("write tfrecord")writer = tf.python_io.TFRecordWriter(filename)for train, label in data.items():train = bytes(train, encoding="utf8")example = tf.train.Example(features=tf.train.Features(feature={"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),'train': tf.train.Feature(bytes_list=tf.train.BytesList(value=[train]))}))# 将序列转为字符串writer.write(example.SerializeToString())writer.close()def _parse_function(example):features = {'label': tf.FixedLenFeature([], tf.int64),'train': tf.FixedLenFeature([], tf.string)}parsed_features = tf.parse_single_example(example, features)# Perform additional preprocessing on the parsed data.label = tf.cast(parsed_features["label"], tf.int32)train = tf.cast(parsed_features["train"], tf.string)return train, labeldef string_to_int(s):return [int(i, 16) for i in str(s, encoding='utf-8').split()]# 例子if __name__ == "__main__":data = produce_data(source, label_Path, length=1000)produce_TFRecord(data, 'D:/TEMP/1000_test.tfrecord')


前面说过 ”如果长度不到2M,这在尾部添加0“,但代码中并没有这样做,而是添加了一个其他字符’”100“,”100“是文件李不存在的(文件byte取值为[x00 -xff]),这样训练出来的模型效果更好。


import tensorflow as tf
import numpy as npfile_size = 2000000
padding = 'VALID'# 初始化权值
def weight_variable(shape):initial = tf.truncated_normal(shape=shape, stddev=0.1)  # 生成一个正态分布return tf.Variable(initial)# 初始化偏置
def bias_variable(shape):initial = tf.constant(0.1, shape=shape)return tf.Variable(initial)def conv1d(x, kernel):# [batch, in_width, in_channels]return tf.nn.conv1d(x, kernel, stride=500, padding=padding)def max_pool(x):return tf.layers.max_pooling1d(x, pool_size=4000, strides=1, padding=padding, data_format='channels_last')lr = tf.Variable(0.001, dtype=tf.float32)#  input.shape = [batch,2000000]
input_data = tf.placeholder(tf.int32, shape=[None, file_size], name="input")
y = tf.placeholder(tf.float32, shape=[None, 9], name="y")
# training = tf.placeholder("bool", name="training")input_data1 = tf.reshape(input_data, [-1, file_size])# 8d-dembedding
embedding = tf.Variable(tf.random_normal([256, 8]), name="embedding")
x = tf.nn.embedding_lookup(embedding, input_data1)# slice.shape=[branch,2000000,4]
sliceA = tf.slice(x, [0, 0, 0], [-1, -1, 4])
sliceB = tf.slice(x, [0, 0, 4], [-1, -1, 4])# 初始化第一个卷积层的权值和偏置
W_convl = weight_variable([500, 4, 128])
b_conv1 = bias_variable([128])V_convl = weight_variable([500, 4, 128])
c_conv1 = bias_variable([128])# 卷积A
A = conv1d(sliceA, W_convl) + b_conv1
B = conv1d(sliceA, V_convl) + c_conv1# G0.shape=[branch*4000*128]
G0 = tf.nn.relu(A * tf.nn.sigmoid(B))P = tf.layers.max_pooling1d(G0, pool_size=4000, strides=1, padding=padding, data_format='channels_last')W_conv2 = weight_variable([100, 128, 256])
b_conv2 = bias_variable([256])# 降维,P.shape=[branch*128]
P = tf.reshape(P, [-1, 128])# FC1
W_fc1 = weight_variable([128, 128])
b_fc1 = bias_variable([128])h_fc1 = tf.nn.relu(tf.matmul(P, W_fc1))keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)# FC2
W_fc2 = weight_variable([128, 9])
b_fc2 = bias_variable([9])prediction = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2)cross_entropy = -tf.reduce_sum(y * tf.log(prediction))
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)init = tf.global_variables_initializer()correct_prediction = tf.equal(tf.argmax(prediction, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))def _parse_function(example):features = {'label': tf.FixedLenFeature([], tf.int64),'train': tf.FixedLenFeature([], tf.string)}parsed_features = tf.parse_single_example(example, features)# Perform additional preprocessing on the parsed data.label = tf.cast(parsed_features["label"], tf.int32)train = tf.cast(parsed_features["train"], tf.string)return train, label# 测试集部分
dataset = tf.data.TFRecordDataset('/input/3000_train.tfrecord')
dataset = dataset.map(_parse_function)
dataset = dataset.repeat()
dataset = dataset.batch(75)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()# 训练集部分
test = tf.data.TFRecordDataset('/input/1000_test.tfrecord')
test = test.map(_parse_function)
test = test.repeat()
test = test.batch(75)
test_iterator = test.make_initializable_iterator()
test_next_element = iterator.get_next()def string_to_int(s):return [int(i, 16) for i in str(s, encoding='utf-8').split()]with tf.Session() as sess:sess.run(init)sess.run(iterator.initializer)for epoch in range(21):sess.run(tf.assign(lr, 0.001 * (0.95 ** epoch)))for batch in range(40):value = sess.run(next_element)batch_xs, batch_ys = sess.run([next_element[0], next_element[1]])l = []for i, j in enumerate(batch_xs):l.append(string_to_int(j))out = np.asanyarray(l)# batch_ys值域为[1-9],需要转化成[0-8]batch_ys = batch_ys - 1# one hot 化batch_ys = np.eye(9)[batch_ys.reshape(-1)]sess.run(train_step, feed_dict={input_data: out, y: batch_ys, keep_prob: 0.7})value = sess.run(test_next_element)batch_xs, batch_ys = sess.run([test_next_element[0], test_next_element[1]])l = []for i, j in enumerate(batch_xs):l.append(string_to_int(j))out = np.asanyarray(l)batch_ys = batch_ys - 1batch_ys = np.eye(9)[batch_ys.reshape(-1)]test_acc = sess.run(accuracy, feed_dict={input_data: out, y: batch_ys, keep_prob: 1.0})print("Iter" + str(epoch) + "Testing Accuracy " + str(test_acc))


最后可以从 github上下载代码


Malware Detection by Eating a Whole EXE

MalConv: Lessons learned from Deep Learning on executables






