zoukankan      html  css  js  c++  java
  • Flink写入数据到Elasticsearch示例

    版本说明:flink-v1.11 elasticsearch-7.9

    1.添加maven依赖

            <!-- elasticsearch connector -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
                <version>1.11.0</version>
            </dependency>
    
            <!-- jackson -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.11.1</version>
            </dependency>

    2.设置Builder

            List<HttpHost> elsearchHosts = new ArrayList<>(); 
            elsearchHosts.add(new HttpHost("192.168.32.36", 9200, "http"));
            elsearchHosts.add(new HttpHost("192.168.32.37", 9200, "http"));
            elsearchHosts.add(new HttpHost("192.168.32.38", 9200, "http"));
    
            ObjectMapper mapper = new ObjectMapper(); // jaskson ObjectMapper
    
            ElasticsearchSink.Builder<ResultCollector> esSinkBuilder = new ElasticsearchSink.Builder<>( // ResultCollector 是你要保存的对象类型,替换即可
                    elsearchHosts,
                    new ElasticsearchSinkFunction<ResultCollector>() {
    
                        private static final long serialVersionUID = -6797861015704600807L;
    
                        public IndexRequest createIndexRequest(ResultCollector collector) throws Exception {
                            return Requests.indexRequest()
                                    .index("flink-test-index") // 设置Index
                                    .id(collector.getId()) // 设置ID
                        // 这里要特别注意需要传map .source(mapper.readValue(mapper.writeValueAsString(collector), Map.
    class)); } @SneakyThrows @Override public void process(ResultCollector collector, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(collector)); } } ); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> { Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type", "application/json")}; restClientBuilder.setDefaultHeaders(headers); });

    3.addSink

            //stream.addSink(esSinkBuilder.build());

     参考: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html

  • 相关阅读:
    Callback2.0
    设计模式之Composite
    设计模式之Proxy
    React Native DEMO for Android
    React Native 与 夜神模拟器的绑定
    Skipping 'Android SDK Tools, revision 24.0.2'; it depends on 'Android SDK Platform-tools, revision 20' which was not installed.
    .ui/qrc文件自动生成.py文件
    简单排序算法
    Big O
    设计模式之Adapter
  • 原文地址:https://www.cnblogs.com/dotqin/p/13656983.html
Copyright © 2011-2022 走看看