zoukankan      html  css  js  c++  java
  • 巡风源码阅读与分析---nascan.py

    Nascan是巡风主要是做目标的资产识别(信息收集)

    nascan.py 文件位于 nascan/nascan.py

    # coding:utf-8
    # author:wolf@YSRC
    import thread
    from lib.common import *
    from lib.start import *
    if __name__ == "__main__":
        try:
            CONFIG_INI = get_config()  # 读取配置
            log.write('info', None, 0, u'获取配置成功')
            STATISTICS = get_statistics()  # 读取统计信息
            MASSCAN_AC = [0]
            NACHANGE = [0]
            thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))  # 心跳线程
            thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))  # 失效记录删除线程
            socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2)  # 设置连接超时
            ac_data = []
            while True:
                now_time = time.localtime()
                now_hour = now_time.tm_hour
                now_day = now_time.tm_mday
                now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day)
                cy_day, ac_hour = CONFIG_INI['Cycle'].split('|')
                log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle']))
                if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:  # 判断是否进入扫描时段
                    ac_data.append(now_date)
                    NACHANGE[0] = 0
                    log.write('info', None, 0, u'开始扫描')
                    s = start(CONFIG_INI)
                    s.masscan_ac = MASSCAN_AC
                    s.statistics = STATISTICS
                    s.run()
                time.sleep(60)
        except Exception, e:
            print e

    读取了配置,get_config() 跟进去

    nascan/lib/common.py

    def get_config():
        config = {}
        config_info = mongo.na_db.Config.find_one({"type": "nascan"})
        for name in config_info['config']:
            if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']:
                config[name] = format_config(name, config_info['config'][name]['value'])
            else:
                config[name] = config_info['config'][name]['value']
        return config

    就是读取了mongodb里面Config表下的内容。

    回到nascan.py

    get_statistics()则是读取统计信息,返回时间。

    也是位于nascan/lib/common.py

    def get_statistics():
        date_ = datetime.datetime.now().strftime('%Y-%m-%d')
        now_stati = mongo.na_db.Statistics.find_one({"date": date_})
        if not now_stati:
            now_stati = {date_: {"add": 0, "update": 0, "delete": 0}}
            return now_stati
        else:
            return {date_: now_stati['info']}

    MASSCAN_AC 是系统来判断是否支持masscan扫描。为1的话就是masscan正在扫描。

     NACHANGE 是用来看现在的扫描列表和开始的列表有没有变化,有变化设为1

    thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE))  # 心跳线程
    thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC))  # 失效记录删除线程
    socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2)  # 设置连接超时

    进入monitor心跳线程

    位于nascan/lib/common.py

    def monitor(CONFIG_INI, STATISTICS, NACHANGE):
        while True:
            try:
                time_ = datetime.datetime.now()
                date_ = time_.strftime('%Y-%m-%d')
                mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}})
                if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0}
                mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True)
                new_config = get_config()
                if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1
                CONFIG_INI.clear()
                CONFIG_INI.update(new_config)
            except Exception, e:
                print e
            time.sleep(30)

    再次调用了get_config(),获取了配置信息,如果Config表的base64编码值如果有变化将NACHANGE[0]改成NACHANGE[1]。系统更新config,然后睡眠30秒,表示需要重新扫描。

    返回nascan.py

    Cruise()函数,位于nascan/lib/common.py

    def cruise(STATISTICS,MASSCAN_AC):
        while True:
            now_str = datetime.datetime.now()
            week = int(now_str.weekday())
            hour = int(now_str.hour)
            if week >= 1 and week <= 5 and hour >= 9 and hour <= 18:  # 非工作时间不删除
                try:
                    data = mongo.NA_INFO.find().sort("time", 1)
                    for history_info in data:
                        while True:
                            if MASSCAN_AC[0]:  # 如果masscan正在扫描即不进行清理
                                time.sleep(10)
                            else:
                                break
                        ip = history_info['ip']
                        port = history_info['port']
                        try:
                            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                            sock.connect((ip, int(port)))
                            sock.close()
                        except Exception, e:
                            time_ = datetime.datetime.now()
                            date_ = time_.strftime('%Y-%m-%d')
                            mongo.NA_INFO.remove({"ip": ip, "port": port})
                            log.write('info', None, 0, '%s:%s delete' % (ip, port))
                            STATISTICS[date_]['delete'] += 1
                            del history_info["_id"]
                            history_info['del_time'] = time_
                            history_info['type'] = 'delete'
                            mongo.NA_HISTORY.insert(history_info)
                except:
                    pass
            time.sleep(3600)

    记录失效目标并删除线程,对目标(ip:port)进行sock连接,如果连接不上就删除INFO里面ipport。然后写进history表里。

    回到nascan.py

    if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:  # 判断是否进入扫描时段

    (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data)

    是判断是否到达扫描的周期时间。

    或者就是NACHANGE[0]的值为1,任何一个成立都可以重新扫描。

    进入Start()函数

    nascan/lib/start.py

    start类中,__init__初始化了传递过来的配置信息。直接看run(),处理目标IP地址和使用masscan进行初步扫描等。

    def run(self):
        global AC_PORT_LIST
        all_ip_list = []
        for ip in self.scan_list:
            if "/" in ip: ip = cidr.CIDR(ip)
            if not ip:continue
            ip_list = self.get_ip_list(ip)
            for white_ip in self.white_list:
                if white_ip in ip_list:
                    ip_list.remove(white_ip)
            if self.mode == 1:
                self.masscan_path = self.config_ini['Masscan'].split('|')[2]
                self.masscan_rate = self.config_ini['Masscan'].split('|')[1]
                ip_list = self.get_ac_ip(ip_list)
                self.masscan_ac[0] = 1
                AC_PORT_LIST = self.masscan(ip_list)  # 如果安装了Masscan即使用Masscan进行全端口扫描
                if not AC_PORT_LIST: continue
                self.masscan_ac[0] = 0
                for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str)  # 加入队列
                self.scan_start()  # 开始扫描
            else:
                all_ip_list.extend(ip_list)
        if self.mode == 0:
            if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list)
            for ip_str in all_ip_list: self.queue.put(ip_str)  # 加入队列
            self.scan_start()  # TCP探测模式开始扫描

    if "/" in ip: ip = cidr.CIDR(ip) ,支持这样的格式:127.0.0.1/24

    if self.mode == 1 判断是否支持masscan扫描,如果支持就使用Masscan进行全端口扫描。如果没有开启,将ip添加到all_ip_list这个列表中。

    masscan()函数

    nascan/lib/start.py

    def masscan(self, ip):
        try:
            if len(ip) == 0: return
            sys.path.append(sys.path[0] + "/plugin")
            m_scan = __import__("masscan")
            result = m_scan.run(ip, self.masscan_path, self.masscan_rate)
            return result
        except Exception, e:
            print e
            print 'No masscan plugin detected'

    调用了/plugin/masscan.py

    def run(ip_list,path,rate):
        try:
            ip_file = open('target.log','w')
            ip_file.write("
    ".join(ip_list))
            ip_file.close()
            path = str(path).translate(None, ';|&')
            rate = str(rate).translate(None, ';|&')
            if not os.path.exists(path):return
            os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate))
            result_file = open('tmp.log', 'r')
            result_json = result_file.readlines()
            result_file.close()
            del result_json[0]
            del result_json[-1]
            open_list = {}
            for res in result_json:
                try:
                    ip = res.split()[3]
                    port = res.split()[2]
                    if ip in open_list:
                        open_list[ip].append(port)
                    else:
                        open_list[ip] = [port]
                except:pass
            os.remove('target.log')
            os.remove('tmp.log')
            return open_list
        except:
            pass

    先过滤了;|&三个特殊字符。然后拼接到命令中

    masscan -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=20000

    masscan扫描好了后保存tmp.log文件里然后读取结果。

    不管开没开masscan,都会进入scan_start()

    跟进到ThreadNum,位于/nascan/lib/start.py 

    class ThreadNum(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
    
        def run(self):
            while True:
                try:
                    task_host = self.queue.get(block=False)
                except:
                    break
                try:
                    if self.mode:
                        port_list = AC_PORT_LIST[task_host]
                    else:
                        port_list = self.config_ini['Port_list'].split('|')[1].split('
    ')
                    _s = scan.scan(task_host, port_list)
                    _s.config_ini = self.config_ini  # 提供配置信息
                    _s.statistics = self.statistics  # 提供统计信息
                    _s.run()
                except Exception, e:
                    print e
                finally:
                    self.queue.task_done()

    run()函数,把IP地址和端口号列表传到另一个scan()函数中。

    位于/nascan/lib/scan.py

    class scan:
        def __init__(self, task_host, port_list):
            self.ip = task_host
            self.port_list = port_list
            self.config_ini = {}
    
        def run(self):
            self.timeout = int(self.config_ini['Timeout'])
            for _port in self.port_list:
                self.server = ''
                self.banner = ''
                self.port = int(_port)
                self.scan_port()  # 端口扫描
                if not self.banner:continue
                self.server_discern()  # 服务识别
                if self.server == '':
                    web_info = self.try_web()  # 尝试web访问
                    if web_info:
                        log.write('web', self.ip, self.port, web_info)
                        time_ = datetime.datetime.now()
                        mongo.NA_INFO.update({'ip': self.ip, 'port': self.port},
                                             {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info,
                                                       'time': time_}})

    scan类的run函数。先进行了端口扫描,scan_port()函数

    位于/nascan/lib/scan.py

    def scan_port(self):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.connect((self.ip, self.port))
            time.sleep(0.2)
        except Exception, e:
            return
        try:
            self.banner = sock.recv(1024)
            sock.close()
            if len(self.banner) <= 2:
                self.banner = 'NULL'
        except Exception, e:
            self.banner = 'NULL'
        log.write('portscan', self.ip, self.port, None)
        banner = ''
        hostname = self.ip2hostname(self.ip)
        time_ = datetime.datetime.now()
        date_ = time_.strftime('%Y-%m-%d')
        try:
            banner = unicode(self.banner, errors='replace')
            if self.banner == 'NULL': banner = ''
            mongo.NA_INFO.insert({"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})
            self.statistics[date_]['add'] += 1
        except:
            if banner:
                history_info = mongo.NA_INFO.find_and_modify(
                    query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True)
                if history_info:
                    mongo.NA_INFO.insert(
                        {"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})
                    self.statistics[date_]['update'] += 1
                    del history_info["_id"]
                    history_info['del_time'] = time_
                    history_info['type'] = 'update'
                    mongo.NA_HISTORY.insert(history_info)

    通过socket连接,获得端口服务返回的banner信息,然后进入server_discern()函数,通过正则表达式,依次比较,获得服务类型。

    server_discern()函数

    位于/nascan/lib/scan.py

    def server_discern(self):
        for mark_info in self.config_ini['Discern_server']: # 快速识别
            try:
                name, default_port, mode, reg = mark_info
                if mode == 'default':
                    if int(default_port) == self.port:
                        self.server = name
                elif mode == 'banner':
                    matchObj = re.search(reg, self.banner, re.I | re.M)
                    if matchObj:
                        self.server = name
                if self.server:break
            except:
                continue
        if not self.server and self.port not in [80,443,8080]:
            for mark_info in self.config_ini['Discern_server']:  # 发包识别
                try:
                    name, default_port, mode, reg = mark_info
                    if mode not in ['default','banner']:
                        dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        dis_sock.connect((self.ip, self.port))
                        mode = mode.decode('string_escape')
                        reg = reg.decode('string_escape')
                        dis_sock.send(mode)
                        time.sleep(0.3)
                        dis_recv = dis_sock.recv(1024)
                        matchObj = re.search(reg, dis_recv, re.I | re.M)
                        if matchObj:
                            self.server = name
                            break
                except:
                    pass
        if self.server:
            log.write("server", self.ip, self.port, self.server)
            mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, {"$set": {"server": self.server}})

    对于没识别出来的服务类型,端口号又不是常见端口号,会重新发包,发送特定包才会返回应答banner的服务类型。

    最后如果还没识别出来,进入try_web()函数

    位于/nascan/lib/scan.py

    def try_web(self):
        title_str, html = '', ''
        try:
            if self.port == 443:
                info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout)
            else:
                info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout)
            html = info.read()
            header = info.headers
        except urllib2.HTTPError, e:
            html = e.read()
            header = e.headers
        except:
            return
        if not header: return
        if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']:  # 解压gzip
            html_data = StringIO.StringIO(html)
            gz = gzip.GzipFile(fileobj=html_data)
            html = gz.read()
        try:
            html_code = self.get_code(header, html).strip()
            if html_code and len(html_code) < 12:
                html = html.decode(html_code).encode('utf-8')
        except: pass
        try:
            title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M)
            if title: title_str = title.group(1)
        except: pass
        try:
            web_banner = str(header) + "
    
    " + html
            self.banner = web_banner
            history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port})
            if 'server' not in history_info:
                tag = self.get_tag()
                web_info = {'title': title_str, 'tag': tag}
                return web_info
            else:
                if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner) / 60:
                    del history_info['_id']
                    history_info['del_time'] = datetime.datetime.now()
                    mongo.NA_HISTORY.insert(history_info)
                    tag = self.get_tag()
                    web_info = {'title': title_str, 'tag': tag}
                    date_ = datetime.datetime.now().strftime('%Y-%m-%d')
                    self.statistics[date_]['update'] += 1
                    log.write('info', None, 0, '%s:%s update web info'%(self.ip, self.port))
                    return web_info
        except:
            return
    
    def ip2hostname(self,ip):
        try:
            hostname = socket.gethostbyaddr(ip)[0]
            return hostname
        except:
            pass
        try:
            query_data = "x00x00x00x00x00x01x00x00x00x00x00x00x20x43x4bx41x41" + 
                         "x41x41x41x41x41x41x41x41x41x41x41x41x41x41x41x41x41" + 
                         "x41x41x41x41x41x41x41x41x41x41x41x00x00x21x00x01"
            dport = 137
            _s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            _s.settimeout(3)
            _s.sendto(query_data, (ip, dport))
            x = _s.recvfrom(1024)
            tmp = x[0][57:]
            hostname = tmp.split("x00", 2)[0].strip()
            hostname = hostname.split()[0]
            return hostname
        except:
            pass
    
    def get_code(self, header, html):
        try:
            m = re.search(r'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I)
            if m: return m.group(1).replace('"', '')
        except:
            pass
        try:
            if 'Content-Type' in header:
                Content_Type = header['Content-Type']
                m = re.search(r'.*?charset=(.*?)(;|$)', Content_Type, flags=re.I)
                if m: return m.group(1)
        except:
            pass

    这个函数就是尝试用web访问,如果有结果的话就保存下来,没有的话就不管了。

    回到nascan

    大概每隔一分钟探测是否要进行扫描。

    参考文章:

    https://landgrey.me/xunfeng-nascan-analysis/

    https://www.cnblogs.com/yangxiaodi/p/8011563.html

  • 相关阅读:
    如何在Eclipse中查看Java类库的源代码以及相应的api
    深入剖析ConcurrentHashMap
    Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
    Quartz配置
    Spring 自动定时任务配置
    @Resource注解的官方解释
    @Resource 注解的使用
    扫地机器人会否抛弃激光雷达这位原配?
    女教授领军打造最耐用机器人,可从180米高空落下执行救援任务
    一文看懂80年“AI革命”简史
  • 原文地址:https://www.cnblogs.com/zhengjim/p/9436008.html
Copyright © 2011-2022 走看看