zoukankan      html  css  js  c++  java
  • 使用Flink时从Kafka中读取Array[Byte]类型的Schema

    使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema:

    val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new SimpleStringSchema(), properties);

    如果存入Kafka中的数据不是JSON,而是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类,很简单,只有一行代码

     class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
      @throws[IOException]
      override def deserialize(message: Array[Byte]): Array[Byte] = message
    }

    然后使用时,如下所示:

    val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new ByteArrayDeserializationSchema[Array[Byte]](), properties);
  • 相关阅读:
    0603 学术诚信与道德
    0601 新的冲刺
    0525 Scrum 项目7.0
    0523 Scrum 项目6.0
    0518 Scrum项目5.0
    0512 Scrum 4.0
    0512 操作系统之进程调度
    0511 backlog
    0506 Scrum 项目1.0
    复利计算再升级
  • 原文地址:https://www.cnblogs.com/liugh/p/7448554.html
Copyright © 2011-2022 走看看