zoukankan      html  css  js  c++  java
  • spring集成kafka

    主要参考spring开源项目: Spring for Apache Kafka

    此处主要介绍consumer和peoducer的使用,其他如connector、stream等,可查阅官网文档

    • kafka强依赖zookeeper
    • kafka创建topic时,可指定分片数(分布式存储数据,水平扩展)和副本数(容错)
    • consumer group中的consumer数必须小于或等于分片(partition)数(一个partition中的数据只能被consumer group中的一个消费者所消费,当该consumer出错时会被改组中的其他consumer继续消费达到容错的目的)。
    • partition中的数据是有序的。
    • 通常情况下,一条消息可以被多个consumer group所消费,但一个consumer group 中只能有一个consumer消费该消息。

    首先pom文件添加kafka相关jar:

    <!--spring for kafka-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.1.6.RELEASE</version>
    </dependency>

     版本兼容:

        Apache Kafka Clients 1.0.0
        Spring Framework 5.0.x
        Minimum Java version: 8 

    接下来演示如何消费kafka中的消息:

    首先编写消费者监听器类KafkaConsumerListen.java(回调方法无consumer实例):

    package com.yinz.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.listener.MessageListener;
    
    public class KafkaConsumerListen implements MessageListener<String, String>{
    
        @Override
        public void onMessage(ConsumerRecord<String, String> data) {
            System.out.println("--------------" + data.value());
        }
    
    }

    KafkaConcurConsumerListen.java(回调方法有consumer实例):

    package com.yinz.kafka;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.listener.ConsumerAwareMessageListener;
    
    public class KafkaConcurConsumerListen implements ConsumerAwareMessageListener<String, String>{
    
        @Override
        public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
            System.out.println(consumer.hashCode() +">>>>>>" + data.partition() +">>>>>>>>>>" + data.value());
        }
    
    }

    接下来配置相关bean,此处我kafka相关配置均在kafka.xml中:

    kafka.xml文件内容如下(包含了单个消费者和并发消费者两种情况,实际使用时配置一种即可):

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
    
        <!-- 定义生产者类 -->
         <!-- 定义producer的参数 -->
        <bean id="producerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="47.98.131.199:9092" />
                    <entry key="retries" value="1" />              <!-- 消息发送失败,重试次数 -->
                    <entry key="batch.size" value="16384" />       <!-- 消息打包发送,大小不能超过该值 -->
                    <entry key="linger.ms" value="1" />            <!-- 消息发送前,延迟该毫秒,当负载较重是,可减少发送请求的次数,即打包一次发送多条消息 -->
                    <entry key="buffer.memory" value="33554432" />  <!-- 生产者缓冲区大小,用于存储还未发送到broker中的消息,超过该值会等待,再后面会抛一次 -->
                    <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                    <entry key="value.serializer"  value="org.apache.kafka.common.serialization.StringSerializer" />
                </map>
            </constructor-arg>
        </bean>
        
        <!-- 创建kafkatemplate需要使用的producerfactory bean -->
        <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg ref="producerProperties"></constructor-arg>
        </bean>
    
        <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory" />
            <constructor-arg name="autoFlush" value="false" />  <!-- true 会显著降低性能 -->
            <property name="defaultTopic" value="test" />    <!-- 默认topic -->
        </bean>
        
        <!-- 定义consumer的参数 -->
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="47.98.131.199:9092" />
                    <entry key="group.id" value="0" />        <!-- consumer group id -->
                    <entry key="enable.auto.commit" value="true" />  <!-- 自动提交offset -->
                    <entry key="auto.commit.interval.ms" value="1000" />  <!-- 定期提交offset时间间隔 -->
                    <entry key="session.timeout.ms" value="15000" />  <!-- 15秒没发生心跳意味着改consumer失效 -->
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                </map>
            </constructor-arg>
        </bean>
        
        <!-- 创建consumerFactory bean -->
        <bean id="consumerFactory"
            class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <ref bean="consumerProperties" />
            </constructor-arg>
        </bean>
    
        <!-- 实际执行消息消费的类,回调方法中无consumer实例  -->
        <!-- 
    <bean id="messageListernerConsumerService" class="com.yinz.kafka.KafkaConsumerListen" /> <!-- 消费者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test"/> <property name="messageListener" ref="messageListernerConsumerService"/> </bean> <!-- 同一group中配置单个消费者,消费者只有一个,不能达到负载均摊和容灾 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean>
    --> <!-- 实际执行消息消费的类, 回调方法中有consumer实例 --> <bean id="messageConcurListernerConsumerService" class="com.yinz.kafka.KafkaConcurConsumerListen" /> <bean id="containerConcurProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test-par"/> <property name="messageListener" ref="messageConcurListernerConsumerService"/> </bean> <!-- 同一group中配置concurrency个消费者, 可达到负载均摊和容灾, 若concurrency 大于top分区数,会自动降级 --> <bean id="messageConcurListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerConcurProperties"/> <property name="concurrency" value="3"></property> </bean> </beans>

    关于offset的提交,官网描述如下:

    Committing Offsets
    
    Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, kafka will auto-commit the offsets according to its configuration. If it is false, the containers support the following AckMode s.
    
    The consumer poll() method will return one or more ConsumerRecords; the MessageListener is called for each record; the following describes the action taken by the container for each AckMode :
    
        RECORD - commit the offset when the listener returns after processing the record.
        BATCH - commit the offset when all the records returned by the poll() have been processed.
        TIME - commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
        COUNT - commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
        COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
        MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
        MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener. 
    
    MANUAL, and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or a BatchAcknowledgingMessageListener;

     配置ackmodel 可参考如下代码片段(如下配置,offset不会自动提交,需调用Acknowledgment.acknowledge(),这便导致消费者重连时可能消费重复消息):

    <!-- 配置枚举类型 -->
    	<bean id="manual" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">    
            <property name="staticField" value="org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL" />    
    	</bean>  
    	
         <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">  
            <constructor-arg value="test-par"/>  
            <property name="messageListener" ref="messageListernerConsumerService"/> 
            <property name="ackMode" ref="manual"></property> 
         </bean>  
    

    生产者比较简单,只需注入kafkaTemplate bean,调用他的send方法发送消息即可。

    注意:

    配置生产者时可指定:partitioner属性用于控制消息发送到那个partition(默认为:org.apache.kafka.clients.producer.internals.DefaultPartitioner.class)

    默认情况下,发送消息时会按照消息的key进行has分区,若消息没有key,这轮询所有partition

  • 相关阅读:
    从头搭建Openstack运行环境(七)--实现负载均衡与外网访问
    ML2分层端口绑定技术在SDN开发中的应用(一)
    从头搭建Openstack运行环境(六)--租户网络间路由与防火墙
    翻译校对1
    pykube-pod.obj的json字符串解析
    第一版k8s
    the server does not allow access to the requested resource
    have fun of Docker
    Clean Development Series
    Understanding the GitHub Flow官方的是最好的,永远要看第一手资料
  • 原文地址:https://www.cnblogs.com/yinz/p/9151675.html
Copyright © 2011-2022 走看看