zoukankan      html  css  js  c++  java
  • kafka2.5.0生产者与消费者配置详解

    1)引入maven依赖

    我这里使用的是springboot 2.1.3.RELEASE 版本:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    会引入一对的kafka包:

     2)生产者配置:

    所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define(  这里的define方法的第三个参数就是默认值

    application.properties里可以这样配置:

    #####################  重要配置  ######################
    spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092
    spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
    # acks=0  如果设置为0,生产者将不等待任何来自服务器的确认。每个记录返回的偏移量将始终设置为-1。
    # acks=1  这意味着leader确认消息即可,但不等待所有副本的完全确认的情况下进行响应。在这种情况下,如果leader在确认记录后立即失败,但是在副本复制它之前,那么记录将丢失。
    # acks=all  不仅需要leader确认收到消息,还将等待全部的副本确认。这保证了只要至少有一个副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于ack =-1设置。
    # acks=-1   跟集群有关
    # 默认 1
    spring.kafka.producer.acks=1
    # 一个批次发送的大小,默认16KB,超过这个大小就会发送数据
    spring.kafka.producer.batch.size=16384
    # 一个批次最长等待多久就发送数据,默认0,即马上发送
    spring.kafka.producer.linger.ms=5000
    # 控制生产者最大发送大小,默认 1MB。这个值必须小于kafka服务器server.properties配置文件里的最大可接收数据大小配置:socket.request.max.bytes=104857600 (默认104857600 = 100MB)
    spring.kafka.producer.max.request.size=1048576
    
    #####################  非重要配置  ######################
    # 生产者内存缓冲区大小。默认33554432bytes=32MB
    spring.kafka.producer.buffer.memory=33554432
    # 发送重试次数,默认 2147483647,接近无限大
    spring.kafka.producer.retries=3
    # 请求超时时间,默认30秒
    spring.kafka.producer.request.timeout.ms=30000
    # 默认值5。并发状态下,kafka生产者允许存在最大的kafka服务端未确认接收的消息个数最大值。
    # 注意,如果该值设置为1,并且开启重试机制,则会在允许的重试次数内,阻塞其他消息发送到kafka Server端。并且为1的话,会严重影响生产者的吞吐量。仅适用于对数据有严格顺序要求的场景。
    spring.kafka.producer.max.in.flight.requests.per.connection=5
    # 最大阻塞时间,超过则抛出异常。默认60秒
    spring.kafka.max.block.ms=60000
    # 数据压缩类型:none、gzip、snappy、lz4、zstd,默认none什么都不做
    spring.kafka.compression.type=none
    # 客户端在进行发送和消费的时候,会缓存kafka的元数据。默认30秒
    spring.kafka.producer.metadata.max.age.ms=30000
    

      

    在springboot框架里,手动封装kafka生产者对象,并@bean对象注入到SpringBoot容器中去:

    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
        @Value("${kafka.producer.servers}")
        private String servers;
        @Value("${kafka.producer.retries}")
        private int retries;
        @Value("${kafka.producer.batch.size}")
        private int batchSize;
        @Value("${kafka.producer.linger}")
        private int linger;
        @Value("${kafka.producer.buffer.memory}")
        private int bufferMemory;
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 
            DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props);
            KafkaTemplate kafkaTemplate
                    = new KafkaTemplate<String, String>(factory) ;
            //kafkaTemplate.setProducerListener();
            return kafkaTemplate;
        }
    }
    

    key和value可以自定义序列化类,参考《kafka2.5.0自定义数据序列化类

     重要知识:

    如果该topic的分区大于1,那么生产者生产的数据存放到哪个分区,完全取决于key值,比如key=A,那么存到分区0,key=B,那么存到分区1,如果key为null,那么负载均衡存储到每个分区!

    再均衡监听器:无论分区个数还是消费者个数发生变化,都会触发再均衡,重新分配分区的消费者。如果需要自定义分区,请参考《kafka2.5.0自定义分区器

    3)消费者配置:

     所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.consumer.ConsumerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define(  这里的define方法的第三个参数就是默认值

    kafka.consumer.bootstrap-servers=192.168.2.61:9092,192.168.2.61:9093
    # 注意:相同的Topic下,相同的群组ID,只有一个消费者能消费到消息
    #kafka.consumer.group-id=myGroupId1
    # 消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,读取设置。
    # latest: (默认)读取最新的,earliest: 读取最早的,none: 如果没有为使用者的组找到偏移量,则consumer抛出异常,anything else: consumer抛出异常
    kafka.consumer.auto-offset-reset=latest
    # 是否自动提交偏移,默认true。偏移量自己控制,可以有效避免重复读、漏读
    kafka.consumer.enable-auto-commit=false
    # 自动提交间隔,默认5秒。从开始消费一条数据到业务结束,必须在5秒内完成,否则会造成提前提交偏移量,如果出现事务失败,将会漏掉该条消费
    #kafka.consumer.auto.commit.interval.ms=5000
    
    # 把分区分配给消费者的策略。RangeAssignor:默认。采用大部分分区都分配给消费者群组里的群主(即消费者0)的策略。RoundRobinAssignor:采用所有消费者平均分配分区策略
    # 注意:无论分区个数变化或者消费者个数变化,都会触发再分配
    kafka.consumer.partition-assignment-strategy=org.apache.kafka.clients.consumer.RangeAssignor.class
    # 客户端在进行发送和消费的时候,会缓存kafka的元数据。默认30秒
    kafka.consumer.metadata-max-age-ms=30000
    # consumer最小拉取多大的数据,默认值1,就是立即发送。达不到这个数据就等待。注意:这里不是根据消费数据条数,而是数据大小,这样设计主要避免每个数据之间大小差距过大。
    kafka.consumer.fetch.min.bytes=1
    # consumer最多等待10秒就消费一次数据,默认500ms
    kafka.consumer.fetch.max.wait.ms=10000
    # 控制每次poll方法返回的记录数量,默认500。这个配置仅仅作用于手动 poll消费的情况下,在springboot中由于使用 @KafkaListener注解消费所以基本没用
    kafka.consumer.max-poll-records=500

    在springboot框架里,手动封装kafka生产者对象,并@bean对象注入到SpringBoot容器中去:

     先定义pojo 类:

    @Component
    @ConfigurationProperties(prefix = "kafka.consumer")
    public class KafkaConsumerConfigModel {
        
         // 这里就是一个简单的pojo类,定义application.properties配置文件的kafka.consumer开头的所有字段.
         private String bootstrapServers;
         ......
         
         // getter and setter
    
    }

    再定义kafka consumer 工厂类:

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.RangeAssignor;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    import org.springframework.kafka.listener.ContainerProperties;
    import org.springframework.kafka.support.TopicPartitionOffset;
    import org.springframework.util.CollectionUtils;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class KafkaConsumerConfig {
        private Logger logger = LoggerFactory.getLogger(KafkaProducerConfig.class);
    
        @Autowired
        KafkaConsumerConfigModel config;
    
        @Bean("consumerFactory")
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Arrays.asList( RangeAssignor.class));
            propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
            propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, config.getFetchMaxWaitMs());
            propsMap.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, config.getMetadataMaxAgeMs());
            ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(propsMap);
            return factory;
        }
    
        @Bean("kafkaListenerContainerFactory")
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
        getKafkaListenerContainerFactory(
                @Autowired ConsumerFactory<String, String> consumerFactory
        ) {
            ConcurrentKafkaListenerContainerFactory<String, String> factory
                    = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            factory.setConcurrency(1);
            factory.getContainerProperties().setPollTimeout(1500);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    //        factory.createContainer( new TopicPartitionOffset(Constant.TOPIC, 0));
            return factory;
        }
    }

    最后kafka consumer消费者长这样:

    import com.joyce.kafka.Constant;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumer {
    
        private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
        // 相同的groupId的消费者只能有一个接收到消息
        @KafkaListener(groupId="mygroup-1",topics = Constant.TOPIC )
        public void listen1(String data) {
            logger.info("消费到消息1: [{}]", data);
        }
    
        @KafkaListener(groupId="mygroup-2",topics =  Constant.TOPIC)
        public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
            logger.info("消费到消息2: [{}]", record.value());
            logger.info("消费到消息2|"+String.format(
                    "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                    record.topic(),record.partition(),record.offset(),
                    record.key(),record.value()));//提交offset
            ack.acknowledge();
        }
    
        @KafkaListener(groupId="mygroup-3", topics =  Constant.TOPIC)
        public void test(String data, Acknowledgment ack) { // ConsumerRecord<String, String> record
            logger.info("消费到消息3: [{}]", data);
            //提交offset
            ack.acknowledge();
        }
    
    }

    end.

  • 相关阅读:
    收集一些dos网络配置命令,从新获取ip刷新dns
    多个线程访问共享对象和数据的方式
    Oracle rownum 分页, 排序
    ORACLE中用rownum分页并排序的SQL语句
    CentOS 6.5安装MongoDB 2.6(多yum数据源)
    【编程练习】收集的一些c++代码片,算法排序,读文件,写日志,快速求积分等等
    java枚举使用详解
    PHP+MySQL动态网站开发从入门到精通(视频教学版)
    Premiere Pro CS6标准教程
    黑客攻防:实战加密与解密
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13191163.html
Copyright © 2011-2022 走看看