zoukankan      html  css  js  c++  java
  • 公有云厂商自建威胁情报系统

    公有云厂商自建威胁情报系统

    2017年8月23日发布

    52,237
    0
    12

    导语:公有云厂商为什么要建立威胁情报系统?威胁情报的价值在哪里?我们不做会有什么损失?威胁情报交换能给我们带来什么附加价值?

    0x00、公有云厂商安全威胁

    Q:公有云厂商为什么要建立威胁情报系统?威胁情报的价值在哪里?我们不做会有什么损失?威胁情报交换能给我们带来什么附加价值?

    各个问题都很尖锐~

    业务层面

    根据自己的公有云厂商业务特点(提供云主机、云存储、云数据库、云GPU),建立威胁情报体系,那么面临的威胁 主要集中在DDoS攻击和主机入侵。

    1、有效的做到事中及时防御,降低用户遭受DDoS攻击总量,减少DDoS用户攻击成本。

    2、DDoS攻击溯源,云安全增值服务。

    3、内部针对性攻击事件溯源。

    技术层面

    1、历史追溯:通过聚类关联分析内部外部历史威胁情报数据,获得攻击事件

    domain/IP/URL/samples/analysis report

    2、及时阻断:能够对DDoS发起点(botnet主控端)、中间过程发起点(反射源/匿名代理TORP2P节点等)、到达点(直接攻击源),造成攻击无法持续。侧重点在前两部分。

    3、DDoS溯源,找到攻击源关联的Domain注册信息、Email信息、云主机注册信息等任何通过身份信息注册的账号。

    下面是一些攻击证据展示:

    Threat Top 1、公有云平台业务系统和云上用户遭受DDoS攻击

    Proof 1、没一天停过,在黑洞之前,攻击峰值总和维持在30G左右。

    公有云厂商自建威胁情报系统

    Proof 2、云上用户占比7成、云基础设施占3成。

    公有云厂商自建威胁情报系统

    Threat Top 2、云内主机中马

    Proof 1、对外DDoS攻击告警

    公有云厂商自建威胁情报系统

    Proof 2、CDN网络遭受入侵

    Proof3、DGA DNS Analytics解析监控,Xshell 感染终端监控。

    当然还有很多威胁,我就不在这里赘述。

    0x01、如何自建公有云厂商威胁情报系统

    首先,需要建立一整套威胁情报生命周期管理:

    在保护云上用户的隐私的前提下,有效的建立一套自动化情报收集、情报处理(特征提取工程、关联分析)、情报应用(抗D 、云WAF黑白名单、云基础设施定向攻击排查),以及情报共享(与拥有情报数据资源的厂商交换数据)。

    其次,在带宽和服务器资源允许的情况下,使用DPI/DFI技术、蜜罐技术、沙箱技术建立自身的威胁情报收集系统,提供高质量的威胁情报(注意,是带证据的IOC情报)、通过AI技术提高检测速度和降低误报率。

    自动化情报收集系统

    @1、云数据中心NIDS部署,集中收集日志。

    Suricata IDS + ELK

    网络拓扑

    公有云厂商自建威胁情报系统

    主要关注:shellcode监控、mysqlRDPSSH登陆尝试、木马活动、DoS等

    @2、DDoS攻击数据源抓取

    先说说技术方案选型问题,考虑到性能问题,主要是抓5元组的数据,那么DPI方案不成,xflow流量采样的方式(例如:绿盟NTA解决方案),1:1000抓包,在高速DDoS攻击的时候是无法抓到瞬间脉冲攻击包的。好吧,只有自己开发基于DPDK的数据统计和抓包系统。

    公有云厂商自建威胁情报系统

    @3、Web威胁数据收集

    基础架构是复用DDoS Pcapdump服务器,根据云平台基础设施IP列表,抓取7层数据,存储到redis,然后汇总到数据仓库中。

    @4、云平台基础设施服务器部署HIDS系统

    有两种选择,一是使用传统的HIDS软件的方式收集,二是使用EDR机器学习的方法分析,我选择了后者。

    公有云厂商自建威胁情报系统

    公有云厂商自建威胁情报系统

    详细描述一下在这一领域的实践之路:

    1、训练数据选择

    选择800万良性文件和400万恶意文件进行训练,Top 10 恶意家族是:

    公有云厂商自建威胁情报系统

    2、训练模型选择

    (1)    通过https://github.com/erocarrera/pefile.git(VirusTotal和Cuckoo都在使用)访问PE文件(PE头信息、PE导入表、PE导出表、PE节数据、PE版本信息) 哈希所有部分。

    Linux系统、MacOSX系统使用https://github.com/lief-project/LIEF 获取 elf文件和MachO文件头相关信息,并且哈希。

    (2)    通过对准确率、漏报率、性能、模型大小、查询执行时间的评估来选择模型。(目前使用sklearn库,将来打算移植到ML Toolkit for TensorFlow,听说训练速度可以提升50倍,毕竟GPU越来越吊)

    公有云厂商自建威胁情报系统

    通过对文件判断的malware分类结果上报数据,提供基于主机的威胁情报源。

    @5、公网蜜罐系统

    公有云厂商自建威胁情报系统

    有关公网蜜罐有以下思考:

    (1)需要用高交互蜜罐,直接修改真实服务加入log记录系统,当然log系统尽量做的隐藏些。

    蜜罐本身有一定威胁情报产生能力(降低误报(3次以上持续攻击才记录)、生成有价值情报(例如:web蜜罐产生sqlixss等情报数据,而不是只收集access log))

    (2)有一套完善的基础服务管理系统,可以快速部署和销毁蜜罐系统,符合大规模部署要求。

    情报处理

    其实就是建立一套威胁溯源平台,平台提供查询接口,提供一个可视化的关系图图,例如:

    公有云厂商自建威胁情报系统

    单更重要的情报逻辑处理部分:

    这里以DDoS一次攻击事件分析说起:

    (1)    首先通过自动化收集系统中收集到的攻击数据:

    Data1:{攻击目的IP/攻击时间/攻击类型/攻击峰值/攻击持续时间}

    Data2: {攻击目的IP /攻击目的端口/攻击时间/攻击源IP/攻击源端口/地理位置/攻击IP运营商/包大小}  取Top 1000数据

    (2)    集合历史攻击数据 ,获取超过2次攻击源IP

    (3)    通过端口扫描,回扫。确定IP存活状态和开放端口状态。

    (4)    通过外联威胁情报查询IP信誉。确定IP状态排除TOR/匿名代理等

    (5)    通过内外情报源查询攻击IP对应malware样本。

    (6)    关联蜜罐系统收集样本,获取botnet Server地址。

    (7)    对发生攻击的botnet Server发起反制措施。

    情报共享

    建立提高情报数据共享接口,同时需要对对外提供数据做脱敏处理。

    @1、进入到ELK数据,建立威胁情报对外接口

    (1)入库:

    #!/usr/bin/env python
    #coding=utf-8
    import traceback
    from elasticsearch import Elasticsearch
    import sys
    import json
    import datetime,time
    import psycopg2
    connC = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")
    conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")
    def GetNum():
             es = Elasticsearch('x.x.x.x')
             data = es.search(index=index_day, body={"query": {"match_all": {}}},size=1)
             return data["hits"]["total"]
    def Getidslog():
             es = Elasticsearch('x.x.x.x')
             tmp_str = datetime.datetime.now().strftime('%Y.%m.%d')
             index_day = 'logstash-' + tmp_str
             es.indices.put_settings(
                                                     {"index": {
                                                            "max_result_window": 500000
                                                     }})
             num = GetNum()
             data = es.search(index=index_day, body={"query": {"match_all": {}}}, size=num)
             datalen=len(data["hits"]["hits"])
             tableName = "%s_%s" % ("idslog", time.strftime("%Y%m%d"))
             cur1 = conn1.cursor()
             try:
                  for c in xrange(0,datalen):
    #                   print c
                         if data["hits"]["hits"][c]:
                                m_ctime=data["hits"]["hits"][c]["_source"]["@timestamp"]
                                m_src_ip=data["hits"]["hits"][c]["_source"]["src_ip"]
                                m_category=data["hits"]["hits"][c]["_source"]["alert"]["category"]
                                m_signature=data["hits"]["hits"][c]["_source"]["alert"]["signature"]
                                sql = "INSERT INTO %s (ctime,src_ip, category,signature) VALUES ('%s','%s','%s','%s')"
                                sqlCmd = sql % (tableName, m_ctime, m_src_ip, m_category, m_signature)
                                print sqlCmd
                                cur1.execute(sqlCmd)
                                conn1.commit()
             except Exception,e:
                  traceback.print_exc()
    def CreateTable():
        curC = connC.cursor()
        sqlCreate = "create table if not exists %s ( 
                     ctime TEXT,
                     src_ip TEXT,
                     category TEXT,
                     signature TEXT
                     )"
        tableName = "%s_%s"%("idslog", time.strftime("%Y%m%d"))
        sqlCmd = sqlCreate%tableName
        curC.execute(sqlCmd)
        curC.close()
        connC.commit()
    if __name__ == '__main__':
             CreateTable()
             Getidslog()

    (2)通过django web方式提供API接口。

      url(r'^api/outxxx/id$', outputAPI.as_view()),//获取某个IP对应的威胁情报,(只提供必要的情报,内部数据需要脱敏)
    url(r'^api/outxxx/IPlist$', outputIPlistAPI.as_view()), //获得所有ip列表
    class outputAPI(APIView):
        def get(self, request, format=None):
            m_src_ip = request.GET.get("ip") //安全机制已屏蔽
            print m_src_ip
            tableName ='idslog_20170814'
            conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",
                                    port="5432")
            cur1 = conn1.cursor()
            SQL1="select * from %s WHERE src_ip=%s" %(tableName,m_src_ip)
            cur1.execute(SQL1)
            rows = cur1.fetchall()
            list =[]
            for row in rows:
                m = {"ctime": row[0], "src_ip": row[1],"category":row[2],"signature":row[3]}
                list.append(m)
            b=json.dumps(list)
            return HttpResponse(b)
    class outputIPlistAPI(APIView):
        def get(self, request, format=None):
            tableName = 'idslog_20170814'
            conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",
                                     port="5432")
            cur1 = conn1.cursor()
            SQL1 = "select DISTINCT(src_ip) from {} ".format(tableName)
            cur1.execute(SQL1)
            rows = cur1.fetchall()
            list = []
            for row in rows:
                m = {"src_ip": row[0]}
                list.append(m)
            b = json.dumps(list)
            return HttpResponse(b)

    0x03、总结

    公有云厂商自建威胁情报系统,是一个长期的投入,虽然短期看不出来多少效果,但是针对数据情报的积累到一定量的时候会有质的变化。

  • 相关阅读:
    MyBatis——调用存储过程
    企业信息化快速开发平台JeeSite
    JavaWeb网页聊天室(WebSocket即时通讯)
    Java用webSocket实现tomcat的日志实时输出到web页面
    Java用WebSocket + tail命令实现Web实时日志
    linux 跨IP拷贝命令 scp
    在map中根据value获取key
    mysql 常用函数
    Nexus中自定义私服,每个项目都用独立的工厂,仓库
    button 默认类型是submit
  • 原文地址:https://www.cnblogs.com/zafu/p/8623974.html
Copyright © 2011-2022 走看看