一、redis sink
对应jar包
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
将文件内容写入到hash中
代码:
object RedisSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt") val dataStream: DataStream[SensorReading] = streamFromFile.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //redis sink val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build() dataStream.addSink(new RedisSink(config,new MyRedisMapper)) env.execute("redis sink test") } } class MyRedisMapper extends RedisMapper[SensorReading]{ //命令为hset,键为sensor_temperature override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature") } //field为传感器id override def getKeyFromData(t: SensorReading): String = t.id //value为温度 override def getValueFromData(t: SensorReading): String = t.temperature.toString }
redis查看结果
127.0.0.1:6379> hgetall sensor_temperature 1) "sensor_1" 2) "35.80018327300259" 3) "sensor_6" 4) "15.402984393403084" 5) "sensor_10" 6) "38.101067604893444" 7) "sensor_7" 8) "6.720945201171228"
二、es sink
对应jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.2</version> </dependency>
将文件内容写入到es中
代码:
object EsSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt") val dataStream: DataStream[SensorReading] = streamFromFile.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //es sink val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("hadoop101",9200)) //创建一个es sink的builder val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] { override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("保存数据:" + t) //包装成map val map = new util.HashMap[String, String]() map.put("sensor_id", t.id) map.put("temperature", t.temperature.toString) map.put("ts", t.timestamp.toString) //创建index request,准备发送数据 val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map) //利用requestIndexer发送请求,写入数据 requestIndexer.add(indexRequest) println("保存成功") } }) esSinkBuilder dataStream.addSink(esSinkBuilder.build()) env.execute("redis sink test") } }
es中查看结果
{ "took" : 148, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 6, "max_score" : 1.0, "hits" : [ { "_index" : "sensor", "_type" : "redingdata", "_id" : "QXpZnnEBUwLRQchmepbT", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_6", "temperature" : "15.402984393403084", "ts" : "1547718201" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RnpZnnEBUwLRQchme5ZS", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_7", "temperature" : "6.720945201171228", "ts" : "1547718202" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "Q3pZnnEBUwLRQchmepbr", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "35.80018327300259", "ts" : "1547718199" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "QnpZnnEBUwLRQchmepbo", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "30.8", "ts" : "1547718200" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RHpZnnEBUwLRQchmepbs", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "40.8", "ts" : "1547718201" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RXpZnnEBUwLRQchmepbu", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_10", "temperature" : "38.101067604893444", "ts" : "1547718205" } } ] } }
三、jdbc sink
①mysql驱动
<!-- mysql sink --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
②自定义mysql sink,继承RichSinkFunction,重写执行逻辑以及初始化和关闭资源的方法。
class MyJdbcSink() extends RichSinkFunction[SensorReading]{ //定义sql连接、预编译器 var conn:Connection = _ var insertStmt : PreparedStatement = _ var updateStmt:PreparedStatement=_ //初始化 override def open(parameters: Configuration): Unit = { super.open(parameters) conn = DriverManager.getConnection("jdbc:mysql:///test","root","123456") insertStmt = conn.prepareStatement("insert into temperatures(sensor,temp) values(?,?)") updateStmt = conn.prepareStatement("update temperatures set temp=? where sensor=?") } //调用连接,执行sql override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { //这句必须删掉 //super.invoke(value, context) //执行更新语句 updateStmt.setDouble(1,value.temperature) updateStmt.setString(2,value.id) updateStmt.execute() //如果没有,则插入 if (updateStmt.getUpdateCount == 0){ insertStmt.setString(1,value.id) insertStmt.setDouble(2,value.temperature) insertStmt.execute() } } //关闭资源 override def close(): Unit = { updateStmt.close() insertStmt.close() conn.close() } }
③添加自定义的mysql sink并执行
object JdbcSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val streamFromFile = env.readTextFile("C:\Users\Mi\Documents\project\idea\FlinkTitorial\src\main\resources\sensor.txt") val dataStream: DataStream[SensorReading] = streamFromFile.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //jdbc sink dataStream.addSink(new MyJdbcSink()) env.execute("jdbc sink test") } }