zoukankan      html  css  js  c++  java
  • 对巡风vulscan的理解

    # coding:utf-8
    # 漏洞检测引擎
    import urllib2
    import thread
    import time
    import pymongo
    import sys
    import datetime
    import hashlib
    import json
    import re
    import uuid
    import os
    from kunpeng import kunpeng
    
    
    sys.path.append(sys.path[0] + '/vuldb')
    # 加载漏洞插件的目录
    sys.path.append(sys.path[0] + "/../")
    
    from config import ProductionConfig
    
    db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT)
    na_db = getattr(db_conn, ProductionConfig.DBNAME)
    na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD)
    na_task = na_db.Task
    na_result = na_db.Result
    na_plugin = na_db.Plugin
    na_config = na_db.Config
    na_heart = na_db.Heartbeat
    na_update = na_db.Update
    lock = thread.allocate()
    PASSWORD_DIC = []
    THREAD_COUNT = 50
    TIMEOUT = 10
    PLUGIN_DB = {}
    TASK_DATE_DIC = {}
    WHITE_LIST = []
    kp = kunpeng()
    
    #巡风的漏洞扫描技术要先看初始化的方法,看完初始化的方法,再从main函数开始看,这样容易理解,分析完main函数之后,再从start函数慢慢看
    class vulscan():
        # 初始化操作
        def __init__(self, task_id, task_netloc, task_plugin):
            self.task_id = task_id #任务id
            self.task_netloc = task_netloc #任务端口
            self.task_plugin = task_plugin #任务插件
            self.result_info = '' #任务结果
            self.start() #任务开始
    
    # start ---> 开始检测
        def start(self):
            self.get_plugin_info()
            #获取插件列表,这个在80多行有个get_plugin_info方法,主要就是info = xxx。
            #这个是获取.json格式的漏洞库,主要用来是探测,并不能起到exp的作用
            if '.json' in self.plugin_info['filename']:  # 标示符检测模式
                self.load_json_plugin()  # 读取漏洞标示
                # 跟踪load_json_plugin()函数,读取的时候,是字符串。所以需要转换成json的形式。
                self.set_request()  # 标示符转换为请求
                # 这个就是发送request的请求了
                self.poc_check()  # 检测
            #     进行poc验证,这个地方需要是check(),里面有py,md5,json三种格式的请求,所以要分别的验证一下
    
            # 如果时kunpeng的脚本开始验证,分为两种一种是web端的http,https,一种是除了web端。没咋懂,就这样吧,有空再看
            elif 'KP-' in self.plugin_info['filename']:
                self.log(str(self.task_netloc) + 'call kunpeng - ' + self.plugin_info['filename'])
                kp.set_config(TIMEOUT, PASSWORD_DIC)
                if self.task_netloc[1] != 80:
                    self.result_info = kp.check('service', '{}:{}'.format(
                        self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename'])
                if not self.result_info:
                    scheme = 'http'
                    if self.task_netloc[1] == 443:
                        scheme = 'https'
                    self.result_info = kp.check('web', '{}://{}:{}'.format(
                        scheme, self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename'])
    
            else:  # 脚本检测模式,这个利用的py脚本的poc,还是比较有意思的,可能能说到说到的就是这一个了,这个和我们平常写的跑py脚本差不多。
                plugin_filename = self.plugin_info['filename']
                # log里面封装的是一个print方法
                self.log(str(self.task_netloc) + 'call ' + self.task_plugin)
                if task_plugin not in PLUGIN_DB:
                    
                    plugin_res = __import__(plugin_filename)
                    setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC)  # 给插件声明密码字典
                    PLUGIN_DB[plugin_filename] = plugin_res
                self.result_info = PLUGIN_DB[plugin_filename].check(
                    str(self.task_netloc[0]), int(self.task_netloc[1]), TIMEOUT)
            self.save_request()  # 保存结果
    
        def get_plugin_info(self):
            info = na_plugin.find_one({"name": self.task_plugin})
            self.plugin_info = info
    
        def load_json_plugin(self):
            json_plugin = open(sys.path[0] + '/vuldb/' +
                               self.plugin_info['filename']).read()
            self.plugin_info['plugin'] = json.loads(json_plugin)['plugin']
    
        def set_request(self):
            url = 'http://' + 
                self.task_netloc[0] + ":" + 
                str(self.task_netloc[1]) + self.plugin_info['plugin']['url']
            if self.plugin_info['plugin']['method'] == 'GET':
                request = urllib2.Request(url)
            else:
                request = urllib2.Request(url, self.plugin_info['plugin']['data'])
            self.poc_request = request
    
        def get_code(self, header, html):
            try:
                m = re.search(r'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I)
                if m:
                    return m.group(1).replace('"', '')
            except:
                pass
            try:
                if 'Content-Type' in header:
                    Content_Type = header['Content-Type']
                    m = re.search(r'.*?charset=(.*?)(;|$)',
                                  Content_Type, flags=re.I)
                    if m:
                        return m.group(1)
            except:
                pass
    
        def poc_check(self):
            try:
                res = urllib2.urlopen(self.poc_request, timeout=30)
                res_html = res.read(204800)
                header = res.headers
                # res_code = res.code
            except urllib2.HTTPError, e:
                # res_code = e.code
                header = e.headers
                res_html = e.read(204800)
            except Exception, e:
                return
            try:
                html_code = self.get_code(header, res_html).strip()
                if html_code and len(html_code) < 12:
                    res_html = res_html.decode(html_code).encode('utf-8')
            except:
                pass
            an_type = self.plugin_info['plugin']['analyzing']
            vul_tag = self.plugin_info['plugin']['tag']
            analyzingdata = self.plugin_info['plugin']['analyzingdata']
            if an_type == 'keyword':
                # print poc['analyzingdata'].encode("utf-8")
                if analyzingdata.encode("utf-8") in res_html:
                    self.result_info = vul_tag
            elif an_type == 'regex':
                if re.search(analyzingdata, res_html, re.I):
                    self.result_info = vul_tag
            elif an_type == 'md5':
                md5 = hashlib.md5()
                md5.update(res_html)
                if md5.hexdigest() == analyzingdata:
                    self.result_info = vul_tag
    
        def save_request(self):
            if self.result_info:
                time_ = datetime.datetime.now()
                self.log(str(self.task_netloc) + " " + self.result_info)
                v_count = na_result.find(
                    {"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count()
                if not v_count:
                    na_plugin.update({"name": self.task_plugin},
                                     {"$inc": {'count': 1}})
                vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'],
                           "vul_type": self.plugin_info['type']}
                w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1],
                         "vul_info": vulinfo, "info": self.result_info, "time": time_,
                         "task_date": TASK_DATE_DIC[str(self.task_id)]}
                na_result.insert(w_vul)
                # self.wx_send(w_vul)  # 自行定义漏洞提醒
    
        def log(self, info):
            lock.acquire()
            try:
                time_str = time.strftime('%X', time.localtime(time.time()))
                print "[%s] %s" % (time_str, info)
            except:
                pass
            lock.release()
    
    
    def queue_get():
        global TASK_DATE_DIC
        task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={
                                           "$set": {"status": 1}}, sort={'time': 1})
        if task_req:
            TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()
            return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
        else:
            task_req_row = na_task.find({"plan": {"$ne": 0}})
            if task_req_row:
                for task_req in task_req_row:
                    if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']):
                        if task_req['isupdate'] == 1:
                            task_req['target'] = update_target(
                                json.loads(task_req['query']))
                            na_task.update({"_id": task_req['_id']}, {
                                           "$set": {"target": task_req['target']}})
                        na_task.update({"_id": task_req['_id']}, {
                                       "$inc": {"status": 1}})
                        TASK_DATE_DIC[str(task_req['_id'])
                                      ] = datetime.datetime.now()
                        return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']
            return '', '', '', ''
    
    
    def update_target(query):
        target_list = []
        try:
            result_list = na_db.Info.find(query)
            for result in result_list:
                target = [result["ip"], result["port"]]
                target_list.append(target)
        except:
            pass
        return target_list
    
    
    def monitor():
        global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST
        while True:
            queue_count = na_task.find({"status": 0, "plan": 0}).count()
            if queue_count:
                load = 1
            else:
                ac_count = thread._count()
                load = float(ac_count - 6) / THREAD_COUNT
            if load > 1:
                load = 1
            if load < 0:
                load = 0
            na_heart.update({"name": "load"}, {
                            "$set": {"value": load, "up_time": datetime.datetime.now()}})
            PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()
            if load > 0:
                time.sleep(8)
            else:
                time.sleep(60)
    
    
    def get_config():
        try:
            config_info = na_config.find_one({"type": "vulscan"})
            pass_row = config_info['config']['Password_dic']
            thread_row = config_info['config']['Thread']
            timeout_row = config_info['config']['Timeout']
            white_row = config_info['config']['White_list']
            password_dic = pass_row['value'].split('
    ')
            thread_count = int(thread_row['value'])
            timeout = int(timeout_row['value'])
            white_list = white_row['value'].split('
    ')
            return password_dic, thread_count, timeout, white_list
        except Exception, e:
            print e
    
    def install_kunpeng_plugin():
        time_ = datetime.datetime.now()
        for plugin in kp.get_plugin_list():
            level_list = ['紧急','高危','中危','低危','提示']
            plugin_info = {
                '_id': plugin['references']['kpid'],
                'name': 'Kunpeng -' + plugin['name'],
                'info': plugin['remarks'] + ' ' + plugin['references']['cve'],
                'level': level_list[int(plugin['level'])],
                'type': plugin['type'],
                'author': plugin['author'],
                'url': plugin['references']['url'],
                'source': 1,
                'keyword': '',
                'add_time': time_,
                'filename': plugin['references']['kpid'],
                'count': 0
            }
            na_plugin.insert(plugin_info)
    
    def init():
        time_ = datetime.datetime.now()
        if na_plugin.find().count() >= 1:
            return
        script_plugin = []
        json_plugin = []
        print 'init plugins'
        file_list = os.listdir(sys.path[0] + '/vuldb')
        for filename in file_list:
            try:
                if filename.split('.')[1] == 'py':
                    script_plugin.append(filename.split('.')[0])
                if filename.split('.')[1] == 'json':
                    json_plugin.append(filename)
            except:
                pass
        for plugin_name in script_plugin:
            try:
                res_tmp = __import__(plugin_name)
                plugin_info = res_tmp.get_plugin_info()
                plugin_info['add_time'] = time_
                plugin_info['filename'] = plugin_name
                plugin_info['count'] = 0
                na_plugin.insert(plugin_info)
            except:
                pass
        for plugin_name in json_plugin:
            try:
                json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()
                plugin_info = json.loads(json_text)
                plugin_info['add_time'] = time_
                plugin_info['filename'] = plugin_name
                plugin_info['count'] = 0
                del plugin_info['plugin']
                na_plugin.insert(plugin_info)
            except:
                pass
        install_kunpeng_plugin()
    
    
    def kp_check():
        while True:
            try:
                new_release = kp.check_version()
                print new_release
                if new_release:
                    info = new_release['body']
                    if '###' in new_release['body']:
                        info = new_release['body'].split('###')[1]
                    row = {
                        'info': info,
                        'isInstall': 0,
                        'name': new_release['name'],
                        'author': new_release['author']['login'],
                        'pushtime': new_release['published_at'],
                        'location': "",
                        'unicode': new_release['tag_name'],
                        'coverage': 0,
                        'source': 'kunpeng'
                    }
                    na_update.insert(row)
                    time.sleep(60 * 60 * 48)
            except Exception as e:
                print e
            time.sleep(60 * 30)
    
    
    def kp_update():
        while True:
            try:
                row = na_update.find_one_and_delete(
                    {'source': 'kunpeng', 'isInstall': 1})
                if row:
                    kp.update_version(row['unicode'])
                    na_plugin.delete_many({'_id':re.compile('^KP')})
                    install_kunpeng_plugin()
            except Exception as e:
                print e
            time.sleep(10)
    
    
    if __name__ == '__main__':
        init()
        PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() #从数据库里面获得配置参数,比如白名单,引擎的线程
        thread.start_new_thread(monitor, ())  #巡风的心跳线程
        thread.start_new_thread(kp_check, ())  #巡风的Kunpeng库,检查更新去情况,跟踪kp_check-->check_version-->_get_release_lates,这个函数发现的是与云上的库进行对比,如果有更新的话,就提示更新
        # 如果没有更新的话,就不提示了
        thread.start_new_thread(kp_update, ()) #kunpeng库更新,上面那个只是推送出来,但是还没有更新,这个是巡风更新的线程,就是如果你把kunpeng库确定更新了之后,然后才能是执行这个线程
        while True:
            try:
                task_id, task_plan, task_target, task_plugin = queue_get()  #获取队列,简单来说就是获得任务的参数
                #如果status : 0 ,这个是未执行的状态,如果是status : 1就是正在执行的状态
                if task_id == '':
                    time.sleep(10)
                    # 每个间隔10秒钟
                    continue
                if PLUGIN_DB:
                    del sys.modules[PLUGIN_DB.keys()[0]]  # 清理插件缓存,这个我也没太懂学长讲的怎么回事...
                    PLUGIN_DB.clear()
                for task_netloc in task_target:
                    while True:
                        if int(thread._count()) < THREAD_COUNT:
                            #如果任务的线程数<50
                            if task_netloc[0] in WHITE_LIST:
                                #如果探测的资产是在白名单里面,那就break,结束  (^-^)
                                break
                            try:
                                thread.start_new_thread(
                                    vulscan, (task_id, task_netloc, task_plugin))
                            except Exception as e:
                                print e
                            break
                        else:
                            time.sleep(2)
                if task_plan == 0:
                    na_task.update({"_id": task_id}, {"$set": {"status": 2}})
            except Exception as e:
                print e

    巡风如果说作为一个检测内网的手段,真的是非常好用。另外分析下巡风的扫描,也能学到不少东西,比如我之前以为扫描摄像头的时候,以为是扫描的数据流,其实扫描的路径,拼接路径就可以了。还有巡风打poc的地方,也可以学习学习。不过自我感觉,还是巡风的资产探测比较好用,主要的还是要把巡风的资产探测学好,巡风的资产探测还是比较好的

  • 相关阅读:
    Failed to convert WOFF 2.0 font to SFNT 字体图标显示不出来
    每日一字:困
    每日一字:biáng
    sqlserver数据类型简记
    每日一字:天
    可以作为今生目标的一句话
    每日一字:惑
    [转载]优秀程序员的12中能力
    每日一字:怒
    [转载]像树一样活着
  • 原文地址:https://www.cnblogs.com/Triangle-security/p/11703741.html
Copyright © 2011-2022 走看看