zoukankan      html  css  js  c++  java
  • RabbitMQ学习12延迟队列

    1、从官网链接下载插件:

    Releases · rabbitmq/rabbitmq-delayed-message-exchange (github.com)

    本文下载的是3.9.0版本。

    2、安装插件

    将下载的文件rabbitmq_delayed_message_exchange-3.9.0.ez,上传到服务器rabbitmq的安装目录下,执行如下命令:

    cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.8/plugins/
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    3、重启rabbitmq

    service rabbitmq-server restart

    4、验证插件是否已安装

    重新刷新15672端口的管理界面,在交换机页面,可以看到,增加了一种x-delayed-message的消息类型:

    5、使用SpringBoot实现延迟队列

    先使用idea建立一个spring boot 的项目,并在pom中添加如下依赖:

    <dependencies>
    <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
        <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    application.yml文件配置:

    spring:
      rabbitmq:
        host: ubu
        port: 5672
        username: admin
        password: 123456
        virtual-host: /

    swagger配置类:

     1 package com.yas.rabbitmqboot.config;
     2 
     3 import org.springframework.context.annotation.Bean;
     4 import org.springframework.context.annotation.Configuration;
     5 import springfox.documentation.builders.ApiInfoBuilder;
     6 import springfox.documentation.service.ApiInfo;
     7 import springfox.documentation.service.Contact;
     8 import springfox.documentation.spi.DocumentationType;
     9 import springfox.documentation.spring.web.plugins.Docket;
    10 import springfox.documentation.swagger2.annotations.EnableSwagger2;
    11 
    12 @Configuration
    13 @EnableSwagger2
    14 public class SwaggerConfig {
    15     @Bean
    16     public Docket webApiConfig() {
    17         return new Docket(DocumentationType.SWAGGER_2)
    18                 .groupName("webApi")
    19                 .apiInfo(webApiInfo())
    20                 .select()
    21                 .build();
    22     }
    23 
    24     private ApiInfo webApiInfo() {
    25         return new ApiInfoBuilder()
    26                 .title("rabbitmq 接口文档")
    27                 .description("本文档描述了 rabbitmq 微服务接口定义")
    28                 .version("1.0")
    29                 .contact(new Contact("asenyang", "http://asenyang.com", "123456@qq.com")).build();
    30     }
    31 }

    生产者代码:

     1 package com.yas.rabbitmqboot.listen;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import lombok.extern.slf4j.Slf4j;
     5 import org.springframework.amqp.core.Message;
     6 import org.springframework.amqp.rabbit.annotation.RabbitListener;
     7 import org.springframework.amqp.rabbit.core.RabbitTemplate;
     8 import org.springframework.beans.factory.annotation.Autowired;
     9 import org.springframework.stereotype.Component;
    10 import org.springframework.web.bind.annotation.GetMapping;
    11 import org.springframework.web.bind.annotation.PathVariable;
    12 import org.springframework.web.bind.annotation.RequestMapping;
    13 import org.springframework.web.bind.annotation.RestController;
    14 
    15 import java.io.IOException;
    16 import java.util.Date;
    17 import java.util.concurrent.TimeUnit;
    18 
    19 @RestController
    20 @Slf4j
    21 @RequestMapping("/ttl")
    22 public class DelayedController {
    23 
    24     public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    25     public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    26 
    27     @Autowired
    28     RabbitTemplate rabbitTemplate;
    29 
    30     @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    31     public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    32         rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
    33                 correlationData -> {
    34                     correlationData.getMessageProperties().setDelay(delayTime);
    35                     return correlationData;
    36                 });
    37         log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
    38     }
    39 }

    消费者代码:

     1 package com.yas.rabbitmqboot.listen;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import lombok.extern.slf4j.Slf4j;
     5 import org.springframework.amqp.core.Message;
     6 import org.springframework.amqp.rabbit.annotation.RabbitListener;
     7 import org.springframework.beans.factory.annotation.Autowired;
     8 import org.springframework.data.redis.core.StringRedisTemplate;
     9 import org.springframework.stereotype.Component;
    10 
    11 import java.io.IOException;
    12 import java.util.Date;
    13 import java.util.concurrent.TimeUnit;
    14 
    15 @Component
    16 @Slf4j
    17 public class DelayedConsumer {
    18 
    19     public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    20 
    21     @RabbitListener(queues = DELAYED_QUEUE_NAME)
    22     public void receiveDelayedQueue(Message message) {
    23         String msg = new String(message.getBody());
    24         log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
    25     }
    26 }

    测试:

    使用postman请求如下地址:

    http://localhost:8080/ttl/sendDelayMsg/hello1/20000
    http://localhost:8080/ttl/sendDelayMsg/hello2/2000

    在控制台查看效果:

  • 相关阅读:
    dubbo服务的运行方式(2)
    朱砂掌健身养生功
    吴清忠养生网
    易筋经十二式
    dubbo入门(1)
    jquery ajax error函数和及其参数详细说明
    com.rabbitmq.client.ShutdownSignalException
    centos 安装rabbitMQ
    SpringMVC @RequestBody接收Json对象字符串
    跨域
  • 原文地址:https://www.cnblogs.com/asenyang/p/15519946.html
Copyright © 2011-2022 走看看