zoukankan      html  css  js  c++  java
  • Kafka 消息的序列化与反序列化(二)

    自定义反序列化类:

    对于自定义的avro schema结构,需要有自定义的类在consumer时反序列化,反序列化类实例在consumer构造的时候通过参数传入

    public class AvroWithSchemaSpecificDeser<T,E> implements Deserializer<T> {
        private Class<T> typeClass;
        private transient Schema schema;
        private String codecName;
        
        /**
         * Simple constructor 
         * 
         * @param pojoClassName The pojo class name to be deserialized
         * @param codecName The codec used for compression, if null, no compression is applied
         */
        public AvroWithSchemaSpecificDeser(final String pojoClassName, final String codecName) {
            try {
                Class<T> payloadClassType = (Class<T>) Class.forName(pojoClassName);
    
                typeClass = payloadClassType;
                schema = (Schema) payloadClassType.getField("SCHEMA$").get(null);
    this.codecName = codecName != null ? codecName : "null";
               
            } catch (AvroRuntimeException ex) {
                throw new IllegalStateException(String.format("Not able to initialize avro object. Details: %s", ex.getMessage()), ex);
            }         
        }
           
        @Override
        public T deserialize(String topic, byte[] data) {
            T pojoObject= null;
            if(data != null && data.length > 0) {
                DatumReader<T> datumReader = null;
                DataFileReader<T> dataFileReader = null;
                try {
                    SpecificData specificData = new SpecificData(); //用于日期和时间格式的转换
                    specificData.addLogicalTypeConversion(new DateConversion());
                    specificData.addLogicalTypeConversion(new TimeConversion());
                    specificData.addLogicalTypeConversion(new TimestampConversion());
            
                    pojoObject = typeClass.newInstance();
                    datumReader = new SpecificDatumReader<>(null, schema, specificData);
                    dataFileReader = new DataFileReader<>(new SeekableByteArrayInput(data), datumReader);
                    while (dataFileReader.hasNext()) {
                        pojoObject = dataFileReader.next(pojoObject);
                    }
    
                } catch(Exception ex) {
                    SerializationException serex = new SerializationException(String.format("Error when deserializing byte[] to this class (%s)  from this topic (%s)",typeClass.toString(), topic), ex);
                } finally {
                    if(dataFileReader != null) {
                        dataFileReader.close();
                    }
                }
            }
            return pojoObject;
        }
    }
     
    创建consumer对象:

    首先在RunnableConsumer中需要创建kafka consumer实例,需要传入consumer的属性列表及反序列化对象,在下面创建反序列化实例时,只传入了pojo_class_name,codec使用了null,也就是没有使用任何压缩编码

    Deserializer<K> keyDeserClass = (Deserializer) Class.forName(props.getProperty("key.deserializer")).newInstance();
    
    Class<?> cl = Class.forName(props.getProperty("value.deserializer"));
    Constructor<?> cons = cl.getConstructor(Map.class);
    Deserializer<V> valueSerClass = (Deserializer)cons.newInstance(consumerConfig.get("pojo_class_name"), null);
    
    consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass);

    consumer的props属性从配置服务器中读取,其值为类似以下的k-v,其中关键的字段为bootstrap.servers,key.deserializer,value.deserializer,group.id和需要反序列化的pojo_class_name

    {
    	security.protocol=SASL_PLAINTEXT,
    	schema.registry.url=http://yourregistryurl.youcompany.com:8080,
    	bootstrap.servers=yourbootstrap1.youcompany.com:7788, yourbootstrap2.youcompany.com:7788,
    	key.deserializer=org.apache.kafka.common.serialization.LongDeserializer,	
    	value.deserializer=com.youcompany.serialization.AvroSchemaSpecificDeser,
    	client.id=20353@xxx,
    	group.id=yourgroupid,
    pojo_class_name=UserSecurity }

    第二个参数是key的反序列化对象,这是一个kafka的标准的反序列化类 LongDeserializer

    第三个参数是value的反序列化对象,反射创建时,需要读取pojo_class_name参数

    订阅和消费消息:

    在consumer对象创建好后,就可以从线程池中启动consumer了,订阅指定的topic,并poll消息,如果有拉取到消息,这将消息notify给监听者

                consumer.subscribe(topics);
                ConsumerGroup.this.isRunning = true;
    
                while (true) {
                    ConsumerRecords<K,V> records = null;
                    try {
    
                        processCommit(SyncMode.ASYNC);
    
                        records = consumer.poll(isPolling ? Long.MAX_VALUE : 0);
                        if(records != null && records.count() > 0) {
                            listener.notify(records);
                        }
                    } catch(WakeupException wex) {
                        LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex);
                    } 
                    isPolling = true;
                }

     

  • 相关阅读:
    猜数字游戏(补)
    团队项目五(项目回顾)
    项目回顾
    第二次阶段冲刺
    团队项目(任务三):第一次冲刺
    个人项目(一):新猜数字
    课后作业(一)
    团队任务二
    团队任务(一)
    课后作业(一)
  • 原文地址:https://www.cnblogs.com/benfly/p/9204777.html
Copyright © 2011-2022 走看看