zoukankan      html  css  js  c++  java
  • filebeat收集json日志到elasticsearch笔记

    前言

    为了方便测试,本文例子是在windows上搭建的,linux大同小异。搭建前需要准备windows环境,springboot应用和windows版docker。

    一.logback配置json格式日志

    1.pom.xml配置增加依赖

            <dependency>
                <groupId>net.logstash.logback</groupId>
                <artifactId>logstash-logback-encoder</artifactId>
                <version>6.1</version>
            </dependency>

    2.修改logback.xml配置

                <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
                    <providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders">
                        <pattern>
                            <pattern>
                                {
                                "date":"%d{yyyy-MM-dd HH:mm:ss.SSS}",
                                "project":"test",
                                "app":"${APP_NAME}",
                                "env":"${ENV:-dev}",
                                "level":"%level",
                                "logger":"%logger",
                                "thread":"%thread",
                                "traceId":"%X{traceId}",
                                "message": "%msg  %exception{20}"
                                }
                            </pattern>
                        </pattern>
                    </providers>
                </encoder>

    二.elasticsearch

    1.docker安装

    docker run -d --name elasticsearch  -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.10.1

    2.安装ik中文分词器

    下载对应版本的ik分词器,地址https://github.com/medcl/elasticsearch-analysis-ik/releases

    解压后,使用docker命令cp复制到容器内,重启elasticsearch容器

    docker cp C:UsersDELLDesktopik elasticsearch:/usr/share/elasticsearch/plugins

     3.浏览器打开http://127.0.0.1:9200,会出现如下信息

    {
        "name": "08485dfd8dd0",
        "cluster_name": "docker-cluster",
        "cluster_uuid": "wZqRjN7nTb66aFMeAz1Hug",
        "version": {
            "number": "7.10.1",
            "build_flavor": "default",
            "build_type": "docker",
            "build_hash": "1c34507e66d7db1211f66f3513706fdf548736aa",
            "build_date": "2020-12-05T01:00:33.671820Z",
            "build_snapshot": false,
            "lucene_version": "8.7.0",
            "minimum_wire_compatibility_version": "6.8.0",
            "minimum_index_compatibility_version": "6.0.0-beta1"
        },
        "tagline": "You Know, for Search"
    }

    三.filebeat收集日志

    1.下载filebeat,地址https://www.elastic.co/cn/downloads/beats/filebeat

    2.解压后,修改filebeat.yml文件

    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - D:datalog**-root.log
    
      fields_under_root: true
    
      tail_files: true
      
      json.keys_under_root: true
    
      json.overwrite_keys: true
    
      json.add_error_key: true
    
    
    # ======================= Elasticsearch template setting =======================
    setup.ilm.enabled: false
    
    setup.template.name: "filebeat"
    setup.template.fields: "fields.yml"
    setup.template.pattern: "*"
    setup.template.enabled: true
    setup.template.overwrite: false
    #setup.template.append_fields:
    #- name: levelee
    #  type: keyword
    setup.template.settings:
      index.number_of_shards: 1
      index.number_of_replicas: 0
      index.codec: best_compression
      #_source.enabled: false
    
    # ---------------------------- Elasticsearch Output ----------------------------
    output:
      elasticsearch:
      # Array of hosts to connect to.
        hosts: ["localhost:9200"]
        index: "filebeat_%{+yyyy-MM-dd}"
        indices:
        - index: "%{[env]}_%{[project]}_%{[app]}_%{+yyyy-MM-dd}"
    
    # ================================= Processors =================================
    processors:
      - add_host_metadata:
          netinfo: 
             enabled: true
      - timestamp:
          field: date
          layouts:
            - '2006-01-02 15:04:05.999'
          test:
            - '2021-01-18 14:06:18.452'
          timezone: Asia/Shanghai
      - drop_fields:
          fields: ["ecs", "agent", "log","input"]

    修改fields.yml文件

    - key: ecs
      title: ECS
      description: ECS Fields.
      fields:
      - name: '@timestamp'
        level: core
        required: true
        type: date
        description: 'Date/time when the event originated.
        example: '2016-05-23T08:05:34.853Z'
      - name: message
        level: core
        type: text
        analyzer: ik_max_word
        description: 'For log events the message field contains the log message, optimized
        example: Hello World

           这两个文件主要定义了es的索引模板,模板定义message字段,采用ik分词器,而日志文件的其他字段将会由索引模板自动生成

    3.启动filebeat,打开cmd定位到filebeat.exe的路径,执行./filebeat.exe

    4.浏览器打开127.0.0.1:9200/_search,将会看到上传到es的数据

    四.java查询es日志

    pom.xml增加依赖

      <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </dependency>

    按照日志时间倒序,调用代码如下

    public String search(String env, String project, Integer size, String levels, String apps, String keywords, Date startDate) throws IOException {
    
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().sort("@timestamp", SortOrder.DESC).timeout(new TimeValue(60, TimeUnit.SECONDS))
                    .fetchSource(new String[]{"message", "date", "level", "thread", "stackTrace", "app", "traceId", "logger", "@timestamp", "host.ip"}, null);
    
            if (size != null && size > 0) {
                sourceBuilder.from(0);
                sourceBuilder.size(size);
            }
    
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
            if (startDate != null) {
                boolQueryBuilder.filter(QueryBuilders.rangeQuery("@timestamp").gte(DateFormatUtils.format(startDate, "yyyy-MM-dd'T'HH:mm:ss.SSSZZ")));
            }
            if (Strings.isNotBlank(levels)) {
                String[] array = levels.split(",");
                boolQueryBuilder.must(QueryBuilders.termsQuery("level", array));
            }
            if (Strings.isNotBlank(keywords)) {
                String[] array = keywords.split(",");
                for (String item : array) {
                    boolQueryBuilder.must(QueryBuilders.wildcardQuery("message", "*" + item + "*"));
                    //boolQueryBuilder.must(QueryBuilders.termQuery("message", item));
                }
            }
            sourceBuilder.query(boolQueryBuilder);
            System.out.println(sourceBuilder.toString());
    
            SearchResponse searchResponse = null;
            String result = null;
    
            Date tomorrow = DateUtils.ceiling(new Date(), Calendar.DATE);
            Date tmpDate = startDate;
            String tmpProject = "*";
    
            if (StringUtils.isNotBlank(project)) {
                tmpProject = project;
            }
    
            List<String> indexList = new ArrayList<>();
            while (tmpDate.before(tomorrow)) {
                if (StringUtils.isNotBlank(apps)) {
                    String[] array = apps.split(",");
                    for (String item : array) {
                        indexList.add(env + "_" + tmpProject + "_" + item + "_" + DateFormatUtils.format(tmpDate, "yyyy-MM-dd"));
                    }
                } else {
                    indexList.add(env + "_" + tmpProject + "_*_" + DateFormatUtils.format(tmpDate, "yyyy-MM-dd"));
                }
                tmpDate = DateUtils.addDays(tmpDate, 1);
            }
    
            SearchRequest searchRequest = new SearchRequest();
            if (CollectionUtils.isNotEmpty(indexList)) {
                searchRequest.indices(indexList.toArray(new String[0]));
                searchRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
            }
    
            searchRequest.source(sourceBuilder);
            System.out.println(searchRequest);
            searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    
            if (searchResponse != null) {
                SearchHits searchHits = searchResponse.getHits();
                StringBuilder sb = new StringBuilder();
                for (SearchHit hit : searchHits.getHits()) {
                    Map<String, Object> map = hit.getSourceAsMap();
                    String message = map.get("message").toString();
                    String date = map.get("date").toString();
                    String level = map.get("level").toString();
                    sb.append(date).append(" ").append(level).append(" ").append(message).append(" ").append(System.lineSeparator()).append("-----------------------------------------------------------").append(System.lineSeparator());
                }
                result = sb.toString();
            }
    
    
            return result;
        }

    常用es的api

    http://127.0.0.1:9200/_analyze 查询字段的token分析   {"analyzer" : "ik_smart",  "text" : "bsp.supplier.saleOrder.query"}
    http://127.0.0.1:9200/indexname/_mapping 主要是看索引的字段类型
    http://127.0.0.1:9200/indexname/_settings 查看索引的设置,例如分片
    http://127.0.0.1:9200/indexname delete方式请求,删除索引
    http://127.0.0.1:9200/_template/indexname 查看索引模板
    http://127.0.0.1:9200/indexname/_termvectors/文档id?fields=message 查看索引字段的分词情况
    http://127.0.0.1:9200/indexname/_delete_by_query 删除文档
  • 相关阅读:
    C#引用类型详细剖析(转)
    wcf问题集锦
    Emgu CV 初试
    C#语言使用习惯
    多线程和消息机制
    ArrayAdapter
    SimpleAdapter
    删除对话框
    HTML制作个人简历
    冒泡排序
  • 原文地址:https://www.cnblogs.com/caizl/p/14344081.html
Copyright © 2011-2022 走看看