zoukankan      html  css  js  c++  java
  • Flink学习(十一) Sink到Elasticsearch

    导入依赖

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    package com.wyh.streamingApi.sink
    
    import java.util
    
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.client.Requests
    
    //温度传感器读数样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
    object Sink2ES {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //Source操作
        val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        val httpHosts = new util.ArrayList[HttpHost]()
        httpHosts.add(new HttpHost("192.168.230.30", 9200))
    
        //创建一个ES Sink的builder
        val esSinkBuilder: ElasticsearchSink.Builder[SensorReading] = new ElasticsearchSink.Builder[SensorReading](
          httpHosts,
          new ElasticsearchSinkFunction[SensorReading] {
            override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
              println("saving data:" + t)
    
              //包装成一个Map或者JsonObject格式
              val json = new util.HashMap[String, String]()
    
              json.put("sensor_id", t.id)
              json.put("timestamp", t.timestamp.toString)
              json.put("temperature", t.temperature.toString)
    
              //创建indexRequest准备发送数据
              val indexRequest = Requests.indexRequest()
                .index("sensor")
                .`type`("readingdata")
                .source(json)
    
              //利用requestIndexer进行发送请求,写入数据
              requestIndexer.add(indexRequest)
              println("data 写入完成。。。")
            }
          }
        )
    
    
        //Sink操作
        dataStream.addSink(esSinkBuilder.build())
    
        env.execute("sink ES test")
      }
    
    }

    启动ES

     启动kibana

     运行

     查看结果

  • 相关阅读:
    LVS 模式
    修改RocketMQ的NameServer端口
    一次清理Hbase的oldWALs的过程
    Linux下删除文件系统空间不释放的问题
    HBase 强制删除表
    关闭Found duplicated code
    Java操作HDFS代码样例
    RocketMQ:Cannot allocate memory
    Storm的StreamID使用样例(版本1.0.2)
    android studio 编译sdk版降低报错解决方法
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12924829.html
Copyright © 2011-2022 走看看