zoukankan      html  css  js  c++  java
  • Spring-Boot集成Kafka

    这篇文章不讲Kafka相关概念,只是实战。具体了解请参考:

    《分布式消息中间件实践》

    《Kafka权威指南》

    《spring-kafka-reference》spring集成kafka官方文档。

    以为SpringBoot集成Kafka

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.niugang</groupId>
        <artifactId>kafka-spring-boot</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
        <url>http://maven.apache.org</url>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <!-- 继承父包 -->
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.10.RELEASE</version>
            <relativePath></relativePath>
        </parent>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                </dependency>
        </dependencies>
        <!--maven的插件 -->
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>

    application.properties

    server.port=8086
    ###########################################kafka about config #######################################################
    spring.kafka.bootstrap-servers=localhost:9092
    ##########################producer about config##############################
    spring.kafka.producer.acks=1
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.retries=0
    spring.kafka.producer.buffer-memory=33554432
    #spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer.class
    #spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer.class
    ##########################consumer about config##############################
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.group-id=kafka_group_2
    spring.kafka.consumer.auto-commit-interval=100
    #spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class
    #spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer.class

    发送消息

    package com.niugang.controller;
    
    import java.util.concurrent.ExecutionException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    /**
     * 
     * @ClassName:  SenderConttoller   
     * @Description:验证发送消息
     * @author: niugang
     * @date:   2018年11月3日 上午9:58:19   
     * @Copyright: 863263957@qq.com. All rights reserved. 
     *
     */
    @RestController
    public class SenderConttoller {
        private Logger logger = LoggerFactory.getLogger(SenderConttoller.class);
    
        @Autowired
        private KafkaTemplate<String, String> template;
    
        /**
         * 同步发送
         * 
         * @return
         * @throws ExecutionException
         * @throws InterruptedException
         */
        @RequestMapping("syncSendMessage")
        public String syncSendMessage() {
            for (int i = 0; i < 100; i++) {
                try {
                    template.send("kafka-boot", "0", "foo" + i).get();
                } catch (InterruptedException e) {
                    logger.error("sync send message fail [{}]", e.getMessage()); 
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    logger.error("sync send message fail [{}]", e.getMessage());
                    e.printStackTrace();
                }
            }
            return "success";
        }
    
        /**
         * 异步发送
         * 
         * @return
         */
        @RequestMapping("asyncSendMessage")
        public String sendMessageAsync() {
            for (int i = 0; i < 100; i++) {
                /**
                 * <p>
                 * SendResult:如果消息成功写入kafka就会返回一个RecordMetaData对象;result.
                 * getRecordMetadata() 他包含主题信息和分区信息,以及集成在分区里的偏移量。
                 * 查看RecordMetaData属性字段就知道了
                 * </p>
                 *
                 */
                ListenableFuture<SendResult<String, String>> send = template.send("kafka-boot", "0", "foo" + i);
                send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
                    @Override
                    public void onSuccess(SendResult<String, String> result) {
                        logger.info("async send message success partition [{}]", result.getRecordMetadata().partition());
                        logger.info("async send message success offest[{}]", result.getRecordMetadata().offset());
                    }
    
                    @Override
                    public void onFailure(Throwable ex) {
                        logger.error("async send message fail [{}]", ex.getMessage());
    
                    }
                });
            }
            return "success";
        }
    }

    消费者

    package com.niugang.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * 
     * @ClassName: ConsumerListener
     * @Description:消费者监听
     * @author: niugang
     * @date: 2018年10月21日 下午2:05:21
     * @Copyright: 863263957@qq.com. All rights reserved.
     *
     */
    @Component
    public class ConsumerListener {
        private Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
    
        @KafkaListener(id = "foo", topics = "kafka-boot")
        public void listen1(String foo) {
            logger.info("message content [{}]", foo);
        }
    }

    启动类

    package com.niugang;
    
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.web.support.SpringBootServletInitializer;
    
    
    /**
     * 
     * @ClassName:  KafkaApplication   
     * @Description:启动类  
     * @author: niugang
     * @date:   2018年10月20日 下午7:55:38   
     * @Copyright: 863263957@qq.com. All rights reserved. 
     *
     */
    @SpringBootApplication
    public class KafkaApplication extends SpringBootServletInitializer {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    
    }

    源码:https://gitee.com/niugangxy/kafka/tree/master/kafka-spring-boot

    微信公众号

                              
  • 相关阅读:
    阿里云安装Mono 发生错误解决方法
    在Entity Framework 中执行Tsql语句
    WinRT app guide
    开源稳定的消息队列 RabbitMQ
    Catpic: OpenSocial Container on .NET
    MSDTC 故障排除
    HTML5 canvas图形库RGraph
    《我的WCF之旅》博文系列汇总
    TSQL Enhancement in SQL Server 2005[下篇]
    谈谈基于SQL Server 的Exception Handling[上篇]
  • 原文地址:https://www.cnblogs.com/niugang0920/p/12187135.html
Copyright © 2011-2022 走看看