zoukankan      html  css  js  c++  java
  • Flink写入数据到Elasticsearch示例

    版本说明:flink-v1.11 elasticsearch-7.9

    1.添加maven依赖

            <!-- elasticsearch connector -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
                <version>1.11.0</version>
            </dependency>
    
            <!-- jackson -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.11.1</version>
            </dependency>

    2.设置Builder

            List<HttpHost> elsearchHosts = new ArrayList<>(); 
            elsearchHosts.add(new HttpHost("192.168.32.36", 9200, "http"));
            elsearchHosts.add(new HttpHost("192.168.32.37", 9200, "http"));
            elsearchHosts.add(new HttpHost("192.168.32.38", 9200, "http"));
    
            ObjectMapper mapper = new ObjectMapper(); // jaskson ObjectMapper
    
            ElasticsearchSink.Builder<ResultCollector> esSinkBuilder = new ElasticsearchSink.Builder<>( // ResultCollector 是你要保存的对象类型,替换即可
                    elsearchHosts,
                    new ElasticsearchSinkFunction<ResultCollector>() {
    
                        private static final long serialVersionUID = -6797861015704600807L;
    
                        public IndexRequest createIndexRequest(ResultCollector collector) throws Exception {
                            return Requests.indexRequest()
                                    .index("flink-test-index") // 设置Index
                                    .id(collector.getId()) // 设置ID
                        // 这里要特别注意需要传map .source(mapper.readValue(mapper.writeValueAsString(collector), Map.
    class)); } @SneakyThrows @Override public void process(ResultCollector collector, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(collector)); } } ); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> { Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type", "application/json")}; restClientBuilder.setDefaultHeaders(headers); });

    3.addSink

            //stream.addSink(esSinkBuilder.build());

     参考: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html

  • 相关阅读:
    shell流程控制
    shell编程变量介绍与表达式详解
    shell编程简介
    反向代理与负载均衡
    存储库之mongodb,redis,mysql
    请求库之requests,selenium
    解析库之re、beautifulsoup、pyquery
    爬虫基本原理
    Django 函数和方法的区别
    Django 知识补漏单例模式
  • 原文地址:https://www.cnblogs.com/dotqin/p/13656983.html
Copyright © 2011-2022 走看看