Redis-Sink
使用Flink内嵌 RedisSink
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
package com.text.sink
import java.net.InetSocketAddress
import java.util
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("ke01", 8899)
val result = stream.flatMap(_.split(" "))
.map((_, 1)).keyBy(0).sum(1)
result.print().setParallelism(1);
// redis是单机
val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("192.168.75.91").setPort(6390).setPassword("aa123456").build()
result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
override def getCommandDescription: RedisCommandDescription = {
// 选择redis存储方式
new RedisCommandDescription(RedisCommand.HSET, "wc")
}
override def getKeyFromData(t: (String, Int)): String = {
println(t._1)
t._1
}
override def getValueFromData(t: (String, Int)): String = {
println(t._2)
t._2.toString
}
}))
env.execute()
}
}
Kafka-Sink
使用Fink内嵌:FlinkKafkaProducer
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.2</version>
</dependency>
package com.text.sink
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
object KafkaSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("ke01", 8890)
val result = stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
val props = new Properties()
props.setProperty("bootstrap.servers", "ke01:9092,ke02:9092,ke03:9092")
// 因为下面序列化了,所以这里没必要序列化
result.addSink(new FlinkKafkaProducer[(String, Int)]("wc", new KafkaSerializationSchema[(String, Int)] {
override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("wc", element._1.getBytes(), (element._2.toString).getBytes())
}
}, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
env.execute()
}
}
Mysql-Sink
1.Fink不支持mysql内嵌,所以自定义:RichSinkFunction
2. MySQL需要幂等性,就是没有数据的时候可正常插入,有数据的时候正常累加
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
package com.text.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
case class CarInfo(monitorId: String, carId: String, eventTime: String, speed: Long)
object MysqlSinkDemo {
def main(args: Array[String]): Unit = {
//需要保证幂等性,幂等性:就是多次写入,如何保证数据准确
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092")
props.setProperty("group.id", "group_id")
props.setProperty("key.deserializer", classOf[StringSerializer].getName)
props.setProperty("value.deserializer", classOf[StringSerializer].getName)
val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
//什么时候停止,停止条件是什么
override def isEndOfStream(nextElement: (String, String)): Boolean = false
//要进行序列化的字节流
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
val key = new String(consumerRecord.key(), "UTF-8")
val value = new String(consumerRecord.value(), "UTF-8")
(key, value)
}
override def getProducedType: TypeInformation[(String, String)] = {
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
}
}, props))
stream.map(data => {
val value = data._2
val splits = value.split(" ")
val monitorId = splits(0)
(monitorId, 1)
}).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] {
// /value1:上次聚合完的结果 value2:当前的数据
override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
(value1._1, value1._2 + value2._2)
}
}).addSink(new MySQLCustomSink)
env.execute()
}
}
class MySQLCustomSink extends RichSinkFunction[(String, Int)] {
var conn: Connection = _
var insertPst: PreparedStatement = _
var updatePst: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://ke01:3306/test", "root", "aa123456")
insertPst = conn.prepareStatement("insert into car_flow(monitorId, count) values(?, ?)")
updatePst = conn.prepareStatement("update car_flow set count = ? where monitorId = ?")
}
override def close(): Unit = {
insertPst.close()
updatePst.close()
conn.close()
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
println(value)
updatePst.setInt(1, value._2)
updatePst.setString(2, value._1)
updatePst.execute()
println(updatePst.getUpdateCount)
if (updatePst.getUpdateCount == 0) {
println("insert")
insertPst.setString(1, value._1)
insertPst.setInt(2, value._2)
insertPst.execute()
}
}
}
Socket-Sink
自定义
package com.text.sink import java.io.PrintStream import java.net.{InetAddress, Socket} import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object SocketSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") props.setProperty("group.id", "flink-kafka-002") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { // 什么时候停止,停止条件是什么 override def isEndOfStream(nextElement: (String, String)): Boolean = false // 要进行序列化的字节流 override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } // 指定一下返回的数据类型 flink提供的类型 override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) stream.map(data => { val value = data._2 val splits = value.split(" ") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] { override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { (value1._1, value1._2 + value2._2) } }).addSink(new SocketCustomSink("ke01", 8891)) env.execute() } } class SocketCustomSink(host: String, port: Int) extends RichSinkFunction[(String, Int)] { var socket: Socket = _ var writer: PrintStream = _ override def open(parameters: Configuration): Unit = { socket = new Socket(InetAddress.getByName(host), port) writer = new PrintStream(socket.getOutputStream) } override def close(): Unit = { socket.close() writer.close() } override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { writer.println(value._1 + " " + value._2) writer.flush() } }
File-Sink
支持分桶写入,每一个桶就是一个目录,默认每隔一个小时会产生一个分桶,每个桶下面会存储每一个
Thread的处理结果,可以设置一些文件滚动的策略(文件打开、文件大小等),防止出现大量的小文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.9.2</version>
</dependency>
package com.text.sink import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object FileSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092") props.setProperty("group.id", "group_id_03") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(nextElement: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) val restStream = stream.map(data => { val value = data._2 val splits = value.split(" ") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] { override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { (value1._1, value2._2 + value1._2) } }).map(x => x._1 + " " + x._2) val rolling:DefaultRollingPolicy[String,String] = DefaultRollingPolicy.create() //当文件大小超过256 则滚动产生一个小文件 .withMaxPartSize(256*1024*1024) //当文件超过2s没有写入新数据,则滚动产生一个小文件 .withInactivityInterval(2000) //文件打开时间超过2s 则滚动产生一个小文件 每隔2s产生一个小文件 .withRolloverInterval(2000) .build() val sink = StreamingFileSink.forRowFormat(new Path("D:\code\scala\test\test07\data\tmp"), new SimpleStringEncoder[String]("UTF-8")) .withBucketCheckInterval(1000) .withRollingPolicy(rolling) .build() restStream.addSink(sink) env.execute() } }
HBase-Sink
计算结果写入sink 两种实现方式:
1. map算子写入 频繁创建hbase连接
2. process写入 适合批量写入hbase
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.3</version>
</dependency>
package com.text.sink
import java.util.{Date, Properties} import akka.remote.serialization.StringSerializer import com.text.utils.DataUtils import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.util.Bytes object HBaseSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") props.setProperty("group.id", "flink-kafka-003") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props)) stream.map(row => { val arr = row.split(" ") (arr(0), 1) }).keyBy(_._1) .reduce((v1: (String, Int), v2: (String, Int)) => { (v1._1, v1._2 + v2._2) }).process(new ProcessFunction[(String, Int), (String, Int)] { var htab: HTable = _ override def open(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "ke02:2181,ke03:2181,ke04:2181") val hbaseName = "car_flow" htab = new HTable(conf, hbaseName) } override def close(): Unit = { htab.close() } override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { val min = DataUtils.getDateStrByDate(new Date()) val put = new Put(Bytes.toBytes(value._1)) put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2)) htab.put(put) } }) env.execute() } }