zoukankan      html  css  js  c++  java
  • kafka自定义序列化器

    《kafka权威指南》

    Customer.java
    public class Customer {
        private int customId;
        private String customerName;
    
        public Customer(int customId, String customerName) {
            this.customId = customId;
            this.customerName = customerName;
        }
    
        public int getCustomId() {
            return customId;
        }
    
        public void setCustomId(int customId) {
            this.customId = customId;
        }
    
        public String getCustomerName() {
            return customerName;
        }
    
        public void setCustomerName(String customerName) {
            this.customerName = customerName;
        }
    }
    CustomerSerializer.java
    import org.apache.commons.lang.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    /**
     * 自定义序列化器
     * */
    public class CustomerSerializer implements Serializer<Customer> {
    
    
        public void configure(Map<String, ?> configs, boolean isKey) {
            //不做任何配置
        }
    
        /**
         * Customer对象被序列化成:
         * 1. 表示customerId的4字节整数
         * 2. 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
         * 3. 表示customerName的N个字节
         *
         * */
        public byte[] serialize(String topic, Customer data) {
            try{
                byte[] serializedName;
                int stringSize;
                if(data == null)
                    return null;
                else{
                    if(data.getCustomerName() != null){
                        serializedName = data.getCustomerName().getBytes("UTF-8");
                        stringSize = serializedName.length;
    
                    } //if
                    else{
                        serializedName = new byte[0];
                        stringSize = 0;
                    }
                    ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
                    buffer.putInt(data.getCustomId());
                    buffer.putInt(stringSize);
                    buffer.put(serializedName);
                    return buffer.array();
                } //else
            } catch (Exception e){
                e.printStackTrace();
                throw new SerializationException("Error when serizlizing custromer to byte[]" +
                e);
            }
        }
    
        public void close() {
    
        }
    }
  • 相关阅读:
    fatal error C1083: 无法打开包括文件:“iostream.h”: No such file or directory
    Dan Saks
    '=' : left operand must be lvalue 左值和右值
    sizeof使用
    stream.js :一个新的JavaScript数据结构
    Kibo:键盘事件捕捉高手
    c中不能用引用的办法
    分布式版本控制工具:git与Mercurial
    非常好的BASH脚本编写教程
    Handler让主线程和子线程进行通信
  • 原文地址:https://www.cnblogs.com/luckygxf/p/9065579.html
Copyright © 2011-2022 走看看