zoukankan      html  css  js  c++  java
  • springboot整合kafka

    一、启动zookeeper和kafka

    bin/zkServer.sh start
    bin/kafka-server-start.sh config/server.properties

    二、新建项目

    新建一个SpringBoot项目,引入所需jar包。

     1         <dependency>
     2             <groupId>org.springframework.kafka</groupId>
     3             <artifactId>spring-kafka</artifactId>
     4         </dependency>
     5 
     6         <dependency>
     7             <groupId>org.projectlombok</groupId>
     8             <artifactId>lombok</artifactId>
     9             <optional>true</optional>
    10         </dependency>
    11 
    12         <dependency>
    13             <groupId>com.google.code.gson</groupId>
    14             <artifactId>gson</artifactId>
    15             <version>2.8.2</version>
    16         </dependency>

    这是主要用到的,注意版本问题,开始我就是spring-kafka版本写错了一直报错(技巧是这里不写具体版本,它会自动引入),具体参考这里

    配置文件application.properties,配置在bootstrap.yml上不行。

    #============== kafka ===================
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=192.168.75.132:9092
    
    #=============== provider  =======================
    
    spring.kafka.producer.retries=0
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    
    # 指定消息key和消息体的编解码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    #=============== consumer  =======================
    # 指定默认消费者group id
    spring.kafka.consumer.group-id=test-consumer-group
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    代码部分:

     1 package cn.sp.component;
     2 
     3 import cn.sp.entity.Message;
     4 import com.google.gson.Gson;
     5 import com.google.gson.GsonBuilder;
     6 import lombok.extern.slf4j.Slf4j;
     7 import org.springframework.beans.factory.annotation.Autowired;
     8 import org.springframework.boot.CommandLineRunner;
     9 import org.springframework.kafka.core.KafkaTemplate;
    10 import org.springframework.stereotype.Component;
    11 
    12 import java.util.Date;
    13 import java.util.UUID;
    14 
    15 /**
    16  * @Author: 2YSP
    17  * @Description:
    18  * @Date: Created in 2018/5/2
    19  */
    20 @Component
    21 //如果不想每次都写private  final Logger logger = LoggerFactory.getLogger(XXX.class); 可以用注解@Slf4j
    22 @Slf4j
    23 public class KafkaSender implements CommandLineRunner{
    24 
    25     @Autowired
    26     private KafkaTemplate<String,String> kafkaTemplate;
    27 
    28     private Gson gson = new GsonBuilder().create();
    29 
    30     public void send(){
    31         Message message = new Message();
    32         message.setId(System.currentTimeMillis());
    33         message.setMsg(UUID.randomUUID().toString());
    34         message.setSendTime(new Date());
    35         log.info("++++++++++++++message:{}",gson.toJson(message));
    36         kafkaTemplate.send("ship",gson.toJson(message));
    37     }
    38 
    39     @Override
    40     public void run(String... strings) throws Exception {
    41         for(int i=0;i<3;i++){
    42             send();
    43             try {
    44                 Thread.sleep(1000);
    45             }catch (InterruptedException e){
    46                 e.printStackTrace();
    47             }
    48         }
    49     }
    50 }

    项目一启动就会发送3次消息。

     1 package cn.sp.component;
     2 
     3 import lombok.extern.slf4j.Slf4j;
     4 import org.apache.kafka.clients.consumer.ConsumerRecord;
     5 import org.springframework.kafka.annotation.KafkaListener;
     6 import org.springframework.stereotype.Component;
     7 
     8 import java.util.Optional;
     9 
    10 /**
    11  * @Author: 2YSP
    12  * @Description:
    13  * @Date: Created in 2018/5/2
    14  */
    15 @Component
    16 @Slf4j
    17 public class KafkaReceiver {
    18 
    19   @KafkaListener(topics = {"ship"})
    20   public void listen(ConsumerRecord<?,?> record){
    21       Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    22       if (kafkaMessage.isPresent()){
    23           Object message = kafkaMessage.get();
    24           log.info("===========record:{}",record);
    25           log.info("===========message:{}",message);
    26       }
    27   }
    28 }

    这里可以指定多个主题,topics={"ship","test"}。

    三、启动测试

    启动项目可以看到控制台日志输出如下:

    消费了3次,生产消息3个。

    代码地址:点击这里

     

  • 相关阅读:
    设计模式
    刷新所有视图存储过程
    js杨辉三角控制台输出
    2018申请淘宝客AppKey
    w3c标准 dom对象 事件冒泡和事件捕获
    promise原理
    vue virtual Dom
    css学习
    seo优化
    新概念学习
  • 原文地址:https://www.cnblogs.com/2YSP/p/8982663.html
Copyright © 2011-2022 走看看