zoukankan      html  css  js  c++  java
  • Flink中使用Avro格式的自定义序列化反序列化传输

    生产者配置:

    FlinkKafkaProducer09<DoubtEventPreformatDataAvro> convertOutTopicProducer = new FlinkKafkaProducer09<>(
            outputTopic,
            ConfluentRegistryAvroSerializationSchema.<DoubtEventPreformatDataAvro>ofValue(outputTopic, jobConfig.getKafkaMasterConfig()),
            jobConfig.getKafkaMasterConfig(), 
         (FlinkKafkaPartitioner)
    null);

    ConfluentRegistryAvroSerializationSchema 实现自定义序列化方法:
    public class ConfluentRegistryAvroSerializationSchema <V extends SpecificRecord> implements SerializationSchema<V> {
    
        private transient KafkaAvroSerializer kafkaAvroSerializer;
    
    
        private String topic;
    
        private Map<String, Object> config;
    
        private boolean isKey;
    
        private ConfluentRegistryAvroSerializationSchema(String topic, boolean isKey, Map<String, Object> config) {
            this.topic = topic;
            this.isKey = isKey;
            this.config = config;
            initKafkaSerializer();
        }
    
        public static ConfluentRegistryAvroSerializationSchema ofValue(String topic, Properties config) {
            return new ConfluentRegistryAvroSerializationSchema(topic, false, config);
        }
        public static ConfluentRegistryAvroSerializationSchema ofKey(String topic, Properties config) {
            return new ConfluentRegistryAvroSerializationSchema(topic, true, config);
        }
        private void initKafkaSerializer(){
            kafkaAvroSerializer = new KafkaAvroSerializer();
            kafkaAvroSerializer.configure(config, isKey);
        }
    
        @Override
        public byte[] serialize(V element) {
            if(kafkaAvroSerializer == null){
                initKafkaSerializer();
            }
            return kafkaAvroSerializer.serialize(topic, element);
        }
    }

    生产者的数据源:

    private DoubtEventPreformatDataAvro convert(JSONObject jsonValue){

    DoubtEventPreformatDataAvro.Builder builder = DoubtEventPreformatDataAvro .newBuilder() .setVersionNumber(jsonValue.getString("xxx")) .setPlatform(jsonValue.getIntValue("zzz"))
    return builder.build();
    }

    avro格式的反序列化:

            FlinkKafkaConsumer09<RetryKeyPreformatAvroValue> inputPreformatTopicConsumer = new FlinkKafkaConsumer09<>(
                    jobConfig.getKafkaInputTopicName(), new RetryKeyPreformatAvroValueDeserializationSchema(schemaUrl), kafkaMasterConfig);
            JobUtils.setStartupMode(jobConfig.getStartModeOfInputTopic(), inputPreformatTopicConsumer);
            inputPreformatTopicConsumer.setCommitOffsetsOnCheckpoints(true);

    自定义实现反序列化的函数:

    public class RetryKeyPreformatAvroValueDeserializationSchema
            extends AbstractAvroKeyValueDeserializationSchema<KafkaRetryKeyMeta, DoubtEventPreformatDataAvro, RetryKeyPreformatAvroValue>{
    
    
        public RetryKeyPreformatAvroValueDeserializationSchema(String schemaRegisterUrl) {
            super(KafkaRetryKeyMeta.class, DoubtEventPreformatDataAvro.class, RetryKeyPreformatAvroValue.class, schemaRegisterUrl);
        }
    
        @Override
        protected RetryKeyPreformatAvroValue newInstance() {
            return new RetryKeyPreformatAvroValue();
        }
    
    }
    public abstract class AbstractAvroKeyValueDeserializationSchema<K extends SpecificRecord, V extends SpecificRecord, R extends KeyValueBase<K, V>>  extends AbstractKeyValueDeserializationSchema<K, V, R> {
    
        private static final long serialVersionUID = 1509391548173891955L;
    
    
        public AbstractAvroKeyValueDeserializationSchema() {
    
        }
        public AbstractAvroKeyValueDeserializationSchema(Class<K> kClass, Class<V> vClass, Class<R> kvClass, String schemaRegisterUrl) {
            this.kClass = kClass;
            this.vClass = vClass;
            this.kvClass = kvClass;
            this.schemaRegisterUrl = schemaRegisterUrl;
        }
    
        @Override
        DeserializationSchema<K> newKeyDeserializer() {
            return ConfluentRegistryAvroDeserializationSchema.forSpecific(kClass, schemaRegisterUrl);
        }
    
        @Override
        DeserializationSchema<V> newValueDeserializer() {
            return ConfluentRegistryAvroDeserializationSchema.forSpecific(vClass, schemaRegisterUrl);
        }
    }
    public abstract class AbstractKeyValueDeserializationSchema<K, V, R extends KeyValueBase<K, V>>  implements KafkaDeserializationSchema<R> {
    
        private static final long serialVersionUID = 1509391548173891955L;
    
        private DeserializationSchema<K> keyDeserializer;
        private DeserializationSchema<V> valueDeserializer ;
    
        protected Class<K> kClass;
        protected Class<V> vClass;
        protected Class<R> kvClass;
    
        protected String schemaRegisterUrl;
        public AbstractKeyValueDeserializationSchema() {
    
        }
        public AbstractKeyValueDeserializationSchema(Class<K> kClass, Class<V> vClass, Class<R> kvClass, String schemaRegisterUrl) {
            this.kClass = kClass;
            this.vClass = vClass;
            this.kvClass = kvClass;
            this.schemaRegisterUrl = schemaRegisterUrl;
            initDeserializer();
        }
    
    
        private void initDeserializer(){
            keyDeserializer = newKeyDeserializer();
            valueDeserializer = newValueDeserializer();
        }
    
        abstract DeserializationSchema<K> newKeyDeserializer();
        abstract DeserializationSchema<V> newValueDeserializer();
    
        @Override
        public R deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    
            if(keyDeserializer == null || valueDeserializer == null){
                initDeserializer();
            }
    
            R keyValue = newInstance();
    
            if(record.key() != null){
                try {
                    keyValue.key = keyDeserializer.deserialize(record.key());
                } catch (Exception e) {
                }
            }
    
            if (record.value() != null) {
                try{
                    keyValue.value = valueDeserializer.deserialize(record.value());
                } catch (Exception e) {
    
                }
            }
            return keyValue;
        }
    
        protected abstract R newInstance();
    
    
        @Override
        public boolean isEndOfStream(R nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<R> getProducedType() {
            return getForClass(kvClass);
        }
    }
    
    
  • 相关阅读:
    学习:组件生命周期(2)
    学习:组件生命周期(3)
    学习:深入分析布局文件(HelloWorld)
    wap webapp app区别
    TCP的数据传输
    SET ANSI_NULLS ON SET QUOTED_IDENTIFIER ON 详解
    未能加载文件或程序集“SqlServerDal”或它的某一个依赖项。系统找不到指定的文件。
    人生的十个不要等
    asp.net网站三层架构详解和反射知识
    工厂模式概况
  • 原文地址:https://www.cnblogs.com/gxyandwmm/p/12218166.html
Copyright © 2011-2022 走看看