zoukankan      html  css  js  c++  java
  • flinkSinkES

    import java.util
    
    import it.bigdata.flink.study.SensorReding
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.client.Requests
    
    object EsSinkTest {
      def main(args: Array[String]): Unit = {
        //创建环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //读取数据
        val inputPath ="D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
        val inputStream = env.readTextFile(inputPath)
    
        //简单的转换为
        val dataStream = inputStream.map(data => {
          val arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
        //定义HttpHosts
        val httpHosts = new util.ArrayList[HttpHost]()
        httpHosts.add(new HttpHost("127.0.0.0",9200))
    
        //自定义写入es的EsSinkFunction
        val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReding] {
          override def process(t: SensorReding, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
            //包装也给Map作为data source
            val dataSource = new util.HashMap[String,String]()
            dataSource.put("id",t.id)
            dataSource.put("temperature",t.temperature.toString)
            dataSource.put("ts",t.timestamp.toString)
    
            //创建index reques,用于发送http请求
            val indexRequest = Requests.indexRequest()
              .index("sensor")
              .`type`("readingdata")
              .source(dataSource)
    
            //用index发送请求
            requestIndexer.add(indexRequest)
          }
        }
    
        dataStream.addSink(new ElasticsearchSink
        .Builder[SensorReding](httpHosts,myEsSinkFunc)
            .build()
        )
    
        env.execute("es sink test")
      }
    }
    author@nohert
  • 相关阅读:
    经验大奉献。收集您的经验之你用什么方法提高.NET网站的性能!
    [书]:asp.net 2.0 高级编程(微软技术丛书)
    VS 2008 快捷键
    [书]:《Improving ASP.NET Performance》提高系统性能
    [书]:<<软件工程导论>> 听说很好,不知是真的否.
    [转] C#编码好习惯,献给所有热爱c#的同志
    [书]:UML和模式应用
    在后台代码里写 JS语句.
    查看和修改MTU值
    Lucene.NET搜索多个索引文件
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928228.html
Copyright © 2011-2022 走看看