zoukankan      html  css  js  c++  java
  • spring-kafka

    spring-kafka

    使用spring-kafka的小伙伴,看过来。

    说明

    因为spring-kafka封装的比较厉害,可能跟你实际使用起来有很大的差别。

    一个简单的消费例子

    spring-boot基础上添加依赖:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.3.RELEASE</version>
    </dependency>
    

    注意要检查下依赖的kakfa-clients,是否与你服务端的匹配。

    不要忘记了通过注解@EnableKafka开启自动配置。

    采用默认的配置:

    spring:
      kafka:
        consumer:
          bootstrap-servers:
            - 127.0.0.1:9092
          # 消费组
          group-id: myGroup
          # 消费者是否自动提交偏移量,默认为true
          enable-auto-commit: false
          # 消费者在读取一个没有偏移量或者偏移量无效的情况下,从起始位置读取partition的记录,默认是latest
          auto-offset-reset: earliest
          # 单次调用poll方法能够返回的消息数量
          max-poll-records: 50
    

    然后写个测试用例试试:

    package tk.fishfish.easyjava.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * 消费者
     *
     * @author 奔波儿灞
     * @since 1.0
     */
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class ConsumerTest {
    
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerTest.class);
    
        @KafkaListener(topics = "test")
        public void onMessage(ConsumerRecord<String, String> record) {
            LOG.info("record: {}", record);
            String value = record.value();
            if (value.length() % 2 == 0) {
                throw new RuntimeException("模拟业务出错");
            }
        }
    
        @Test
        public void run() {
            try {
                // 阻塞5分钟,方便调试
                Thread.sleep(5 * 60 * 1000);
            } catch (InterruptedException e) {
                LOG.warn("sleep error", e);
            }
        }
    
    }
    

    上面通过@KafkaListener来监听topic,处理消息。

    为了模拟业务会出现一些异常,我特意在判断value长度为偶数的情况下抛出异常,看在默认配置的情况下,如果业务出错,是否仍会提交offsets

    结果发现,仍提交了offsets

    spring-kafka-consumer-default

    下面是日志:

    INFO  2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] tk.fishfish.easyjava.kafka.ConsumerTest            - record: ConsumerRecord(topic = test, partition = 0, offset = 261, CreateTime = 1558059947687, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
    ERROR 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler             - Error while processing: ConsumerRecord(topic = test, partition = 0, offset = 261, CreateTime = 1558059947687, serialized key size = -1, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = )
    org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void tk.fishfish.easyjava.kafka.ConsumerTest.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>)' threw exception; nested exception is java.lang.RuntimeException: 模拟业务出错
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.RuntimeException: 模拟业务出错
        at tk.fishfish.easyjava.kafka.ConsumerTest.onMessage(ConsumerTest.java:29)
        at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
        ... 13 common frames omitted
    DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.a.RecordMessagingMessageListenerAdapter    - Processing [GenericMessage [payload=d, headers={kafka_offset=262, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@f46d581, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=test, kafka_receivedTimestamp=1558059947851}]]
    INFO  2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] tk.fishfish.easyjava.kafka.ConsumerTest            - record: ConsumerRecord(topic = test, partition = 0, offset = 262, CreateTime = 1558059947851, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d)
    DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {test-0=OffsetAndMetadata{offset=263, metadata=''}}
    DEBUG 2019-05-17 10:25:48.557 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing: {test-0=OffsetAndMetadata{offset=263, metadata=''}}
    DEBUG 2019-05-17 10:25:48.560 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.consumer.internals.ConsumerCoordinator     - [Consumer clientId=consumer-2, groupId=myGroup] Committed offset 263 for partition test-0
    

    从日志可以看到offset = 261的记录处理失败了,但最后仍提交了Committed offset 263 for partition test-0

    总结:如果你采取我这样的配置,当处理record出错的时候,仍会提交偏移量。那么我们就需要业务处理失败的情况了。比如try...catch之后保存错误的record,然后定时重试。

    出错的情况下不提交offsets

    那么,能不能在出错的情况下不提交咧?

    通过查看文档发现,发现可以使用Acknowledgment去确认该条record是否提交。

    修改下配置,配置spring.kafka.listener.*

    spring:
      kafka:
        consumer:
          bootstrap-servers:
            - 127.0.0.1:9092
          group-id: myGroup
          # 消费者是否自动提交偏移量,默认为true
          enable-auto-commit: false
          # 消费者在读取一个没有偏移量或者偏移量无效的情况下,从起始位置读取partition的记录,默认是latest
          auto-offset-reset: earliest
          # 单次调用poll方法能够返回的消息数量
          max-poll-records: 50
        listener:
          # Listener AckMode
          ack-mode: MANUAL_IMMEDIATE
          # 并发消费者
          concurrency: 1
    

    然后代码有些调整(就不贴全了),使用Acknowledgment

    @KafkaListener(topics = "test")
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
        LOG.info("record: {}", record);
        String value = record.value();
        if (value.length() % 2 == 0) {
            throw new RuntimeException("模拟业务出错");
        }
        // 业务处理成功确认
        ack.acknowledge();
    }
    

    如果在业务出错的情况下,不会提交offsets,然而真的是这样的吗?

    测试发现,在业务出错的情况下,确实不会提交offsets,但是只要后面的记录处理成功,就会提交offsets,这样前面的失败的数据还是需要自己去手动处理。要么重新获取该offset的数据,要么记录错误record,业务重试。

    总结

    在错误的情况下,建议自己业务记录后,去重试;或者使用spring-kafkaErrorHandler,处理错误的情况。

  • 相关阅读:
    Hive—数据库级增、删、改、查
    Kafka—Bootstrap broker hadoop102:2181 (id: -1 rack: null) disconnected
    Kafka—命令行操作
    Kafka—Kafka安装部署
    Kafka—Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
    mysql—Job for mysqld.service failed because the control process exited with error code. See "systemctl status mysqld.service" and "journalctl -xe" for details.
    Linux—安装MySQL数据库
    Linux—Yum源配置
    Hive-FAILED: SemanticException org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
    Linux—Crontab定时任务
  • 原文地址:https://www.cnblogs.com/bener/p/10881165.html
Copyright © 2011-2022 走看看