zoukankan      html  css  js  c++  java
  • Kafka序列化和反序列化与示例

    1.  卡夫卡序列化和反序列化

    今天,在这篇Kafka SerDe文章中,我们将学习使用Kafka创建自定义序列化器和反序列化器的概念。此外,我们将了解序列化在Kafka中的工作原理以及为什么需要序列化。与此同时,我们将看到 Kafka序列化器示例和Kafka解串器示例。此外,这个Kafka序列化和反序列化教程为我们提供了 Kafka字符串序列化器和 Kafka  对象序列化器的知识。 

    基本上,Apache Kafka提供了我们可以轻松发布以及订阅记录流的功能。因此,我们可以灵活地创建自己的自定义序列化器以及解串器,这有助于使用它传输不同的数据类型。 那么,让我们开始Kafka序列化和反序列化 

    序列化和反序列化

    Kafka-序列化和反序列化(Kafka SerDe)

    2. Apache Kafka SerDe

    但是,为了传输而将对象转换为字节流的过程就是我们所说的序列化。虽然,Apache Kafka存储以及在队列中传输这些字节数组。
    阅读Apache Kafka用例| Kafka应用程序
    然而,序列化的反面是反序列化。在这里,我们将数组的字节转换为我们想要的数据类型。但是,请确保Kafka仅为少数数据类型提供序列化程序和反序列化程序,例如

    • 整数
    • 字节

    3.为什么在Kafka中使用Custom Serializer和Deserializer?

    基本上,为了准备从生产者传递到代理的消息,我们使用序列化器。换句话说,在将整个消息传输到代理之前,让生产者知道如何将消息转换为字节数组,我们使用序列化器。类似地,要将字节数组转换回对象,我们使用消费者的反序列化器。

    4. Kafka SerDe的实施

    实现org.apache.kafka.common.serialization.Serializer接口以创建序列化程序类非常重要。Ans,对于反序列化器类,重要的是实现org.apache.kafka.common.serialization.Deserializer接口。
    让我们来讨论Apache Kafka架构及其基本概念
    Kafka序列化和反序列化接口有3种方法:

    Kafka序列化和反序列化的实现方法

    Kafka序列化和反序列化的实现方法

    一个。配置

    在配置启动时,我们调用Configure方法。

    湾 序列化/反序列化

    出于Kafka序列化和反序列化的目的,我们使用此方法。

    C。

    在关闭Kafka会话时,我们使用Close方法。
    阅读如何创建Kafka客户端

    5.与Kafka的串行器接口

    1. public interface Serializer extends Closeable {
       void configure(Map<String, ?> var1, boolean var2);
       byte[] serialize(String var1, T var2);
       void close();
      }

    6.与Kafka的解串器接口

    1. public interface Deserializer extends Closeable {
       void configure(Map<String, ?> var1, boolean var2);
       T deserialize(String var1, byte[] var2);
       void close();
      }

    7. Serializer和Deserializer的示例

    这里的依赖关系是:
    让我们来探讨卡夫卡的优缺点

    • 卡夫卡(0.10.1.1)。
    • FasterXML Jackson(2.8.6)。
    1. user.java:
      public class User {
       private String firstname;
       private int age;
       public User() {
       }
       public User(String firstname, int age) {
         this.firstname = firstname;
         this.age = age;
       }
       public String getfirstName() {
         return this.firstname;
       }
       public int getAge() {
         return this.age;
       }
       @Override public String toString() {
         return "User(" + firstname + ", " + age + ")";
       }
      }
    1. userserializer.java:
      public class UserSerializer implements Serializer {
       @Override public void configure(Map<String, ?> map, boolean b) {
       }
       @Override public byte[] serialize(String arg0, User arg1) {
         byte[] retVal = null;
         ObjectMapper objectMapper = new ObjectMapper();
         try {
           retVal = objectMapper.writeValueAsString(arg1).getBytes();
         } catch (Exception e) {
           e.printStackTrace();
         }
         return retVal;
       }
       @Override public void close() {
       }
      }
    1. Userdeserializer.java:
      public class UserDeserializer implements Deserializer {
       @Override public void close() {
       }
       @Override public void configure(Map<String, ?> arg0, boolean arg1) {
       }
       @Override
       public User deserialize(String arg0, byte[] arg1) {
         ObjectMapper mapper = new ObjectMapper();
         User user = null;
         try {
           user = mapper.readValue(arg1, User.class);
         } catch (Exception e) {
           e.printStackTrace();
         }
         return user;
       }
      }

    此外,为了使用上面的序列化程序,我们必须使用此属性进行注册:
    使用命令学习Apache Kafka Operations

    1. props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

    那么,制作人将是:

    1. try (Producer<String, User> producer = new KafkaProducer<>(props)) {
        producer.send(new ProducerRecord<String, User>("MyTopic", user));
        System.out.println("Message " + user.toString() + " sent !!");
      } catch (Exception e) {
        e.printStackTrace();
      }

    现在,我们再次需要为反序列化器注册此属性:

    1. props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

    因此,消费者将:

    1. try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
         consumer.subscribe(Collections.singletonList(topic));
         while (true) {
             ConsumerRecords<String, User> messages = consumer.poll(100);
             for (ConsumerRecord<String, User> message : messages) {
               System.out.println("Message received " + message.value().toString());
             }
         }
      } catch (Exception e) {
         e.printStackTrace();
      }

    所以,这就是Kafka序列化和反序列化。希望您喜欢并理解我们对Kafka的自定义序列化器和反序列化器的解释。
    让我们修改Apache Kafka Workflow | Kafka Pub-Sub Messaging

    8.结论

    因此,在这个Kafka序列化和反序列化教程中,我们学会了创建一个自定义的Kafka SerDe示例。此外,我们看到了对Kafka的串行器和解串器的需求。与此同时,我们学习了Kafka序列化和反序列化的实现方法

  • 相关阅读:
    Python_时间,日期,时间戳之间转换
    VirtualBox虚拟机网络设置
    Java_IO流
    获取ElasticSearch索引列表
    关闭ElasticSearch动态创建mapping
    关于elasticsearch输出默认限制最多一万条记录的问题
    linux下ElasticSearch安装及集群搭建
    linux下NFS远程目录挂载
    linux centos7 防火墙及端口开放相关命令
    linux命令
  • 原文地址:https://www.cnblogs.com/a00ium/p/10853005.html
Copyright © 2011-2022 走看看