zoukankan      html  css  js  c++  java
  • FLINK实例(12):CONNECTORS(11)elasticsearch 写 入

    1 工程目录

     pom.xml

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
                <version>1.10.0</version>
            </dependency>

    2 flink 写入 hbase

    package com.atguigu.flink.app
    
    import java.util
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.source.HbaseSource
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.api.scala
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.client.Requests
    
    object ESSinkApp {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //调用addSource以此来作为数据输入端
        val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource)
    
        val httpHosts = new util.ArrayList[HttpHost]()
        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
        val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
          httpHosts,
          new ElasticsearchSinkFunction[SensorReading]{
            override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
              // 构造数据格式
              val hashMap = new util.HashMap[String, String]()
              hashMap.put("data", t.toString)
    
              // 创建请求
              val indexRequest = Requests
                .indexRequest()
                .index("sensor") // 索引是sensor,相当于数据库
                .`type`("readingData") // es6必须写这一行代码
                .source(hashMap)// 数据源
    
              // 提交数据
              requestIndexer.add(indexRequest)
            }
    
          }
        )
    
        // 设置每一批写入es多少数据
        esSinkBuilder.setBulkFlushMaxActions(1)
    
        stream.addSink(esSinkBuilder.build())
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
    
      }
    
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13680673.html

  • 相关阅读:
    二叉树
    队列
    python3使用pdfminer3k解析pdf文件
    得到手机版新闻解析
    python连接redis并插入url
    Python使用requirements.txt安装类库
    (1366, "Incorrect string value: '\xF3\xB0\x84\xBC</...' for column 'content' at row 1")
    mysql中Incorrect string value乱码问题解决方案
    mysql命令
    requests ip代理池单ip和多ip设置方式
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13680673.html
Copyright © 2011-2022 走看看