  • Flink 自定义source和sink,获取kafka的key,输出指定key


    沙雕了,可以用  JSONKeyValueDeserializationSchema,接收ObjectNode的数据,如果有key,会放在ObjectNode中

    if (record.key() != null) {
                node.set("key", mapper.readValue(record.key(), JsonNode.class));
            if (record.value() != null) {
                node.set("value", mapper.readValue(record.value(), JsonNode.class));
            if (includeMetadata) {
                    .put("offset", record.offset())
                    .put("topic", record.topic())
                    .put("partition", record.partition());


    Flink 的  FlinkKafkaConsumer、FlinkKafkaProducer,在消费、生成kafka 数据的时候,不能指定key,又时候,我们又需要这个key。

    val kafkaSource = new FlinkKafkaConsumer[ObjectNode]("kafka_demo", new JsonNodeDeserializationSchema(), Common.getProp)
        val sink = new FlinkKafkaProducer[String]("kafka_demo_out", new SimpleStringSchema(), Common.getProp)
          .map(node => {
            node.put("token", System.currentTimeMillis())

    下面通过flink 的自定source、sink 实现,消费、生成kafka 数据的时候,获取数据的key ,和输出不同key的数据

    思路: 使用kafka 原生的api,KafkaConsuemr和KafkaProducer 消费、生产kafka的数据,就可以获取到key值

    kafka 生产者:

    object KafkaKeyMaker {
      val topic = "kafka_key"
      def main(args: Array[String]): Unit = {
        val producer = new KafkaProducer[String, String](Common.getProp)
        while (true) {
          val map = Map("user"->"venn", "name"->"venn","pass"->System.currentTimeMillis())
          val jsonObject: JSONObject = new JSONObject(map)
    // key : msgKey + long val msg
    = new ProducerRecord[String, String](topic, "msgKey" + System.currentTimeMillis(), jsonObject.toString()) producer.send(msg) producer.flush() Thread.sleep(3000) } } }

    kafka 消费者:

    object KafkaKeyReceive{
      val topic = "kafka_key"
      def main(args: Array[String]): Unit = {
        val consumer = new KafkaConsumer[String, String](Common.getProp)
        consumer.subscribe(util.Arrays.asList(topic + "_out"))
        while (true) {
          val records = consumer.poll(500)
          val tmp = records.iterator()
          while (tmp.hasNext){
            val record = tmp.next()
            val key = record.key()
            val value = record.value()
            println("receive -> key : " + key + ", value : " + value)

    flink 代码,自定义source、sink

    import com.venn.common.Common
    import org.apache.flink.api.scala._
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.kafka.clients.consumer.KafkaConsumer
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import scala.collection.JavaConversions._
      * Created by venn on 19-4-26.
    object KafkaSourceKey {
      def main(args: Array[String]): Unit = {
        // environment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.addSource(new RichSourceFunction[String] {
          // kafka consumer 对象
          var consumer: KafkaConsumer[String, String] = null
          // 初始化方法
          override def open(parameters: Configuration): Unit = {
            consumer = new KafkaConsumer[String, String](Common.getProp)
            // 订阅topic
            val list = List("kafka_key")
          // 执行方法,拉取数据,获取到的数据,会放到source 的缓冲区
          override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
            while (true) {
              val records = consumer.poll(500)
              val tmp = records.iterator()
              while (tmp.hasNext) {
                val record = tmp.next()
                val key = record.key()
                val value = record.value()
                ctx.collect("key : " + key + ", value " + value)
          override def cancel(): Unit = {
        }).map(s => s + "map")
          .addSink(new RichSinkFunction[String] {
            // kafka producer 对象
            var producer: KafkaProducer[String, String] = null
            // 初始化
            override def open(parameters: Configuration): Unit = {
              producer = new KafkaProducer[String, String](Common.getProp)
            override def close(): Unit = {
              if (producer == null) {
            // 输出数据,每条结果都会执行一次,并发高的时候,可以按需做flush
            override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
              println("flink : " + value)
              val msg = new ProducerRecord[String, String]( "kafka_key_out", "key" + System.currentTimeMillis(), value)
        // execute job

    kafka 生产者数据:

    {"user" : "venn", "name" : "venn", "pass" : 1561355358148}
    {"user" : "venn", "name" : "venn", "pass" : 1561355361271}
    {"user" : "venn", "name" : "venn", "pass" : 1561355364276}
    {"user" : "venn", "name" : "venn", "pass" : 1561355367279}
    {"user" : "venn", "name" : "venn", "pass" : 1561355370283}

    flink 输出数据:

    flink : key : msgKey1561355358180, value {"user" : "venn", "name" : "venn", "pass" : 1561355358148}map
    flink : key : msgKey1561355361271, value {"user" : "venn", "name" : "venn", "pass" : 1561355361271}map
    flink : key : msgKey1561355364276, value {"user" : "venn", "name" : "venn", "pass" : 1561355364276}map
    flink : key : msgKey1561355367279, value {"user" : "venn", "name" : "venn", "pass" : 1561355367279}map
    flink : key : msgKey1561355370283, value {"user" : "venn", "name" : "venn", "pass" : 1561355370283}map
    flink : key : msgKey1561355373289, value {"user" : "venn", "name" : "venn", "pass" : 1561355373289}map
    flink : key : msgKey1561355376293, value {"user" : "venn", "name" : "venn", "pass" : 1561355376293}map

    kafka 消费者:

    receive -> key : key1561355430411, value : key : msgKey1561355430356, value {"user" : "venn", "name" : "venn", "pass" : 1561355430356}map
    receive -> key : key1561355433427, value : key : msgKey1561355433359, value {"user" : "venn", "name" : "venn", "pass" : 1561355433359}map
    receive -> key : key1561355436441, value : key : msgKey1561355436364, value {"user" : "venn", "name" : "venn", "pass" : 1561355436364}map
    receive -> key : key1561355439456, value : key : msgKey1561355439367, value {"user" : "venn", "name" : "venn", "pass" : 1561355439367}map
    receive -> key : key1561355442473, value : key : msgKey1561355442370, value {"user" : "venn", "name" : "venn", "pass" : 1561355442370}map
    receive -> key : key1561355445391, value : key : msgKey1561355445374, value {"user" : "venn", "name" : "venn", "pass" : 1561355445374}map


      1、source 的精确一次可以使用kafka 的低级api,每次从指定的offset 读取数据,提交新的offset,然后将当前的offset 存到状态中,这样即使程序失败,重启到上一个checkpoint状态,数据也不会重复。

      2、sink 的处理比较麻烦,以官网介绍的 “两段提交”的方法,提交生产者的数据。简单来说,就是每次数据处理完后,需要提交数据到kafka,不做真正的提交,仅写入一些已定义的状态变量,当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉。

    参考zhisheng 大佬的 博客 : 《从0到1学习Flink》—— 如何自定义 Data Source ?

    《从0到1学习Flink》—— 如何自定义 Data Sink ?

    两段提交的一篇翻译: 【译】Flink + Kafka 0.11端到端精确一次处理语义的实现

