zoukankan      html  css  js  c++  java
  • kafka自定义序列化器

    《kafka权威指南》

    Customer.java
    public class Customer {
        private int customId;
        private String customerName;
    
        public Customer(int customId, String customerName) {
            this.customId = customId;
            this.customerName = customerName;
        }
    
        public int getCustomId() {
            return customId;
        }
    
        public void setCustomId(int customId) {
            this.customId = customId;
        }
    
        public String getCustomerName() {
            return customerName;
        }
    
        public void setCustomerName(String customerName) {
            this.customerName = customerName;
        }
    }
    CustomerSerializer.java
    import org.apache.commons.lang.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    /**
     * 自定义序列化器
     * */
    public class CustomerSerializer implements Serializer<Customer> {
    
    
        public void configure(Map<String, ?> configs, boolean isKey) {
            //不做任何配置
        }
    
        /**
         * Customer对象被序列化成:
         * 1. 表示customerId的4字节整数
         * 2. 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
         * 3. 表示customerName的N个字节
         *
         * */
        public byte[] serialize(String topic, Customer data) {
            try{
                byte[] serializedName;
                int stringSize;
                if(data == null)
                    return null;
                else{
                    if(data.getCustomerName() != null){
                        serializedName = data.getCustomerName().getBytes("UTF-8");
                        stringSize = serializedName.length;
    
                    } //if
                    else{
                        serializedName = new byte[0];
                        stringSize = 0;
                    }
                    ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
                    buffer.putInt(data.getCustomId());
                    buffer.putInt(stringSize);
                    buffer.put(serializedName);
                    return buffer.array();
                } //else
            } catch (Exception e){
                e.printStackTrace();
                throw new SerializationException("Error when serizlizing custromer to byte[]" +
                e);
            }
        }
    
        public void close() {
    
        }
    }
  • 相关阅读:
    LTS版本的解析
    symfony中twig的模板过滤器
    symfony中twig的模板载入
    symfony中twig的流程控制if,for用法
    symfony中twig的模板变量与注释
    使用Symfony 2在三小时内开发一个寻人平台
    symfony在模板中生成url
    模板中引入其他的模板
    劳务派遣有新规章,劳务工有保障了|中山劳务派
    Symfony启动过程详细学习
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9065579.html
Copyright © 2011-2022 走看看