zoukankan      html  css  js  c++  java
  • Kafka消费者——结合spring开发

    Kafka消费者端

    可靠性保证

    作为消费端,消费数据需要考虑的是:

    1、不重复消费消息

    2、不缺失消费消息

    自动提交 offset 的相关参数:

    enable.auto.commit: 是否开启自动提交 offset 功能(true)
    auto.commit.interval.ms: 自动提交 offset 的时间间隔 (1000ms = 1s)

    手动提交offset 的相关参数:

    enable.auto.commit: 是否开启自动提交 offset 功能(false)

    异步提交也个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

    无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费 。所以,在保证数据完整性的前提下,选择同步提交同时尽量能在消费端进行消息去重的操作。

    spring-kafka消费者端

    spring-consumer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
    	http://www.springframework.org/schema/beans/spring-beans.xsd
    	http://www.springframework.org/schema/context
    	http://www.springframework.org/schema/context/spring-context.xsd
    	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
    
        <context:component-scan base-package="listener" />
        <!--<context:component-scan base-package="concurrent" />-->
    
    
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <!--broker集群-->
                    <entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
                    <!--groupid-->
                    <entry key="group.id" value="group1"/>
                    <!--
                    earliest 
                    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
                    latest 
                    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
                    none 
                    topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
                    -->
                    <entry key="auto.offset.reset" value="earliest "/>
                    <!--自动提交-->
                    <entry key="enable.auto.commit" value="false"/>
                    <!--自动提交重试等待时间-->
                    <entry key="auto.commit.interval.ms" value="1000"/>
                    <!--检测消费者故障的超时-->
                    <entry key="session.timeout.ms" value="30000"/>
                    <!--key反序列化-->
                    <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                    <!--value反序列化-->
                    <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                </map>
            </constructor-arg>
        </bean>
        <!--consumer工厂-->
        <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <ref bean="consumerProperties"/>
            </constructor-arg>
        </bean>
        <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg  >
                <list>
                    <value>topic1</value>
                    <value>topic2</value>
                </list>
            </constructor-arg>
            <property name="messageListener" ref="kafkaConsumerListener"/>
    		<property name="pollTimeout" value="1000"/>
    		<property name="AckMode" value="MANUAL"/>
        </bean>
    
        <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" >
            <constructor-arg ref="consumerFactory"/>
            <constructor-arg ref="containerProperties"/>
        </bean>
    
        <!-- 并发消息监听容器,执行doStart()方法 -->
    <!--    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
            <constructor-arg ref="consumerFactory" />
            <constructor-arg ref="containerProperties" />
            &lt;!&ndash;#消费监听器容器并发数&ndash;&gt;
            &lt;!&ndash;concurrency = 3&ndash;&gt;
            <property name="concurrency" value="3" />
        </bean>-->
    </beans>
    

    AckMode
    RECORD每处理一条commit一次

    BATCH(默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率

    TIME 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)

    COUNT 累积达到ackCount次的ack去commit

    COUNT_TIMEackTime或ackCount哪个条件先满足,就commit

    MANUAL listener负责ack,但是背后也是批量上去

    MANUAL_IMMEDIATE listner负责ack,每调用一次,就立即commit

    KafkaConsumerListener类

    (同步提交)

    @Component
    public class KafkaConsumerListener implements AcknowledgingMessageListener<String, String> {
        @Override
        public void onMessage(ConsumerRecord<String, String> stringStringConsumerRecord, Acknowledgment acknowledgment) {
            System.out.printf("offset= %d, key= %s, value= %s,topic= %s,partition= %s
    ",
                    stringStringConsumerRecord.offset(),
                    stringStringConsumerRecord.key(),
                    stringStringConsumerRecord.value(),
                    stringStringConsumerRecord.topic(),
                    stringStringConsumerRecord.partition());
                    acknowledgment.acknowledge();
        }
    }
    

    测试

        @Test
        public  void consumer() {
            ApplicationContext context = new ClassPathXmlApplicationContext("listener.xml");
            System.out.printf("启动listener");
            while (true) {
    
            }
        }
    

    结果:

    offset= 57, key= null, value= 2019-11-19 03:40:45,topic= topic1,partition= 0
    offset= 4929, key= null, value= 2019-11-19 03:40:47,topic= topic2,partition= 2
    

    kafka消费者如何才能从头开始消费某个topic的全量数据

    消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):

    (1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
    
    (2)指定"auto.offset.reset"参数的值为earliest;
    

    对应的spring-kafka消费者客户端配置参数为:

    <!-- 指定消费组名 -->
    <entry key="group.id" value="fg11"/>
    <!-- 从何处开始消费,latest 表示消费最新消息,earliest 表示从头开始消费,none表示抛出异常,默认latest -->
    <entry key="auto.offset.reset" value="earliest"/>
  • 相关阅读:
    Hashmap实现原理
    策略模式
    Google Drive ubuntu
    numix Docky
    Google Drive 和 Dropbox 同步同一个文件夹目录
    sublime text 2
    matlab cell
    liteide
    taglist and nerdtree
    codeblocks
  • 原文地址:https://www.cnblogs.com/luckyhui28/p/12003650.html
Copyright © 2011-2022 走看看