参考《web安全之深度学习实战》这本书
前言
目前常用的木马病毒等的检测工具主要有两种:一种是静态检测,通过匹配一些特征码或者危险函数来识别webshell等,但是这种方式需要完善的匹配规则,而且只能识别已知的webshell。另一种是动态检测,通过检测文件执行时表现出来的特征,查看它是否是一个webshell的文件。
本程序采用机器学习算法的朴素贝叶斯算法和深度学习的MLP算法,实现两种对webshell的检测方式,结合两种算法的预测结果,对一个文件是否是webshell进行判断。
使用到了scikit-learn,安装教程https://blog.csdn.net/weixin_42886817/article/details/102716587
原理
TF-IDF
TF是词频,表示一个词在一个文档中出现的频率,它的计算公式是:
(TF=frac{ ext {该词在该文档中出现的次数}}{ ext {文档中所有词的次数和}})
IDF是逆文档率,表示一个词在所有文档中出现的频率,它的计算公式是:
(IDF=log frac{ ext {文档总数}}{1+ ext {包含该词的所有文档数}})
TF-IDF实际就是TF×IDF,表示一个词对与文档集中一个文件的重要程度。主要思想是,如果一个词对一个文件重要,那么它在这个文件出现的频率高,并且在其它文件出现的频率低。那么这个词就可以代表这个文档的特征,可以用来分类。
Ngram
Ngram是一个基于概率的判别模型,它输入一句话,即单词的顺序,输出是这句话的概率,即这些单词的联合概率。而该模型基于这样一种假设,第N个词的出现只与前面N-1个词相关,而与其它任何词都不相关,整句的概率就是各个词出现概率的乘积。
朴素贝叶斯算法
https://www.cnblogs.com/Qi-Lin/p/12274001.html
MLP
MLP是一种神经网络,它的层与层之间的神经元是全连接的。激活函数一种是Sigmoid函数。公式如下:
(s(x)=frac{1}{1+e^{-x}})
训练过程和预测过程其实本质如下:
Opcode
opcode为中间代码,当解释器完成对脚本代码的分析后,便将它们生成可以直接运行的中间代码。对于php文件执行时,通常流程为虚拟机从文件系统读取php文件、扫描其词典和表达式、解析文件、创建要执行的opcode,最后执行opcode。
项目设计之前首先要配置好php的vld扩展相关环境。
vld的安装:http://www.asarea.cn/diary/265
vld下载地址:https://windows.php.net/
通过命令‘php.exe -dvld.active=1 -dvld.execute=0 文件名’进行分析,如下图所示,分析的opcode为:FETCH_R FETCH_DIM_R SEND_VAR DO_FCALL ECHO RETURN
在python中执行系统命令采用subprocess模块https://www.jianshu.com/p/430c411160f8
处理流程
贝叶斯模型
其处理流程为:对webshell文件和正常的php文件提取关键词,关键词的特征最大采用10000个。之后通过TF-IDF进行计算。然后将数据集分为训练集和测试集,使用朴素贝叶斯算法在训练集上进行训练,之后通过测试集在训练好的模型上进行测试,验证效果。
MLP模型
其处理流程为:对webshell文件和正常的php文件进行编译分析,得到其对应的opcode。Opcode的特征数最大采用10000个。之后通过NGram进行处理。然后将数据集分为训练集和测试集,使用MLP在训练集上进行训练,之后通过测试集在训练好的模型上进行测试,验证效果。
使用opcode和NGram处理时,设定的NGram参数为2,即每个opcode只考虑它与前后有关系。其过程如下示例:
MLP网络隐藏层使用两层,第一层五个神经元,第二层两个。示例如下:
扫描器设计
通过将训练好的模型进行保存,当进行扫描时,先对扫描文件进行提取关键词,进行TF-IDF处理后,用保存的朴素贝叶斯模型进行预测。之后再对扫描文件分析opcode,进行2gram的处理,用保存的MLP模型进行预测。最后将两次预测结果进行与操作,得到最准确的扫描结果,进行报警。其过程如下:
数据收集
对《web安全之深度学习实战》这本书所带的php文件,对它们进行初步处理,按照序号重命名,其中正常文件825个,webshell文件614个
分析数据
朴素贝叶斯模型
通过load_word函数加载文件内容,通过os.walk读取文件夹下所有文件,通过遍历,判断是否是php文件,如果是进行读取。在读取前首先使用chardet包的detect函数检测文件的编码方式,之后以该编码方式打开。读取后进行初步的处理,将换行符等字符替换为空格,将其加入到读取的内容列表中,列表中每一个元素代表一个文档的内容,同时函数返回读取的对应的文档名的列表。其函数流程如下:
通过B_get_feature函数得到用于朴素贝叶斯训练的特征。
该函数首先通过调用load_word函数得到了正常文件和webshell文件的内容,并且进行标记,webshell文件标记为1,正常文件标记为0。然后通过CountVectorizer的fit_transform函数提取词向量,并且最多只保留10000个特征,即10000个词,得到词频结果。得到的词频矩阵如图所示,如第一行表示,第0个文档的在词集中序号为1843的词出现了2次,之后可以通过toarray转换为矩阵形式。
CountVectorizer的方法参数说明https://blog.csdn.net/weixin_38278334/article/details/82320307
同时要保存相应的词集,以便应用模型时,对未知文件的特征的提取,词集包含10000个作为特征的词,以json格式保存。
最后对得到的矩阵进行TF-IDF的处理。通过TfidfTransformer的fit_transform得到各个文件的各个词的TF-IDF的值。结果如图所示,如第一行表示,第0个文件在词集中第9342个词的TF-IDF值为0.048694,之后可以通过toarray转换为矩阵形式。
MLP模型
通过get_opcode函数编译php文件,得到对应的opcode。编译环境我选择的为php5.6.27的版本。通过php扩展进行编译分析,命令为php.exe -dvld.active=1 -dvld.execute=0 文件名。执行命令采用的subprocess包的getstatusoutput函数,该函数返回两个值,一个为执行的状态,一个为返回的结果。但是返回的结果如图所示,包含很多无用的信息,我通过正则匹配进行处理提取。而opcode为带下划线的大写字母,所以匹配规则为:s([A-Z_]{2,})s,提取的opcode至少为两个字符。最后将opcode以空格拼接为一个序列写入对应的文件中。
通过MLP_get_feature函数得到用于MLP模型的特征。
该函数首先通过调用load_opcode函数得到了正常文件和webshell文件分析后的opcode,并且进行标记,webshell文件标记为1,正常文件标记为0。然后通过CountVectorizer的fit_transform函数提取词向量,并且最多只保留10000个特征,即10000个opcode,得到opcode词频结果,这些特征首先采用2Gram处理,即要考虑每个单独的opcode和前一个和后一个的关系,即统计时,统计前后两个opcode一起出现的概率。其余与朴素贝叶斯的结果大致相同。
最后要保存相应的词集,以便应用模型时,对未知文件的特征的提取,词集包含10000个作为特征的词,以json格式保存
训练和测试算法
两个模型都通过交叉验证进行验证,将数据集的40%分为测试集,60%用于训练。
朴素贝叶斯模型
验证结果,在预测为webshell的文件中,有329个为真webshell文件,4个为正常文件,而有21个webshell文件没有检测出来。
- | webshell文件 | 正常文件 |
---|---|---|
检索到 | 329 | 4 |
未检索到 | 21 | 219 |
准确率为:0.95
精确率为:0.98
召回率为:0.91
MLP模型
验证结果,在预测为webshell的文件中,有323个为真webshell文件,8个为正常文件,而有4个webshell文件没有检测出来。
- | webshell文件 | 正常文件 |
---|---|---|
检索到 | 323 | 8 |
未检索到 | 4 | 236 |
准确率为:0.98
精确率为:0.97
召回率为:0.98
使用模型
界面设计
界面采用pyqt拖拽设计,之后再自动生成对应的代码,设计如图所示
openFile函数为选择文件按钮的槽函数,通过点击按钮,显示选择文件窗口,选择文件后,将文件目录显示在条形编辑框区域。
打开文件或文件夹参考https://www.jianshu.com/p/98e8218b2309
train_btn函数为训练模型按钮的槽函数,通过点击按钮,为train函数创建新线程,进行模型的训练,防止界面卡死。
Train函数为训练模型的函数,该函数调用前文所述的一些列函数,进行训练,并且为了以后扫描的方便,在前文所述函数中,使用dump函数,保存已经训练好的模型,扫描预测时直接调用即可。
sklearn模型的保存https://www.cnblogs.com/zichun-zeng/p/4761602.html
dump使用需要导入from sklearn.externals import joblib
但出现报错,直接import joblib
导入
scan_btn函数为开始扫描按钮的槽函数,通过点击按钮,为scan函数创建新线程,进行模型的训练,防止界面卡死。
scan函数为扫描函数,通过读取条形编辑框区域的文件路径,遍历路径下的文件,同样使用两种模型,采用前文所述方法对文件进行处理,处理时读取保存的词集进行提取特征。之后读取保存的模型,对文件进行预测。不同的是预测结果结合两种模型的预测结果,通过与两种模型的预测结果给出最终的预测结果。将可能是webshell的文件显示出来。
扫描演示
在扫描时需要先训练模型,模型训练后会进行保存,如果模型已经训练完成,就可以直接进行扫描。扫描速度还是十分快的,主要影响扫描速度的因素是,在进行MLP预测时,需要对文件进行编译,分析opcode,占用了扫描的时间。
源代码
getShell.py
import os
import re
import subprocess
import json
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.neural_network import MLPClassifier
import joblib
import chardet
import time
# 获取文件的opcode
def get_opcode(path, rpath):
for filepath, dirnames, filenames in os.walk(path):
for filename in filenames:
filetype = os.path.splitext(filename)[1]
filetype = filetype.lower()
if filetype == '.php':
cmd = 'D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php.exe -dvld.active=1 -dvld.execute=0 ' + os.path.join(
filepath, filename)
try:
status, output = subprocess.getstatusoutput(cmd)
#print(output)
if status == 0:
opcode_list = re.findall(r's([A-Z_]{2,})s', output)
opcode = ' '.join(opcode_list)
pre = str(time.time()) + '.txt'
file = os.path.join(rpath, pre)
with open(file, 'w') as f2:
f2.write(str(opcode))
else:
print("执行失败" + filename)
except:
print("执行失败" + filename)
# 对php文件进行处理
def get_shell(path, rpath, i):
for filepath, dirnames, filenames in os.walk(path):
for filename in filenames:
i = i + 1
filetype = os.path.splitext(filename)[1]
filetype = filetype.lower()
if filetype == '.php':
file = os.path.join(filepath, filename)
with open(file, 'r') as f:
result = f.read()
pre = str(i) + '.php'
file = os.path.join(rpath, pre)
with open(file, 'w') as f2:
f2.write(result)
return i
# 得到文件的内容
def load_word(path):
result = []
filelist = []
for filepath, dirnames, filenames in os.walk(path):
for filename in filenames:
filetype = os.path.splitext(filename)[1]
filetype = filetype.lower()
if filetype == '.php':
with open(os.path.join(filepath, filename), 'rb') as f:
r = f.read()
de = chardet.detect(r)['encoding']
try:
result.append(r.decode(encoding=de).replace('
', ' ').replace('
', ' ').replace(' ', ' '))
filelist.append(os.path.join(filepath, filename))
except:
print("失败" + filename)
return result, filelist
# 得到opcode文件的opcode
def load_opcode(path):
result = []
filelist = []
for filepath, dirnames, filenames in os.walk(path):
for filename in filenames:
with open(os.path.join(filepath, filename), 'r') as f:
r = f.read()
try:
result.append(r)
filelist.append(os.path.join(filepath, filename))
except:
print("失败" + filename)
return result, filelist
# 得到用于MLP训练的特征
def MLP_get_feature():
webshell, list1 = load_opcode(r'C:UsersDesktopphpwebshellopt')
y1 = [1] * len(webshell)
normal, list2 = load_opcode(r'C:UsersDesktopphp
ormalopt')
y2 = [0] * len(normal)
x = webshell + normal
y = y1 + y2
CV = CountVectorizer(ngram_range=(2, 2), decode_error="ignore", max_features=10000, token_pattern=r'w+',
min_df=1, max_df=1.0)
#print(CV.fit_transform(x))
x = CV.fit_transform(x).toarray()
x_2 = CV.vocabulary_
x_2 = json.dumps(x_2)
with open(r'C:UsersDesktopphpx_2.json', 'w') as f:
f.write(x_2)
clf = MLPClassifier(solver='lbfgs', alpha=1e-5, hidden_layer_sizes=(5, 2), random_state=1)
mo = clf.fit(x, y)
joblib.dump(mo, 'xx.model')
return x, y
# 得到用于贝叶斯训练的特征
def B_get_feature():
webshell, list1 = load_word(r'C:UsersDesktopphpwebshell')
y1 = [1] * len(webshell)
normal, list2 = load_word(r'C:UsersDesktopphp
ormal')
y2 = [0] * len(normal)
x_1 = webshell + normal
y = y1 + y2
CV = CountVectorizer(decode_error="ignore", max_features=10000, token_pattern=r'w+', min_df=1, max_df=1.0)
x = CV.fit_transform(x_1).toarray()
#print(CV.fit_transform(x_1))
#保存词集
x_1 = CV.vocabulary_
x_1 = json.dumps(x_1)
with open(r'C:UsersDesktopphpx_1.json', 'w') as f:
f.write(x_1)
transformer = TfidfTransformer(smooth_idf=False)
x_tfidf = transformer.fit_transform(x)
print(x_tfidf)
x = x_tfidf.toarray()
gnb = GaussianNB()
mo = gnb.fit(x, y)
#保存模型
joblib.dump(mo, 'x.model')
return x, y
# 统计模型好坏的函数
def do_metrics(y_test, y_pred):
print("分类准确率为:")
print(metrics.accuracy_score(y_test, y_pred))
print("混淆矩阵为:")
print(metrics.confusion_matrix(y_test, y_pred))
print("精确率为:")
print(metrics.precision_score(y_test, y_pred))
print("召回率为:")
print(metrics.recall_score(y_test, y_pred))
# 进行贝叶斯预测
def do_nb(x, y):
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.4, random_state=0)
gnb = GaussianNB()
gnb.fit(x_train, y_train)
y_pred = gnb.predict(x_test)
do_metrics(y_test, y_pred)
# 进行MLP预测
def do_mlp(x, y):
clf = MLPClassifier(solver='lbfgs', alpha=1e-5, hidden_layer_sizes=(5, 2), random_state=1)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.4, random_state=0)
clf.fit(x_train, y_train)
y_pred = clf.predict(x_test)
do_metrics(y_test, y_pred)
# 对收集的数据进行处理
def dealDate():
# 处理数据集
path = 'C:/Users/Desktop/data/webshell/w/'
rpath = r'C:UsersGuoyangDesktop网安设计php
ormal'
i = 0
i = get_shell(path, rpath, i)
path = 'C:/Users/Desktop/data/webshell/normal/'
i = get_shell(path, rpath, i)
print('正常文件有' + str(i))
path = 'C:/Users/Desktop/data/webshell/b/'
rpath = r'C:UsersGuoyangDesktop网安设计phpwebshell'
i = 0
i = get_shell(path, rpath, i)
path = 'C:/Users/Desktop/data/webshell/webshell/'
i = get_shell(path, rpath, i)
print('木马文件有' + str(i))
# #得到opcode
# path = r'C:UsersDesktopphp
ormal'
# rpath = r'C:UsersDesktopphp
ormalopt1'
# get_opcode(path,rpath)
# path = r'C:UsersDesktopphpwebshell'
# rpath = r'C:UsersDesktopphpwebshellopt'
# get_opcode(path,rpath)
# #训练贝叶斯,预测
# x,y=B_get_feature()
# do_nb(x,y)
#
# 训练MLP,预测
x,y=MLP_get_feature()
do_mlp(x,y)
webshellScan.py
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'webshellScan.ui'
#
# Created by: PyQt5 UI code generator 5.13.2
#
# WARNING! All changes made in this file will be lost!
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_Form(object):
def setupUi(self, Form):
Form.setObjectName("Form")
Form.resize(1000, 661)
self.widget = QtWidgets.QWidget(Form)
self.widget.setGeometry(QtCore.QRect(20, 40, 961, 591))
self.widget.setObjectName("widget")
self.verticalLayout = QtWidgets.QVBoxLayout(self.widget)
self.verticalLayout.setContentsMargins(0, 0, 0, 0)
self.verticalLayout.setObjectName("verticalLayout")
self.horizontalLayout = QtWidgets.QHBoxLayout()
self.horizontalLayout.setObjectName("horizontalLayout")
self.lineEdit = QtWidgets.QLineEdit(self.widget)
self.lineEdit.setObjectName("lineEdit")
self.horizontalLayout.addWidget(self.lineEdit)
self.pushButton_3 = QtWidgets.QPushButton(self.widget)
self.pushButton_3.setObjectName("pushButton_3")
self.horizontalLayout.addWidget(self.pushButton_3)
self.pushButton = QtWidgets.QPushButton(self.widget)
self.pushButton.setObjectName("pushButton")
self.horizontalLayout.addWidget(self.pushButton)
self.pushButton_2 = QtWidgets.QPushButton(self.widget)
self.pushButton_2.setObjectName("pushButton_2")
self.horizontalLayout.addWidget(self.pushButton_2)
self.verticalLayout.addLayout(self.horizontalLayout)
self.label = QtWidgets.QLabel(self.widget)
self.label.setText("")
self.label.setObjectName("label")
self.verticalLayout.addWidget(self.label)
self.textBrowser = QtWidgets.QTextBrowser(self.widget)
self.textBrowser.setObjectName("textBrowser")
self.verticalLayout.addWidget(self.textBrowser)
self.retranslateUi(Form)
QtCore.QMetaObject.connectSlotsByName(Form)
def retranslateUi(self, Form):
_translate = QtCore.QCoreApplication.translate
Form.setWindowTitle(_translate("Form", "webshell扫描工具"))
self.pushButton_3.setText(_translate("Form", "选择文件"))
self.pushButton.setText(_translate("Form", "训练模型"))
self.pushButton_2.setText(_translate("Form", "开始扫描"))
webshellScan_ui.py
from webshellScan import Ui_Form
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import *
import sys
from getShell import *
import joblib
import threading
class myWin(QtWidgets.QWidget, Ui_Form):
def __init__(self):
super(myWin, self).__init__()
self.setupUi(self)
self.pushButton_3.clicked.connect(self.openFile)
self.pushButton_2.clicked.connect(self.scan_btn)
self.pushButton.clicked.connect(self.train_btn)
def openFile(self):
dir = QFileDialog.getExistingDirectory(self.widget, "打开目录", '')
self.lineEdit.setText(dir)
def train(self):
self.label.setText('正在训练模型……')
global x1_train, x2_train, y1_train, y2_train
x1_train, y1_train = B_get_feature()
x2_train, y2_train = MLP_get_feature()
self.label.setText('训练完成!')
# self.textBrowser.append('训练完成')
def train_btn(self):
t = threading.Thread(target=self.train)
t.start()
def scan(self):
path = self.lineEdit.text()
self.label.setText('加载词集……')
# 加载词集
with open(r'C:UsersGuoyangDesktopphpx_1.json', 'r') as f:
x_1 = json.load(f)
print(x_1)
with open(r'C:UsersDesktopphpx_2.json', 'r') as f:
x_2 = json.load(f)
self.label.setText('进行贝叶斯预测……')
# 贝叶斯预测
x1, filelist = load_word(path)
CV = CountVectorizer(decode_error="ignore", max_features=10000, token_pattern=r'w+', min_df=1, max_df=1.0,
vocabulary=x_1)
x1 = CV.fit_transform(x1).toarray()
transformer = TfidfTransformer(smooth_idf=False)
x_tfidf = transformer.fit_transform(x1)
x1 = x_tfidf.toarray()
# 导入训练好的模型
gnb = joblib.load('x.model')
y1_pred = gnb.predict(x1)
# MLP预测
self.label.setText('进行MLP训练……')
self.label.setText('得到opcode……')
if os.path.exists(r'C:UsersDesktopopcode') == False:
os.mkdir(r'C:UsersDesktopopcode')
get_opcode(path, r'C:UsersDesktopopcode')
x2, filelist2 = load_opcode(r'C:UsersDesktopopcode')
CV = CountVectorizer(ngram_range=(2, 2), decode_error="ignore", max_features=10000, token_pattern=r'w+',
min_df=1, max_df=1.0, vocabulary=x_2)
x2 = CV.fit_transform(x2).toarray()
# 导入训练好的模型
clf = joblib.load('xx.model')
y2_pred = clf.predict(x2)
# 最终预测结果
self.label.setText('最终预测结果')
y = y1_pred & y2_pred
y = list(y)
self.textBrowser.append('可能存在威胁的文件')
for i in range(len(y)):
if y[i] == 1:
self.textBrowser.append(filelist[i])
def scan_btn(self):
t = threading.Thread(target=self.scan)
t.start()
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
Widget = myWin()
Widget.show()
sys.exit(app.exec_())