zoukankan      html  css  js  c++  java
  • kafka2.5.0自定义数据序列化类

    kafka只接收bytes字节数组,所以自定义序列化器内部实现需按照bytes字节数组转换为标准。

    重点:本例子只是提供参考怎样写自定义序列化器,因为关系到性能,一般默认使用StringSerializer即可,效率很高。

    小知识:Kafka支持Avro序列化器,比较适用于生产者和消费者在版本升级差距拉大时使用,但同时要注意性能。参考文章《使用kafka中提供的Avro序列化框架实现序列化

    1) 自定义序列化类,转换成bytes字节数组:

    import cn.enjoyedu.vo.DemoUser;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class MySerializer implements Serializer<DemoUser> {
        public void configure(Map<String, ?> configs, boolean isKey) {
            //do nothing
        }
    
        public byte[] serialize(String topic, DemoUser data) {
            try {
                byte[] name;
                int nameSize;
                if(data==null){
                    return null;
                }
                if(data.getName()!=null){
                    name = data.getName().getBytes("UTF-8");
                    //字符串的长度
                    nameSize = data.getName().length();
                }else{
                    name = new byte[0];
                    nameSize = 0;
                }
                /*id的长度4个字节,字符串的长度描述4个字节,
                字符串本身的长度nameSize个字节*/
                ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
                buffer.putInt(data.getId());//4
                buffer.putInt(nameSize);//4
                buffer.put(name);//nameSize
                return buffer.array();
            } catch (Exception e) {
                throw new SerializationException("Error serialize DemoUser:"+e);
            }
        }
    
        public void close() {
            //do nothing
        }
    }

    2) 自定义反序列化类,从bytes字节数组转换成自定义对象:

    import cn.enjoyedu.vo.DemoUser;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class MyDeserializer implements Deserializer<DemoUser> {
    
    
        public void configure(Map<String, ?> configs, boolean isKey) {
            //do nothing
        }
    
        public DemoUser deserialize(String topic, byte[] data) {
            try {
                if(data==null){
                    return null;
                }
                if(data.length<8){
                    throw new SerializationException("Error data size.");
                }
                ByteBuffer buffer = ByteBuffer.wrap(data);
                int id;
                String name;
                int nameSize;
                id = buffer.getInt();
                nameSize = buffer.getInt();
                byte[] nameByte = new byte[nameSize];
                buffer.get(nameByte);
                name = new String(nameByte,"UTF-8");
                return new DemoUser(id,name);
            } catch (Exception e) {
                throw new SerializationException("Error Deserializer DemoUser."+e);
            }
    
        }
    
        public void close() {
            //do nothing
        }
    }

    3) 配置序列化类

    定义好自定义数据序列化类,需配置到kafka的配置里(参考《kafka2.5.0生产者与消费者配置详解》):

    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MySerializer.class

    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MySerializer.class

    end.

    
    
  • 相关阅读:
    查看Mysql版本
    Day03_SpringCloud2
    Day01_SpringBoot
    【Java面试题】如何判断一个字符串中某个字符出现的次数?
    你以为这样写Java代码很6,但我看不懂
    smart-socket实战:玩转心跳消息
    JVM 对象分配过程
    Spring Cloud基于Redis实现的分布式锁
    Python10行以内代码能有什么高端操作
    会话技术之Cookie详解
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13191550.html
Copyright © 2011-2022 走看看