zoukankan      html  css  js  c++  java
  • SpringBoot

    前言

    RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,记录下SpringBoot整合RocketMQ的方式,RocketMQ的安装可以查看:Windows下安装RocketMQ


    环境

    SpringBoot2.5.3 + RocketMQ4.7.0


    具体实现

    • pom.xml
    <!-- rocketmq -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.0</version>
    </dependency>
    
    • application.yml
    rocketmq:
      producer:
        producer-group: CoisiniProducerGroup
      consumer:
        consumer-group: CoisiniConsumerGroup
      namesrv-addr: 127.0.0.1:9876
    
    • MQ生产者
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    import java.util.Objects;
    
    /**
     * @Description MQ生产者
     * @author coisini
     * @date Aug 25, 2021
     * @Version 1.0
     */
    @Component
    public class ProducerSchedule {
    
        private DefaultMQProducer producer;
    
        @Value("${rocketmq.producer.producer-group}")
        private String producerGroup;
    
        @Value("${rocketmq.namesrv-addr}")
        private String nameSrvAddr;
    
        public ProducerSchedule() {
    
        }
    
        /**
         * 生产者构造
         * @PostConstruct该注解被用来修饰一个非静态的void()方法
         * Bean初始化的执行顺序:
         * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
         */
        @PostConstruct
        public void defaultMQProducer() {
            if (Objects.isNull(this.producer)) {
                this.producer = new DefaultMQProducer(this.producerGroup);
                this.producer.setNamesrvAddr(this.nameSrvAddr);
            }
    
            try {
                this.producer.start();
                System.out.println("Producer start");
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 消息发布
         * @param topic
         * @param tag
         * @param messageText
         * @return
         */
        public String send(String topic, String messageText){
            Message message = new Message(topic, messageText.getBytes());
    
            /**
             * 延迟消息级别设置
             * messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
             */
            message.setDelayTimeLevel(4);
    
            SendResult result = null;
            try {
                result = this.producer.send(message);
                System.out.println("MessageQueue: " + result.getMessageQueue());
                System.out.println("MsgId: " + result.getMsgId());
                System.out.println("SendStatus: " + result.getSendStatus());
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return result.getMsgId();
        }
    }
    
    • MQ消费者
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    /**
     * @Description MQ消费者
     *      CommandLineRunner 初始化预加载数据
     * @author coisini
     * @date Aug 25, 2021
     * @Version 1.0
     */
    @Component
    public class ConsumerSchedule implements CommandLineRunner {
    
        @Value("${rocketmq.consumer.consumer-group}")
        private String consumerGroup;
    
        @Value("${rocketmq.namesrv-addr}")
        private String nameSrvAddr;
    
        public void messageListener() throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.consumerGroup);
            consumer.setNamesrvAddr(this.nameSrvAddr);
    
            /**
             * 订阅主题
             */
            consumer.subscribe("Topic", "*");
    
            /**
             * 设置消费消息数
             */
            consumer.setConsumeMessageBatchMaxSize(1);
    
            /**
             * 注册消息监听
             */
            consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
                for (Message message : messages) {
                    System.out.println("监听到消息:" + new String(message.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
    
            consumer.start();
        }
    
        @Override
        public void run(String... args) throws Exception {
            this.messageListener();
        }
    }
    
    • 测试接口
    @RestController
    @RequestMapping("/test")
    public class TestController {
    
        @Autowired
        private ProducerSchedule producerSchedule;
        
        @GetMapping("/push")
        public void pushMessageToMQ() throws Exception {
            producerSchedule.send("Topic", "Coisini");
        }
    }
    
    • 接口调用:

    在这里插入图片描述

    • 30s后延迟消息触发:

    在这里插入图片描述


    - End -
    梦想是咸鱼
    关注一下吧
    以上为本篇文章的主要内容,希望大家多提意见,如果喜欢记得点个推荐哦
    作者:Maggieq8324
    本文版权归作者和博客园共有,欢迎转载,转载时保留原作者和文章地址即可。
  • 相关阅读:
    splay
    开车旅行(2012day1T3)
    LCT入门
    最小瓶颈路
    poj 3041 Asteroids
    sql waitfor 延时执行
    [Microsoft][ODBC SQL Server Driver][DBNETLIB]SQL Server 不存在或访问被拒绝
    SQL Server中行列转换
    sql中 with rollup 、with cube、grouping 统计函数用法
    sql 分组后 组内排名
  • 原文地址:https://www.cnblogs.com/maggieq8324/p/15188150.html
Copyright © 2011-2022 走看看