zoukankan      html  css  js  c++  java
  • Flink输出到Elasticsearch

    1.代码

    import java.util
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.client.Requests

    //温度传感器读取样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)

    object EsSinkTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //source
    val inputStream = env.readTextFile("sensor1.txt")

    //transform
    import org.apache.flink.api.scala._
    val dataStream = inputStream.map(x => {
    val arr = x.split(",")
    SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })

    //sink
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))
    //创建一个esSink的Builder
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading] (
    httpHosts,
    new ElasticsearchSinkFunction[SensorReading] {
    override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
    print("saving data" + t)
    //包装成一个Map或者JsonObject
    val hashMap = new util.HashMap[String, String]()
    hashMap.put("sensor_id", t.id)
    hashMap.put("temperature", t.temperature.toString)
    hashMap.put("timestamp", t.timestamp.toString)
    //创建index request,准备发送数据
    val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(hashMap)
    //发送请求,写入数据
    requestIndexer.add(indexRequest)
    println("data saved successfully")
    }
    }
    )

    dataStream.addSink(esSinkBuilder.build())

    env.execute("es sink test")

    }
    }

    2.启动Elasticsearch
    3.访问: 127.0.0.1:9200/sensor/_search?pretty ,结果如下

      

    有帮助的欢迎评论打赏哈,谢谢!

  • 相关阅读:
    解决Cannot download "https://github.com/sass/node-sass/releases/download/binding.nod的问题
    wid是一个字符串 必须转化成整型
    如何获取内联样式的width值
    onresize方法
    jquery中$("#afui").get(0)为什么要加get(0)呢?
    jquery $(document).ready() 与window.onload的区别
    鼠标点击
    添加二级菜单颜色
    homepage左边的导航菜单怎么做的?
    center
  • 原文地址:https://www.cnblogs.com/wddqy/p/12176053.html
Copyright © 2011-2022 走看看