zoukankan      html  css  js  c++  java
  • Spring Kafka整合Spring Boot创建生产者客户端案例

    每天学习一点点 编程PDF电子书、视频教程免费下载:
    http://www.shitanlife.com/code

    创建一个kafka-producer-master的maven工程。整个项目结构如下:

    Maven的依赖

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.gzh.kafka.producer</groupId>
        <artifactId>producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>kafka-producer-master</name>
        <description>demo project for kafka producer</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            
            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>${spring-kafka.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <version>${spring-kafka.version}</version>
                <scope>test</scope>
            </dependency>
            
            <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
            <dependency>
                <groupId>io.springfox</groupId>
                <artifactId>springfox-swagger2</artifactId>
                <version>2.8.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
            <dependency>
                <groupId>io.springfox</groupId>
                <artifactId>springfox-swagger-ui</artifactId>
                <version>2.8.0</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    复制代码

     使用application.properties配置应用程序

    当然,根据个人喜好,你也可以使用application.yml属性文件重写配置。Spring Boot会尝试根据pom.xml文件中指定的依赖关系自动配置应用程序,并设置合理的默认值。

    复制代码
    server.port=8000
    spring.application.name=kafka-producer
    #kafka configuration
    spring.kafka.producer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    #topic
    kafka.app.topic.foo=test20180430
    复制代码

    在上面的配置中,我给生产者分配的端口号是8000,服务器有3台,采用先前window环境搭建zookeeper,kafka集群 中配置的服务器。想了解关于kafka生产者相关的更多配置的话,可以阅读关于Spring Boot Kafka Properties的配置信息。

    使用Spring Boot发送Spring Kafka消息

    SpringKafka提供了使用Producer的KafkaTemplate类发送消息,并提供将数据发送到Kafka主题的高级操作。 提供异步和同步方法,异步方法返回Future。Spring Boot根据application.properties属性文件中配置的属性自动配置并初始化KafkaTemplate。为了方便测试发送消息,使用了Spring的定时任务,在类上使用@EnableScheduling 注解开启定时任务,通过@Scheduled注解指定发送消息规则。

    复制代码
    package com.gzh.kafka.producer.component;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    
    @Component
    @EnableScheduling
    public class KafkaMessageProducer {
    
        private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageProducer.class);
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Value("${kafka.app.topic.foo}")
        private String topic;
    
        @Scheduled(cron = "00/5 * * * * ?")
        public void send() {
            String message = "Hello World---" + System.currentTimeMillis();
            LOG.info("topic="+topic+",message="+message);
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
            future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),
                    fail -> LOG.error("KafkaMessageProducer 发送消息失败!"));
        }
    }
    复制代码

    创建消息生产者启动类

    复制代码
    package com.gzh.kafka.producer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    
    @SpringBootApplication
    @EnableConfigurationProperties
    public class KafkaProducerApplication{
        
        public static void main(String[] args) {
            SpringApplication.run(KafkaProducerApplication.class, args);
        }
    }
    复制代码

    至此,Spring Boot整合Spring Kafka消息生产者应用已经整合完毕。启动zookeeper、kafka各个服务器。启动生产者应用,查看消息生产者应用控制台日志,如下图说明整合OK。

     当然在创建消息生产者类时,我们可以更加灵活,可以不使用定时任务,通过界面请求的方式,发送我们想要发送的内容。简单案例如下:

    • 消息发送者类
    复制代码
    package com.gzh.kafka.producer.service;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Service;
    import org.springframework.util.concurrent.ListenableFuture;
    
    @Service
    public class KafkaMessageSendService {
    
        private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSendService.class);
        
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
        
        @Value("${kafka.app.topic.foo}")
        private String topic;
        
        public void send(String message){
            LOG.info("topic="+topic+",message="+message);
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
            future.addCallback(success -> LOG.info("KafkaMessageProducer 发送消息成功!"),
                    fail -> LOG.error("KafkaMessageProducer 发送消息失败!"));
        }
    }
    复制代码
    • 界面请求处理controller类
    复制代码
    package com.gzh.kafka.producer.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.gzh.kafka.producer.service.KafkaMessageSendService;
    
    @RestController
    @RequestMapping(value="send",produces=MediaType.APPLICATION_JSON_UTF8_VALUE)
    public class KafkaMessageSendController {
    
        @Autowired
        private KafkaMessageSendService kafkaMessageSendService;
        
        @RequestMapping(value="/sendMessage",method=RequestMethod.POST)
        public String send(@RequestParam(required=true) String message){
            try {
                kafkaMessageSendService.send(message);
            } catch (Exception e) {
                return "send failed.";
            }
            return message;
        }
    }
    复制代码
    • 通过Swagger访问测试Controller服务请求

     Spring Kafka整合Spring Boot创建消费者客户端案例:

    创建一个kafka-consumer-master的maven工程。整个项目结构如下:

    Maven的依赖

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.gzh.kafka.consumer</groupId>
        <artifactId>consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>kafka-consumer-master</name>
        <description>demo project for kafka consumer</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <spring-kafka.version>1.3.4.RELEASE</spring-kafka.version>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>${spring-kafka.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <version>${spring-kafka.version}</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    复制代码

    注意,这是使用Spring-Kafka时一定要注意版本问题,否则会报各种奇葩错误。Spring官方网站上给出了SpringKafka和kafka-client版本(它的版本号要和kafka服务器的版本保持一致)的对应关系:

    使用application.properties配置应用程序

    Spring Boot会尝试根据pom.xml文件中指定的依赖关系自动配置应用程序,并设置合理的默认值。

    复制代码
    server.port=8001
    spring.application.name=kafka-consumer
    
    #kafka configuration
    #指定消息被消费之后自动提交偏移量,以便下次继续消费
    spring.kafka.consumer.enable-auto-commit=true
    #指定消息组
    spring.kafka.consumer.group-id=guan
    #指定kafka服务器地址
    spring.kafka.consumer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094
    #指定从最近地方开始消费(earliest)
    spring.kafka.consumer.auto-offset-reset=latest
    
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #topic
    kafka.app.topic.foo=test20180430
    复制代码

    在上面的配置中,我给生产者分配的端口号是8000,服务器有3台,采用先前window环境搭建zookeeper,kafka集群 中配置的服务器。想了解关于kafka生产者相关的更多配置的话,可以阅读关于Spring Boot Kafka Properties的配置信息。

    使用Spring Boot消费Spring Kafka消息

    通过使用@KafkaListener来注解一个方法Spring Kafka会自动创建一个消息监听器容器。使用该注解,并指定要消费的topic(也可以指定消费组以及分区号,支持正则表达式匹配),这样,消费者一旦启动,就会监听kafka服务器上的topic,实时进行消费消息。

    复制代码
    package com.gzh.kafka.consumer.service;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class KafkaMessageConsumer {
    
        private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class);
        
        @KafkaListener(topics={"${kafka.app.topic.foo}"})
        public void receive(@Payload String message, @Headers MessageHeaders headers){
            LOG.info("KafkaMessageConsumer 接收到消息:"+message);
            headers.keySet().forEach(key->LOG.info("{}: {}",key,headers.get(key)));
        }
    }
    复制代码

    创建消息消费者启动类

    复制代码
    package com.gzh.kafka.consumer;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    
    @SpringBootApplication
    @EnableConfigurationProperties
    public class KafkaConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaConsumerApplication.class, args);
        }
    }
    复制代码

    消费者应用已经完成,接下来让我们验证Spring Kafka消息发送和接收效果。先依次启动zookeeper、kafka服务器,然后在启动生产者(kafka-producer-master)应用,再启动消费者(kafka-consumer-master)应用,然后观察生产者和消费者启动类日志:

    每天学习一点点 编程PDF电子书、视频教程免费下载:
    http://www.shitanlife.com/code

  • 相关阅读:
    慢sql
    drf 和django 字段参数解析
    django uwsgi
    django 中间件原理图和实现方法
    解决 控制台console导入模型报错 django.core.exceptions.ImproperlyConfigured: Requested setting INSTALLED_APPS, but settings are not configured.
    版本控制器 django全局和局部配置
    极客论坛Cpu瓶颈定位思路
    jmeter grpc 自定义开发java请求案例
    论文阅读笔记四十七:Generalized Intersection over Union: A Metric and A Loss for Bounding Box Regression(CVPR2019)
    论文阅读笔记四十六:Feature Selective Anchor-Free Module for Single-Shot Object Detection(CVPR2019)
  • 原文地址:https://www.cnblogs.com/scode2/p/8984937.html
Copyright © 2011-2022 走看看