zoukankan      html  css  js  c++  java
  • Kafka-序列化器与反序列化器的使用(自定义消息类型)

    Kafka-序列化器与反序列化器的使用(自定义消息类型)

    代码如下

    Customer

    /**
     * @Author FengZhen
     * @Date 2020-03-30 22:49
     * @Description 自定义序列化器的实体类
     */
    public class Customer {
        private int customerID;
        private String customerName;
    
        public Customer(int customerID, String customerName) {
            this.customerID = customerID;
            this.customerName = customerName;
        }
    
        public int getCustomerID() {
            return customerID;
        }
    
        public void setCustomerID(int customerID) {
            this.customerID = customerID;
        }
    
        public String getCustomerName() {
            return customerName;
        }
    
        public void setCustomerName(String customerName) {
            this.customerName = customerName;
        }
    }

    序列化器

    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    /**
     * @Author FengZhen
     * @Date 2020-03-30 22:49
     * @Description 自定义序列化器:不建议使用,因为如果修改序列化器,就会出现新旧消息不兼容。
     * 建议使用已有的序列化器和反序列化器,如JSON、Avro、Thrift或Protobuf
     */
    public class CustomerSerializer implements Serializer<Customer> {
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            //不做任何配置
        }
    
        /**
         * Customer对象被序列化成:
         * 表示customerID的4字节整数
         * 表示customerName长度的4字节整数(如果customerName为空,则长度为0)
         * 表示customerName的N个字节
         * @param topic
         * @param data
         * @return
         */
        @Override
        public byte[] serialize(String topic, Customer data) {
            try {
                byte[] serializedName;
                int stringSize;
                if (null == data){
                    return null;
                }else{
                    if (data.getCustomerName() != ""){
                        serializedName = data.getCustomerName().getBytes("UTF-8");
                        stringSize = serializedName.length;
                    }else{
                        serializedName = new byte[0];
                        stringSize = 0;
                    }
                }
                ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
                buffer.putInt(data.getCustomerID());
                buffer.putInt(stringSize);
                buffer.put(serializedName);
                return buffer.array();
            } catch (Exception e){
                throw new SerializationException("Error when serializing Customer to byte[] " + e);
            }
        }
    
        @Override
        public void close() {
            //不需要关闭任何东西
        }
    }

    反序列化器

    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.header.Headers;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    /**
     * @Author FengZhen
     * @Date 2020-04-06 15:08
     * @Description 自定义反序列化器
     */
    public class CustomerDeserializer implements Deserializer<Customer> {
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        @Override
        public Customer deserialize(String topic, byte[] data) {
            int id;
            int nameSize;
            String name;
            try {
                if (null == data){
                    return null;
                }
                if (data.length < 8){
                    throw  new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
                }
                ByteBuffer buffer = ByteBuffer.wrap(data);
                id = buffer.getInt();
                nameSize = buffer.getInt();
                byte[] nameBytes = new byte[nameSize];
                buffer.get(nameBytes);
                name = new String(nameBytes, "UTF-8");
                return new Customer(id, name);
            } catch (Exception e){
                throw new SerializationException("Error when serializing Customer to byte[]" + e);
            }
        }
    
        @Override
        public void close() {
    
        }
    }

    生产者发送消息

    import com.chinaventure.kafka.serializer.Customer;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    
    /**
     * @Author FengZhen
     * @Date 2020-03-29 12:21
     * @Description kafka生产者使用
     */
    public class KafkaProducerTest {
    
        private static Properties kafkaProps = new Properties();
        static {
            kafkaProps.put("bootstrap.servers", "localhost:9092");
            kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        }
    
        public static void main(String[] args) {
            udfSerializer();
        }
    
        /**
         * 自定义序列化器
         */
        public static void udfSerializer(){
            kafkaProps.put("value.serializer", "com.chinaventure.kafka.serializer.CustomerSerializer");
            KafkaProducer<String, Customer> producer = new KafkaProducer(kafkaProps);
            for (int i = 0; i < 10; i++){
                ProducerRecord<String, Customer> record = new ProducerRecord<>("test_udf_serializer",i % 3 == 0 ? "Apple": "Banana"+i,new Customer(i, "我是" + i));
                producer.send(record, new DemonProducerCallback());
            }
            while (true){
                try {
                    Thread.sleep(10 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }

    消费者读取数据

    import com.chinaventure.kafka.serializer.Customer;
    import com.chinaventure.util.ExceptionUtil;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @Author FengZhen
     * @Date 2020-04-06 11:07
     * @Description kafka消费者
     */
    public class KafkaConsumerTest {
        private static Properties kafkaProps = new Properties();
        static {
            kafkaProps.put("bootstrap.servers", "localhost:9092");
            kafkaProps.put("group.id", "test");
            kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
    
        private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    
        private static KafkaConsumer<String, String> consumer;
    
        public static void main(String[] args) {
            udfDeserializer();
        }
    
        /**
         * 自定义反序列化器
         */
        public static void udfDeserializer(){
            kafkaProps.put("value.deserializer", "com.chinaventure.kafka.serializer.CustomerDeserializer");
            KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(kafkaProps);
            //订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,会立即触发一次再均衡,消费者就可以读取新添加的主题。
            //如:test.*,订阅test相关的所有主题
            consumer.subscribe(Collections.singleton("test_udf_serializer"));
            System.out.println("==== subscribe success ====");
            try {
                while (true){
                    //消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
                    //传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
                    //如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
                    //poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
                    ConsumerRecords<String, Customer> records = consumer.poll(Duration.ofMillis(100));
                    System.out.println("==== data get ====");
                    for (ConsumerRecord<String, Customer> record : records) {
                        System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                    }
                }
            } catch(Exception e){
                e.printStackTrace();
            } finally {
                //退出应用前使用close方法关闭消费者。
                //网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
                consumer.close();
            }
        }
    }

    生产者打印内容

    topic:test_udf_serializer
    partition:0
    offset:0
    metaData:test_udf_serializer-0@0
    topic:test_udf_serializer
    partition:0
    offset:1
    metaData:test_udf_serializer-0@1
    topic:test_udf_serializer
    partition:0
    offset:2
    metaData:test_udf_serializer-0@2
    topic:test_udf_serializer
    partition:0
    offset:3
    metaData:test_udf_serializer-0@3
    topic:test_udf_serializer
    partition:0
    offset:4
    metaData:test_udf_serializer-0@4
    topic:test_udf_serializer
    partition:0
    offset:5
    metaData:test_udf_serializer-0@5
    topic:test_udf_serializer
    partition:0
    offset:6
    metaData:test_udf_serializer-0@6
    topic:test_udf_serializer
    partition:0
    offset:7
    metaData:test_udf_serializer-0@7
    topic:test_udf_serializer
    partition:0
    offset:8
    metaData:test_udf_serializer-0@8
    topic:test_udf_serializer
    partition:0
    offset:9
    metaData:test_udf_serializer-0@9

    消费者打印内容

    topic=test_udf_serializer, partition=0, offset=0, key=Apple, value=com.chinaventure.kafka.serializer.Customer@63798ca7
    topic=test_udf_serializer, partition=0, offset=1, key=Banana1, value=com.chinaventure.kafka.serializer.Customer@4612b856
    topic=test_udf_serializer, partition=0, offset=2, key=Banana2, value=com.chinaventure.kafka.serializer.Customer@22875539
    topic=test_udf_serializer, partition=0, offset=3, key=Apple, value=com.chinaventure.kafka.serializer.Customer@5674e1f2
    topic=test_udf_serializer, partition=0, offset=4, key=Banana4, value=com.chinaventure.kafka.serializer.Customer@79c7532f
    topic=test_udf_serializer, partition=0, offset=5, key=Banana5, value=com.chinaventure.kafka.serializer.Customer@2a448449
    topic=test_udf_serializer, partition=0, offset=6, key=Apple, value=com.chinaventure.kafka.serializer.Customer@32f232a5
    topic=test_udf_serializer, partition=0, offset=7, key=Banana7, value=com.chinaventure.kafka.serializer.Customer@43f82e78
    topic=test_udf_serializer, partition=0, offset=8, key=Banana8, value=com.chinaventure.kafka.serializer.Customer@e54303
    topic=test_udf_serializer, partition=0, offset=9, key=Apple, value=com.chinaventure.kafka.serializer.Customer@e8df99a

    Done.

  • 相关阅读:
    iOS让软键盘消失的简单方法
    苹果使用蓝汛CDN网络分发ios8
    -pie can only be used when targeting iOS 4.2 or later
    java并发容器(Map、List、BlockingQueue)具体解释
    SQL SERVER 服务启动失败
    Android5.0新特性-Material Design
    java对象和json数据转换实现方式3-使用jackson实现
    XML Publiser For Excel Template
    Unity3D之Mecanim动画系统学习笔记(六):使用脚本控制动画
    Unity3D之Mecanim动画系统学习笔记(五):Animator Controller
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/12642303.html
Copyright © 2011-2022 走看看