zoukankan      html  css  js  c++  java
  • Beats、Logstash与Kibana

    今日内容:
    1) JAVA API 操作 ES 集群
    2) ES的架构原理
    3) ES的 sql操作
    4) Beats基本概念及其使用
    5) logstash基本概念及其使用
    6) kibana基本概念及其使用

    1) JAVA API 操作 ES 集群 : 根据关键词查询 分页查询(浅分页 和 深分页) 高亮展示数据

     

    2) 构建索引库的时候, 除了可以指定 mapping信息以外, 还可以指定 分片和副本
    PUT /job_idx_shard
    {
    "mappings": {
    "properties": {
    "id": { "type": "long", "store": true },
    "area": { "type": "keyword", "store": true },
    "exp": { "type": "keyword", "store": true },
    "edu": { "type": "keyword", "store": true },
    "salary": { "type": "keyword", "store": true },
    "job_type": { "type": "keyword", "store": true },
    "cmp": { "type": "keyword", "store": true },
    "pv": { "type": "keyword", "store": true },
    "title": { "type": "text", "store": true },
    "jd": { "type": "text"}

    }
    },
    "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 2
    }
    }


    3) beats基本介绍:
    本质上是一个数据发送器, 用于将本地服务器中数据发生到logstash或者elasticSearch

    fileBeats 主要的作用 监控本地服务器中日志数据, 将监控的数据采集下来, 然后发生到 logstash或者elasticSearch

    fileBeats的组成:
    input : 输入 用于监控指定的文件

    output: 输出 用于将数据输出指定的目的的

    4) logstash基本介绍:
    数据采集工具, 主要用于将各个数据源中数据进行采集, 对数据进行处理, 将处理后数据, 发生到目的地

    Flume : 数据采集的工具, 跟logstash是同类型软件 都是数据采集的工具

    flume和 logstash的对比:
    flume 是一款通用的采集工具 , 关注数据的传输, 更加纯粹的数据传输(采集)工具
    logstash 更多时候是和beats 以及 elasticSearch组合使用 , 更多关注数据的预处理
    logstash 和 fileBeats的对比:
    logstash 和filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少
    logstash基于JVM运行, 占用资源比较大
    logstash可以基于filter对数据进行过滤处理工作, 而fileBeats更多关注数据的传递工作

    在生产中, 一般关于 elastic Stack的使用:
    首先在需要进行采集的服务器上挂在beat组件, 然后beats组件数据数据采集到后, 发生到logstash,
    然后logstash通过filter对进行处理, 处理后, 将数据传递到elasticSearch中, 后续基于kibana对
    数据进行探索对数据进行图标展示


    可能出现的问题:
    1) 安装好fileBeats(logstash | kibana)后, 开始执行, 然后使用ctrl+C 无法退出问题: finalShell
    如何解决:
    ctrl +z 但是此操作可以回到命令行, 但是不会将 fileBeats关闭

    此时如果直接在此启动 fileBeats 回报一个文件以及被占用错误: data path ....

    处理的方式:
    ps -ef | grep filebeat 找到进程ID 直接 kill -9 杀死即可
    2) 路径上的问题: 在配置文件中, 路径指向不对
    解决方案: 看到任何路径 都去验证一下是否正常
    3) kibana无法启动问题, 即使在 kibana根目录下启动, 依然无法启动
    怀疑 配置文件修改错误: kibana.yml
    server.host: "node1.itcast.cn" 没有修改, 或者改了以后前面的#没删除

    第二章 Beats、Logstash与Kibana

    学习目标

    l 能够安装部署FileBeat、Logstash和Kibana

    l 能够使用FileBeat采集数据

    l 能够使用Logstash采集、解析数据

    l 能够使用Kibana进行数据探索和可视化

    1.  Beats

    Beats是一个开放源代码的数据发送器。我们可以把Beats作为一种代理安装在我们的服务器上,这样就可以比较方便地将数据发送到Elasticsearch或者Logstash中。Elastic Stack提供了多种类型的Beats组件。

    审计数据

    AuditBeat

    日志文件

    FileBeat

    云数据

    FunctionBeat

    可用性数据

    HeartBeat

    系统日志

    JournalBeat

    指标数据

    MetricBeat

    网络流量数据

    PacketBeat

    Windows事件日志

    Winlogbeat

     

     

    Beats可以直接将数据发送到Elasticsearch或者发送到Logstash,基于Logstash可以进一步地对数据进行处理,然后将处理后的数据存入到Elasticsearch,最后使用Kibana进行数据可视化。

    1.1  FileBeat简介  阿善看到

    FileBeat专门用于转发和收集日志数据的轻量级采集工具。它可以为作为代理安装在服务器上,FileBeat监视指定路径的日志文件,收集日志数据,并将收集到的日志转发到Elasticsearch或者Logstash。

    1.2  FileBeat的工作原理

    启动FileBeat时,会启动一个或者多个输入(Input),这些Input监控指定的日志数据位置。FileBeat会针对每一个文件启动一个Harvester(收割机)。Harvester读取每一个文件的日志,将新的日志发送到libbeat,libbeat将数据收集到一起,并将数据发送给输出(Output)。

     

    1.3  安装FileBeat

    安装FileBeat只需要将FileBeat Linux安装包上传到Linux系统,并将压缩包解压到系统就可以了。FileBeat官方下载地址:https://www.elastic.co/cn/downloads/past-releases/filebeat-7-6-1

     

    上传FileBeat安装到Linux,并解压。

    tar -xvzf filebeat-7.6.1-linux-x86_64.tar.gz

    1.4  使用FileBeat采集Kafka日志到Elasticsearch

    1.4.1  需求分析

    在资料中有一个kafka_server.log.tar.gz压缩包,里面包含了很多的Kafka服务器日志,现在我们为了通过在Elasticsearch中快速查询这些日志,定位问题。我们需要用FileBeats将日志数据上传到Elasticsearch中。

     

    问题:

    l 首先,我们要指定FileBeat采集哪些Kafka日志,因为FileBeats中必须知道采集存放在哪儿的日志,才能进行采集。

    l 其次,采集到这些数据后,还需要指定FileBeats将采集到的日志输出到Elasticsearch,那么Elasticsearch的地址也必须指定。

    1.4.2  配置FileBeats

    FileBeats配置文件主要分为两个部分。

    1. inputs
    2. output

    从名字就能看出来,一个是用来输入数据的,一个是用来输出数据的。

    1.4.2.1  input配置

    filebeat.inputs:

    - type: log

      enabled: true

      paths:

        - /var/log/*.log

        #- c:programdataelasticsearchlogs*

    在FileBeats中,可以读取一个或多个数据源。

     

    1.4.2.2  output配置

     

    默认FileBeat会将日志数据放入到名称为:filebeat-%filebeat版本号%-yyyy.MM.dd 的索引中。

     

    PS:

    FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。

    1.4.3  配置文件

    1. 创建配置文件

    cd /export/server/es/filebeat-7.6.1-linux-x86_64

    vim filebeat_kafka_log.yml

    1. 复制一下到配置文件中

    filebeat.inputs:

    - type: log

      enabled: true

      paths:

        - /export/server/es/data/kafka/server.log.*

     

    output.elasticsearch:

        hosts: ["node1.itcast.cn:9200", "node2.itcast.cn:9200", "node3.itcast.cn:9200"]

    1.4.4  运行FileBeat

    1. 运行FileBeat

    ./filebeat  -c filebeat_kafka_log.yml -e

    1. 将日志数据上传到/var/kafka/log,并解压

    mkdir -p /export/server/es/data/kafka/

    tar -xvzf kafka_server.log.tar.gz

     

    注意: 文件权限的报错

    如果在启动fileBeat的时候, 报了一个配置文件权限的错误, 请修改其权限为 -rw-r--r--

    1.4.5  查询数据

    1. 查看索引信息

    GET /_cat/indices?v

        {

            "health": "green",

            "status": "open",

            "index": "filebeat-7.6.1-2020.05.29-000001",

            "uuid": "dplqB_hTQq2XeSk6S4tccQ",

            "pri": "1",

            "rep": "1",

            "docs.count": "213780",

            "docs.deleted": "0",

            "store.size": "71.9mb",

            "pri.store.size": "35.8mb"

        }

     

    1. 查询索引库中的数据

    GET /filebeat-7.6.1-2020.05.29-000001/_search

                {

                    "_index": "filebeat-7.6.1-2020.05.29-000001",

                    "_type": "_doc",

                    "_id": "-72pX3IBjTeClvZff0CB",

                    "_score": 1,

                    "_source": {

                        "@timestamp": "2020-05-29T09:00:40.041Z",

                        "log": {

                            "offset": 55433,

                            "file": {

                                "path": "/var/kafka/log/server.log.2020-05-02-16"

                            }

                        },

                        "message": "[2020-05-02 16:01:30,682] INFO Socket connection established, initiating session, client: /192.168.88.100:46762, server: node1.itcast.cn/192.168.88.100:2181 (org.apache.zookeeper.ClientCnxn)",

                        "input": {

                            "type": "log"

                        },

                        "ecs": {

                            "version": "1.4.0"

                        },

                        "host": {

                            "name": "node1.itcast.cn"

                        },

                        "agent": {

                            "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

                            "version": "7.6.1",

                            "type": "filebeat",

                            "ephemeral_id": "b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63",

                            "hostname": "node1.itcast.cn"

                        }

                    }

                }

    FileBeat自动给我们添加了一些关于日志、采集类型、Host各种字段。

    1.4.6  解决一个日志涉及到多行问题

    我们在日常日志的处理中,经常会碰到日志中出现异常的情况。类似下面的情况:

    [2020-04-30 14:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error when sending leader epoch request for Map(test_10m-2 -> (currentLeaderEpoch=Optional[161], leaderEpoch=158)) (kafka.server.ReplicaFetcherThread)

    java.io.IOException: Connection to node2.itcast.cn:9092 (id: 1 rack: null) failed.

            at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)

            at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)

            at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)

            at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)

            at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)

            at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)

            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

    [2020-04-30 14:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test_10m-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

    [2020-04-30 14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (node2.itcast.cn/192.168.88.101:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

    在FileBeat中,Harvest是逐行读取日志文件的。但上述的日志会出现一条日志,跨多行的情况。有异常信息时,肯定会出现多行。我们先来看一下,如果默认不处理这种情况会出现什么问题。

    1.4.6.1  导入错误日志
    1. 在/export/server/es/data/kafka/中创建名为server.log.2020-09-10的日志文件
    2. 将资料中的err.txt日志文本贴入到该文件中

     

    观察FileBeat,发现FileBeat已经针对该日志文件启动了Harvester,并读取到数据数据。

    2020-05-29T19:11:01.236+0800    INFO    log/harvester.go:297    Harvester started for file: /var/kafka/log/server.log.2020-09-10

     

    1. 在Elasticsearch检索该文件。

    GET /filebeat-7.6.1-2020.05.29-000001/_search

    {

        "query": {

            "match": {

                "log.file.path": "/var/kafka/log/server.log.2020-09-10"

            }

        }

    }

     

    我们发现,原本是一条日志中的异常信息,都被作为一条单独的消息来处理了~

     

    这明显是不符合我们的预期的,我们想要的是将所有的异常消息合并到一条日志中。那针对这种情况该如何处理呢?

    1.4.6.2  问题分析

    每条日志都是有统一格式的开头的,就拿Kafka的日志消息来说,[2020-04-30 14:00:05,725]这是一个统一的格式,如果不是以这样的形式开头,说明这一行肯定是属于某一条日志,而不是独立的一条日志。所以,我们可以通过日志的开头来判断某一行是否为新的一条日志。

    1.4.6.3  FileBeat多行配置选项

    在FileBeat的配置中,专门有一个解决一条日志跨多行问题的配置。主要为以下三个配置:

    multiline.pattern: ^[

    multiline.negate: false

    multiline.match: after

    multiline.pattern表示能够匹配一条日志的模式,默认配置的是以[开头的才认为是一条新的日志。

    multiline.negate:配置该模式是否生效,默认为false。

    multiline.match:表示是否将未匹配到的行追加到上一日志,还是追加到下一个日志。

    1.4.6.4  重新配置FileBeat
    1. 修改filebeat.yml,并添加以下内容

    filebeat.inputs:

    - type: log

      enabled: true

      paths:

        - /var/kafka/log/server.log.*

      multiline.pattern: '^['

      multiline.negate: true

      multiline.match: after

     

    output.elasticsearch:

        hosts: ["node1.itcast.cn:9200", "node2.itcast.cn:9200", "node3.itcast.cn:9200"]

    1. 修改「注册表」/data.json,将server.log.2020-09-10对应的offset设置为0

    cd /export/server/es/filebeat-7.6.1-linux-x86_64/data/registry/filebeat

    vim data.json

    1. 删除之前创建的文档

    // 删除指定文件的文档

    POST /filebeat-7.6.1-2020.05.29-000001/_delete_by_query

    {

        "query": {

            "match": {

                "log.file.path": "/var/kafka/log/server.log.2020-09-10"

            }

        }

    }

     

    1. 重新启动FileBeat

    ./filebeat -e

     

     

     

    1.5  FileBeat是如何工作的

    FileBeat主要由input和harvesters(收割机)组成。这两个组件协同工作,并将数据发送到指定的输出。

    1.5.1  input和harvester

    1.5.1.1  inputs(输入)

    input是负责管理Harvesters和查找所有要读取的文件的组件。如果输入类型是 log,input组件会查找磁盘上与路径描述的所有文件,并为每个文件启动一个Harvester,每个输入都独立地运行。

    1.5.1.2  Harvesters(收割机)

    Harvesters负责读取单个文件的内容,它负责打开/关闭文件,并逐行读取每个文件的内容,并将读取到的内容发送给输出,每个文件都会启动一个Harvester。但Harvester运行时,文件将处于打开状态。如果文件在读取时,被移除或者重命名,FileBeat将继续读取该文件。

    1.5.2  FileBeats如何保持文件状态

    FileBeat保存每个文件的状态,并定时将状态信息保存在磁盘的「注册表」文件中,该状态记录Harvester读取的最后一次偏移量,并确保发送所有的日志数据。如果输出(Elasticsearch或者Logstash)无法访问,FileBeat会记录成功发送的最后一行,并在输出(Elasticsearch或者Logstash)可用时,继续读取文件发送数据。在运行FileBeat时,每个input的状态信息也会保存在内存中,重新启动FileBeat时,会从「注册表」文件中读取数据来重新构建状态。

     

    在/export/server/es/filebeat-7.6.1-linux-x86_64/data目录中有一个Registry文件夹,里面有一个data.json,该文件中记录了Harvester读取日志的offset。

     

    2.  Logstash

    2.1  简介

    Logstash是一个开源的数据采集引擎。它可以动态地将不同来源的数据统一采集,并按照指定的数据格式进行处理后,将数据加载到其他的目的地。最开始,Logstash主要是针对日志采集,但后来Logstash开发了大量丰富的插件,所以,它可以做更多的海量数据的采集。

    它可以处理各种类型的日志数据,例如:Apache的web log、Java的log4j日志数据,或者是系统、网络、防火墙的日志等等。它也可以很容易的和Elastic Stack的Beats组件整合,也可以很方便的和关系型数据库、NoSQL数据库、Kafka、RabbitMQ等整合。

     

    2.1.1  经典架构

     

    2.1.2  对比Flume

    1. Apache Flume是一个通用型的数据采集平台,它通过配置source、channel、sink来实现数据的采集,支持的平台也非常多。而Logstash结合Elastic Stack的其他组件配合使用,开发、应用都会简单很多
    2. Logstash比较关注数据的预处理,而Flume跟偏重数据的传输,几乎没有太多的数据解析预处理,仅仅是数据的产生,封装成Event然后传输。

    2.1.3  对比FileBeat

    l logstash是jvm跑的,资源消耗比较大

    l 而FileBeat是基于golang编写的,功能较少但资源消耗也比较小,更轻量级

    l logstash 和filebeat都具有日志收集功能,Filebeat更轻量,占用资源更少

    l logstash 具有filter功能,能过滤分析日志

    l 一般结构都是filebeat采集日志,然后发送到消息队列,redis,kafka中然后logstash去获取,利用filter功能过滤分析,然后存储到elasticsearch中

    2.2  安装Logstash

    1. 切换到itcast用户
    2. 下载Logstash

    https://www.elastic.co/cn/downloads/past-releases/logstash-7-6-1

    1. 解压Logstash到指定目录
    2. 运行测试

    cd /export/server/es/logstash-7.6.1/

    bin/logstash -e 'input { stdin { } } output { stdout {} }'

    等待一会,让Logstash启动完毕。

    Sending Logstash logs to /export/server/es/logstash-7.6.1/logs which is now configured via log4j2.properties

    [2020-05-28T16:31:44,159][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified

    [2020-05-28T16:31:44,264][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.6.1"}

    [2020-05-28T16:31:45,631][INFO ][org.reflections.Reflections] Reflections took 37 ms to scan 1 urls, producing 20 keys and 40 values

    [2020-05-28T16:31:46,532][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.

    [2020-05-28T16:31:46,560][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x3ccbc15b run>"}

    [2020-05-28T16:31:47,268][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}

    The stdin plugin is now waiting for input:

    [2020-05-28T16:31:47,348][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}

    [2020-05-28T16:31:47,550][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

    然后,随便在控制台中输入内容,等待Logstash的输出。

    {

    "host" => "node1.itcast.cn",

    "message" => "hello world",

    "@version" => "1",

    "@timestamp" => 2020-05-28T08:32:31.007Z

    }

     

    ps:

    -e选项表示,直接把配置放在命令中,这样可以有效快速进行测试

    2.3  采集Apache Web服务器日志

    2.3.1  需求

    Apache的Web Server会产生大量日志,当我们想要对这些日志检索分析。就需要先把这些日志导入到Elasticsearch中。此处,我们就可以使用Logstash来实现日志的采集。

     

    打开这个文件,如下图所示。我们发现,是一个纯文本格式的日志。如下图所示:

     

    这个日志其实由一个个的字段拼接而成,参考以下表格。

    字段名

    说明

    client IP

    浏览器端IP

    timestamp

    请求的时间戳

    method

    请求方式(GET/POST)

    uri

    请求的链接地址

    status

    服务器端响应状态

    length

    响应的数据长度

    reference

    从哪个URL跳转而来

    browser

    浏览器

     

    因为最终我们需要将这些日志数据存储在Elasticsearch中,而Elasticsearch是有模式(schema)的,而不是一个大文本存储所有的消息,而是需要将字段一个个的保存在Elasticsearch中。所以,我们需要在Logstash中,提前将数据解析好,将日志文本行解析成一个个的字段,然后再将字段保存到Elasticsearch中。

    2.3.2  准备日志数据

    将Apache服务器日志上传到 /export/server/es/data/apache/ 目录

     mkdir -p /export/server/es/data/apache/

    2.3.3  使用FileBeats将日志发送到Logstash

    在使用Logstash进行数据解析之前,我们需要使用FileBeat将采集到的数据发送到Logstash。之前,我们使用的FileBeat是通过FileBeat的Harvester组件监控日志文件,然后将日志以一定的格式保存到Elasticsearch中,而现在我们需要配置FileBeats将数据发送到Logstash。FileBeat这一端配置以下即可:

    #----------------------------- Logstash output ---------------------------------

    #output.logstash:

      # Boolean flag to enable or disable the output module.

      #enabled: true

     

      # The Logstash hosts

      #hosts: ["localhost:5044"]

     

    hosts配置的是Logstash监听的IP地址/机器名以及端口号。

    114.113.220.255

    准备FileBeat配置文件

    cd /export/server/es/filebeat-7.6.1-linux-x86_64

    vim filebeat-logstash.yml

    因为Apache的web log日志都是以IP地址开头的,所以我们需要修改下匹配字段。

    filebeat.inputs:

    - type: log

      enabled: true

      paths:

        - /var/apache/log/access.*

      multiline.pattern: '^d+.d+.d+.d+ '

      multiline.negate: true

      multiline.match: after

     

    output.logstash:

      enabled: true

      hosts: ["node1.itcast.cn:5044"]

     

    启动FileBeat,并指定使用新的配置文件

    ./filebeat -e -c filebeat-logstash.yml

     

    FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。

    2020-06-01T11:28:47.585+0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused

    2.3.4  配置Logstash接收FileBeat数据并打印

    Logstash的配置文件和FileBeat类似,它也需要有一个input、和output。基本格式如下:

    # #号表示添加注释

    # input表示要接收的数据

    input {

    }

     

    # file表示对接收到的数据进行过滤处理

    filter {

     

    }

     

    # output表示将数据输出到其他位置

    output {

    }

     

    配置从FileBeat接收数据

    cd /export/server/es/logstash-7.6.1/config

    vim filebeat-print.conf

    input {

      beats {

        port => 5044

      }

    }

     

    output {

      stdout {

        codec => rubydebug

      }

    }

     

    测试logstash配置是否正确

    bin/logstash -f config/filebeat-print.conf --config.test_and_exit

    [2020-06-01T11:46:33,940][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

    启动logstash

    bin/logstash -f config/filebeat-print.conf --config.reload.automatic

    reload.automatic:修改配置文件时自动重新加载

    测试

    创建一个access.log.1文件,使用cat test >> access.log.1往日志文件中追加内容。

    test文件中只保存一条日志:

    [root@node1 log]# cat test

    235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

     

    当我们启动Logstash之后,就可以发现Logstash会打印出来从FileBeat接收到的数据:

    {

               "log" => {

              "file" => {

                "path" => "/var/apache/log/access.log.1"

            },

            "offset" => 825

        },

             "input" => {

            "type" => "log"

        },

             "agent" => {

            "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",

                 "version" => "7.6.1",

                    "type" => "filebeat",

                      "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

                "hostname" => "node1.itcast.cn"

        },

        "@timestamp" => 2020-06-01T09:07:55.236Z,

               "ecs" => {

            "version" => "1.4.0"

        },

              "host" => {

            "name" => "node1.itcast.cn"

        },

              "tags" => [

            [0] "beats_input_codec_plain_applied"

        ],

           "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",

          "@version" => "1"

    }

    2.3.5  Logstash输出数据到Elasticsearch

    通过控制台,我们发现Logstash input接收到的数据没有经过任何处理就发送给了output组件。而其实我们需要将数据输出到Elasticsearch。所以,我们修改Logstash的output配置。配置输出Elasticsearch只需要配置以下就可以了:

    output {

        elasticsearch {

            hosts => [ "localhost:9200" ]

        }}

     

    操作步骤:

    1. 重新拷贝一份配置文件

    cp filebeat-print.conf filebeat-es.conf

    1. 将output修改为Elasticsearch

    input {

      beats {

        port => 5044

      }

    }

     

    output {

     elasticsearch {

       hosts => [ "node1.itcast.cn:9200","node2.itcast.cn:9200","node3.itcast.cn:9200"]

     }

    }

    1. 重新启动Logstash

    bin/logstash -f config/filebeat-es.conf --config.reload.automatic

    1. 追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档

    cat test >> access.log.1

    // 查看索引数据

    GET /_cat/indices?v

    我们在Elasticsearch中发现一个以logstash开头的索引。

        {

            "health": "green",

            "status": "open",

            "index": "logstash-2020.06.01-000001",

            "uuid": "147Uwl1LRb-HMFERUyNEBw",

            "pri": "1",

            "rep": "1",

            "docs.count": "2",

            "docs.deleted": "0",

            "store.size": "44.8kb",

            "pri.store.size": "22.4kb"

        }

    // 查看索引库的数据

    GET /logstash-2020.06.01-000001/_search?format=txt

    {

        "from": 0,

        "size": 1

    }

    我们可以获取到以下数据:

                        "@timestamp": "2020-06-01T09:38:00.402Z",

                        "tags": [

                            "beats_input_codec_plain_applied"

                        ],

                        "host": {

                            "name": "node1.itcast.cn"

                        },

                        "@version": "1",

                        "log": {

                            "file": {

                                "path": "/var/apache/log/access.log.1"

                            },

                            "offset": 1343

                        },

                        "agent": {

                            "version": "7.6.1",

                            "ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",

                            "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

                            "hostname": "node1.itcast.cn",

                            "type": "filebeat"

                        },

                        "input": {

                            "type": "log"

                        },

                        "message": "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",

                        "ecs": {

                            "version": "1.4.0"

                        }

     

    从输出返回结果,我们可以看到,日志确实已经保存到了Elasticsearch中,而且我们看到消息数据是封装在名为message中的,其他的数据也封装在一个个的字段中。我们其实更想要把消息解析成一个个的字段。例如:IP字段、时间、请求方式、请求URL、响应结果,这样。

    2.3.6  Logstash过滤器

    在Logstash中可以配置过滤器Filter对采集到的数据进行中间处理,在Logstash中,有大量的插件供我们使用。参考官网:

    https://www.elastic.co/guide/en/logstash/7.6/filter-plugins.html

    此处,我们重点来讲解Grok插件。

    2.3.6.1  查看Logstash已经安装的插件

    bin/logstash-plugin list

    2.3.6.2  Grok插件

    Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。

    Grok官网:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-grok.html

    2.3.6.3  Grok语法

    Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认,Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。

    官网:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

     

    grok模式的语法是:%{SYNTAX:SEMANTIC}

    SYNTAX指的是Grok模式名称,SEMANTIC是给模式匹配到的文本字段名。例如:

    %{NUMBER:duration} %{IP:client}

    duration表示:匹配一个数字,client表示匹配一个IP地址。

    默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}

     

    以下是常用的Grok模式:

    NUMBER

    匹配数字(包含:小数)

    INT

    匹配整形数字

    POSINT

    匹配正整数

    WORD

    匹配单词

    DATA

    匹配所有字符

    IP

    匹配IP地址

    PATH

    匹配路径

    2.3.6.4  用法

     

        filter {

          grok {

            match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }

          }

        }

    2.3.7  匹配日志中的IP、日期并打印

    235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

    我们使用IP就可以把前面的IP字段匹配出来,使用HTTPDATE可以将后面的日期匹配出来。

     

    配置Grok过滤插件

    1. 配置Logstash

    input {

        beats {

            port => 5044

        }

    }

     

    filter {

        grok {

            match => {

                "message" => "%{IP:ip} - - [%{HTTPDATE:date}]"

            }

        }   

    }

     

    output {

        stdout {

            codec => rubydebug

        }

    }

    1. 启动Logstash

    bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic

    {

               "log" => {

            "offset" => 1861,

              "file" => {

                "path" => "/var/apache/log/access.log.1"

            }

        },

             "input" => {

            "type" => "log"

        },

              "tags" => [

            [0] "beats_input_codec_plain_applied"

        ],

              "date" => "15/Apr/2015:00:27:19 +0849",

               "ecs" => {

            "version" => "1.4.0"

        },

        "@timestamp" => 2020-06-01T11:02:05.809Z,

           "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",

              "host" => {

            "name" => "node1.itcast.cn"

        },

                "ip" => "235.9.200.242",

             "agent" => {

                "hostname" => "node1.itcast.cn",

                 "version" => "7.6.1",

            "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",

                      "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

                    "type" => "filebeat"

        },

          "@version" => "1"

    }

     

    我们看到,经过Grok过滤器插件处理之后,我们已经获取到了ip和date两个字段。接下来,我们就可以继续解析其他的字段。

    2.3.8  解析所有字段

    将日志解析成以下字段:

    字段名

    说明

    client IP

    浏览器端IP

    timestamp

    请求的时间戳

    method

    请求方式(GET/POST)

    uri

    请求的链接地址

    status

    服务器端响应状态

    length

    响应的数据长度

    reference

    从哪个URL跳转而来

    browser

    浏览器

     

    1. 修改Logstash配置文件

    input {

        beats {

            port => 5044

        }

    }

     

    filter {

        grok {

            match => {

                "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status} %{INT:length} "%{DATA:reference}" "%{DATA:browser}""

            }

        }   

    }

     

    output {

        stdout {

            codec => rubydebug

        }

    }

    1. 测试并启动Logstash

    我们可以看到,8个字段都已经成功解析。

    {

         "reference" => "www.baidu.com",

          "@version" => "1",

               "ecs" => {

            "version" => "1.4.0"

        },

        "@timestamp" => 2020-06-02T03:30:10.048Z,

                "ip" => "235.9.200.241",

            "method" => "POST",

               "uri" => "/itcast.cn/bigdata.html",

             "agent" => {

                      "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",

            "ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972",

                 "version" => "7.6.1",

                "hostname" => "node1.itcast.cn",

                    "type" => "filebeat"

        },

            "length" => "45",

            "status" => "200",

               "log" => {

              "file" => {

                "path" => "/var/apache/log/access.log"

            },

            "offset" => 1

        },

             "input" => {

            "type" => "log"

        },

              "host" => {

            "name" => "node1.itcast.cn"

        },

              "tags" => [

            [0] "beats_input_codec_plain_applied"

        ],

           "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",

              "date" => "15/Apr/2015:00:27:19 +0849",

           "message" => "235.9.200.241 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html HTTP/1.1" 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900""

    }

     

    2.3.9  将数据输出到Elasticsearch

    到目前为止,我们已经通过了Grok Filter可以将日志消息解析成一个一个的字段,那现在我们需要将这些字段保存到Elasticsearch中。我们看到了Logstash的输出中,有大量的字段,但如果我们只需要保存我们需要的8个,该如何处理呢?而且,如果我们需要将日期的格式进行转换,我们又该如何处理呢?

    2.3.9.1  过滤出来需要的字段

    要过滤出来我们需要的字段。我们需要使用mutate插件。mutate插件主要是作用在字段上,例如:它可以对字段进行重命名、删除、替换或者修改结构。

     

    官方文档:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-mutate.html

    例如,mutate插件可以支持以下常用操作

     

     

    配置:

    注意:此处为了方便进行类型的处理,将status、length指定为int类型。

    input {

        beats {

            port => 5044

        }

    }

     

    filter {

        grok {

            match => {

                "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status:int} %{INT:length:int} "%{DATA:reference}" "%{DATA:browser}""

            }

        }

        mutate {

            enable_metric => "false"

            remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]

        }

    }

     

    output {

        stdout {

            codec => rubydebug

        }

    }

    2.3.9.2  转换日期格式

    要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:https://www.elastic.co/guide/en/logstash/7.6/plugins-filters-date.html

     

    用法如下:

     

     

    将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段。

     

    Logstash配置修改为如下:

    input {

        beats {

            port => 5044

        }

    }

     

    filter {

        grok {

            match => {

                "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status:int} %{INT:length:int} "%{DATA:reference}" "%{DATA:browser}""

            }

        }

        mutate {

            enable_metric => "false"

            remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]

        }

        date {

            match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]

            target => "date"

        }

    }

     

    output {

        stdout {

            codec => rubydebug

        }

    }

     

    启动Logstash:

    bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic

     

    {

           "status" => "200",

        "reference" => "www.baidu.com",

           "method" => "POST",

          "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",

               "ip" => "235.9.200.241",

           "length" => "45",

              "uri" => "/itcast.cn/bigdata.html",

             "date" => 2015-04-14T15:38:19.000Z

    }

    2.3.9.3  输出到Elasticsearch指定索引

    我们可以通过    

    elasticsearch {

            hosts => ["node1.itcast.cn:9200" ,"node2.itcast.cn:9200" ,"node3.itcast.cn:9200"]

            index => "xxx"

    }

    index来指定索引名称,默认输出的index名称为:logstash-%{+yyyy.MM.dd}。但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。

     

    input {

        beats {

            port => 5044

        }

    }

     

    filter {

        grok {

            match => {

                "message" => "%{IP:ip} - - [%{HTTPDATE:date}] "%{WORD:method} %{PATH:uri} %{DATA}" %{INT:status:int} %{INT:length:int} "%{DATA:reference}" "%{DATA:browser}""

            }

        }

        mutate {

            enable_metric => "false"

            remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]

        }

        date {

            match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]

            target => "date"

        }

    }

     

    output {

        stdout {

            codec => rubydebug

        }

        elasticsearch {

            hosts => ["node1.itcast.cn:9200" ,"node2.itcast.cn:9200" ,"node3.itcast.cn:9200"]

            index => "apache_web_log_%{+YYYY-MM}"

        }

    }

     

     

    启动Logstash

    bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exit

    bin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic

     

    注意:

    l index名称中,不能出现大写字符

    3.  Kibana

    3.1  简介

     

    通过上面的这张图就可以看到,Kibana可以用来展示丰富的图表。

    l Kibana是一个开源的数据分析和可视化平台,使用Kibana可以用来搜索Elasticsearch中的数据,构建漂亮的可视化图形、以及制作一些好看的仪表盘

    l Kibana是用来管理Elastic stack组件的可视化平台。例如:使用Kibana可以进行一些安全设置、用户角色设置、对Elasticsearch进行快照等等

    l Kibana提供统一的访问入口,不管是日志分析、还是查找文档,Kibana提供了一个使用这些功能的统一访问入口

    l Kibana使用的是Elasticsearch数据源,Elasticsearch是存储和处理数据的引擎,而Kibana就是基于Elasticsearch之上的可视化平台

     

    主要功能:

    l 探索和查询Elasticsearch中的数据

     

    l 可视化与分析

     

    3.2  安装Kibana

    在Linux下安装Kibana,可以使用Elastic stack提供 tar.gz压缩包。官方下载地址:

    https://www.elastic.co/cn/downloads/past-releases/kibana-7-6-1

     

    1. 解压Kibana gz压缩包

    tar -xzf kibana-7.6.1-linux-x86_64.tar.gz

    1. 进入到Kibana目录

    cd kibana-7.6.1-linux-x86_64/

    1. 配置Kibana: config/kibana.yml

    server.host: "node1.itcast.cn"

    # The Kibana server's name.  This is used for display purposes.

    server.name: "itcast-kibana"

     

    # The URLs of the Elasticsearch instances to use for all your queries.

    elasticsearch.hosts: ["http://node1.itcast.cn:9200","http://node2.itcast.cn:9200","http://node3.itcast.cn:9200"]

    1. 运行Kibana

    ./bin/kibana

    3.2.1  查看Kibana状态

    输入以下网址,可以查看到Kibana的运行状态:

    http://node1.itcast.cn:5601/status

     

    3.2.2  查看Elasticsearch的状态

    点击

     按钮,再点击 「Index Management」,可以查看到Elasticsearch集群中的索引状态。

     

     

    点击索引的名字,可以进一步查看索引更多的信息。

     

    点击「Manage」按钮,还可以用来管理索引。

     

    3.3  添加Elasticsearch数据源

    3.3.1  Kibana索引模式

    在开始使用Kibana之前,我们需要指定想要对哪些Elasticsearch索引进行处理、分析。在Kibana中,可以通过定义索引模式(Index Patterns)来对应匹配Elasticsearch索引。在第一次访问Kibana的时候,系统会提示我们定义一个索引模式。或者我们可以通过点击按钮,再点击Kibana下方的Index Patterns,来创建索引模式。参考下图:

     

     

    1. 定义索引模式,用于匹配哪些Elasticsearch中的索引。点击「Next step」

     

    1. 选择用于进行时间过滤的字段

     

    1. 点击「Create Index Pattern」按钮,创建索引模式。创建索引模式成功后,可以看到显示了该索引模式对应的字段。里面描述了哪些可以用于搜索、哪些可以用来进行聚合计算等。

     

    3.4  探索数据(Discovery)

    通过Kibana中的Discovery组件,我们可以快速地进行数据的检索、查询。

    3.4.1  使用探索数据功能

    点击按钮可以打开Discovery页面。

     

    我们发现没有展示任何的数据。但我们之前已经把数据导入到Elasticsearch中了。

     

    Kibana提示,让我们扩大我们的查询的时间范围。

     

    默认Kibana是展示最近15分钟的数据。我们把时间范围调得更长一些,就可以看到数据了。

     

    将时间范围选择为1年范围内的,我们就可以查看到Elasticsearch中的数据了。

     

    3.4.2  导入更多的Apache Web日志数据

    1. 将资料中的 access.log 文件上传到Linux
    2. 将access.log移动到/var/apache/log,并重命名为access.log.2

    mv access.log /var/apache/log/access.log.2

    1. 启动FileBeat

    ./filebeat -e -c filebeat-logstash.yml

    1. 启动Logstash

    bin/logstash -f config/filebeat-es.conf --config.reload.automatic

    3.4.3  基于时间过滤查询

    3.4.3.1  选择时间范围

     

    3.4.3.2  指定查询某天的数据

    查询2020年5月6日的所有日志数据。

     

    3.4.3.3  从直方图
    上选择日期更细粒度范围

    如果要选择查看某一天的日志,上面这种方式会有一些麻烦,我们有更快更容易的方式。

     

     

    3.4.4  使用Kibana搜索数据

    在Kibana的Discovery组件中,可以在查询栏中输入搜索条件。默认情况下,可以使用Kibana内置的标准查询语言,来进行快速查询。还有一种是遗留的基于Lucene的查询语法目前暂时可用,这种查询语法也可以使用基于JSON的Elasticsearch DSL也是可用的。当我们在Discovery搜索数据时,对应的直方图、文档列表都会随即更新。默认情况下,优先展示最新的文档,按照时间倒序排序的。

    3.4.4.1  Kibana查询语言(KQL)

    在7.0中,Kibana上线了新的查询语言。这种语言简洁、易用,有利于快速查询。

    查询语法:

    「字段:值」,如果值是字符串,可以用双引号括起来。

    查询包含zhihu的请求

    *zhihu*

    查询页面不存在的请求

    status : 404

    查询请求成功和不存在的请求

    status: (404 or 200)

    查询方式为POST请求,并请求成功的日志

    status: 200 and method: post

    查询方式为GET成功的请求,并且响应数据大于512的日志

    status: 200 and method: get and length > 512

    查询请求成功的且URL为「/itcast.cn开头的日志

    uri: "/itcast.cn/*"

    注意:因为/为特殊字符,需要使用反斜杠进行转义

    3.4.4.2  过滤字段

    Kibana的Discovery组件提供各种各样的筛选器,这样可以筛选出来我们关注的数据上。例如:我们只想查询404的请求URI。

     

     

    指定过滤出来404以及请求的URI、从哪儿跳转来的日志

     

     

    将查询保存下来,方便下次直接查看

     

     

    下次直接点击Open就可以直接打开之前保存的日志了

     

    3.5  数据可视化(Visualize)

    Kibana中的Visualize可以基于Elasticsearch中的索引进行数据可视化,然后将这些可视化图表添加到仪表盘中。

    3.5.1  数据可视化的类型

    l Lens

    n 通过简单地拖拽数据字段,快速构建基本的可视化

    l 常用的可视化对象

    n 线形图(Line)、面积图(Area)、条形图(Bar):可以用这些带X/Y坐标的图形来进行不同分类的比较

    n 饼图(Pie):可以用饼图来展示占比

    n 数据表(Data Table):以数据表格的形式展示

    n 指标(Metrics):以数字的方式展示

    n 目标和进度:显示带有进度指标的数字

    n 标签云/文字云(Tag Cloud):以文字云方式展示标签,文字的大小与其重要性相关

    l Timelion

    n 从多个时间序列数据集来展示数据

    l 地图

    n 展示地理位置数据

    l 热图

    n 在矩阵的单元格展示数据

     

    l 仪表盘工具

    n Markdown部件:显示一些MD格式的说明

    n 控件:在仪表盘中添加一些可以用来交互的组件

    l Vega

    3.5.2  以饼图展示404与200的占比

    效果图:

     

    操作步骤:

    1. 创建可视化

     

    1. 选择要进行可视化图形类型,此处我们选择Pie(饼图类型)

     

    1. 选择数据源

     

    1. 添加分桶、分片(还记得吗?我们在Elasticsearch进行分组聚合都是以分桶方式进行的,可以把它理解为分组)

     

    1. 配置分桶以及指标计算方式

     

    1. 点击蓝色播放按钮执行。

     

    1. 保存图形(取名为:apache_log@404_200)

    3.5.3  以条形图方式展示2020年5月每日请求数

    效果如下:

     

     

    开发步骤:

     

     

     

     

    我们还可以修改图形的样式,例如:以曲线、面积图的方式展示。

     

     

    3.5.4  以TSVB可视化不同访问来源的数据

    TSVB是一个基于时间序列的数据可视化工具,它可以使用Elasticsearch聚合的所有功能。使用TSVB,我们可以轻松地完成任意聚合方式来展示复杂的数据。它可以让我们快速制作效果的图表:

    1. 基于时间序列的图形展示

     

    1. 展示指标数据

     

    1. TopN

     

    1. 类似油量表的展示

     

    1. Markdown自定义数据展示

     

    1. 以表格方式展示数据

     

    操作步骤:

    1. 创建TSVB可视化对象

     

    1. 配置Time Series数据源分组条件

     

     

    1. 配置Metric

     

     

    1. TopN

     

    3.5.5  制作用户选择请求方式、响应字节大小控制组件

    3.5.5.1  控制组件

    在Kibana中,我们可以使用控件来控制图表的展示。例如:提供一个下列列表,供查看图表的用户只展示比较关注的数据。我们可以添加两个类型的控制组件:

    1. 选项列表

    l 根据一个或多个指定选项来筛选内容。例如:我们先筛选某个城市的数据,就可以通过选项列表来选择该城市

    1. 范围选择滑块

    l 筛选出来指定范围的数据。例如:我们筛选某个价格区间的商品等。

     

    3.5.5.2  Kibana开发

     

     

    3.6  制作Dashboard

    接下来,我们把前面的几个图形放到一个看板中。这样,我们就可以在一个看板中,浏览各类数据了。

     

    1. 点击第三个组件图标,并创建一个新的Dashboard。

     

    1. 点击Edit编辑Dashboard。

     

    1. 依次添加我们之前制作好的图表。

     

     

  • 相关阅读:
    PAT2019顶级7-2:美丽的序列(线段树+DP)
    ZOJ2112 Dynamic Rank(可持久化线段树套树状数组)
    CF1353E K-periodic Garland(动态规划)
    CF1353D Constructing the array(优先队列)
    HDU2069 Coin Change(基础DP)
    surf(树状数组+DP)
    双倍快乐(回文树)
    ZOJ3591 Nim(博弈论)
    HDU6601 Keep On EveryThing But Triangle(可持久化线段树)
    HDU6599 I Love Palindrome String(回文树)
  • 原文地址:https://www.cnblogs.com/shan13936/p/14176285.html
Copyright © 2011-2022 走看看