zoukankan      html  css  js  c++  java
  • SpringBoot2整合kafka集群

    参考

    SpringBoot整合kafka集群
    SpringBoot整合kafka(实现producer和consumer)

    1. 在pom.xml中引入依赖

     <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.5.7.RELEASE</version>
            </dependency>
    

    完整的pom.xml文件如下

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.5.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.roncoo.eshop</groupId>
        <artifactId>cache</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>cache</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <spring.framework.version>5.2.10.RELEASE</spring.framework.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-thymeleaf</artifactId>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.49</version>
                <scope>runtime</scope>
            </dependency>
    
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>2.1.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
    
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.10</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- Spring Boot 缓存支持启动器 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-cache</artifactId>
            </dependency>
            <!-- Ehcache 坐标 -->
            <dependency>
                <groupId>net.sf.ehcache</groupId>
                <artifactId>ehcache</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.5.7.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    2. 配置application.yml

    spring:
      kafka:
        bootstrap-servers: http://127.0.0.1:9091,http://127.0.0.1:9092,http://127.0.0.1:9093
        producer:
          retries: 3
          acks: all
          batch-size: 16384
          buffer-memory: 33554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: eshop-cache-group
          auto-offset-reset: earliest
          enable-auto-commit: false
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 20000
        listener:
          concurrency: 3
          ack-mode: MANUAL
    

    采用Kafka提供的StringSerializer和StringDeserializer进行序列化和反序列化

    3. 编写bean,service,controller

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.io.Serializable;
    
    /**
     * @author john
     * @date 2020/11/5 - 14:11
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class KafkaMessage implements Serializable {
        private String serviceId;
        private Long Id;
    }
    
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.roncoo.eshop.cache.model.KafkaMessage;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    // 生产者向kafka发送消息
    @RestController
    @Slf4j
    public class ProducerController {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        @GetMapping("/send_product")
        public String sendproductmsg() throws JsonProcessingException {
            KafkaMessage kafkaMessage = new KafkaMessage();
            kafkaMessage.setId(1L);
            kafkaMessage.setServiceId("productInfoService");
    
            ObjectMapper mapper = new ObjectMapper();
            String kafkaJsonMessage = mapper.writeValueAsString(kafkaMessage);
            kafkaTemplate.send("eshop-cache", kafkaJsonMessage); //使用kafka模板发送信息
            String res = "消息:【" + kafkaJsonMessage + "】发送成功 SUCCESS !";
            log.info(res);
            return res;
        }
    
        @GetMapping("/send_shop")
        public String sendshopmsg() throws JsonProcessingException {
            KafkaMessage kafkaMessage = new KafkaMessage();
            kafkaMessage.setId(1L);
            kafkaMessage.setServiceId("shopInfoService");
    
            ObjectMapper mapper = new ObjectMapper();
            String kafkaJsonMessage = mapper.writeValueAsString(kafkaMessage);
            kafkaTemplate.send("eshop-cache", kafkaJsonMessage); //使用kafka模板发送信息
            String res = "消息:【" + kafkaJsonMessage + "】发送成功 SUCCESS !";
            log.info(res);
            return res;
        }
    }
    
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.io.Serializable;
    
    /**
     * 商品信息
     *
     * @author Administrator
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class ProductInfo implements Serializable {
    
        private Long id;
        private String name;
        private Double price;
    }
    
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @author john
     * @date 2020/11/5 - 13:30
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class ShopInfo {
        private Long id;
        private String name;
        private Integer level;
        private Double goodCommentRate;
    }
    

    4. 编写消息消费者

    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.roncoo.eshop.cache.model.KafkaMessage;
    import com.roncoo.eshop.cache.model.ProductInfo;
    import com.roncoo.eshop.cache.model.ShopInfo;
    import com.roncoo.eshop.cache.service.CacheService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class ConsumerListener {
    
        @Autowired
        private CacheService cacheService;
    
    
        //建议看一下KafkaListener的源码 很多api 我们也可以指定分区消费消息
    // topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
        @KafkaListener(topics = "eshop-cache", groupId = "eshop-cache-group")
        public void listen(String message, Acknowledgment ack) throws JsonProcessingException {
            log.info("消息内容" + message);
    
            // 解析消息
            ObjectMapper mapper = new ObjectMapper();
            KafkaMessage kafkaMessage = mapper.readValue(message, KafkaMessage.class);
            String serviceId = kafkaMessage.getServiceId();
    
            // 如果是商品信息服务
            if ("productInfoService".equals(serviceId)) {
                // 处理商品消息
                //processProductInfoChangeMessage(kafkaMessage);
            } else if ("shopInfoService".equals(serviceId)) {
                // 处理店铺消息
               // processShopInfoChangeMessage(kafkaMessage);
            }
    
    
            //手动提交offset
            ack.acknowledge();
            log.info("消费结束");
        }
      
    }
    
    

    5. 测试

    参考代码

    6. 采用自定义序列化和反序列化器进行实体类的序列化和反序列化(不推荐)

    和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口Serializer。假设每个字段按照下图所示的方式自定义序列化:

    1. 创建KafkaMessage序列化器

    import com.roncoo.eshop.cache.model.KafkaMessage;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    import java.util.Map;
    
    /**
     * @author john
     * @date 2020/11/5 - 17:00
     */
    public class KafkaMessageSerializable implements Serializer<KafkaMessage> {
        @Override
        public void configure(Map<String, ?> map, boolean b) {
    
        }
    
        @Override
        public byte[] serialize(String topic, KafkaMessage kafkaMessage) {
            byte[] dataArray = null;
            ByteArrayOutputStream outputStream = null;
            ObjectOutputStream objectOutputStream = null;
            try {
                outputStream = new ByteArrayOutputStream();
                objectOutputStream = new ObjectOutputStream(outputStream);
                objectOutputStream.writeObject(kafkaMessage);
                dataArray = outputStream.toByteArray();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (outputStream != null) {
                    try {
                        outputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (objectOutputStream != null) {
                    try {
                        objectOutputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            return dataArray;
        }
    
        @Override
        public void close() {
    
        }
    }
    

    2.创建KafkaMessage反序列化器

    import com.roncoo.eshop.cache.model.KafkaMessage;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.util.Map;
    
    /**
     * @author john
     * @date 2020/11/5 - 17:02
     */
    public class KafkaMessageDeserializer implements Deserializer<KafkaMessage> {
        @Override
        public void configure(Map<String, ?> map, boolean b) {
    
        }
    
        @Override
        public KafkaMessage deserialize(String topic, byte[] bytes) {
            KafkaMessage kafkaMessage = null;
            ByteArrayInputStream inputStream = null;
            ObjectInputStream objectInputStream = null;
            try {
                inputStream = new ByteArrayInputStream(bytes);
                objectInputStream = new ObjectInputStream(inputStream);
                kafkaMessage = (KafkaMessage) objectInputStream.readObject();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (objectInputStream != null) {
                    try {
                        objectInputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            return kafkaMessage;
        }
    
        @Override
        public void close() {
    
        }
    }
    

    3. 修改application.yml中生产者配置的value-serializer和修改消费者配置的value-deserializer配置

    spring:
      ...
      kafka:
        ...
        producer:
          ...
          value-serializer: com.roncoo.eshop.cache.serializable.KafkaMessageSerializable
        consumer:
    	  ...
          value-deserializer: com.roncoo.eshop.cache.serializable.KafkaMessageDeserializer
    
    

    4. 修改生产者控制器部分代码

    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.roncoo.eshop.cache.model.KafkaMessage;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Slf4j
    public class ProducerController {
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        @GetMapping("/send_product")
        public String sendproductmsg() throws JsonProcessingException {
            KafkaMessage kafkaMessage = new KafkaMessage();
            kafkaMessage.setId(1L);
            kafkaMessage.setServiceId("productInfoService");
    
            kafkaTemplate.send("eshop-cache", kafkaMessage); //使用kafka模板发送信息
            String res = "消息:【" + kafkaMessage.toString() + "】发送成功 SUCCESS !";
            log.info(res);
            return res;
        }
    
        @GetMapping("/send_shop")
        public String sendshopmsg() throws JsonProcessingException {
            KafkaMessage kafkaMessage = new KafkaMessage();
            kafkaMessage.setId(1L);
            kafkaMessage.setServiceId("shopInfoService");
    
            kafkaTemplate.send("eshop-cache", kafkaMessage); //使用kafka模板发送信息
            String res = "消息:【" + kafkaMessage.toString() + "】发送成功 SUCCESS !";
            log.info(res);
            return res;
        }
    }
    

    5. 修改消费者部分代码

    @Component
    @Slf4j
    public class ConsumerListener {
    
        @Autowired
        private CacheService cacheService;
    
    
        //建议看一下KafkaListener的源码 很多api 我们也可以指定分区消费消息
    // topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
        @KafkaListener(topics = "eshop-cache", groupId = "eshop-cache-group")
        public void listen(KafkaMessage kafkaMessage, Acknowledgment ack) throws JsonProcessingException {
            log.info("消息内容" + kafkaMessage.toString());
    
            String serviceId = kafkaMessage.getServiceId();
    
            // 如果是商品信息服务
            if ("productInfoService".equals(serviceId)) {
                //processProductInfoChangeMessage(kafkaMessage);
            } else if ("shopInfoService".equals(serviceId)) {
                //processShopInfoChangeMessage(kafkaMessage);
            }
    
            //手动提交offset
            ack.acknowledge();
            log.info("消费结束");
        }
    }
    

    测试

    参考代码

  • 相关阅读:
    每日日报
    Java学习
    Java学习
    Java学习
    Java学习
    Java学习
    Java学习
    Java学习
    Java学习
    JAVA日报
  • 原文地址:https://www.cnblogs.com/ifme/p/13932842.html
Copyright © 2011-2022 走看看