zoukankan      html  css  js  c++  java
  • Spring生态研习【三】:Spring-kafka

    1. 基本信息介绍


    基于spring的kafka应用,非常简单即可搭建起来,前提是要有一个kafka的broker集群。我在之前的博文里面已经介绍并搭建了一套broker环境,参考Kafka研究【一】:bring up环境

    另外,要注意的是kafka基于spring框架构建应用,需要注意版本信息,下面是官方要求:

    Apache Kafka Clients 1.0.0
    Spring Framework 5.0.x
    Minimum Java version: 8

    我这里要介绍的应用案例,是基于springboot构建的,所以,版本信息,可能不是严格按照上述的要求来的,但是整体还是满足版本兼容要求。

    2. 搭建基于springboot的kafka应用

    2.1 首先在IDEA里面构建一个maven项目

    配置好pom.xml,整个项目的pom.xml如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.roomdis</groupId>
        <artifactId>kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>kafka</name>
        <description>kafka project with Spring Boot</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.4.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-freemarker</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-logging</artifactId>
                    </exclusion>
                    <exclusion>
                        <artifactId>log4j-over-slf4j</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- 添加log4j的依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-log4j</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    接下来,就是构建具体的消息生产者和消息消费者。这里,我们的topic是固定的,partition也是默认的1个,这里主要是介绍如何构建一个spring框架下的kafka应用,至于如何动态构建topic,下一个博文介绍深入内容。这里,介绍一个基本的消息发送和介绍流程,发送采用异步(async)的方式,接收消息的模块,采用了应用层面控制消费确认,一般来说,生产级别的kafka应用,消息的消费确认都是会选择应用层面控制确认逻辑,保障消息的安全处理,既不出现消息丢失,也不出现重复消费的问题

    2.2 工程配置

    这里,我采用的是YAML格式的配置文件,这个也非常简单,其实和properties的配置相比,还简单明了。具体配置如下:

    server:
      port: 8899
      contextPath : /kafka
    spring:
      application:
        name: kafka
      kafka:
        bootstrapServers: 10.90.7.2:9092,10.90.2.101:9092,10.90.2.102:9092
        consumer:
          groupId: kefu-logger
          enable-auto-commit: false
          keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
          valueDserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          groupId: kefu-logger
          retries: 3
          buffer-memory: 20480
          keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
          valueDserializer: org.apache.kafka.common.serialization.StringSerializer
        listener:
          ack-mode: MANUAL_IMMEDIATE

    这里重点说下几点:

    A. 应用端口是8899,工程对外项目名称是kafka,即URL里面的头部是/kafka.

    B. 另外,消息生产和消费的序列化工具都是指定的String的。

    C. 消费者和生产者都在指定的组groupId为kefu-logger.注意,这里的groupId,其实是为了提高消息的消费能力做的特别处理,即同一个groupId的消费者,可以负载均衡的将partition组里面的消息消费掉。

    D. 还有一点,很重要的就是监听器的ackMode的配置,这里,指定为MANUAL_IMMEDIATE,意思就是手动立即确认,这个必须要求消费者配置enable-auto-commit为false,同时,消息消费的逻辑里面,要有相应的逻辑对消费的消息进行acknowledge操作,否则,下次消费者启动后,将会再次消费这些offset对应的消息记录,导致重复消费

    2.3 消息实例定义

    这里,主要是考虑后续的日志集中接管处理,所以,DTO就是以日志消息维度定义的。主要有如下内容:

    public class LogMessage {
        /*
         *服务类型,例如:IMS,BI等
         */
        private String serviceType;
        /*
         *服务器地址,IP:PORT,例如:10.130.207.221:8080
         */
        private String serverAddr;
        /*
         *日志产生的具体程序全路径
         */
        private String fullClassPath;
        /*
         *消息产生的时间
         */
        private String messageTime;
        /*
         *消息的具体内容。这个很重要,是json的字符串。兼容不同服务的消息格式。
         */
        private String content;
        /*
         *日志的级别,主要有INFO,WARN,ERROR,DEBUG等
         */
        private String level;
    
        public String getServiceType() {
            return serviceType;
        }
    
        public void setServiceType(String serviceType) {
            this.serviceType = serviceType;
        }
    
        public String getServerAddr() {
            return serverAddr;
        }
    
        public void setServerAddr(String serverAddr) {
            this.serverAddr = serverAddr;
        }
    
        public String getFullClassPath() {
            return fullClassPath;
        }
    
        public void setFullClassPath(String fullClassPath) {
            this.fullClassPath = fullClassPath;
        }
    
        public String getMessageTime() {
            return messageTime;
        }
    
        public void setMessageTime(String messageTime) {
            this.messageTime = messageTime;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        public String getLevel() {
            return level;
        }
    
        public void setLevel(String level) {
            this.level = level;
        }
    }

    当然,这里的DTO里面,其实可以采用注解的方式实现setter和getter以及toString等基本函数的实现,为了方便说明问题,我这里就不要lomback注解包的功能。

    2.4 消息生产者

    这里重点关注消息的异步生产过程,即消息投递到broker的过程是异步的,这个是非常有价值的,对于并发性提升。

    @Service
    public class MessageProducer {
        private Logger logger = Logger.getLogger(MessageProducer.class);
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        private Gson gson = new GsonBuilder().create();
    
        public void send(LogMessage logMessage) {
            String msg = gson.toJson(logMessage);
            //下面采取的是异步的方式完成消息的发送,发送成功或者失败,都有回调函数进行后续逻辑处理,非常方便
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(Config.TOPIC, msg);
            future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                    long offset = stringStringSendResult.getRecordMetadata().offset();
                    String cont = stringStringSendResult.getProducerRecord().toString();
                    logger.info("cont: " + cont + ", offset: " + offset);
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    logger.error(throwable.getMessage());
                }
            });
        }
    }

    2.5 消息消费者

    下面的消费者逻辑中,OnMessage的入参中,必须要有Acknowledgment参数,否则没有办法完成MANUAL的所谓应用层面的消息消费确认。

    @Service
    public class MessageConsumer {
    
        private Logger logger = Logger.getLogger(MessageConsumer.class);
    
        @KafkaListener(topics = Config.TOPIC)
        public void onMessage(@Payload String msg, Acknowledgment ack){
            logger.info(msg);
    //        long offset = record.offset();
    //        long partition = record.partition();
    //        String content = record.value();
    //        logger.info("offset: " + offset + ", partition: " + partition + ", content: " + content);
            ack.acknowledge();
        }
    
        @KafkaListener(topics = Config.TOPIC)
        public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack){
            logger.info(record);
            long offset = record.offset();
            long partition = record.partition();
            String content = record.value();
            logger.info("offset: " + offset + ", partition: " + partition + ", payload: " + content);
            //手动确认消息已经被消费,这个很重要,灵活控制,保证消息消费确认的问题。
            ack.acknowledge();
        }
    }

    3. 程序运行验证

    这里,主要是验证消息消费后,执行了ack.acknowledge()和不执行ack.acknowledge()的区别,深刻理解不确认会导致重复消费的问题。

    3.1 执行acknowledge

    效果是程序启动后offset的值会接着上次递增,对应的消息内容payload也是不同的。这个就不给出日志内容了。

    3.2 不执行acknowledge
    为了对比,给出一段停应用前的日志:

      .   ____          _            __ _ _
     /\ / ___'_ __ _ _(_)_ __  __ _    
    ( ( )\___ | '_ | '_| | '_ / _` |    
     \/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::        (v1.5.4.RELEASE)
    
    2018-08-01 19:45:06.181  INFO 14264 --- [           main] c.roomdis.micros.kafka.KafkaApplication  : Starting KafkaApplication on 60-361-0008 with PID 14264 (D:KnowledgeSOURCEspringboot-kafka	argetclasses started by chengsh05 in D:KnowledgeSOURCEspringboot-kafka)
    2018-08-01 19:45:06.184  INFO 14264 --- [           main] c.roomdis.micros.kafka.KafkaApplication  : No active profile set, falling back to default profiles: default
    2018-08-01 19:45:06.236  INFO 14264 --- [           main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy
    2018-08-01 19:45:07.194  INFO 14264 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$2d472f92] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
    2018-08-01 19:45:07.655  INFO 14264 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat initialized with port(s): 8899 (http)
    2018-08-01 19:45:07.672  INFO 14264 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
    2018-08-01 19:45:07.673  INFO 14264 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/8.5.15
    2018-08-01 19:45:07.786  INFO 14264 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/kafka]  : Initializing Spring embedded WebApplicationContext
    2018-08-01 19:45:07.786  INFO 14264 --- [ost-startStop-1] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1552 ms
    2018-08-01 19:45:07.942  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.ServletRegistrationBean  : Mapping servlet: 'dispatcherServlet' to [/]
    2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'characterEncodingFilter' to: [/*]
    2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'hiddenHttpMethodFilter' to: [/*]
    2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'httpPutFormContentFilter' to: [/*]
    2018-08-01 19:45:07.947  INFO 14264 --- [ost-startStop-1] o.s.b.w.servlet.FilterRegistrationBean   : Mapping filter: 'requestContextFilter' to: [/*]
    2018-08-01 19:45:08.354  INFO 14264 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : Looking for @ControllerAdvice: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy
    2018-08-01 19:45:08.419  INFO 14264 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error]}" onto public org.springframework.http.ResponseEntity<java.util.Map<java.lang.String, java.lang.Object>> org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
    2018-08-01 19:45:08.420  INFO 14264 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/error],produces=[text/html]}" onto public org.springframework.web.servlet.ModelAndView org.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
    2018-08-01 19:45:08.448  INFO 14264 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
    2018-08-01 19:45:08.448  INFO 14264 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
    2018-08-01 19:45:08.478  INFO 14264 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
    2018-08-01 19:45:08.623  INFO 14264 --- [           main] o.s.ui.freemarker.SpringTemplateLoader   : SpringTemplateLoader for FreeMarker: using resource loader [org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@48fa0f47: startup date [Wed Aug 01 19:45:06 CST 2018]; root of context hierarchy] and template loader path [classpath:/templates/]
    2018-08-01 19:45:08.624  INFO 14264 --- [           main] o.s.w.s.v.f.FreeMarkerConfigurer         : ClassTemplateLoader for Spring macros added to FreeMarker configuration
    2018-08-01 19:45:08.644  WARN 14264 --- [           main] o.s.b.a.f.FreeMarkerAutoConfiguration    : Cannot find template location(s): [classpath:/templates/] (please add some templates, check your FreeMarker configuration, or set spring.freemarker.checkTemplateLocation=false)
    2018-08-01 19:45:08.717  INFO 14264 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
    2018-08-01 19:45:08.734  INFO 14264 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
    2018-08-01 19:45:08.748  INFO 14264 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = kefu-logger
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    
    2018-08-01 19:45:08.751  INFO 14264 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
        check.crcs = true
        client.id = consumer-1
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = kefu-logger
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    
    2018-08-01 19:45:08.796  INFO 14264 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
    2018-08-01 19:45:08.797  INFO 14264 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
    2018-08-01 19:45:08.841  INFO 14264 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8899 (http)
    2018-08-01 19:45:08.848  INFO 14264 --- [           main] c.roomdis.micros.kafka.KafkaApplication  : Started KafkaApplication in 3.079 seconds (JVM running for 3.484)
    2018-08-01 19:45:08.859  INFO 14264 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
        acks = 1
        batch.size = 16384
        block.on.buffer.full = false
        bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
        buffer.memory = 20480
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.fetch.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 3
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        timeout.ms = 30000
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    
    2018-08-01 19:45:08.859  INFO 14264 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
        acks = 1
        batch.size = 16384
        block.on.buffer.full = false
        bootstrap.servers = [10.90.7.2:9092, 10.90.2.101:9092, 10.90.2.102:9092]
        buffer.memory = 20480
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.fetch.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 3
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        timeout.ms = 30000
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    
    2018-08-01 19:45:08.873  INFO 14264 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
    2018-08-01 19:45:08.873  INFO 14264 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
    2018-08-01 19:45:08.932  INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator 10.90.2.102:9092 (id: 2147483644 rack: null) for group kefu-logger.
    2018-08-01 19:45:08.936  INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group kefu-logger
    2018-08-01 19:45:08.936  INFO 14264 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
    2018-08-01 19:45:08.936  INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group kefu-logger
    2018-08-01 19:45:08.947  INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group kefu-logger with generation 7
    2018-08-01 19:45:08.948  INFO 14264 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [kefuLogger-0] for group kefu-logger
    2018-08-01 19:45:08.958  INFO 14264 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[kefuLogger-0]
    2018-08-01 19:45:09.085  INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer     : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, timestamp=null), offset: 112
    2018-08-01 19:45:09.092  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 112, CreateTime = 1533123909071, checksum = 2908956415, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"})
    2018-08-01 19:45:09.093  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 112, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}
    2018-08-01 19:45:11.080  INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer     : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"}, timestamp=null), offset: 113
    2018-08-01 19:45:11.081  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 113, CreateTime = 1533123911078, checksum = 843723551, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"})
    2018-08-01 19:45:11.081  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 113, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"}
    2018-08-01 19:45:13.082  INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer     : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"}, timestamp=null), offset: 114
    2018-08-01 19:45:13.083  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 114, CreateTime = 1533123913080, checksum = 2420940286, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"})
    2018-08-01 19:45:13.083  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 114, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"}
    2018-08-01 19:45:15.084  INFO 14264 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer     : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"}, timestamp=null), offset: 115
    2018-08-01 19:45:15.084  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 115, CreateTime = 1533123915082, checksum = 2206983395, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"})
    2018-08-01 19:45:15.084  INFO 14264 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 115, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"}
    
    Process finished with exit code 1
    View Code

    停应用后,再次启动的日志:

    2018-08-01 19:45:57.562  INFO 8632 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[kefuLogger-0]
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 112, CreateTime = 1533123909071, checksum = 2908956415, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"})
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 112, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 113, CreateTime = 1533123911078, checksum = 843723551, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"})
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 113, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"}
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 114, CreateTime = 1533123913080, checksum = 2420940286, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"})
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 114, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:13 CST 2018","content":"9ce5ce56-b66e-4952-9053-26a32c2b16de:Wed Aug 01 19:45:13 CST 2018"}
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 115, CreateTime = 1533123915082, checksum = 2206983395, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"})
    2018-08-01 19:45:57.580  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 115, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:15 CST 2018","content":"ad188d68-c9d2-49ba-be2a-f33b90a45404:Wed Aug 01 19:45:15 CST 2018"}
    2018-08-01 19:45:57.738  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 116, CreateTime = 1533123957726, checksum = 2375523911, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:57 CST 2018","content":"b22aa35d-2bff-4e9e-9832-56145415b075:Wed Aug 01 19:45:57 CST 2018"})
    2018-08-01 19:45:57.738  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 116, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:57 CST 2018","content":"b22aa35d-2bff-4e9e-9832-56145415b075:Wed Aug 01 19:45:57 CST 2018"}
    2018-08-01 19:45:57.738  INFO 8632 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer     : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:57 CST 2018","content":"b22aa35d-2bff-4e9e-9832-56145415b075:Wed Aug 01 19:45:57 CST 2018"}, timestamp=null), offset: 116
    2018-08-01 19:45:59.735  INFO 8632 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer     : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:59 CST 2018","content":"b9a9148b-d0d6-49c5-ac2a-8cfac03dad90:Wed Aug 01 19:45:59 CST 2018"}, timestamp=null), offset: 117
    2018-08-01 19:45:59.736  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 117, CreateTime = 1533123959733, checksum = 2508549365, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:59 CST 2018","content":"b9a9148b-d0d6-49c5-ac2a-8cfac03dad90:Wed Aug 01 19:45:59 CST 2018"})
    2018-08-01 19:45:59.736  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 117, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:59 CST 2018","content":"b9a9148b-d0d6-49c5-ac2a-8cfac03dad90:Wed Aug 01 19:45:59 CST 2018"}
    2018-08-01 19:46:01.736  INFO 8632 --- [ad | producer-1] c.r.m.kafka.producer.MessageProducer     : cont: ProducerRecord(topic=kefuLogger, partition=null, key=null, value={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:46:01 CST 2018","content":"3cbd6443-9617-4ac3-8985-f0b494187f0a:Wed Aug 01 19:46:01 CST 2018"}, timestamp=null), offset: 118
    2018-08-01 19:46:01.736  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : ConsumerRecord(topic = kefuLogger, partition = 0, offset = 118, CreateTime = 1533123961734, checksum = 3825449208, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:46:01 CST 2018","content":"3cbd6443-9617-4ac3-8985-f0b494187f0a:Wed Aug 01 19:46:01 CST 2018"})
    2018-08-01 19:46:01.736  INFO 8632 --- [ntainer#0-0-L-1] c.r.m.kafka.consumer.MessageConsumer     : offset: 118, partition: 0, payload: {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:46:01 CST 2018","content":"3cbd6443-9617-4ac3-8985-f0b494187f0a:Wed Aug 01 19:46:01 CST 2018"}
    
    Process finished with exit code 1

    上述红色部分,明显是在应用重启之前就已经显示消费国的内容,也就是说,enable-auto-commit为false的时候,acknowledge必须应用程序执行确认,否则出现了重复消费

     4. 遇到问题

     主要是实现应用层面进行消费确认过程中,遇到的,这里,要注意一点,就是enable-auto-commit设置为true是默认行为,为了应用层面控制确认消费,必须将enable-auto-commit设置为false,同时,ack-mode必须设置为MANUAL或者MANUAL-IMMEDIATE。两个若没有配合,消费者端就会报错。例如,我这里,当初值配置了enable-auto-commit为false,最后ack-mode没有配置,就出现下面的错误:

    2018-08-01 19:49:49.469  INFO 19828 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
    2018-08-01 19:49:49.541  INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator 10.90.2.102:9092 (id: 2147483644 rack: null) for group kefu-logger.
    2018-08-01 19:49:49.543  INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group kefu-logger
    2018-08-01 19:49:49.544  INFO 19828 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
    2018-08-01 19:49:49.544  INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group kefu-logger
    2018-08-01 19:49:49.557  INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group kefu-logger with generation 11
    2018-08-01 19:49:49.558  INFO 19828 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [kefuLogger-0] for group kefu-logger
    2018-08-01 19:49:49.566  INFO 19828 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[kefuLogger-0]
    2018-08-01 19:49:49.587 ERROR 19828 --- [ntainer#0-0-L-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = kefuLogger, partition = 0, offset = 112, CreateTime = 1533123909071, checksum = 2908956415, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"})
    
    org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
    Endpoint handler details:
    Method [public void com.roomdis.micros.kafka.consumer.MessageConsumer.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)]
    Bean [com.roomdis.micros.kafka.consumer.MessageConsumer@27068a50]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}], failedMessage=GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:178) ~[spring-kafka-1.1.6.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.6.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.6.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_77]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
    Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}], failedMessage=GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}]
        ... 10 common frames omitted
    Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload={"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:08 CST 2018","content":"a89aca04-f6d0-4f70-93e9-fd0471165497:Wed Aug 01 19:45:08 CST 2018"}, headers={kafka_offset=112, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=kefuLogger}]
        at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.6.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.6.RELEASE.jar:na]
        ... 9 common frames omitted
    
    2018-08-01 19:49:49.592 ERROR 19828 --- [ntainer#0-0-L-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = kefuLogger, partition = 0, offset = 113, CreateTime = 1533123911078, checksum = 843723551, serialized key size = -1, serialized value size = 221, key = null, value = {"serverAddr":"10.90.9.20:8899","fullClassPath":"class com.roomdis.micros.kafka.KafkaApplication","messageTime":"Wed Aug 01 19:45:11 CST 2018","content":"f907347d-6582-452e-8bcb-4b4f490e5675:Wed Aug 01 19:45:11 CST 2018"})

    补充说明ack-mode配置相关信息
    官方说法,管enable-auto-commit为false的时候ackMode取值解释:

    RECORD - commit the offset when the listener returns after processing the record.
    BATCH - commit the offset when all the records returned by the poll() have been processed.
    TIME - commit the offset when all the records returned by the poll() have been processed as long as the ackTime since the last commit has been exceeded.
    COUNT - commit the offset when all the records returned by the poll() have been processed as long as ackCount records have been received since the last commit.
    COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true.
    MANUAL - the message listener is responsible to acknowledge() the Acknowledgment; after which, the same semantics as BATCH are applied.
    MANUAL_IMMEDIATE - commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.

    下面是具体的配置操作,配合ackMode的取值,相关的参数设置:

    spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
    spring.kafka.listener.ack-mode=  # Listener AckMode. See the spring-kafka documentation.
    spring.kafka.listener.ack-time=  # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
    spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
    spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
    spring.kafka.listener.type=single # Listener type.

    最后,任何一个新的技术应用到实际生产,都必须弄清楚每一个关键环节,否则风险或者灾难的产生只是迟早的事情

  • 相关阅读:
    源码安装mysql-5.6.32.tar.gz
    linux 小喇叭 没了
    [ERROR] Fatal error: Can't open and lock privilege tables: Table 'mysql.host' doesn't exist
    ARP协议和DNS协议对比
    subversion 版本库数据迁移 从一台服务器迁移到另一台新有服务器
    局域网通过ip查mac地址、通过mac地址查ip方法
    kmv xml 文件配置vnc 端口冲突 会无法启动
    kvm xxx.xml文件的位置
    select into from 提示 Undeclared variable.....错误的解决办法 && select into from 和 insert into select 的用法和区别
    Redux 入门教程
  • 原文地址:https://www.cnblogs.com/shihuc/p/9403731.html
Copyright © 2011-2022 走看看