zoukankan      html  css  js  c++  java
  • flinkSinkES

    import java.util
    
    import it.bigdata.flink.study.SensorReding
    import org.apache.flink.api.common.functions.RuntimeContext
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
    import org.apache.http.HttpHost
    import org.elasticsearch.client.Requests
    
    object EsSinkTest {
      def main(args: Array[String]): Unit = {
        //创建环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //读取数据
        val inputPath ="D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
        val inputStream = env.readTextFile(inputPath)
    
        //简单的转换为
        val dataStream = inputStream.map(data => {
          val arr = data.split(",")
          SensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    
        //定义HttpHosts
        val httpHosts = new util.ArrayList[HttpHost]()
        httpHosts.add(new HttpHost("127.0.0.0",9200))
    
        //自定义写入es的EsSinkFunction
        val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReding] {
          override def process(t: SensorReding, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
            //包装也给Map作为data source
            val dataSource = new util.HashMap[String,String]()
            dataSource.put("id",t.id)
            dataSource.put("temperature",t.temperature.toString)
            dataSource.put("ts",t.timestamp.toString)
    
            //创建index reques,用于发送http请求
            val indexRequest = Requests.indexRequest()
              .index("sensor")
              .`type`("readingdata")
              .source(dataSource)
    
            //用index发送请求
            requestIndexer.add(indexRequest)
          }
        }
    
        dataStream.addSink(new ElasticsearchSink
        .Builder[SensorReding](httpHosts,myEsSinkFunc)
            .build()
        )
    
        env.execute("es sink test")
      }
    }
    author@nohert
  • 相关阅读:
    桥接模式
    单例模式
    SpringAOP aspectJ ProceedingJoinPoint 获取当前方法
    springMVC实现文件下载
    JAVA的变量初始化类成员变量和局部变量区别
    JAVA枚举类型的应用
    linux下svn命令大全
    转:shell脚本的一些注意事项
    转: linux下不同服务器间数据传输(rcp,scp,rsync,ftp,sftp,lftp,wget,curl)
    TCP三次握手/四次挥手详解
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928228.html
Copyright © 2011-2022 走看看