导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.7.2</version>
</dependency>
package com.wyh.streamingApi.sink
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
//温度传感器读数样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sink2ES {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//Source操作
val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
//Transform操作
val dataStream: DataStream[SensorReading] = inputStream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("192.168.230.30", 9200))
//创建一个ES Sink的builder
val esSinkBuilder: ElasticsearchSink.Builder[SensorReading] = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
println("saving data:" + t)
//包装成一个Map或者JsonObject格式
val json = new util.HashMap[String, String]()
json.put("sensor_id", t.id)
json.put("timestamp", t.timestamp.toString)
json.put("temperature", t.temperature.toString)
//创建indexRequest准备发送数据
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("readingdata")
.source(json)
//利用requestIndexer进行发送请求,写入数据
requestIndexer.add(indexRequest)
println("data 写入完成。。。")
}
}
)
//Sink操作
dataStream.addSink(esSinkBuilder.build())
env.execute("sink ES test")
}
}
启动ES
启动kibana
运行
查看结果