zoukankan      html  css  js  c++  java
  • ELK Stack企业日志平台文档

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    ELK Stack企业日志平台文档

     

     

     

     

     

    实验环境

    主机名

    IP地址

    配置

    系统版本

    用途

    controlnode

    172.16.1.120

    2核/4G/60G

    Centos7.4

    nginxtomcat

    filebeat

    redis

    logstash

    kibana

    slavenode1

    172.16.1.121

    2核/2G/60G

    Centos7.4

    ES-head

    elastic01

    slavenode2

    172.16.1.122

    2核/2G/60G

    Centos7.4

    elastic02

    slavenode3

    172.16.1.123

    2核/2G/60G

    Centos7.4

    elastic03

     

     

     

     

     

     

    目录

    一、ELK介绍 3

    ELK架构 3

    三、ElasticSearch 4

    3.1 基本概念 4

    3.2 集群部署 4

    3.3 数据操作 7

    3.4 常用查询 8

    示例数据 8

    match_all 9

    fromsize 10

    match 11

    bool 11

    range 12

    3.5 Head插件 13

    四、Logstash 15

    4.1 安装 16

    4.2 条件判断 17

    4.3 输入插件(Input) 17

    4.3.1 所有输入插件支持的配置选项 18

    4.3.1 Stdin 18

    4.3.1 File 19

    4.3.2 TCP 20

    4.3.4 Beats 21

    4.4 编码插件(Codec) 21

    4.4.1 json/json_lines 21

    4.4.2 multline 22

    4.4.3 rubydebug 25

    4.5 过滤器插件(Filter) 25

    4.5.1 json 25

    4.5.2 kv 26

    4.5.3 grok 27

    4.5.4 geoip 31

    4.5.5 date 33

    4.5.6 mutate 35

    4.6 输出插件(Output) 35

    4.6.1 ES 35

    五、Kibana 37

    引入Redis 38

    七、引入Filebeat 39

    八、生产实验 42

     

     

    一、ELK介绍

    ELKStacks是一个技术栈的组合,分别Elasticsearch、Logstash、Kibana 

     

    ELK Stack:

    1、扩展性:采用高扩展性分布式架构设计,可支持每日TB级数据

    2、简单易用:通过图形页面可对日志数据各种统计,可视化

    3、查询效率高:能做到秒级数据采集、处理和搜索

     

    https://www.elastic.co/cn/products/elasticsearch

    https://www.elastic.co/cn/products/kibana

    https://www.elastic.co/cn/products/beats/filebeat

    https://www.elastic.co/cn/products/beats/metricbeat

     

    ELK架构

    wps1 

     

    Logstash :开源的服务器端数据处理管道,能够同时从多个来源采集数据、转换数据,然后将数据存储到数据库中。

    Elasticsearch:搜索、分析和存储数据。

    Kibana:数据可视化。

    Beats :轻量型采集器的平台,从边缘机器向 Logstash 和 Elasticsearch 发送数据。

    Filebeat:轻量型日志采集器。

    https://www.elastic.co/cn/

    https://www.elastic.co/subscriptions

     

    Input:输入,输出数据可以是Stdin、File、TCP、Redis、Syslog等。

    Filter:过滤,将日志格式化。有丰富的过滤插件:Grok正则捕获、Date时间处理、Json编解码、Mutate数据修改等。

    Output:输出,输出目标可以是Stdout、File、TCP、Redis、ES等。

    三、ElasticSearch

    3.1 基本概念

    Node:运行单个ES实例的服务器

    Cluster:一个或多个节点构成集群

    Index:索引是多个文档的集合

    Document:Index里每条记录称为Document,若干文档构建一个Index

    Type:一个Index可以定义一种或多种类型,将Document逻辑分组

    Field:ES存储的最小单元

    Shards:ES将Index分为若干份,每一份就是一个分片

    Replicas:Index的一份或多份副本

     

    ES

    关系型数据库(比如Mysql)

    Index

    Database

    Type

    Table

    Document

    Row

    Field

    Column

     

    3.2 集群部署

    确认时区:

    date

    如果时区是EST,要修改为CST(中国时区)

    cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

    172.16.1.121、172.16.1.122、172.16.1.123节点上配置elastic YUM仓库:

    # rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

    # vim /etc/yum.repos.d/elactic.repo

    [elasticsearch]

    name=Elasticsearch repository for 7.x packages

    baseurl=https://artifacts.elastic.co/packages/7.x/yum

    gpgcheck=1

    gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch

    enabled=0

    autorefresh=1

    type=rpm-md

    # yum clean all

    # yum install --enablerepo=elasticsearch elasticsearch -y

    提示:如果下载慢可以更换为清华源或者直接下载相应版本的rpm包进行安装

    sed -i 's#artifacts.elastic.co/packages/7.x/yum#mirrors.tuna.tsinghua.edu.cn/elasticstack/yum/elastic-7.x/#g' /etc/yum.repos.d/elactic.repo

     

    172.16.1.121节点

    # egrep -v "^$|#" /etc/elasticsearch/elasticsearch.yml

    cluster.name: elk-cluster

    node.name: node-1

    path.data: /var/lib/elasticsearch

    path.logs: /var/log/elasticsearch

    network.host: 172.16.1.121

    http.port: 9200

    discovery.seed_hosts: ["172.16.1.121", "172.16.1.122", "172.16.1.123"]

    cluster.initial_master_nodes: ["172.16.1.121", "172.16.1.122", "172.16.1.123"]

     

    172.16.1.122节点

    # egrep -v "^$|#" /etc/elasticsearch/elasticsearch.yml

    cluster.name: elk-cluster

    node.name: node-2

    path.data: /var/lib/elasticsearch

    path.logs: /var/log/elasticsearch

    network.host: 172.16.1.122

    http.port: 9200

    discovery.seed_hosts: ["172.16.1.121", "172.16.1.122", "172.16.1.123"]

    cluster.initial_master_nodes: ["172.16.1.121", "172.16.1.122", "172.16.1.123"]

     

    172.16.1.123 节点

    # egrep -v "^$|#" /etc/elasticsearch/elasticsearch.yml 

    cluster.name: elk-cluster

    node.name: node-3

    path.data: /var/lib/elasticsearch

    path.logs: /var/log/elasticsearch

    network.host: 172.16.1.123

    http.port: 9200

    discovery.seed_hosts: ["172.16.1.121", "172.16.1.122", "172.16.1.123"]

    cluster.initial_master_nodes: ["172.16.1.121", "172.16.1.122", "172.16.1.123"]

    cluster.name:  # 集群名称

    node.name:     # 节点名称

    path.data:     # 数据目录可以设置多个路径,这种情况下,所有的路径都会存储数据。

    network.host:  # elasticsearch 监听的IP地址

    http.port:     # elasticsearch 监听的端口号

     

    集群主要关注两个参数:

    discovery.seed_hosts: # 单播集群节点IP列表,提供了自动组织集群自动扫描端口9300-9305连接其他节点无需额外配置。

    cluster.initial_master_nodes: # 初始化引导集群节点,集群节点IP列表,初始化时只有一个集群。

     

    分别启动各elastic节点,并加入到开机自启动

    # systemctl restart elasticsearch.service

    # systemctl enable elasticsearch.service

    # netstat -tunlp | grep "java"

    tcp6       0      0 172.16.1.121:9200       :::*                    LISTEN      1810/java          

    tcp6       0      0 172.16.1.121:9300       :::*                    LISTEN      1810/java 

    查看集群节点

    # curl -X GET 'http://172.16.1.121:9200/_cat/nodes?v'

    wps2 

    查看集群健康状态:

    # curl -X GET 'http://172.16.1.121:9200/_cluster/health?pretty'

    wps3 

    green所有的主分片和副本分片都已分配。你的集群是 100% 可用的。

    yellow所有的主分片已经分片了,但至少还有一个副本是缺失的。不会有数据丢失,所以搜索结果依然是完整的。不过,你的高可用性在某种程度上被弱化。如果更多的分片消失,你就会丢数据了。把yellow想象成一个需要及时调查的警告。

    red至少一个主分片(以及它的全部副本)都在缺失中。这意味着你在缺少数据:搜索只能返回部分数据,而分配到这个分片上的写入请求会返回一个异常。

    green/yellow/red 状态是一个概览你的集群并了解眼下正在发生什么的好办法。

     

    补充:

    1修改内存限制

    172.16.1.121、172.16.1.122、172.16.1.123节点上配置

    锁定物理内存地址,防止es内存被交换出去,也就是避免es使用swap交换分区,频繁的交换,会导致IOPS变高。在elasticsearch.yml中开启"bootstrap.memory_lock: true"内存锁配置项需要修改以下配置文件,否则会导致elasticsearch启动失败

    1)jvm堆大小设置为大约可用内存的一半

    # vim /etc/elasticsearch/jvm.options

    -Xms512m

    -Xmx512m

    # 说明:默认锁定内存是1GB,一般设置为服务器内存50%最好,但是最大不能超过32G

    2)允许进程的所有者使用此限制

    # vim /usr/lib/systemd/system/elasticsearch.service

    LimitMEMLOCK=infinity

    # 说明:参数需要自己添加,要放到[install]标签的上面,不然启动不了

    3)重启elasticsearch

    # systemctl restart elasticsearch.service

    3.3 数据操作

    RestFul API格式

    curl -X <verb> '<protocol>://<host>:<port>/<path>?<query_string>' -d '<body>'

    参数

    描述

    verb

    HTTP方法,比如GET、POST、PUT、HEAD、DELETE

    host

    ES集群中的任意节点主机名

    port

    ES HTTP服务端口,默认9200

    path

    索引路径

    query_string

    可选的查询请求参数。例如?pretty参数将返回JSON格式数据

    -d

    里面放一个GET的JSON格式请求主体

    body

    自己写的 JSON格式的请求主体

     

    查看索引:

    curl -X GET 'http://172.16.1.121:9200/_cat/indices?v'  

    新建索引

    curl -X PUT '172.16.1.121:9200/logs-2020.8.05'

    删除索引:

    curl -X DELETE '172.16.1.121:9200/logs-2020.8.05'

    3.4 常用查询

    ES提供一种可用于执行查询JSON式的语言,被称为Query DSL

    示例数据

    使用官方提供的示例数据

    https://www.elastic.co/guide/en/elasticsearch/reference/current/_exploring_your_data.html

    wget https://raw.githubusercontent.com/elastic/elasticsearch/master/docs/src/test/resources/accounts.json

     

    导入数据

    curl -H "Content-Type: application/json" -X POST "172.16.1.121:9200/bank/_doc/_bulk?pretty&refresh" --data-binary "@accounts.json"

    # 说明:bank 表示索引(数据库)_doc 表示type(表)_bulk 表示API接口

    curl -X GET "172.16.1.121:9200/_cat/indices?v"

     

    curl -X GET "172.16.1.121:9200/bank/_search?q=*&sort=account_number:asc&pretty"

    _search:表示查询API接口

    q=* ES批量索引中的所有文档

    sort=account_number:asc 表示根据account_number按升序对结果排序

    以上方式是使用查询字符串替换请求主体

     

    curl -X GET "172.16.1.121:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": { "match_all": {} },

      "sort": [

        { "account_number": "asc" }

      ]

    }'

    这个区别在于不是传入的q=* URI,而是向 _search API提供JSON格式的查询请求体。

     

    导入前的json 数据文件

    wps4 

    导入json 文件后显示的数据

    wps5 

    match_all

    match_all匹配所有文档(行)默认查询

    示例:查询所有,默认只返回10个文档

    curl -X GET "localhost:9200/bank/_search?pretty" -H 'Content-Type: application/json' -d'

    {

      "query": { "match_all": {} }

    }'

    query告诉我们查询什么match_all使我们查询的类型match_all查询仅仅在指定的索引的所有文件进行搜索

     

    fromsize

    除了query参数外还可以传递其他参数影响查询结果比如上面的sortsize

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": { "match_all": {} },

      "size": 1

    }'

    注意size未指定默认为10

     

    返回10-19的文档

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": { "match_all": {} },

      "from": 10,

      "size": 10

    }'

    此功能实现分页功能非常有用如果from未指定默认为0

     

    返回_source字段中的几个字段:

    _source包含的是一行数据Document的所有字段Field

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": { "match_all": {} },

      "_source": ["account_number", "balance"]

    }'

    match

    基本搜索查询,针对特定字段或字段集合进行搜索

     

    查询编号为20的账户:

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": { "match": { "account_number": 20 } }

    }'

    返回地址中包含mill的账户:

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": { "match": { "address": "mill" } }

    }'

    返回地址有包含mill或lane的所有账户

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": { "match": { "address": "mill lane" } }

    }'

    bool

    查询包含mill和lane的所有账户

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": {

        "bool": {

          "must": [

            { "match": { "address": "mill" } },

            { "match": { "address": "lane" } }

          ]

        }

      }

    }'

    bool must指定了所有必须为真才匹配

    查询包含mill或lane的所有账户

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": {

        "bool": {

          "should": [

            { "match": { "address": "mill" } },

            { "match": { "address": "lane" } }

          ]

        }

      }

    }'

    range

    l range

    指定区间内的数字或者时间。

    操作符gt大于gte大于等于lt小于lte小于等于

     

    查询余额大于或等于20000且小于等于30000的账户:

    curl -X GET "localhost:9200/bank/_search" -H 'Content-Type: application/json' -d'

    {

      "query": {

        "bool": {

          "must": { "match_all": {} },

          "filter": {

            "range": {

              "balance": {

                "gte": 20000,

                "lte": 30000

              }

            }

          }

        }

      }

    }'

    3.5 Head插件

    172.16.1.121 节点上操作

     

    安装nodejs环境

    # wget https://npm.taobao.org/mirrors/node/latest-v14.x/node-v14.1.0-linux-x64.tar.gz

    # tar -xzf node-v14.1.0-linux-x64.tar.gz

    # mv node-v14.1.0-linux-x64/ /usr/local/node14.1/

    # vim /etc/profile

    export NODE_HOME=/usr/local/node14.1

    export PATH=$NODE_HOME/bin:$PATH

    # source /etc/profile

    # node -v

    v14.1.0

    # npm -v

    6.14.4

     

    以上步骤是二进制方式安装nodejs,也可以执行下面的命令进行yum安装nodejs

    # yum install nodejs npm -y

    安装elasticsearch-head

    # yum install git -y

    # mkdir -p /tmp/phantomjs/

    # cd /tmp/phantomjs/

    # wget https://github.com/Medium/phantomjs/releases/download/v2.1.1/phantomjs-2.1.1-linux-x86_64.tar.bz2

    # cd /usr/local/

    # git clone https://github.com/mobz/elasticsearch-head.git

    # cd elasticsearch-head/

    # npm install phantomjs-prebuilt@2.1.16 --ignore-scripts

    # npm install

    出现下面的问题正常

    wps6 

     

    修改elasticsearch-head监听地址

    # vim /usr/local/elasticsearch-head/Gruntfile.js

    wps7 

     

    运行elasticsearch-head

    # cd /usr/local/elasticsearch-head/ && npm run start &>/dev/null &

    # netstat -tunlp | grep "grunt"

    tcp        0      0 0.0.0.0:9100            0.0.0.0:*               LISTEN      1659/grunt

     

    加入开机自启动

    # echo 'source /etc/profile' >>/etc/rc.local

    # echo 'cd /usr/local/elasticsearch-head/ && npm run start &>/dev/null &' >>/etc/rc.local

    # chmod +x /etc/rc.d/rc.local

     

    172.16.1.121、172.16.1.122、172.16.1.123节点开启elastic跨域访问功能

    # vim /etc/elasticsearch/elasticsearch.yml

    # 配置文件末尾增加如下字段

    http.cors.enabled: true

    http.cors.allow-origin: "*"

    # 重启elasticsearch 服务

    # systemctl restart elasticsearch.service

    在浏览器中输入http://172.16.1.121:9100/地址进行访问

    wps8 

    四、Logstash

    172.16.1.120节点上操作

    4.1 安装

    https://www.elastic.co/guide/en/logstash/current/configuration.html

     

    # 安装 elk yum 源或在清华源下载elasticsearch对应版本的logstash rpm包

    # 安装 jdk

    # yum install java-1.8.0-openjdk.x86_64 -y

    # yum install logstash -y

    启动logstash并加入开机自启动

    # systemctl restart logstash.service

    # systemctl enable logstash.service

    # tailf /var/log/logstash/logstash-plain.log

    wps9 

    # netstat -tunlp | grep 9600

    tcp6       0      0 127.0.0.1:9600          :::*                    LISTEN      733/java

    # 注意:logstash启动后在不收集日志的情况下,监听的9600端口是出不来的。

    # 配置监听日志的文件放在 /etc/logstash/conf.d/ 目录下。

     

     

     

     

     

     

     

    参数设置

    # 通过指定收集日志的配置文件启动logstash,常用于日志收集调试。

    # /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf

    -f, --path.config CONFIG_PATH

     

    指定配置文件。使用文件,目录,或者通配符加载配置信息,如果指定目录或者通配符,按字符顺序加载。

    -e, --config.string CONFIG_STRING

     

    指定字符串输入

    -w, --pipeline.workers COUNT

    指定管道数量默认3

    --log.level LEVEL

    指定Logstash日志级别,fatal/error/warn/info/debug/trace

    -r--config.reload.automatic

     

    配置文件自动重新加载默认每3s检查一次配置文件更改

    --config.reload.interval <interval> 修改时间间隔

    如果没有启用自动加载可以向Logstash进程发送SIGHUP(信号挂起)信号重启管道例如:kill -1 14175

    -t, --config.test_and_exit

    检查配置文件是否正确

     

    4.2 条件判断

    使用条件来决定filter和output处理特定的事件

     

    比较操作:

    相等: ==, !=, <, >, <=, >=

    正则: =~(匹配正则), !~(不匹配正则)

    包含: in(包含), not in(不包含)

    布尔操作:

    and(与), or(或), nand(非与), xor(非或)

    一元运算符:

    !(取反)

    ()(复合表达式), !()(对复合表达式结果取反)

     

     

     

     

    可以像其他编程语言那样,条件if判断、多分支,嵌套

    if EXPRESSION {

      ...

    } else if EXPRESSION {

      ...

    } else {

      ...

    }

    4.3 输入插件(Input)

    Input:输入数据可以是Stdin、File、TCP、Syslog、Redis、Kafka等。

    https://www.elastic.co/guide/en/logstash/current/input-plugins.html

    4.3.1 所有输入插件支持的配置选项

    Setting

    Input type

    Required

    Default

    Description

    add_field

    hash

    No

    {}

    添加一个字段到一个事件

    codec

    codec

    No

    plain

    用于输入数据的编解码器

    enable_metric

    boolean

    No

    true

     

    id

    string

    No

     

    添加一个ID插件配置,如果没有指定ID,Logstash生成一个ID。强烈建议配置此ID,当两个或多个相同类型的插件时,这个非常有用的。例如两个文件输入,添加命名标识有助于监视

    tags

    array

    No

     

    添加任意数量的标签,有助于后期处理

    type

    string

    No

     

    为输入处理的所有事件添加一个字段,自已随便定义,比如linux系统日志,定义为syslog

     

     

     

     

     

     

     

    4.3.1 Stdin

    标准输入。

    示例:

    # cat /etc/logstash/conf.d/logstash.conf

    input {

      stdin {

      }

    }

    filter {

    }

    output {

      stdout{codec => rubydebug}

    }

    4.3.1 File

    https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

    Setting

    Input type

    Required

    Default

    Description

    close_older

    number

    No

    3600

    单位打开文件多长时间关闭

    delimiter

    string

    No

    每行分隔符

    discover_interval

    number

    No

    15

    单位秒,多长时间检查一次path选项是否有新文件

    exclude

    array

    No

     

    排除监听的文件跟path一样,支持通配

    max_open_files

    number

    No

     

    打开文件最大数量

    path

    array

    YES

     

    输入文件的路径可以使用通配符

    例如/var/log/**/*.log,则会递归搜索

    sincedb_path

    string

    No

     

    sincedb数据库文件的路径,用于记录被监控的日志文件当前位置

    sincedb_write_interval

    number

    No

    15

    单位秒,被监控日志文件当前位置写入数据库的频率

    start_position

    string, one of ["beginning", "end"]

    No

    end

    指定从什么位置开始读取文件:开头或结尾。默认从结尾开始,如果要想导入旧数据,将其设置begin如果sincedb记录了此文件位置,那么选项不作用

    stat_interval

     

     

    number

    No

    1

    单位秒,统计文件的频率,判断是否被修改。增加此值会减少系统调用次数。

     

    修改/var/log/messages 权限

    # chmod 644 /var/log/messages

    # cat /etc/logstash/conf.d/file.conf

    input {

      file {

         path => "/var/log/messages"

         tags => ["syslog"]

         type => "syslog"

      }

    }

    filter {

    }

    output {

      stdout{codec => rubydebug}

    }

     

    日志收集

    wps10 

    4.3.2 TCP

    通过TCP套接字读取事件。标准输入和文件输入一样,每个事件都定位一行文本。

    # cat /etc/logstash/conf.d/tcp.conf

    input {

      tcp {

         port => 12345

         type => "nc"

      }

    }

    filter {

    }

    output {

      stdout{codec => rubydebug}

    }

    # /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/tcp.conf

     

     

     

    # netstat -tunlp | grep java

    wps11 

    # nc 172.16.1.120 12345

    4.3.4 Beats

    Elastic Beats框架接收事件

     

    # cat /etc/logstash/conf.d/beat.conf

    input {

      beats {

        port => 5044

      }

    }

    filter {

    }

    output {

      stdout {codec => rubydebug}

    }

    4.4 编码插件(Codec)

    Logstash处理流程:input->decode->filter->encode->output

    4.4.1 json/json_lines

    解码器可用于解码Input)和编码(Output)JSON消息。如果发送的数据是JSON数组,则会创建多个事件(每个元素一个)

    如果传输JSON消息以 分割,就需要使用json_lines

     

    # cat /etc/logstash/conf.d/json.conf

    input {

      stdin {

         codec => json {

            charset => ["UTF-8"]

         }

      }

    }

    filter {

    }

    output {

      stdout{codec => rubydebug}

    }

    # /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/json.conf

    {"a":123,"b":456,"c":[1,2,3]}

    wps12 

    # netstat -tunlp | grep java

    tcp6       0      0 127.0.0.1:9600          :::*                    LISTEN      5732/java

    4.4.2 multline

    匹配多行。

     

    Setting

    Input type

    Required

    Default

    Description

    auto_flush_interval

    number

    No

     

     

    charset

    string

    No

    UTF-8

    输入使用的字符编码

    max_bytes

    bytes

    No

    10M

    如果事件边界未明确定义则事件的积累可能会导致logstash退出,并出现内存不足。max_lines组合使用

    max_lines

     

     

    number

    No

    500

    如果事件边界未明确定义则事件的积累可能会导致logstash退出,并出现内存不足。max_bytes组合使用

    multiline_tag

    string

    No

    multiline

    给定标签标记多行事件

    negate

    boolean

    No

    false

    正则表达式模式设置正向匹配还是反向匹配。默认正向

    pattern

    string

    Yes

     

    正则表达式匹配

    patterns_dir

    array

    No

    []

    默认带的一堆模式

    what

    string, one of ["previous", "next"]

    Yes

    设置未匹配的内容是向前合并还是向后合并

     

    input {

      stdin {

        codec => multiline {

          pattern => "pattern, a regexp"

          negate => "true" or "false"

          what => "previous" or "next"

        }

      }

    }

    1:JAVA堆栈跟踪是多行的,通常从最左边开始,每个后续行都缩进

    实现思路:以任意空白开头的行都属于上一行

    # cat /etc/logstash/conf.d/java-exe.conf

    input {

      stdin {

        codec => multiline {

          pattern => "^s"

          what => "previous"

        }

      }

    }

    filter {

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    test1

      test2

      test3

    wps13 

     

     

    2:不以时间戳开始的行应与前一行合并

    # cat /etc/logstash/conf.d/multline.conf

    input {

      stdin {

        codec => multiline {

          pattern => "^["

          negate => true

          what => "previous"

        }

      }

    }

    filter {

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    [test1

      test2

      test3

    wps14 

     

    补充

    input {

      stdin {

        codec => multiline {

          # Grok pattern names are valid! :)

          pattern => "^%{TIMESTAMP_ISO8601}"

          negate => true

          what => "previous"

        }

      }

    }

    4.4.3 rubydebug

    采用ruby awsone print库来解析日志。

    output {

      stdout{codec => rubydebug}

    }

    4.5 过滤器插件(Filter)

    Filter:过滤,将日志格式化。有丰富的过滤插件:Grok正则捕获、date时间处理、JSON编解码、数据修改Mutate等。

     

    所有的过滤器插件都支持以下配置选项:

    Setting

    Input type

    Required

    Default

    Description

    add_field

    hash

    No

    {}

    如果过滤成功,添加任何field到这个事件。例如:add_field => [ "foo_%{somefield}", "Hello world, from %{host}" ],如果这个事件有一个字段somefiled,它的值是hello,那么我们会增加一个字段foo_hello,字段值则用%{host}代替。

    add_tag

    array

    No

    []

    过滤成功增加一个任意的标签到事件例如:add_tag => [ "foo_%{somefield}" ]

    enable_metric

    boolean

    No

    true

     

    id

    string

    No

     

     

    periodic_flush

    boolean

    No

    false

    定期调用过滤器刷新方法

    remove_field

    array

    No

    []

    过滤成功该事件中移除任意filed。例:remove_field => [ "foo_%{somefield}" ]

    remove_tag

    array

    No

    []

    过滤成功从该事件中移除任意标签,例如:remove_tag => [ "foo_%{somefield}" ]

     

    4.5.1 json

    JSON解析过滤器,接收一个JSON的字段将其展开为Logstash事件中的实际数据结构。

    事件解析失败时,这个插件有一个后备方案,那么事件将不会触发,而是标记为_jsonparsefailure可以使用条件来清楚数据可以使用tag_on_failure

     

    # cat filter-json.conf

    input {

      stdin {

      }

    }

    filter {

      json {

        source => "message"

        target => "content"

      }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    {"a":123,"b":456,"c":[1,2,3]}

    wps15 

     

     

     

     

     

     

    4.5.2 kv

    自动解析key=value。也可以任意字符串分割数据。

    field_split  一串字符,指定分隔符分析键值对

    例如URL查询字符串拆分参数,根据&和?分隔:

    # cat filter-kv.conf

    input {

      stdin {

      }

    }

    filter {

      kv {

        field_split => "&?"

      }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    www.baidu.com?pin=12345~0&d=123&e=foo@bar.com&oq=bobo&ss=12345

    wps16 

    4.5.3 grok

    grok是将非结构化数据解析为结构化

    这个工具非常适系统日志,mysql日志其他Web服务器日志以及通常人类无法编写任何日志的格式。

    默认情况下,Logstash附带约120个模式。也可以添加自己的模式patterns_dir

    https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

    http://grokdebug.herokuapp.com/    Grok调试网站

    wps17 

     

    Setting

    Input type

    Required

    Default

    Description

    break_on_match

    boolean

    No

    true

     

    keep_empty_captures

     

    No

    false

    如果true将空保留为事件字段

    match

    hash

    No

    {}

    一个hash匹配字段=>值

    named_captures_only

    boolean

    No

    true

    如果true,存储

    overwrite

    array

    No

    []

    覆盖已存在的字段

    pattern_definitions

     

    No

    {}

     

    patterns_dir

    array

    No

    []

    自定义模式

    patterns_files_glob

    string

    No

    *

    Glob模式,用于匹配patterns_dir指定目录中的模式文件

    tag_on_failure

    array

    No

    _grokparsefailure

    tags没有匹配成功值附加到字段

    tag_on_timeout

    string

    No

    _groktimeout

    如果Grok正则表达式超时,则应用标记

    timeout_millis

    number

     

    30000

    正则表达式超时时间

     

    grok模式的语法是 %{SYNTAX:SEMANTIC}

    SYNTAX模式名称

    SEMANTIC匹配文本的标识符

    例如%{NUMBER:duration} %{IP:client}

     

    例如:虚构http请求日志抽出有用的字段

    55.3.244.1 GET /index.html 15824 0.043

    # cat /etc/logstash/conf.d/filter-grok.conf

    input {

      stdin {

      }

    }

    filter {

      grok {

        match => {

          "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

        }

      }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    wps18 

    自定义模式

    如果默认模式中没有匹配的,可以自己写正则表达式。

    # cat /etc/logstash/conf.d/patterns

    ID [0-9A-Z]{10,11}

    # cat /etc/logstash/conf.d/filter-grok-customize.conf

    input {

      stdin {

      }

    }

    filter {

      grok {

        patterns_dir => "/etc/logstash/conf.d/patterns"

        match => {

          "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} %{ID:id}"

        }

      }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    55.3.244.1 GET /index.html 15824 0.043 15BF7F3ABB

    wps19 

    模式匹配

    一个日志可能有多种格式,一个匹配可以有多规则匹配多种格式。

    一条匹配模式,如果匹配不到,只会message字段。

    例如:新版本项目日志需要添加日志字段需要兼容日志匹配

    # cat /etc/logstash/conf.d/filter-grok-MultiModule.conf

    input {

      stdin {

        }

    }

    filter {

      grok {

        patterns_dir =>"/etc/logstash/conf.d/patterns"

        match => [

          "message", "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} %{ID:id}",

          "message", "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

        ]

      }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    55.3.244.1 GET /index.html 15824 0.043 15BF7F3ABB

    55.3.244.1 GET /index.html 15824 0.043

    wps20 

     

    4.5.4 geoip

    下载开源IP地址库:

    https://dev.maxmind.com/geoip/geoip2/geolite2/

     

    IP地址库使用示例:

    # cat /etc/logstash/conf.d/geoip1.conf

    input {

      stdin {

      }

    }

    filter {

      grok {

        match => {

          "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

        }

      }

      geoip {

        source => "client"

        database => "/etc/logstash/conf.d/GeoLite2-City_20200804/GeoLite2-City.mmdb"

      }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    139.226.141.24 GET /index.html 15824 0.043

    wps21 

    只保留关键字段

    # cat /etc/logstash/conf.d/geoip2.conf

    input {

      stdin {

      }

    }

    filter {

      grok {

        match => {

          "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

        }

      }

      geoip {

        source => "client"

        database => "/etc/logstash/conf.d/GeoLite2-City_20200804/GeoLite2-City.mmdb"

        target => "geoip"

        fields => ["city_name", "country_code2", "country_name", "region_name"]

      }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    139.226.141.24 GET /index.html 15824 0.043

    wps22 

     

     

     

     

    geoip 字段所代表的含义

    city_name, 城市名称

    continent_code, 大洲代码

    country_code2, 国家/地区代码2

    country_code3, 国家/地区代码3

    country_name, 国家/地区名称

    dma_code, dma代码

    ip, ip

    latitude, 纬度

    longitude, 经度

    postal_code, 邮政编码

    region_code, 地区代码

    region_name、地区名称

    timezone. 时区

    4.5.5 date

    日志时间过滤器用于从字段中解析日期然后使用日期或时间戳作为事件的logstash时间戳。

     

    如果不使用date插件,那么Logstash处理事件作为时间戳。时间戳字段Logstash自己添加到内置字段@timestamp默认是UTC时间,比北京时间少8个小时

    插入ES中保存的也UTC时间创建索引也是根据这个时间创建的Kibana是根据你当前浏览器的时区显示的(timestamp加减)

    nginx的访问日志做date处理

    # cat /etc/logstash/conf.d/filter-date.conf

    input {

      stdin {

      }

    }

    filter {

      grok {

        match => {

          "message" => "%{IPV4:remote_addr} - (%{USERNAME:remote_user}|-) [%{HTTPDATE:time_local}] "%{WORD:request_method} %{URIPATHPARAM:request_uri} HTTP/%{NUMBER:http_protocol}" %{NUMBE

    R:http_status} %{NUMBER:body_bytes_sent} "%{GREEDYDATA:http_referer}" "%{GREEDYDATA:http_user_agent}" "(%{GREEDYDATA:http_x_forwarded_for}|-)""    }

        overwrite => ["message"]

      }

      date {

        locale => "en"

          match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]

          # 默认target是@timestamp,所以time_local会更新@timestamp时间

        }

    }

    output {

      stdout{codec => rubydebug}

    }

     

    测试

    172.16.1.254 - - [07/Aug/2020:15:20:33 +0800] "GET /favicon.ico HTTP/1.1" 401 581 "http://172.16.1.120/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36" "-"

    wps23 

     

    还是以UTC时间存储,但@timestamp不是采集当前系统时间,而是根据匹配模式里的time_local时间,可以个别的时间测试

    4.5.6 mutate

    修改指定字段的内容。

    Setting

    Input type

    Required

    Default

    Description

    convert

    hash

    No

     

    字段的值转换为其他类型

    gsub

    array

    No

     

    字符串替换正则匹配的内容,只支持字符串类型。

    replace

    hash

    No

     

    新值替换一个字段

     

    比如字段类型转换:

    filter {

      mutate {

        convert => { "fieldname" => "integer" }

      }

    }

    4.6 输出插件(Output)

    Output:输出,输出目标可以是Stdout、ES、Redis、File、TCP等。

     

    4.6.1 ES

    Setting

    Input type

    Required

    Default

    Description

    hosts

    URL

    No

     

     

    index

    string

    No

    logstash-%{+YYYY.MM.dd}

    事件写入索引。默认按日期划分。

    user

    string

    No

     

    ES集群用户

    password

    password

    No

     

    ES集群密码

     

    output {

      elasticsearch {

        hosts => "localhost:9200"

        index => "ytjh-admin-%{+YYYY.MM.dd}"

      }

    }

    Lostash->ES

    收集audit.logmessages日志

    更改收集日志的权限

    # chmod 644 /var/log/audit/audit.log

    # chmod 644 /var/log/messages

     

    logstash配置文件

    # cat /etc/logstash/conf.d/logstash-es.conf

    input {

      file {

        path => ["/var/log/messages"]

        type => "system"

        tags => ["syslog"]

        start_position => "beginning"

      }

      file {

         path => ["/var/log/audit/audit.log"]

         type => "system"

         tags => ["auth"]

         start_position => "beginning"

      }

    }

    filter {

    }

    output {

      if [type] == "system" {

        if [tags][0] == "syslog" {

          elasticsearch {

            hosts  => ["http://172.16.1.121:9200","http://172.16.1.122:9200","http://172.16.1.123:9200"]

            index  => "logstash-system-syslog-%{+YYYY.MM.dd}"

          }

           stdout { codec=> rubydebug }

        }

        else if [tags][0] == "auth" {

          elasticsearch {

            hosts  => ["http://172.16.1.121:9200","http://172.16.1.122:9200","http://172.16.1.123:9200"]

            index  => "logstash-system-auth-%{+YYYY.MM.dd}"

          }

          stdout {codec=> rubydebug}

        }

      }

    }

     

    启动logstash,也可以通过调试命令启动查看日志收集状态,因为带有stdout {codec=> rubydebug}参数

    systemctl start logstash

     

    elastic-head中查看生成的索引

    wps24 

     

    五、Kibana

    172.16.1.120节点上进行操作

    安装kibana

    # 安装 elk 的yum 源或在清华源下载elasticsearch对应版本的kibana rpm包

    # yum install kibana y

    # vim /etc/kibana/kibana.yml

    server.port: 5601

    server.host: "0.0.0.0"

    elasticsearch.hosts: ["http://172.16.1.121:9200"]

    # systemctl start kibana

    # systemctl enable kibana

    # netstat -tunlp | grep "5601"

    tcp        0      0 0.0.0.0:5601            0.0.0.0:*               LISTEN      745/node

    访问

    http://172.16.1.120:5601

     

    使用nginx对 kibana登录进行验证

    安装 nginx

    # yum install nginx -y

     

    配置nginx 验证参数

    # openssl passwd -crypt 123456

    RbgCOrMsYkZCE

    # echo "liuchang:RbgCOrMsYkZCE" >>/etc/nginx/passwd.db

    # vim /etc/nginx/nginx.conf  修改nginx.conf server标签内容如下

    server {

            listen       80;

            server_name  _;

            location / {

                proxy_pass http://172.16.1.120:5601;

                auth_basic "Please input user and password";

                auth_basic_user_file /etc/nginx/passwd.db;

            }

    }

     

    启动nginx并加入开机自启动

    # systemctl start nginx

    # systemctl enable nginx

    # netstat -tunlp | grep nginx

    tcp        0      0 0.0.0.0:80              0.0.0.0:*               LISTEN      9722/nginx: master

    引入Redis

    wps25 

    优势:

    1、相比上图,在多台服务器,大量日志情况下可减少对ES压力,队列起到缓冲作用,也可以一定程度保护数据不丢失。(当Filetbeat收集数据能力超过ES处理能力时,可增加队列均衡网络传输)

    2、将收集的日志统一在Indexer中处理。

    如果日志量大可采用Kafka做缓冲队列,相比Redis更适合大吞吐量。

     

    172.16.1.120节点上操作

    安装redis

    # yum install redis -y

    # vim /etc/redis.conf

    bind 0.0.0.0

    requirepass 123456

    # systemctl start redis

    # systemctl enable redis

    # netstat -tunlp | grep redis

    tcp        0      0 0.0.0.0:6379            0.0.0.0:*               LISTEN      32688/redis-server

    七、引入Filebeat

    wps26 

    wps27 

    如果只采集日志,Filebeat比Logstash更适合,filebeat占用资源少,配置简单。

    采集系统日志示例:

    172.16.1.120节点上操作

    安装filebeat

    # 安装 elk 的yum 源或在清华源下载elasticsearch对应版本的filebeat rpm包

    # yum install filebeat -y

    # systemctl start filebeat.service

    # systemctl enable filebeat.service

    # grep -E -v "^$|#" filebeat.yml

    filebeat.inputs:

    - type: log

      enabled: true

      paths:

        - /var/log/messages

      tags: ["filebeat-system-log"]

      exclude_lines: ['^DBG','^$']

    - type: log

      enabled: true

      paths:

        - /var/log/tomcat/catalina.out

      tags: ["filebeat-catalina-out"]

      exclude_lines: ['^DBG','^$']

      fields:

        app: www

        type: tomcat-catalina

      fields_under_root: true

      multiline:

        pattern: '^['

        negate: true

        match: after

    filebeat.config.modules:

      path: ${path.config}/modules.d/*.yml

      reload.enabled: false

    setup.template.settings:

      index.number_of_shards: 1

    setup.kibana:

    output.logstash:

      hosts: ["172.16.1.91:5044"]

      enabled: true

      worker: 2

      compression_level: 3

    processors:

      - add_host_metadata: ~

      - add_cloud_metadata: ~

    paths # 指定监控的文件可通配符匹配。如果要对目录递归处理,例如/var/log/*/*.log

    encoding:# 指定监控的文件编码类型,使用plain和utf-8可以处理中文日志

    exclude_lines: ['^DEBUG'] # 排除 DEBUG开头的行

    include_lines: ['^ERR', '^WARN'] # 读取ERR,WARN开头的行

    fields # 新增字段,向输出的日志添加额外的信息,方面后面分组统计

    fields_under_root # 设置true,则新增字段为根目录否则fields.level

    # processors定义处理器在发送数据之前处理事件

    wps28 

    drop_fields # 指定删除的字段

    # 以JSON格式控制台输出,和processors同级

    output.console:

      pretty: true

     

    多行匹配参数:

    multiline:

        pattern: '^['

        negate: true

    match: after

    上面配置的意思是:不以 [ 开头的行都合并到上一行的末尾

    pattern:正则表达式

    negate:true 或 false;默认是false,匹配pattern的行合并到上一行;true,不匹配pattern的行合并到上一行

    match:after 或 before,合并到上一行的末尾或开头

    八、生产实验

    wps29 

    8.1 修改 nginx json格式

    1 配置文件

    vim /etc/nginx/nginx.conf  #在nginx配置文件的http模块进行如下操作;

    …………………………

    http {

        #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '

        #                  '$status $body_bytes_sent "$http_referer" '

        #                  '"$http_user_agent" "$http_x_forwarded_for"';

        #access_log  /var/log/nginx/access.log  main;

        log_format access_json '{"@timestamp":"$time_iso8601",'

                               '"server_addr":"$server_addr",'

                               '"remote_addr":"$remote_addr",'

                               '"body_bytes_sent":"$body_bytes_sent",'

                               '"request_time":"$request_time",'

                               '"upstream_response_time":"$upstream_response_time",'

                               '"upstream_addr":"$upstream_addr",'

                               '"uri":"$uri",'

                               '"http_referer":"$http_referer",'

                               '"http_user_agent":"$http_user_agent",'

                               '"http_x_forwarded_for":"$http_x_forwarded_for",'

                               '"remote_user":"$remote_user",'

                               '"request":"$request",'

                               '"status":"$status"}';

        access_log  /var/log/nginx/access.log  access_json;

    …………………………

                  }

    2 检查配置文件

    nginx -t

    nginx: the configuration file /etc/nginx/nginx.conf syntax is ok

    nginx: configuration file /etc/nginx/nginx.conf test is successful

     

    3 启动nginx

    systemctl start nginx.service

     

    4 访问nginx

    yum install httpd-tools -y

    ab -n10000 -c100 http://172.16.1.120/index.html

     

    5 查看nginx 访问日志变为了json格式

    {"@timestamp":"2020-08-05T10:39:06+08:00","server_addr":"172.16.1.120","remote_addr":"120.204.241.100","body_bytes_sent":"15","request_time":"0.619","upstream_response_time":"0.619","upstream_addr":"172.16.1.120:5601","uri":"/api/ui_metric/report","http_referer":"http://172.16.1.120/app/kibana","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36","http_x_forwarded_for":"-","remote_user":"liuchang","request":"POST /api/ui_metric/report HTTP/1.1","status":"200"}

     

    转化为易看的json格式

    {

    "@timestamp": "2020-08-05T10:39:06+08:00",

    "server_addr": "172.16.1.120",

    "remote_addr": "120.204.241.100",

    "body_bytes_sent": "15",

    "request_time": "0.619",

    "upstream_response_time": "0.619",

    "upstream_addr": "172.16.1.120:5601",

    "uri": "/api/ui_metric/report",

    "http_referer": "http://172.16.1.120/app/kibana",

    "http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36",

    "http_x_forwarded_for": "-",

    "remote_user": "liuchang",

    "request": "POST /api/ui_metric/report HTTP/1.1",

    "status": "200"

    }

     

    8.2 logstash 收集日志配置文件

    1 修改被收集日志文件的权限

    # chmod 644 /usr/local/tomcat/logs/catalina.out

    # chmod 644 /var/log/nginx/access.log

    # chmod 644 /var/log/nginx/error.log

     

    2 filebeat 配置文件

    # cat /etc/filebeat/filebeat.yml

    filebeat.inputs:

    - type: log

      enabled: true

      paths:

        - /var/log/nginx/access.log

      tags: ["nginx-access-log"]

      exclude_lines: ['^DBG','^$']

     

    - type: log

      enabled: true

      paths:

        - /var/log/nginx/error.log

      tags: ["nginx-error-log"]

      exclude_lines: ['^DBG','^$']

     

    - type: log

      enabled: true

      paths:

        - /usr/local/tomcat/logs/catalina.out

      tags: ["catalina-out-log"]

      exclude_lines: ['^DBG','^$']

      multiline:

        pattern: '^['

        negate: true

        match: after

     

    filebeat.config.modules:

      path: ${path.config}/modules.d/*.yml

      reload.enabled: false

     

    setup.template.settings:

      index.number_of_shards: 1

     

    setup.kibana:

    #output.logstash:

    #  hosts: ["172.16.1.91:5044"]

    #  enabled: true

    #  worker: 2

    #  compression_level: 3

     

    output.redis:

      hosts: ["172.16.1.120"]

      password: "123456"

      key: "filebeat"

      db: 0

      datatype: list

     

    processors:

      - add_host_metadata: ~

      - add_cloud_metadata: ~

     

    8.3 logstash redis中取数据配置文件

    # cat /etc/logstash/conf.d/logstash-to-es-nginx.conf

    input {

      redis {

        host => "172.16.1.120"

        port => 6379

        password => "123456"

        db => "0"

        data_type => "list"

        key => "filebeat"

        }

    }

     

    filter {

      if "nginx-access-log" in [tags] {

        json {

          source => "message"

          #remove_field => ["message"]

        }

        geoip {

          source => "remote_addr"

          target => "geoip"

          database => "/opt/GeoLite2-City.mmdb"

          add_field => ["[geoip][coordinates]", "%{[geoip][longitude]}"]

          add_field => ["[geoip][coordinates]", "%{[geoip][latitude]}"]

        }

        mutate {

          convert => ["[geoip][coordinates]", "float"]

        }

      }

    }

     

    output {

      if "nginx-access-log" in [tags] {

        elasticsearch {

          hosts  => ["172.16.1.121:9200","172.16.1.122:9200","172.16.1.123:9200"]

          index  => "logstash-nginx-access-log-%{+YYYY.MM.dd}"

        }

        stdout{codec => rubydebug}

      }

      if "nginx-error-log" in [tags] {

        elasticsearch {

          hosts  => ["172.16.1.121:9200","172.16.1.122:9200","172.16.1.123:9200"]

          index  => "logstash-nginx-error-log-%{+YYYY.MM.dd}"

        }

        stdout{codec => rubydebug}

      }

      if "catalina-out-log" in [tags] {

        elasticsearch {

          hosts  => ["172.16.1.121:9200","172.16.1.122:9200","172.16.1.123:9200"]

          index  => "logstash-catalina-out-log-%{+YYYY.MM.dd}"

        }

        stdout{codec => rubydebug}

      }

    }

     

     

     

    8.4 elastci-head 中查看收集的日志

    wps30 

     

    8.5 kibana nginx access.log 数据可视化设置

    1 NGINX-PV

    wps31 

     

     

     

     

     

     

     

     

     

     

     

    2 NGINX-UV

    wps32 

     

    3 NGINX-URI-TOP-10

    wps33 

     

     

     

     

     

     

     

     

     

     

    4 NGINX-HTTP-STATUS-TOP-10

    wps34 

     

    5 NGINX-REMOTE-IP-TOP-1O

    wps35 

     

     

     

     

     

     

     

     

     

     

     

     

     

    6 NGINX-AGENT-TOP-10

    wps36 

     

    7 NGINX-IP-MAP

    wps37 

     

    wps38 

     

    wps39 

     

     

     

     

     

     

     

     

     

     

    8 可视化图表

    wps40 

     

    8.6 kibana nginx access.log仪表板设置

    1将上面创建的可视化图表加入到仪表板中

    wps41 

     

    2 仪表板显示

    wps42 

    wps43 

     

    8.7  kibana 中添加 日志索引

    1 创建索引模式

    wps44 

     

    2 定义索引模式

    wps45 

    3 通过数据中的时间戳字段对索引进行过滤

    wps46 

     

    4 在discover中查看数据

    wps47 

     

    8.8 nginx access.log 一条日志在elasticsearch集群中存储的字段数据

    wps48 

    wps49 

    wps50 

     

    8.9 小结

    1 logstash/filebeat将处理事件作为时间戳时间戳字段是logstash/filebeat自己添加到内置字段@timestamp(该时间戳可以被后来指定的时间戳覆盖),默认是UTC时间,比北京时间少8个小时,插入到ES中保存的也是UTC时间,创建索引(index  => "*-log-%{+YYYY.MM.dd})也是根据这个时间创建的Kibana是根据你当前浏览器的时区显示的对timestamp加减。

     

    2 es 中以xxx-<时间格式> 的索引方式对一类日志按天进行存储,在kibana中通过xxx-* 的方式把该类索引匹配为一个数据集,然后以每条数据中存在的时间戳对该类索引的数据集合进行查询。

     

     

     

     

     

  • 相关阅读:
    Win64 驱动内核编程-12.回调监控进线程创建和退出
    我的主站 SHARELIST -分享列表 (功能持续完善中 2019-11-24 版本0.3)
    Caddy-基于go的微型serve用来做反向代理和Gateway
    Docker Swarm删除节点
    Docker Swarm集群搭建
    Docker Machine-Windows
    Docker Compose
    Docker网络配置进阶
    Docker网络相关
    Docker数据管理
  • 原文地址:https://www.cnblogs.com/LiuChang-blog/p/14702941.html
Copyright © 2011-2022 走看看