zoukankan      html  css  js  c++  java
  • Kafka 消息的序列化与反序列化(一)

    在使用Kafka发送接收消息时,producer端需要序列化,consumer端需要反序列化,在大多数场景中,需要传输的是与业务规则相关的复杂类型,这就需要自定义数据结构。Avro是一种序列化框架,使用JSON来定义schema,shcema由原始类型(null,boolean,int,long,float,double,bytes,string)和复杂类型(record,enum,array,map,union,fixed)组成,schema文件以.avsc结尾,表示avro schema,有2种序列化方式

    • 二进制方式:也就是Specific方式,定义好schema asvc文件后,使用编译器(avro-tools.jar)编译生成相关语言(java)的业务类,类中会嵌入JSON schema
    • JSON方式:也就是Generic方式,在代码中动态加载schema asvc文件,将FieldName - FieldValue,以Map<K,V>的方式存储

     序列化后的数据,是schema和data同时存在的,如下图

    1:自定义序列化类:

    先定义序列化类,使用Specific序列化方式,下面方法中使用了SpecificDatumWriter类

    public class AvroWithSchemaSpecificSer<T, E> implements Serializer<T> {
       public byte[] serialize(String topic, T data) {
            SpecificData specificData = new SpecificData(); //用于日期和时间格式的转换
            specificData.addLogicalTypeConversion(new DateConversion());
            specificData.addLogicalTypeConversion(new TimeConversion());
            specificData.addLogicalTypeConversion(new TimestampConversion());
    
            DatumWriter<T> datumWriter = new SpecificDatumWriter(this.schema, specificData);
            byte[] bytes = new byte[0];
            if (data != null) {
    
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
                dataFileWriter.setCodec(CodecFactory.fromString(this.codecName)).create(this.schema, baos);
                dataFileWriter.append(data);
                dataFileWriter.flush();
                baos.flush();
                bytes = baos.toByteArray();                        
                dataFileWriter.close();                    
                baos.close();
            }
            return bytes;
       }
    }

    2:创建producer对象:

    KafkaProducer<K,V>是用于创建producer实例的线程安全的客户端类,多个线程可以共用一个producer实例,而且通常情况下共用一个实例比每个线程创建一个producer实例性能要好。在创建produce实例时,Properities必须配置的3个参数为:bootstrap.servers,key.serializer,value.serializer,关于KafkaProducer类,可参看官方文档KafkaProducer

    创建producer对象,并加入到虚拟机关闭钩子中,用于在虚拟机关闭是清理producer

    Serializer<K> keyDeserClass = (Serializer) Class.forName(props.getProperty("key.serializer")).newInstance();
    
    Class<?> cl = Class.forName(props.getProperty("value.serializer"));
    Constructor<?> cons = cl.getConstructor(Map.class);
    Serializer<V> valueSerClass = (Serializer)cons.newInstance(consumerConfig.get("pojo_class_name"), null);
    
    Producer<K,V> producer = new KafkaProducer<>(props, keySerClass, valueSerClass);
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            producer.close();
        }
    });

    函数原型如下:

    public class KafkaProducer<K, V> implements Producer<K, V> {
        public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)
    }

    第一个参数是一个Properties类型,配置如下

    {
    	security.protocol=SASL_PLAINTEXT,
    	schema.registry.url=http://yourregistryurl.youcompany.com:8080,
    	bootstrap.servers=yourbootstrap1.youcompany.com:7788, yourbootstrap2.youcompany.com:7788,
    	key.serializer=org.apache.kafka.common.serialization.LongSerializer,	
    	value.serializer=com.youcompany.serialization.AvroSchemaSpecificSer,
    pojo_class_name=UserSecurity client.id=18324@xxx, acks=all }

    第二个参数是key的序列化实例,从第一个参数的key.serializer获取类名字 "org.apache.kafka.common.serialization.LongSerializer",并反射创建类实例

    第三个参数是value的序列化实例,从第一个参数的value.serializer获取类名字 "com.youcompany.serialization.AvroSchemaSpecificSer",应用反射创建类实例,这是一个自定义的序列化类,用于序列化pojo_class_name所指向的avro类(由avro schema定义并经过编译后的业务类)UserSecurityRequest

    Class<?> cl = Class.forName(props.getProperty("value.serializer"));
    Constructor<?> cons = cl.getConstructor(Map.class);
    Serializer<V> valueSerClass = (Serializer)cons.newInstance(produceConfig.get("pojo_class_name"), null);
    注:
    在生成producer实例过程中,将调用方法:ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); 
    因为上面第一个参数里配置的authentication为SASL_PLAINTEXT,所以此方法将创建一个SaslChannelBuilder 通道构造器,
    
    这个过程将使用kerberos方式登陆认证,可参见另外一篇博客

    3:producer发送消息:

    ProducerRecord<K, V> producerRecord = new ProducerRecord<>("mytopic",data);
    producer.send(producerRecord, new ProducerCallBack(requestId));

    先使用avro业务对象(UserSecurityRequest)data创建producerRecord,然后调用producer发送消息,有同步和异步2种发送方式,此处使用的是异步方式,消息将被存储在待发送的IO缓存后即刻返回,这样可以并行无阻塞的发送更多消息,提高producer性能,并在回调函数中获取消息的offset。在send的过程中,如果有拦截器,则先调用拦截器,再继续发送消息,send的源代码如下:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
            return this.doSend(interceptedRecord, callback);
    }

    在do.Send函数中,可以看到调用了value的序列化

    byte[] serializedValue;
    try {
          serializedValue = this.valueSerializer.serialize(record.topic(), record.value());
    } catch (ClassCastException var16) {
          throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer");
    }

    而对于value的序列化,正是使用了自定义的对avro schema序列化的类

     4:架构图:

     参考:

    Apache Avro™ 1.8.1 Specification

  • 相关阅读:
    Spyder | 关于报错No module named 'PyQt5.QtWebKitWidgets'
    Java基础(11) | 接口
    Java基础(10) | 抽象
    Java基础(9) | 继承
    Java基础(7) | String
    Java基础(6) | ArrayList
    CodeBlocks17.12配置GNU GCC + 汉化
    图片懒加载
    Mac安装Mysql 超详细(转载)
    剑指 Offer 03. 数组中重复的数字
  • 原文地址:https://www.cnblogs.com/benfly/p/9188453.html
Copyright © 2011-2022 走看看