zoukankan      html  css  js  c++  java
  • kafka自定义序列化器

    《kafka权威指南》

    Customer.java
    public class Customer {
        private int customId;
        private String customerName;
    
        public Customer(int customId, String customerName) {
            this.customId = customId;
            this.customerName = customerName;
        }
    
        public int getCustomId() {
            return customId;
        }
    
        public void setCustomId(int customId) {
            this.customId = customId;
        }
    
        public String getCustomerName() {
            return customerName;
        }
    
        public void setCustomerName(String customerName) {
            this.customerName = customerName;
        }
    }
    CustomerSerializer.java
    import org.apache.commons.lang.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    /**
     * 自定义序列化器
     * */
    public class CustomerSerializer implements Serializer<Customer> {
    
    
        public void configure(Map<String, ?> configs, boolean isKey) {
            //不做任何配置
        }
    
        /**
         * Customer对象被序列化成:
         * 1. 表示customerId的4字节整数
         * 2. 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
         * 3. 表示customerName的N个字节
         *
         * */
        public byte[] serialize(String topic, Customer data) {
            try{
                byte[] serializedName;
                int stringSize;
                if(data == null)
                    return null;
                else{
                    if(data.getCustomerName() != null){
                        serializedName = data.getCustomerName().getBytes("UTF-8");
                        stringSize = serializedName.length;
    
                    } //if
                    else{
                        serializedName = new byte[0];
                        stringSize = 0;
                    }
                    ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
                    buffer.putInt(data.getCustomId());
                    buffer.putInt(stringSize);
                    buffer.put(serializedName);
                    return buffer.array();
                } //else
            } catch (Exception e){
                e.printStackTrace();
                throw new SerializationException("Error when serizlizing custromer to byte[]" +
                e);
            }
        }
    
        public void close() {
    
        }
    }
  • 相关阅读:
    053-1
    多项式ADT笔记(数据结构c版)
    052-188
    052-187
    052-186
    052-185
    052-184
    052-183
    052-182
    JS中的垃圾回收(GC)
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9065579.html
Copyright © 2011-2022 走看看