zoukankan      html  css  js  c++  java
  • Kafka中使用Avro编码、解码消息

    1.消费者代码

    import com.twitter.bijection.Injection;
    import com.twitter.bijection.avro.GenericAvroCodecs;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * Created by p on 2018/10/8.
     */
    public class AvroKafkaProducer {
        public static final String USER_SCHEMA = "{
    " +
                "    "type":"record",
    " +
                "    "name":"Customer",
    " +
                "    "fields":[
    " +
                "        {"name":"id","type":"int"},
    " +
                "        {"name":"name","type":"string"},
    " +
                "        {"name":"email","type":["null","string"],"default":"null"}
    " +
                "    ]
    " +
                "}";
    
    
        public static void main(String[] args){
    
            Properties kafkaProps = new Properties();
            kafkaProps.put("bootstrap.servers","ip:9092");
            kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
            kafkaProps.put("partitioner.class","MyPartitioner");
    
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(USER_SCHEMA);
    
            Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema);
            KafkaProducer producer = new KafkaProducer<String,byte[]>(kafkaProps);
            for(int i = 0;i < 1000;i++){
                GenericData.Record record = new GenericData.Record(schema);
                record.put("id",i);
                record.put("name","name-"+i);
                record.put("email","email-"+i);
                byte[] bytes = injection.apply(record);
                ProducerRecord<String,byte[]> record1 = new ProducerRecord<String, byte[]>("Customer","customer-"+i,bytes);
                producer.send(record1);
            }
            producer.close();
            System.out.println(USER_SCHEMA);
        }
    }

    2. 消费者代码

    import com.twitter.bijection.Injection;
    import com.twitter.bijection.avro.GenericAvroCodecs;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * Created by p on 2018/10/14.
     */
    public class AvroKafkaConsumer {
    
        public static final String USER_SCHEMA = "{
    " +
                "    "type":"record",
    " +
                "    "name":"Customer",
    " +
                "    "fields":[
    " +
                "        {"name":"id","type":"int"},
    " +
                "        {"name":"name","type":"string"},
    " +
                "        {"name":"email","type":["null","string"],"default":"null"}
    " +
                "    ]
    " +
                "}";
    
        public static void main(String[] args){
            Properties kafkaProps = new Properties();
            kafkaProps.put("bootstrap.servers","ip:9092");
    
            kafkaProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            kafkaProps.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
    
            kafkaProps.put("group.id","DemoAvroKafkaConsumer");
    
            kafkaProps.put("auto.offset.reset","earliest");
    
            KafkaConsumer<String ,byte[]> consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
    
            consumer.subscribe(Collections.singletonList("Customer"));
    
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(USER_SCHEMA);
    
            Injection<GenericRecord,byte[]> injection = GenericAvroCodecs.toBinary(schema);
    
            try {
                while (true){
                    ConsumerRecords<String,byte[]> records = consumer.poll(10);
                    for(ConsumerRecord<String,byte[]> record : records){
                        GenericRecord record1 = injection.invert(record.value()).get();
                        System.out.println(record.key() + ":" + record1.get("id") + "	" + record1.get("name") + "	" + record1.get("email"));
                    }
                }
            } finally {
                consumer.close();
            }
        }
    }

    3. pom依赖

    <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.7.6-cdh5.9.1</version>
            </dependency>
            <dependency>
                <groupId>com.twitter</groupId>
                <artifactId>bijection-avro_2.11</artifactId>
                <version>0.9.6</version>
            </dependency>
  • 相关阅读:
    MSSQL '20210806'转换成'2021-08-06'
    cxgrid 列内容居中显示
    CXGRID 导出EXCEL
    study PostgreSQL【3-get数据库中all表以及表的字段信息】
    study PostgreSQL【2-FireDAC连接PostgreSQL】
    高格-销售发票勾稽销售出货的赠品处理【14】
    study PostgreSQL【1-PostgreSQL对象】
    高格-负库存导致系统异常的处理【13】
    study Rust-9【组织管理】
    基础资料属性不符合目标组织要求:物料.允许库存,物料.来料检验
  • 原文地址:https://www.cnblogs.com/darange/p/9787139.html
Copyright © 2011-2022 走看看