zoukankan      html  css  js  c++  java
  • kafka 高级应用 springboot2.1 (1)

    官网:http://kafka.apache.org/21/documentation.html

    依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>

    配置:KafkaConfig

    package com.sea.common.config;
    
    import java.util.Map;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    import org.springframework.kafka.listener.ContainerProperties;
    
    import com.google.common.collect.Maps;
    
    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
    
        @Value("${spring.kafka.consumer.enable-auto-commit}")
        private Boolean autoCommit;
    
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String autoOffsetReset;
    
        @Value("${spring.kafka.consumer.max-poll-records}")
        private Integer maxPollRecords;
    
        @Value("${spring.kafka.producer.retries}")
        private Integer retries;
    
        @Value("${spring.kafka.producer.batch-size}")
        private Integer batchSize;
    
        @Value("${spring.kafka.producer.buffer-memory}")
        private Integer bufferMemory;
    
    
    
        //############################# producer 的基本配置 ################################*/
        /**
         * producer 的基本配置
         * @return
         */
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = Maps.newHashMap();
            props.put(ProducerConfig.ACKS_CONFIG, "0");//推荐设置为1
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
        
        //############################# consumer 的基本配置 ################################*/
        /**
         * consumer基本属性配置
         * @return
         */
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = Maps.newHashMap();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);// #最早未被消费的offset earliest
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//#批量消费一次最大拉取的数据量
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 180000);//#连接超时时间
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 180000);//#手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    //        props.put(ConsumerConfig.DEFAULT_FETCH_MAX_BYTES+"",15728640); //#设置拉取数据的大小,15M
            return props;
        }
        
    
        /**
         * 并发数3
         */
    //    @Bean //配置默认kafkaFactory
    //    @ConditionalOnMissingBean(name = "kafkaBatchListener3")
    //    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener3() {
    //        ConcurrentKafkaListenerContainerFactory<String, String> factory = (ConcurrentKafkaListenerContainerFactory<String, String>) batchFactory();
    //        factory.setConcurrency(3);
    //        return factory;
    //    }
        
        /**
         * 配置为批量消费
         * @return
         */
        @Bean
        public KafkaListenerContainerFactory<?> batchFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
            //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
            factory.setBatchListener(true);
            
            //设置并发量为3
            factory.setConcurrency(3);
            // set the retry template 失败retry
    //        factory.setRetryTemplate(retryTemplate());
            //设置为手动ack
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
    
    }

    application.xml

    ### kafka configure
    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=seatest//该值任意,建议使用项目名
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.max-poll-records=5
    spring.kafka.producer.retries=3
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432

    或者(参考调整):

    kafka:
      producer:
        bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
        batch-size: 16785                                   #一次最多发送数据量
        retries: 1                                          #发送失败后的重复发送次数
        buffer-memory: 33554432                             #32M批处理缓冲区
        linger: 1
      consumer:
        bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
        auto-offset-reset: latest                           #最早未被消费的offset earliest
        max-poll-records: 3100                              #批量消费一次最大拉取的数据量
        enable-auto-commit: false                           #是否开启自动提交
        auto-commit-interval: 1000                          #自动提交的间隔时间
        session-timeout: 20000                              #连接超时时间
        max-poll-interval: 15000                            #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
        max-partition-fetch-bytes: 15728640                 #设置拉取数据的大小,15M
      listener:
        batch-listener: true                                #是否开启批量消费,true表示批量消费
        concurrencys: 3,6                                   #设置消费的线程数
        poll-timeout: 1500                                  #只限自动提交,

    发送数据:

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.util.concurrent.ListenableFuture;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Producertester {
    
        private static Logger log = LoggerFactory.getLogger(Producertester.class);
    
        @Autowired
        KafkaTemplate<String, String> kafkaTemplate;
        private static String TOPIC = "sea";
    
        @Test
        public void testSender() throws Exception {
            /**
             * 参数1:topic 参数2: message
             */
            kafkaTemplate.send(TOPIC, "ni hao ma");
    
        }
    
        /**
         * * 带回调函数, 前提是 props.put(ProducerConfig.ACKS_CONFIG, "1");//设置为1 或者all
         * 
         * @param topic
         * @param message
         * @throws Exception
         */
        @Test
        public void testSenderwithCallBack() throws Exception {
    
            ListenableFuture<SendResult<String, String>> sender = kafkaTemplate.send("sea", "chifanle ");
    
            // 发送成功
    //    SuccessCallback successCallback = result -> log.info("数据发送成功!");
            // 发送失败回调
    //    FailureCallback failureCallback = ex -> log.error("数据发送失败!");
    //        void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
            sender.addCallback(successCallback -> {
            }, failureCallback -> log.info("数据发送失败!"));
            SendResult<String, String> sendResult = sender.get();
            System.err.println(sendResult);
            // SendResult [producerRecord=ProducerRecord(topic=sea, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=chifanle , timestamp=null), recordMetadata=sea-0@-1]
    
        }
    
    }

    消费数据:

    @Component
    public class KafkaConsumer {
    
        /**
         * 方式二: 批量消费, 增大吞吐量
         * @param records
         * @param ack
         */
    @KafkaListener(topics = "sea", containerFactory = "batchFactory", errorHandler = "consumerAwareErrorHandler") public void listen(List<ConsumerRecord<String, String>> records, Acknowledgment ack){ System.err.println(records); System.err.println("&&&&&&&&&&&&&&&&&&"); ack.acknowledge(); } /** * 方式一:单条消费 * @param record * @param ack */
      //@KafkaListener(containerFactory = "batchFactory",topics = {"topic1","topic2"})
    @KafkaListener(topics = "sea1",errorHandler = "consumerAwareErrorHandler") public void listen(ConsumerRecord<?,String> record,Acknowledgment ack) { System.out.println(record);
          ack.acknowledge(); } }

    异常处理:

    @Component
    public class KafkaErrorListener {
    
    
        @Bean
        public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() 
        {
             return new ConsumerAwareListenerErrorHandler() 
                            {
                @Override
                public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                      System.err.println("consumer message occur error, "+ e);
                      //doing something
                    return null;
                }
            };
        }
    }

    分区批量消费:

    @Component
    public class KafkaPartitionConsumer {
    
    /**
     * 分区消费: 此处只测试批量分区消费
     * 
     * 说明: topic "sea" 有两个分区, 分区0,1  (同一个group 中的Consumer 如果不指定分区,或者指定的分区是一样的 那么消费的数据 一模一样, 毫无意义 )
     * 下面使用两个 Consumer 分别去消费两个不同的partition 的数据, 这样一条数据,只会被一个consumer 消费
     * 
     * @param records
     * @param ack
     */
        @KafkaListener(id = "id2",groupId="sea7", containerFactory = "batchFactory",topicPartitions = { @TopicPartition(topic = "sea", partitions = { "0" }) })
    //     @KafkaListener(id = "id1",groupId="sea8", containerFactory = "batchFactory",topicPartitions = { @TopicPartition(topic = "sea",partitionOffsets =  @PartitionOffset(partition = "0",initialOffset = "-1")) })
        public void listen2(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
            System.err.println("方式2  方式2  方式2  方式2 方式2 方式2 方式2 方式2 ");
            System.err.println(records.get(0).value());
            ack.acknowledge();
        }
        
        
        
       
    //      @KafkaListener(id = "id1",groupId="sea9", containerFactory = "batchFactory",topicPartitions = { @TopicPartition(topic = "sea", partitions = { "1" }) })
          @KafkaListener(id = "id1",groupId="sea8", containerFactory = "batchFactory",topicPartitions = { @TopicPartition(topic = "sea",partitionOffsets =  @PartitionOffset(partition = "1",initialOffset = "-1")) })
          public void listen1(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
            
            System.err.println("方式一 方式一 方式一 方式一 方式一 方式一 方式一 方式一 ");
            System.out.println(records.get(0).value());
            ack.acknowledge();
        }
    }
  • 相关阅读:
    centos7 安装openGauss极简版本
    postgresql 通过一个表创建一个新表
    postgresql字符串函数与操作符
    SQLServer查看各个表大小
    seata1.3 分布式事务集成 AT模式
    用户体验——以用户为中心的Web设计_Chapter1. 用户体验为什么如此重要
    用户体验——以用户为中心的Web设计_Chapter2. 认识这些要素
    用户体验——以用户为中心的Web设计_Chapter3. 战略层:网站目标和用户需求
    用户体验——以用户为中心的Web设计_Chapter4. 范围层:功能规格和内容需求
    lineheight 详解,及个别问题
  • 原文地址:https://www.cnblogs.com/lshan/p/11282426.html
Copyright © 2011-2022 走看看