zoukankan      html  css  js  c++  java
  • 【转载】一个人的安全部之ELK接收Paloalto日志并用钉钉告警

    转载:站着洗澡

    http://ksowo.com/2018/02/01/ELK%E6%8E%A5%E6%94%B6paloalto%E6%97%A5%E5%BF%97%E5%B9%B6%E7%94%A8%E9%92%89%E9%92%89%E5%91%8A%E8%AD%A6/

    https://www.freebuf.com/articles/others-articles/161905.html

    起因

    通报漏洞后,开发未能及时修复漏洞,导致被攻击,领导说我发现被攻击的时间晚了,由于一个人安全部精力有限未能及时看IPS告警,于是做了个钉钉告警。

    本人环境介绍

    ubuntu 14.04
    python 2.7
    kibana-5.5.2
    logstash-5.5.2
    elasticsearch-5.5.2
    paloalto软件版本7.1.14
    

    1、ELK安装

    elasticsearch下载地址:https://www.elastic.co/cn/downloads/elasticsearch

    kibana下载地址:https://www.elastic.co/cn/downloads/kibana

    Logstash下载地址:https://www.elastic.co/cn/downloads/logstash

    elasticsearch和kibana配置就不多说了,比较简单。

    2、logstash的配置

    新建syslog.conf文件,此版本的paloalto的有50多个字段,暴力配置如下,所有转发过来的日志直接丢到elasticsearch里面,最后用kibana展示(可以大屏装X)

    启动logstash:nohup ./bin/logstash -f syslog.conf &

    input{
        syslog{#输入syslog
        type => "syslog"
        port => 514
        }
    }
    
    filter {  
        grok {  
            match => ["message", "%{DATA:Domain}\,%{DATA:Receive-Time}\,%{DATA:Serial}\,%{DATA:Type}\,%{DATA:Threat-Type}\,%{DATA:Config-Version}\,%{DATA:Generate-Time}\,%{IP:Source-address}\,%{IP:Destination-address}\,%{DATA:NAT-Source-IP}\,%{DATA:NAT-Destination-IP}\,%{DATA:Rule}\,%{DATA:Source-User}\,%{DATA:Destination-User}\,%{DATA:Application}\,%{DATA:Virtual-System}\,%{DATA:Source-Zone}\,%{DATA:Destination-Zone}\,%{DATA:Inbound-Interface}\,%{DATA:Outbound-Interface}\,%{DATA:Log-Action}\,%{DATA:Time-Logged}\,%{DATA:Session-ID}\,%{DATA:Repeat-Count}\,%{DATA:Source-Port}\,%{DATA:Destination-Port}\,%{DATA:NAT-Source-Port}\,%{DATA:NAT-Destination-Port}\,%{DATA:Flags}\,%{DATA:IP-Protocol}\,%{DATA:Action}\,%{DATA:URL}\,%{DATA:Threat-Content-Name}\,%{DATA:Category}\,%{DATA:Severity}\,%{DATA:Direction}\,%{DATA:Sequence-Number}\,%{DATA:Action-Flags}\,%{DATA:Source-Country}\,%{DATA:Destination-Country}\,%{DATA:cpadding}\,%{DATA:contenttype}\,%{DATA:pcap_id}\,%{DATA:filedigest}\,%{DATA:cloud}\,%{DATA:url_idx}\,%{DATA:user_agent}\,%{DATA:filetype}\,%{DATA:xff}\,%{DATA:referer}\,%{DATA:sender}\,%{DATA:subject}\,%{DATA:recipient}\,%{DATA:reportid}\,%{DATA:dg_hier_level_1}\,%{DATA:dg_hier_level_2}\,%{DATA:dg_hier_level_3}\,%{DATA:dg_hier_level_4}\,%{DATA:Virtual-System-Name}\,%{DATA:Device-Name}\,%{DATA:file_url}"]  
        }  
    } 
    
    output{
        elasticsearch{
          hosts => ["x.x.x.x:9200"]
          index => "syslog"
        }
        # stdout{#控制台打印输出
        #     codec => rubydebug
        # }
    }
    

    3、paloalto设置

    一共有寄个地方要注意一下,否则日志转发不成功

    1、创建syslog,转发到logstash服务器

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    2、配置转发用syslog

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    3、配置你想要的日志类型和严重性

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    4、在安全策略出匹配设置的日志转发

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    5、最后记得提交配置,否则不生效

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    kibana最终效果

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    4、elastalert设置

    elastalert:https://github.com/Yelp/elastalert.git钉钉告警:https://github.com/xuyaoqiang/elastalert-dingtalk-plugin部分依赖:

    sudo apt-get install python-dev libffi-dev
    

    1、安装过程如下

    pip install elastalert
    或者
    git clone https://github.com/Yelp/elastalert.git 
    cd elastalert
    sudo python setup.py install
    sudo pip install -r requirements.txt
    

    其中有部分依赖可能安装错误,请单独下载安装既可。

    2、安装完继续在elasticsearch中创建elastalert的日志索引

    sudo elastalert-create-index --index elastalert
    

    根据自己的情况,填入elasticsearch的相关信息,关于 elastalert_status部分直接回车默认的即可。 如下所示:

    Enter Elasticsearch host: localhost
    Enter Elasticsearch port: 9200
    Use SSL? t/f: 
    Enter optional basic-auth username (or leave blank): 
    Enter optional basic-auth password (or leave blank): 
    Enter optional Elasticsearch URL prefix (prepends a string to the URL of every request): 
    Name of existing index to copy? (Default None) 
    Elastic Version:5
    Mapping used for string:{'index': 'not_analyzed', 'type': 'string'}
    New index elastalert created
    Done!
    

    3、创建配置文件

    3.1、修改elastalert的配置文件

    下载https://github.com/xuyaoqiang/elastalert-dingtalk-plugin

    把elastalert-dingtalk-plugin中的elastalert_modules、rules和config.yaml复制到elastalert下

    修改config.yaml对应配置

    es_host: elasticsearch 地址
    es_port: elasticsearch 端口
    

    3.2、修改rules的配置文件

    官方有很多rules规则可以去看官方文档:http://elastalert.readthedocs.io/en/latest/ruletypes.html#rule-types

    修改xxx.yaml对应配置:

    name: IPS安全告警
    
    #唯一值重复告警规则type: cardinality
    
    #es_host: localhost
    
    #es_port: 9200
    
    # Index to search, wildcard supported
    
    index: syslog
    
    cardinality_field: Source-address.keyword
    
    #最小5次触发规则min_cardinality: 5
    
    #max_cardinality: 5
    
    # 60秒内
    
    timeframe: seconds: 60
    
    #5分钟内重复告警不告警realert: minutes: 30
    
    # ES 查询,用以过滤
    
    #filter:
    
    #- term:
    
    # Severity: "high"
    
    # (Required)
    
    # The alert is use when a match is found
    
    alert:
    
    *   "debug"#你自己定义的钉钉告警脚本
    *   "elastalert_modules.dingtalk_alert.DingTalkAlerter"
    
    dingtalk_webhook: 在钉钉群中添加机器人可以获取dingtalk_msgtype: "text"
    

    4、钉钉的配置

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    5、配置elastalert变为自定义告警内容

    5.1、启用钉钉的报警

    原生告警比较不友好

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    修改为:

    一个人的安全部之ELK接收paloalto日志并用钉钉告警

    修改了elastalert-dingtalk-plugin-masterelastalert_modulesdingtalk_alert.py里面的代码,

    为了获取告警时间,然后对这个时间变为时间段,到es里面查询,获取对应的字段值,这个里的时间转换比较乱,代码写的渣,大佬可以忽略

    # 获取对应时间段,并查询到对应可疑ip地址
    get_time = body.split('
    ')[4].split(' ')[1]
    times = get_time.split('.')[0].split(':')[:2]
    t = times[0]
    t1 = int(times[1]) - 2
    t2 = int(times[1]) + 2
    # 将其转换为时间数组 
    timeStruct = time.strptime(t + ':' + str(t1), "%Y-%m-%dT%H:%M") 
    # 转换为时间戳: 
    timeStamp1 = int(time.mktime(timeStruct))
    # 时间戳转换为指定格式日期
    localTime = time.localtime(timeStamp1) 
    gt = time.strftime("%Y-%m-%dT%H:%M", localTime) 
    timeStruct = time.strptime(t + ':' + str(t2), "%Y-%m-%dT%H:%M") 
    # 转换为时间戳: 
    timeStamp2 = int(time.mktime(timeStruct))
    # 时间戳转换为指定格式日期
    localTime = time.localtime(timeStamp2) 
    lt = time.strftime("%Y-%m-%dT%H:%M", localTime)
    # print(gt+'
    '+lt)
    es = Elasticsearch("10.11.10.245:9200")
    body = {
        "query": {
            "range" : {
                "@timestamp" : {
                    "gt" : gt,
                    "lt": lt
                                }
                        }
                }
        }
    res = es.search(index="syslog", body=body)
    text = res['hits']['hits']
    if len(text) != 0:
        sip = text[0]['_source']['Source-address']
        dip = text[0]['_source']['Destination-address']
        dport = text[0]['_source']['Destination-Port']
        atype = text[0]['_source']['Threat-Content-Name']
        ntime = text[0]['_source']['Time-Logged']
        payload = {
            "msgtype": self.dingtalk_msgtype,
            "text": {
                "content": "IPS安全告警
    发现源ip地址: %s 在30秒内,对服务器ip:%s 的 %s 端口进行了5次攻击,攻击类型为 %s,请排除或确认攻击!
    (攻击时间点:%s)" % (sip, dip, dport, atype, ntime) 
            },
            "at": {
                "isAtAll":False
            }
        }
    

    6、最后开启所有组件

    ./elasticsearch-5.5.2/bin/elasticsearch &
    ./kibana-5.5.2-linux-x86_64/bin/kibana &
    ./logstash-5.5.2/bin/logstash -f /xxxx/logstash-5.5.2/syslog.conf &
    python -m ./elastalert/elastalert.elastalert --verbose &
    

    至此大功告成,其实elk还可以接收各种日志,自己做分析,然后告警,本文只是其中一个场景,大家可以收集所有日志一起做集中告警。谢谢各位大佬捧场。

  • 相关阅读:
    zabbix+grafana使用
    其它工具网址
    IntelliJ IDEA 进行多线程调试
    mac外接显示器 双屏同时滑动问题
    wacher和acl
    zookeeper介绍
    iterm2用法与技巧
    linux下ssh公钥验证的设置和远程登录
    不变模式
    单例模式创建的三种方式
  • 原文地址:https://www.cnblogs.com/h2zZhou/p/8425205.html
Copyright © 2011-2022 走看看