zoukankan      html  css  js  c++  java
  • Flink学习(十一) Sink到Elasticsearch

    导入依赖

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    package com.wyh.streamingApi.sink
    
    import java.util
    
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.client.Requests
    
    //温度传感器读数样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
    object Sink2ES {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //Source操作
        val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        val httpHosts = new util.ArrayList[HttpHost]()
        httpHosts.add(new HttpHost("192.168.230.30", 9200))
    
        //创建一个ES Sink的builder
        val esSinkBuilder: ElasticsearchSink.Builder[SensorReading] = new ElasticsearchSink.Builder[SensorReading](
          httpHosts,
          new ElasticsearchSinkFunction[SensorReading] {
            override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
              println("saving data:" + t)
    
              //包装成一个Map或者JsonObject格式
              val json = new util.HashMap[String, String]()
    
              json.put("sensor_id", t.id)
              json.put("timestamp", t.timestamp.toString)
              json.put("temperature", t.temperature.toString)
    
              //创建indexRequest准备发送数据
              val indexRequest = Requests.indexRequest()
                .index("sensor")
                .`type`("readingdata")
                .source(json)
    
              //利用requestIndexer进行发送请求,写入数据
              requestIndexer.add(indexRequest)
              println("data 写入完成。。。")
            }
          }
        )
    
    
        //Sink操作
        dataStream.addSink(esSinkBuilder.build())
    
        env.execute("sink ES test")
      }
    
    }

    启动ES

     启动kibana

     运行

     查看结果

  • 相关阅读:
    关于JDK和JRE的一些总结
    Jackson 格式化日期问题
    CentOS6.8安装mysql5.6
    CentOS6.8安装JDK1.7
    VMware NAT方式 CentOS 6.8配置静态IP
    CentOS6.8使用源码安装Git
    关于SourceTree License
    记录平时遇到的问题
    使用React-Router遇到的那些坑
    移动端响应式布局好文收集
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12924829.html
Copyright © 2011-2022 走看看