zoukankan      html  css  js  c++  java
  • Kafka和SpringBoot

    事先必备:

    kafka已安装完成

    1.目录结构

    2.父pom

    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.5.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>org.example</groupId>
        <artifactId>KafkaAndSpringBoot</artifactId>
        <packaging>pom</packaging>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
        <modules>
            <module>KafkaProducer</module>
            <module>KafkaConsumer</module>
        </modules>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
        </dependencies>
    </project>

    3.producer模块

    A.application.properties

    server.port=8081
    
    #kafka节点
    spring.kafka.bootstrap-servers=192.168.204.139:9092
    #kafka发送消息失败后的重试次数
    spring.kafka.producer.retries=0
    #当消息达到该值后再批量发送消息.16kb
    spring.kafka.producer.batch-size=16384
    #设置kafka producer内存缓冲区大小.32MB
    spring.kafka.producer.buffer-memory=33554432
    #kafka消息的序列化配置
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    #acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。??
    #acks=1 : 只要集群的leader节点收到消息,生产者就会收到一个来自服务器成功响应。
    #acks=-1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为producer请求成功。
    #         这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
    spring.kafka.producer.acks=1

    B.producer代码

    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    @Component
    @Slf4j
    @Data
    public class KafkaProducerDemo {
        private final KafkaTemplate<String, Object> kafkaTemplate;
    
        public void sendMsg(String topic, Object object) {
            ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, object);
            send.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("消息发送失败:{}", ex.toString());
                }
    
                @Override
                public void onSuccess(SendResult<String, Object> result) {
                    log.info("消息发送成功:{}", result.toString());
                }
            });
        }
    }

    C.启动类

     略

    D.producerTest

    import com.sakura.producer.KafkaProducerDemo;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ProducerTest {
        @Autowired
        private KafkaProducerDemo kafkaProducerDemo;
    
        @Test
        public void send() throws InterruptedException {
            String topic = "firstTopic";
            for (int i = 0; i < 6; i++) {
                kafkaProducerDemo.sendMsg(topic, "Hello kafka," + i);
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

    4.consumer模块

     A.application.properties

    server.port=8082
    #kafka节点
    spring.kafka.bootstrap-servers=192.168.204.139:9092
    #consumer消息签收机制
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.listener.ack-mode=manual
    #如果没有设置offset或者设置的offset不存在时(例如数据被删除)采取的策略:
    #earliest:使用最早的offset
    #latest:使用最新的offset
    #none:使用前一个offset,如果没有就向consumer抛异常
    #anything else:直接向consumer抛出异常
    spring.kafka.consumer.auto-offset-reset=earliest
    ## 序列化配置
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #监听消息消费的线程数,值范围在[1,partitionCounts]之间.
    #假如有3个partition,concurrency的值为4,@KafkaListener的数量为2.
    #其中一个@KafkaListener会启动两个线程分配到两个partition
    #另一个@KafkaListener会启动一个线程分配到另一个partition
    #当有一个@KafkaListener挂掉之后会触发broker的再均衡,由剩余的@KafkaListener启动线程重新分配至partition.
    #@KafkaListener就像是消费者一样的存在,当值为1时broker会认为只有一个消费者在消费topic.
    spring.kafka.listener.concurrency=1

    B.consumer代码

    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class KafkaConsumerDemo {
        @KafkaListener(topics = "firstTopic",groupId = "groupDemo")
        public void receiveMsg(ConsumerRecord<String, Object> record,
                               Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            log.info("消费消息:{}", record.value());
            //手动ack
            acknowledgment.acknowledge();
            consumer.commitAsync();
        }
    }

    C.启动类

  • 相关阅读:
    Windows下升级MySQL5.0到5.5
    聊聊MVC和模块化以及MVVM和组件化
    还有很多行业,并没有和互联网相加
    用React实现一个自动生成文章目录的组件
    一个Js开发者学习Python的第一天
    React弹窗组件
    React项目开发经验汇总
    Audio 标签的使用和自己封装一个强大的React音乐播放器
    你知道的和不知道的sass
    我眼中javascript的这些年
  • 原文地址:https://www.cnblogs.com/monument/p/12944702.html
Copyright © 2011-2022 走看看