zoukankan      html  css  js  c++  java
  • ELK部署与使用总结

    前言

    自己最近在负责elk的工作,在这里想写一个总结,把好多遇到的问题啥的,都写一下,也做个笔记,

    目录

    环境介绍
    kafka,zookeeper安装
    logstash安装
    elasticsearch安装
    lucene语法
    kafka使用
    elasticsearch插件安装
    elasticsearch常用操作

    环境介绍

    这次部署ELK还有filebeat都是5.6.3版本,整体数据流是filebeat logstash kafka logstash elasticsearch grafana(kibana),

    rpm -qf /etc/issue
    centos-release-7-3.1611.el7.centos.x86_64
    
    zookeeper-3.4.10
    kafka_2.11-0.10.0.1
    
    

    kafka,zookeeper安装

    安装有些部分和我的ansible笔记重了,我就不一一列举了,大家可以多看看,我都是用supervisor管理的kafka和zookeeper的启停,下面是supervisor的配置文件。

    [root@jumpserver common]# cat templates/kafka.ini 
    [program:kafka]
    command = /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    autorestart=true
    redirect_stderr = true
    stdout_logfile = /opt/kafka/supervisor_logs/kafka.log
    stopasgroup=true
    environment = JAVA_HOME=/opt/java
    
    [root@jumpserver common]# cat templates/zookeeper.ini 
    [program:zookeeper]
    command = /opt/zookeeper/bin/zkServer.sh start-foreground
    autorestart=true
    redirect_stderr = true
    stdout_logfile = /opt/zookeeper/supervisor_logs/zookeeper.log
    stopasgroup=true
    environment = JAVA_HOME=/opt/java
    
    

    logstash安装

    需要把java放到/usr/bin/下,要不就要修改logstash的配置参数。

    ll /usr/bin/java
    lrwxrwxrwx 1 root root 18 Sep 6 17:05 /usr/bin/java → /opt/java/bin/java
    
    

    否则会有如下报错:

    Using provided startup.options file: /etc/logstash/startup.options
    /usr/share/logstash/vendor/jruby/bin/jruby: line 388: /usr/bin/java: No such file or directory
    Unable to install system startup script for Logstash.
    
    

    elasticsearch安装

    最好按照官方文档进行一些设置,如关闭swap,jvm内存设置不要超过内存的一半并且不要超过32G,关闭这些设置可以参考这篇文档,下面把一些文件给大家看看:

    cat templates/elasticsearch.yml |egrep -v "^#|^$"
    cluster.name: moji
    node.name: {{ ansible_hostname }}
    path.data: /opt/elasticsearch/data
    path.logs: /opt/elasticsearch/logs
    bootstrap.memory_lock: true
    network.host: 0.0.0.0
    discovery.zen.ping.unicast.hosts: [{{ cluster_list|map('regex_replace', '^(.*)$', '"\1"')|join(',') }}]
    http.cors.enabled: true 
    http.cors.allow-origin: "*"
    
    cat templates/elasticsearch.service |egrep -v "^#|^$"
    [Unit]
    Description=Elasticsearch
    Documentation=http://www.elastic.co
    Wants=network-online.target
    After=network-online.target
    [Service]
    Environment=ES_HOME=/usr/share/elasticsearch
    Environment=CONF_DIR=/etc/elasticsearch
    Environment=DATA_DIR=/var/lib/elasticsearch
    Environment=LOG_DIR=/var/log/elasticsearch
    Environment=PID_DIR=/var/run/elasticsearch
    EnvironmentFile=-/etc/sysconfig/elasticsearch
    WorkingDirectory=/usr/share/elasticsearch
    User=elasticsearch
    Group=elasticsearch
    ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec
    ExecStart=/usr/share/elasticsearch/bin/elasticsearch 
                                                    -p ${PID_DIR}/elasticsearch.pid 
                                                    --quiet 
                                                    -Edefault.path.logs=${LOG_DIR} 
                                                    -Edefault.path.data=${DATA_DIR} 
                                                    -Edefault.path.conf=${CONF_DIR}
    StandardOutput=journal
    StandardError=inherit
    LimitNOFILE=65536
    LimitNPROC=2048
    LimitMEMLOCK=infinity
    TimeoutStopSec=0
    KillSignal=SIGTERM
    KillMode=process
    SendSIGKILL=no
    SuccessExitStatus=143
    [Install]
    WantedBy=multi-user.target
    

    lucene语法

    下面是写在grafana中过滤特定条件的

    domain:$domain AND NOT http_code:499
    
    domain:$domain AND http_code:499
    
    

    kafka使用

    现在logstash使用new consumer来管理详细的介绍为:https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

    查看消费情况(默认是所有消费者)

    cd /opt/kafka/bin
    
    ./kafka-consumer-groups.sh --new-consumer --group logstash --bootstrap-server 172.16.21.7:9096 --describe
    
    
    old version check status:
    
    cd /opt/kafka/bin
    ./kafka-topics.sh —list —zookeeper /ops|while read topic; do ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker —group logstash —topic $topic —zookeeper /ops; done
    

    删除一个topic

    ./kafka-topics.sh --delete --zookeeper / --topic nginx-log-wis
    

    elasticsearch插件安装

    目前用了两个插件headHQ,这两个插件从git clone下来后用nginx启动就行了,下面是我用来启动AdminLTE的例子可做参考,就是改下root,然后写个server的标签就行了

    cat /etc/nginx/conf.d/lte.conf 
        server {
            listen       2000 ;
            root         /application/AdminLTE;
            location / {
            }
        }
    
    

    elasticsearch常用操作

    删除一段时间的数据,size默认是10,最大的是一个es的设置,我们一般先query看一下size如果比较大的话,如果不大,直接设置个比较大的size就行了。我这个index的date格式的name是timestamp。

    post
    
    http://192.168.3.3:9200/daily-summary-statistics-http-code/_delete_by_query
    
    {
      "size": 100000,
      "query": {
        "bool": {
          "filter": [
            {
              "range": {
                "timestamp": {
                  "gte": "2018-01-09T00:00:00.000+08:00",
                  "lt": "2018-01-10T00:00:00.000+08:00"
                }
              }
            }
          ]
        }
      }
    }
    

    修改template数值

    get http://192.168.3.3:9200/_template/logstash
    

    然后找到logstash字段,都copy出来,把想改的改了,在put回去,我是改了refresh_interval

    put http://192.168.3.3:9200/_template/logstash
    
    
    {
       "order": 0,
        "version": 50001,
        "template": "logstash-*",
        "settings": {
            "index": {
                "refresh_interval": "30s"
            }
        },
        "mappings": {
            "_default_": {
                "_all": {
                    "enabled": true,
                    "norms": false
                },
                "dynamic_templates": [
                    {
                        "message_field": {
                            "path_match": "message",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false
                            }
                        }
                    },
                    {
                        "string_fields": {
                            "match": "*",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false,
                                "fields": {
                                    "keyword": {
                                        "type": "keyword",
                                        "ignore_above": 256
                                    }
                                }
                            }
                        }
                    }
                ],
                "properties": {
                    "@timestamp": {
                        "type": "date",
                        "include_in_all": false
                    },
                    "@version": {
                        "type": "keyword",
                        "include_in_all": false
                    },
                    "geoip": {
                        "dynamic": true,
                        "properties": {
                            "ip": {
                                "type": "ip"
                            },
                            "location": {
                                "type": "geo_point"
                            },
                            "latitude": {
                                "type": "half_float"
                            },
                            "longitude": {
                                "type": "half_float"
                            }
                        }
                    }
                }
            }
        },
        "aliases": {}
    }
    
    

    elasticsearch数据处理

    我们用es收集完数据之后,我们需要提取一些我们想要的数据做永久存储,

    我们现在只收集了nginx的log,格式如下,还没仔细整理呢:

    log_format  access_log_json      '{"remote_addr":"$remote_addr","host":"$host","time_iso8601":"$time_iso8601","request":"$request","status":"$status","body_bytes_sent":"$body_bytes_sent","http_referer":"$http_referer","http_user_agent":"$http_user_agent","http_x_forwarded_for":"$http_x_forwarded_for","upstream_response_time":"$upstream_response_time","uri":"$uri","request_time":"$request_time"}';
    

    下面的是python脚本,以下好多脚本会用到,我们的index名称规则是logstash-nginx-log-appname然后加时间,

    from datetime import timedelta
    import datetime
    import requests
    
    G_URL = "http://192.168.3.3:9200"
    headers = {"content-type": "application/json"}
    ES_DATA_KEEP_DAYS = 7
    
    time_map = {
        "mappings" : {
            "doc" : {
                "properties" : {
                    "timestamp" : { "type" : "date" }
                }
            }
        }
    }
    
    def get_apps():
        res = requests.get(
            G_URL + "/_cat/indices?v",
            json={}
        ).text
        apps = set()
        for line in res.strip().split("
    "):
            lines = line.split()
            if lines[2].startswith("logstash-nginx-log"):
                index_name = lines[2]
                app_name = index_name.split("-")[-2] if index_name.split("-")[-2] != "log" else "whv3"
                apps.add(app_name)
        return list(apps)
    
    
    def get_iso_day(days_before_now=14):
        now = datetime.datetime.now()
        return (now - timedelta(days=days_before_now)).strftime('%Y-%m-%dT00:00:00.000+08:00')
    
    
    def get_one_day_qps_json_no_domain(days_before_now=14):
        return {
            "size": 0,
            "query": {
                "bool": {
                    "filter": [
                        {
                            "range": {
                                "time_iso8601": {
                                    "gte": get_iso_day(days_before_now),
                                    "lte": get_iso_day(days_before_now-1)
                                }
                            }
                        }
                    ]
                }
            },
            "aggs": {
                "count_per_interval": {
                    "date_histogram": {
                        "interval": "1s",
                        "field": "time_iso8601",
                        "min_doc_count": 0,
                        "time_zone": "Asia/Shanghai"
                    },
                    "aggs": {
                        "count_max": {
                            "sum": {
                                "script": {
                                    "source": "1"
                                }
                            }
                        }
                    }
                },
                "max_count": {
                    "max_bucket": {
                        "buckets_path": "count_per_interval>count_max"
                    }
                }
            }
    
        }
    
    
    def get_one_day_http_code(days_before_now=14):
        return {
            "size": 0,
            "query": {
                "bool": {
                    "filter": [
                        {
                            "range": {
                                "time_iso8601": {
                                    "gte": get_iso_day(days_before_now),
                                    "lt": get_iso_day(days_before_now-1)
                                }
                            }
                        }
                    ]
                }
            },
            "aggs": {
                "4": {
                    "terms": {
                        "field": "host.keyword",
                        "size": 10,
                        "order": {
                            "_count": "desc"
                        },
                        "min_doc_count": 1
                    },
                    "aggs": {
                        "5": {
                            "terms": {
                                "field": "status",
                                "size": 10,
                                "order": {
                                    "_count": "desc"
                                },
                                "min_doc_count": 1
                            },
                            "aggs": {
                                "2": {
                                    "date_histogram": {
                                        "interval": "1d",
                                        "field": "time_iso8601",
                                        "min_doc_count": 0,
                                        "time_zone": "Asia/Shanghai"
                                    },
                                    "aggs": { }
                                }
                            }
                        }
                    }
                }
            }
        }
    

    计算每天最大QPS并且不区分域名:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Copyright (c) 2017 - hongzhi.wang <hongzhi.wang@moji.com>
    '''
    Author: hongzhi.wang
    Create Date: 2017/12/14
    Modify Date: 2017/12/14
    '''
    
    import sys
    import os
    
    file_root = os.path.dirname(os.path.abspath("__file__"))
    sys.path.append(file_root)
    
    import requests
    import json
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    import elasticsearch
    from datetime import timedelta
    import time
    import datetime
    
    from settings import *
    
    daily_index_name = "daily-summary-statistics-qps-no-domain"
    
    if len(sys.argv) == 2:
        compute_days = int(sys.argv[1])
    else:
        compute_days = 1
    
    es = Elasticsearch(G_URL)
    
    
    def get_apps():
        res = requests.get(
            G_URL + "/_cat/indices?v",
            json={}
        ).text
        apps = set()
        for line in res.strip().split("
    "):
            lines = line.split()
            if lines[2].startswith("logstash") and 'soa' not in lines[2]:
                index_name = lines[2]
                app_name = index_name.split("-")[-2] if index_name.split("-")[-2] != "log" else "whv3"
                apps.add(app_name)
        return list(apps)
    
    apps = get_apps()
    
    import aiohttp
    import asyncio
    import async_timeout
    
    bodys = []
    
    async def fetch(session, app_name, days):
        index_pattern = "logstash-nginx-log-%s*" % app_name if app_name != "whv3" else "logstash-nginx-log-20*"
        if not es.indices.exists(daily_index_name):
            es.indices.create(index=daily_index_name, body=time_map)
        async with session.post(G_URL + "/%s/_search" % index_pattern, json=get_one_day_qps_json_no_domain(days), headers=headers) as response:
            es_result = json.loads(await response.text())
            try:
                item1 = es_result["aggregations"]
                if item1["max_count"]["value"] > 20:
                    max_qps = item1["max_count"]["value"]
                    max_qps_time = item1["max_count"]["keys"][0]
                    bodys.append({
                        "_index": daily_index_name,
                        "_type": app_name,
                        "_id": "%s-%s" % (app_name, max_qps_time),
                        "_source": {
                            "timestamp": max_qps_time,
                            "max_qps": max_qps,
                        }
                    })
            except Exception as e:
                print(G_URL + "/%s/_search" % index_pattern)
                print(get_one_day_qps_json_no_domain(days))
                print(app_name)
                print(e)
    
    async def main(app_name, days=compute_days):
        async with aiohttp.ClientSession() as session:
            await fetch(session, app_name, days=days)
    
    loop = asyncio.get_event_loop()
    
    tasks = [main(app_name) for app_name in apps]
    loop.run_until_complete(asyncio.wait(tasks))
    
    res = bulk(es, bodys)
    print(datetime.datetime.now())
    print(res)
    
    

    统计每个域名下的httpcode占得百分比

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Copyright (c) 2017 - hongzhi.wang <hongzhi.wang@moji.com> 
    '''
    Author: hongzhi.wang
    Create Date: 2017/12/5
    Modify Date: 2017/12/5
    '''
    import sys
    import os
    
    file_root = os.path.dirname(os.path.abspath("__file__"))
    sys.path.append(file_root)
    
    import requests
    import json
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    import elasticsearch
    from datetime import timedelta
    import time
    import datetime
    
    
    from settings import *
    
    if len(sys.argv) == 2:
        day_to_process = int(sys.argv[1])
    else:
        day_to_process = 1
    
    headers = {"content-type": "application/json"}
    daily_index_name = "daily-summary-statistics-http-code"
    daily_percentage_index_name = "daily-summary-statistics-percentage-http-code"
    
    es = Elasticsearch(G_URL)
    apps = get_apps()
    
    
    def get_detail(app_name):
        index_pattern = "logstash-nginx-log-%s*" % app_name if app_name != "whv3" else "logstash-nginx-log-20*"
        print(index_pattern)
        if not es.indices.exists(daily_index_name):
            es.indices.create(index=daily_index_name, body=time_map)
        if not es.indices.exists(daily_percentage_index_name):
            es.indices.create(index=daily_percentage_index_name, body=time_map)
        res = requests.get(G_URL + "/%s/_search" % index_pattern, json=get_one_day_http_code(day_to_process), headers=headers)
        es_result = json.loads(res.text)
    
        for item in es_result["aggregations"]["4"]["buckets"]:
            domain = item["key"]
            all_sum = item["doc_count"]
            domain_dict = {}
            for detail in item["5"]["buckets"]:
                http_code = detail["key"]
                for final_detail in detail["2"]["buckets"]:
                    domain_dict[http_code] = final_detail["doc_count"]
                    yield {
                        "_index": daily_index_name,
                        "_type": app_name,
                        "_id": "%s-%s-%d-%s" %(app_name, domain, http_code, get_iso_day(day_to_process)),
                        "_source": {
                            "timestamp": get_iso_day(day_to_process),
                            "domain": domain,
                            "http_code": http_code,
                            "count": final_detail["doc_count"],
                        }
                    }
            count200 = domain_dict.get(200, 0)
            yield {
                "_index": daily_percentage_index_name,
                "_type": app_name,
                "_id": "%s-%s-%s" % (app_name, domain, get_iso_day(day_to_process)),
                "_source": {
                    "timestamp": get_iso_day(day_to_process),
                    "domain": domain,
                    "percent200": count200/all_sum,
                }
            }
    
    for i in range(len(apps)):
        print(datetime.datetime.now())
        print("current process is %f%%" % (i/len(apps)*100))
        app_name = apps[i]
        res = bulk(es, get_detail(app_name=app_name))
        print(res)
    
  • 相关阅读:
    Linux (Ubuntu)安装ssh
    Linux (Ubuntu)提示ifconfig:找不到命令
    Docker介绍
    微服务用到的技术
    移动端BI的设计
    Cobbler Web管理(二)
    基于CentOS7环境下的Cobbler部署介绍(一)
    使用google-perftools优化nginx内存管理提升性能
    解决 nginx 配置TLS1.2无效,总是TLS1.0的问题
    在nginx中将爬虫过来的请求转到指定的后端服务
  • 原文地址:https://www.cnblogs.com/WisWang/p/8298736.html
Copyright © 2011-2022 走看看