zoukankan      html  css  js  c++  java
  • 最新RocketMq与SpringBoot整合

    最新版的RocketMqSpringBoot2.X进行整合可以利用rocketmq-spring-boot-starter来简化配置,本文采用了最新版的jar包来整合,并且略微做了封装,以便于其他模块引用,适合于多生产者多消费者的情况。

    项目依赖

    主要用到了rocketmq的包和lombok的包,具体的依赖如下所示:

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
    

    配置项

    rocketmq.name-server=127.0.0.1:9876
    rocketmq.producer.enable=true
    
    rocketmq.producer.group=test-group-e
    rocketmq.producer.topic=test-topic-e
    
    rocketmq.producer.group2=test-group-f
    rocketmq.producer.topic2=test-topic-f
    
    
    
    rocketmq.consume.group=consumer-group-3
    rocketmq.consume.topic=test-topic-e
    
    rocketmq.consume.group2=consumer-group-4
    rocketmq.consume.topic2=test-topic-f
    

    最重要的是rocketmq.name-serverrocketmq.producer.group这两个属性一定要配置,否则项目无法启动,在封装成中间件的时候rocketmq.producer.group可以随意指定一个,亲测即便指定了也不会在启动的时候就生成这个生产者的group.rocketmq.producer.enable这个是我自己加的,为了让rocketmq能按需加载,因为封装成中间件的话其他模块引入,有的可能用不到,所以bean需要按条件加载。 剩下的group和topic我分别都指定了两个,为了模拟多消费者和多生产者的情况。没有用tag来区分,因为Tag使用不当会引来不必要的麻烦,不同的功能严格按照topic和group来区分。

    封装的消息类

    package com.rocket.mq.demo.controller;
    
    import lombok.Data;
    
    import java.io.Serializable;
    
    /**
     * @author : zhangwei
     * @description : 消息
     * @date: 2020-08-20 11:03
     */
    
    @Data
    public class RocketMqMessage<T> implements Serializable {
        /**
         * 消息内容
         */
        private T content;
    
    
        /**
         * 消息的key
         */
        private String msgKey;
    
        /**
         * topic
         */
        private String producerTopic;
        /**
         * group
         */
        private String producerGroup;
        /**
         * tag
         */
        private String producerTag;
    }
    

    发送消息的公共方法

    package com.rocket.mq.demo.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import java.util.UUID;
    
    /**
     * @author : zhangwei
     * @description : Mq发送消息的类
     * @date: 2020-08-21 09:54
     */
    @Component
    @Slf4j
    @ConditionalOnProperty(name = "rocketmq.producer.enable", havingValue = "true")
    public class MqSendService {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 发送带tag的消息
         *
         * @param msg
         * @param topic
         * @param group
         * @param tag
         * @author: zhangwei
         * @date: 2020/8/21 10:54
         * @return: org.apache.rocketmq.client.producer.SendResult
         **/
        private <T> SendResult send(T msg, String topic, String group, String tag) {
            if (StringUtils.isBlank(topic) || StringUtils.isBlank(group)) {
                new Throwable("发送方topic或者group不能为空");
            }
            String uuid = UUID.randomUUID().toString().replaceAll("-", "");
            RocketMqMessage message = new RocketMqMessage();
            message.setProducerTopic(topic);
            message.setProducerGroup(group);
            message.setProducerTag(tag);
            message.setContent(msg);
            message.setMsgKey(uuid);
            // 发送消息
            Message messageFinal = MessageBuilder.withPayload(message).setHeader("KEYS", uuid).build();
            String destination = topic;
            if (StringUtils.isNotBlank(tag)) {
                destination = topic + ":" + tag;
            }
            SendResult result = rocketMQTemplate.syncSend(destination, messageFinal);
            log.info("成功发送消息,消息内容为:{},返回值为:{}", message, result);
            return result;
        }
    
        /**
         * 发送不带tag的消息
         *
         * @param msg
         * @param topic
         * @param group
         * @author: zhangwei
         * @date: 2020/8/21 10:54
         * @return: org.apache.rocketmq.client.producer.SendResult
         **/
        public <T> SendResult send(T msg, String topic, String group) {
            return this.send(msg, topic, group, null);
        }
    
    
    }
    
    

    多生产者发送消息示例

    package com.rocket.mq.demo.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author : zhangwei
     * @description : mq消息发送服务
     * @date: 2020-08-20 19:11
     */
    @Slf4j
    @Service
    @RestController
    public class Producer {
    
        @Value(value = "${rocketmq.producer.topic}")
        private String topic;
    
        @Value(value = "${rocketmq.producer.group}")
        private String group;
    
        @Autowired
        private MqSendService mqSendService;
    
        @GetMapping("/test-rocketmq/sendMsg")
        public String testSendMsg() {
            List<String> list=new ArrayList<>();
            list.add("1");
            list.add("2");
            list.add("3");
            mqSendService.send(list,topic,group);
            return "send message success";
        }
    }
    
    
    package com.rocket.mq.demo.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author : zhangwei
     * @description : mq消息发送服务
     * @date: 2020-08-20 19:11
     */
    @Slf4j
    @Service
    @RestController
    public class Producer2 {
    
        @Value(value = "${rocketmq.producer.topic2}")
        private String topic;
    
        @Value(value = "${rocketmq.producer.group2}")
        private String group;
    
        @Autowired
        private MqSendService mqSendService;
    
        @GetMapping("/test-rocketmq/sendMsg2")
        public String testSendMsg() {
            List<String> list=new ArrayList<>();
            list.add("1");
            list.add("2");
            list.add("3");
            mqSendService.send(list,topic,group);
            return "send message success";
        }
    }
    
    

    多消费者消费消息示例

    package com.rocket.mq.demo.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * @author : zhangwei
     * @description : ed
     * @date: 2020-08-20 16:29
     */
    @Slf4j
    @Component
    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
    @RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic}", consumerGroup = "${rocketmq.consume.group}")
    public  class Consumer implements  RocketMQListener<RocketMqMessage> {
    
        @Override
        public void onMessage(RocketMqMessage message) {
            log.info("======我收到了消息,消息内容为:{}",message);
        }
    }
    
    
    package com.rocket.mq.demo.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * @author : zhangwei
     * @description : ed
     * @date: 2020-08-20 16:29
     */
    @Slf4j
    @Component
    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
    @RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic2}", consumerGroup = "${rocketmq.consume.group2}")
    public  class Consumer2 implements  RocketMQListener<RocketMqMessage> {
    
        @Override
        public void onMessage(RocketMqMessage message) {
            log.info("======我收到了消息,消息内容为:{}",message);
        }
    }
    

    镜像地址

    https://www.zhangwei.wiki/#/posts/16

    pay

  • 相关阅读:
    cesium 学习(七) HeadingPitchRoll
    cesium 学习(六) 坐标转换
    cesium 学习(五) 加载场景模型
    Cesium 学习(一)环境搭建
    Cesium 学习(二)所支持的模型数据类型,以及转换
    cesium 学习(四) Hello World
    Cesium 学习(三)各种资源链接
    【Windows编程】系列第十一篇:多文档界面框架
    【Windows编程】系列第十篇:文本插入符
    【Windows编程】系列第八篇:通用对话框
  • 原文地址:https://www.cnblogs.com/coderzhw/p/13589737.html
Copyright © 2011-2022 走看看