zoukankan      html  css  js  c++  java
  • Apache Kafka(七)- Kafka ElasticSearch Comsumer

    Kafka ElasticSearch Consumer

    对于Kafka Consumer,我们会写一个例子用于消费Kafka 数据传输到ElasticSearch。

    1. 构造ElasticSearch 基本代码

    我们使用如下代码构造一个 Elastic Search Client,并向 ES写入一个index:

    import org.apache.http.HttpHost;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.index.IndexResponse;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.common.xcontent.XContentType;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class ElasticSearchConsumer {
    
    
        public static void main(String[] args) throws IOException {
            Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
            RestHighLevelClient client = createClient();
    
    
            String jsonString = "{"foo": "bar"}";
    
            // create an index
    
            IndexRequest indexRequest = new IndexRequest (
                    "kafkademo"
            ).source(jsonString, XContentType.JSON);
    
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            String id = indexResponse.getId();
    
            logger.info(id);
    
            // close the client
            client.close();
        }
    
        public static RestHighLevelClient createClient(){
            String hostname = "xxxxx";
    
            RestClientBuilder builder = RestClient.builder(
                    new HttpHost(hostname, 443, "https"))
                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        @Override
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                            return httpAsyncClientBuilder;
                        }
                    });
    
            RestHighLevelClient client = new RestHighLevelClient(builder);
    
            return client;
        }
    }
    

    在 ES 端查看index 以及条目信息:

    > curl https://xxx/_cat/indices?v

    health status index     uuid                   pri rep docs.count docs.deleted store.size pri.store.size

    green  open   .kibana_1 tQuukokDTbWg9OyQI8Bh4A   1   1          0            0       566b           283b

    green  open   .kibana_2 025DtfBLR3CUexrUkX9x9Q   1   1          0            0       566b           283b

    green  open   kafkademo elXjncvwQPam7dqMd5gedg   5   1          1            0      9.3kb          4.6kb

    green  open   .kibana   ZvzR21YqSOi-8nbjffSuTA   5   1          1            0     10.4kb          5.2kb

    > curl https://xxx/kafkademo/

    {"kafkademo":{"aliases":{},"mappings":{"properties":{"foo":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}}},"settings":{"index":{"creation_date":"1566985949656","number_of_shards":"5","number_of_replicas":"1","uuid":"elXjncvwQPam7dqMd5gedg","version":{"created":"7010199"},"provided_name":"kafkademo"}}}}

    2. 向Kafka 生产消息

    为了模拟输入到 Kafka 的消息,我们使用一个开源的json-data-generator,github地址如下:

    https://github.com/everwatchsolutions/json-data-generator

    使用此工具可以很方便地向 Kafka 生产随机的 json数据。

    下载此工具后,配置好Kafka broker list地址,启动向Kafka 生产消息:

    > java -jar json-data-generator-1.4.0.jar jackieChanSimConfig.json

     

    3. 将消息发往ElasticSearch

    在原有Kafka Consumer 的基础上,我们增加以下代码:

    // poll for new data
    while(true){
        ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofMinutes(100));
    
        for(ConsumerRecord record : records) {
            // where we insert data into ElasticSearch
            IndexRequest indexRequest = new IndexRequest(
                    "kafkademo"
            ).source(record.value(), XContentType.JSON);
    
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            String id = indexResponse.getId();
    
            logger.info(id);
    
            try {
                Thread.sleep(1000); // introduce a small delay
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        }
    

    可以看到消息被正常发往ElasticSearch,其中随机字符串为插入ES后的 _id:

     

  • 相关阅读:
    Vue.Draggable实现拖拽效果(快速使用)
    1.从面向过程到面向对象的过渡
    微信支付流程
    2.js原型的基本概念
    POST和GET请求的区别
    vue-router2.0
    vue列表渲染,以及鼠标点击改变样式的问题
    复杂数组结构的深拷贝
    高德地图将字符串地址转为经纬度的一个demo
    数组对象排序
  • 原文地址:https://www.cnblogs.com/zackstang/p/11428045.html
Copyright © 2011-2022 走看看