zoukankan      html  css  js  c++  java
  • Kafka API

    一、原生API

    (一)生产者

      1、生产者配置

    @Data
    public class DemoProducer {
        private KafkaProducer<Integer, String> producer;
    
        public DemoProducer(){
            Properties properties = new Properties();
            properties.put("bootstrap.servers","192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092");
            properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            this.producer = new KafkaProducer<Integer, String>(properties);
        }
    
        public Future<RecordMetadata> send(ProducerRecord<Integer, String> record) {
            return producer.send(record, (Callback)null);
        }
    
    }

      2、发送消息

        private final String topic = "cities";
    
        public void sendMsg() throws Exception {
    
            DemoProducer producer = new DemoProducer();
            int partition = 0;
            int key = 1;
            String cityName = "beijing";
            //指定主题及消息
            ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,cityName);
            //指定主题、key、消息
            //ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,key,cityName);
            //指定主题、partition、key、消息
            //ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,partition,key,cityName);
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata recordMetadata = future.get();
            log.info("=====================【{}】", recordMetadata.offset());
            log.info("=====================【{}】", recordMetadata.partition());
            log.info("=====================【{}】", recordMetadata.timestamp());
            log.info("=====================【{}】", recordMetadata.topic());
        }

      3、发送消息测试类

        @Test
        void sendTest()  throws Exception{
            NativeService nativeService = new NativeService();
            nativeService.sendMsg();
        }

    (二)批量发送消息

      1、配置生产者

    @Data
    public class DemoBatchProducer {
        private KafkaProducer<Integer, String> producer;
    
        public DemoBatchProducer(){
            Properties properties = new Properties();
            properties.put("bootstrap.servers","192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092");
            properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("batch.size",16384);
            properties.put("linger",50);
            this.producer = new KafkaProducer<Integer, String>(properties);
        }
    
        public Future<RecordMetadata> send(ProducerRecord<Integer, String> record) {
            return producer.send(record, (Callback)null);
        }
    
    }

      2、消息发送

        public void sendMsg1() throws Exception {
            DemoBatchProducer producer = new DemoBatchProducer();
            int partition = 0;
            int key = 1;
            String cityName = "beijing";
            for(int i=0; i<50; i++) {
                ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,cityName + i*1000);
                Future<RecordMetadata> future = producer.send(record);
                RecordMetadata recordMetadata = future.get();
                log.info("【{}】=====================【{}】", i+1, recordMetadata.offset());
                log.info("【{}】=====================【{}】", i+1, recordMetadata.partition());
                log.info("【{}】=====================【{}】", i+1, recordMetadata.timestamp());
                log.info("【{}】=====================【{}】", i+1, recordMetadata.topic());
            }
        }

      3、测试类

        @Test
        void batchSendTest()  throws Exception{
            NativeService nativeService = new NativeService();
            nativeService.sendMsg1();
        }

    (三)消费者

      1、配置消费者

    @Data
    public class DemoConsumer{
        private KafkaConsumer<Integer, String> consumer;
    
        public DemoConsumer(){
            Properties properties = new Properties();
            properties.put("bootstrap.servers","192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092");
            properties.put("group.id", "mygroup1");
            properties.put("enable.auto.commit",  "true");
            properties.put("max.poll.records", "500");
            properties.put("auto.commit.interval,ms","1000");
            properties.put("session.timeout.ms","30000");
            properties.put("heartbeat.interval.ms","10000");
            properties.put("auto.offset.reset","earliest");
            properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            this.consumer = new KafkaConsumer<Integer, String>(properties);
        }
    }

      2、具体的调用方法

        public void autoDoWork(){
            DemoConsumer consumer = new DemoConsumer();
            consumer.getConsumer().subscribe(Collections.singletonList(topic));
            ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000);
            for (ConsumerRecord<Integer, String> consumerRecord: records) {
                log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value());
            }
        }

      3、测试类

        @Test
        void consumerTest()  throws Exception{
            NativeService nativeService = new NativeService();
            nativeService.doWork();
        }

    (四)手动提交

      上面的消费是自动提交offset的方式对broker中的消息进行消费的,但是自动提交可能出现消息重复消费的问题,所以在生产情况下,一般都对消息进行手动提交。

      手动提交可以分为同步提交、异步提交和同异步联合提交三种情况。

      无论是哪种情况,手动提交都需要修改消费者配置,需要设置自动提交标识为false,同时设置手动提交最大值。

    @Data
    public class DemoConsumer{
        private KafkaConsumer<Integer, String> consumer;
        public DemoConsumer(){
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup1");
            // properties.put("enable.auto.commit",  "true");
            properties.put("max.poll.records", "500");
            properties.put("auto.commit.interval,ms","1000");
            properties.put("session.timeout.ms","30000");
            properties.put("heartbeat.interval.ms","10000");
            properties.put("auto.offset.reset","earliest");
            properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
            //设置是否手动提交及最大提交数
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);
    
            this.consumer = new KafkaConsumer<Integer, String>(properties);
        }
    }

      1、同步提交

        同步提交与自动提交就多了一次调用commitSync函数。

        同步提交是消费者向broker提交offset后等待broker成功响应。若没有收到broker的响应,则会重新提交,直到获取到响应。但是在整个过程中,消费者是处于阻塞状态,这样就严重的影响了消费者的吞吐量。

        public void syncDoWork(){
            DemoConsumer consumer = new DemoConsumer();
            consumer.getConsumer().subscribe(Collections.singletonList(topic));
            ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000);
            for (ConsumerRecord<Integer, String> consumerRecord: records) {
                log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value());
                consumer.getConsumer().commitSync();
            }
        }

      2、异步提交

        由于同步提交操作会影响消费者的吞吐量,因此就有了异步提交。异步提交就是提交后不再等待broker的响应,直接开始做后续处理,提高了消费者的吞吐量。

        异步提交与同步提交就是调用的提交方法改为commitAsync,该方法可以有回调函数,回调函数中可以对异常信息做判断和输出等操作。

        public void asyncDoWork(){
            DemoConsumer consumer = new DemoConsumer();
            consumer.getConsumer().subscribe(Collections.singletonList(topic));
            ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000);
            for (ConsumerRecord<Integer, String> consumerRecord: records) {
                log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value());
                consumer.getConsumer().commitAsync((offsets, e) -> {
                    if(e != null){
                        log.info("提交失败,offsets=【{}】,失败原因【{}】", offsets, e);
                    }
                });
            }
        }

      3、同异步手动提交

        上面提到得异步提交可能会造成重复消费,因此可以使用同异步手动提交的方式进行提交。

        在同异步手动提交的情况下,如果出现提交失败,后续提交会将这次提交失败的offset提交,因此不会影响消费者的消费。

        同异步手动提交与异步提交代码的唯一区别就是需要在回调函数中判断如果提交失败,则需要同步方式进行再次提交。

        public void syncasyncDoWork(){
            DemoConsumer consumer = new DemoConsumer();
            consumer.getConsumer().subscribe(Collections.singletonList(topic));
            ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000);
            for (ConsumerRecord<Integer, String> consumerRecord: records) {
                log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value());
                consumer.getConsumer().commitAsync((offsets, e) -> {
                    if(e != null){
                        log.info("提交失败,offsets=【{}】,失败原因【{}】", offsets, e);
                        consumer.getConsumer().commitSync();
                    }
                });
            }
        }

    二、Spring Boot Kafka

      使用SpringBoot与原生的API主要调整三点,分别是:producer和consumer等配置项直接放入配置文件、发送消息使用KafkaTemplate、消费者使用KafkaListener注解即可。

      1、配置项

    kafka:
      topic: cities
    spring:
      kafka:
        bootstrap-servers: 192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092
        producer:
          key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          batch-size: 16384
    
        consumer:
          group-id: mygroup1
          enable-auto-commit: false
          key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 10

      2、发送消息

    @RestController
    @RequestMapping("/boot")
    @Slf4j
    public class BootProducerApi {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @Value("${kafka.topic}")
        private String topic;
    
        @GetMapping("/send")
        public void sendMsg() throws Exception {
            String cityName = "LY";
            for(int i=0; i<50; i++) {
                ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,cityName + i*1000);
                ListenableFuture<SendResult> future = kafkaTemplate.send(record);
                RecordMetadata recordMetadata = future.get().getRecordMetadata();
                log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.offset());
                log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.partition());
                log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.timestamp());
                log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.topic());
            }
        }
    }

      3、消费者

    @Component
    @Slf4j
    public class BootConsumer {
        @KafkaListener(topics = "${kafka.topic}")
        public void onMsg(String msg){
            log.info("consumer============msg=【{}】",msg);
        }
    }
    ------------------------------------------------------------------
    -----------------------------------------------------------
    ---------------------------------------------
    朦胧的夜 留笔~~
  • 相关阅读:
    数据库中两张表之间的数据同步实现思路(增加、删除、更新)Mysql、sqlserver
    多台服务器之间如何让sqlserver,mysql数据库进行数据同步?
    mysql数据库同步时数据一致性的配置优化
    mysql 、sqlserver数据库,实时同步,增量同步(脚本模式)
    SyncNavigator 注册机 使用教程
    SyncNavigator数据库同步软件8.4.1 中文版
    HKROnline SyncNavigator破解版企业版 8.4.1 注册机使用教程
    SyncNavigator 破解版8.4.1 企业版 授权注册流程
    浅谈数据库高可用性(HA)技术
    软件工程第七周总结
  • 原文地址:https://www.cnblogs.com/liconglong/p/14571634.html
Copyright © 2011-2022 走看看