zoukankan      html  css  js  c++  java
  • RabbitMQ的使用

    一、docker下安装

    1.下载镜像

    docker pull rabbitmq:management

    2.运行容器

    docker run -di --name=rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management

    3.浏览器访问

    http://192.168.25.129:15672/#/

    默认用户名和密码都是guest

    4.修改密码

    rabbitmqctl  change_password  guest  '123456'

    二、用java代码操作rabbitmq

    1.直接模式

    1)继承springboot并添加相关依赖

        <parent>
            <artifactId>tenpower-parent</artifactId>
            <groupId>com.tenpower</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

    2)在application.yml中配置rabbitmq

    spring:
      rabbitmq:
        host: 192.168.25.129

    3)在main目录下编写springboot启动类

    4)在test目录下编写发送消息测试类

    @RunWith(SpringRunner.class)    //替代junit原生的运行器
    @SpringBootTest(classes = RabbitmqApplication.class)
    public class ProducerTest {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void sendMsgDirectly() {
            rabbitTemplate.convertAndSend("red","直接模式测试");
        }
    }

    5)在main目录下写消息消费类

    @Component  //加入spring容器
    @RabbitListener(queues = "red")
    public class Customer1 {
        @RabbitHandler
        public void showMsg(String msg) {   //方法名称随意
            System.out.println("red接收到的消息:" + msg);
        }
    }

    6)在http://192.168.25.129:15672/#/里创建一个队列,取名为red

    7)运行测试方法发现浏览器rabbitmq页面出现消息,运行启动类后消息消失,且控制台打印出消息

    2.分列模式(不常用,略)

    3.主题模式

    1)新建1个Exchange,name为color,类型为topic

    2)新建3个queue,name分别为red、blue、green

    3.点击新建的交换器color,binding新建的3个队列,并写上routing key

    4.分别写3个消费者类

    @Component  //加入spring容器
    @RabbitListener(queues = "red")
    public class Customer1 {
        @RabbitHandler
        public void showMsg(String msg) {   //方法名称随意
            System.out.println("red接收到的消息:" + msg);
        }
    }
    @Component
    @RabbitListener(queues = "blue")
    public class Customer2 {
        @RabbitHandler
        public void showMsg(String msg) {
            System.out.println("blue接收到的消息:"+msg);
        }
    }
    @Component
    @RabbitListener(queues = "green")
    public class Customer3 {
        @RabbitHandler
        public void showMsg(String msg) {
            System.out.println("green接收到的消息:"+msg);
        }
    }

    5.以主题模式生产消息

       @Test
        public void sendMsgTopic1() {
            rabbitTemplate.convertAndSend("color", "goods.aaa", "主题模式测试1");
        }
        @Test
        public void sendMsgTopic2() {
            rabbitTemplate.convertAndSend("color", "goods.bbb.log", "主题模式测试2");
        }
        @Test
        public void sendMsgTopic3() {
            rabbitTemplate.convertAndSend("color", "goods.log", "主题模式测试3");
        }

    运行测试方法和启动类,发现topic1方法输出red,topic2输出red和blue,topic3输出red、blue、green

    三、利用rabbitmq执行延时任务

    1.安装插件

    1)下载

    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

    2)复制到rabbitmq插件目录

    docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

    3)使插件生效

    cd /sbin
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    4)重启rabbitmq

    docker restart rabbitmq

    2.代码使用

    1)添加配置类,绑定延时队列(可以绑定多个)

    package com.lbh360.order.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitConfig {
        public static final String EXCHANGE_DELAY = "delay.exchange";
        public static final String FINISH_COMPLETE = "finish.complete";
    
    
        @Bean
        public CustomExchange customExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(EXCHANGE_DELAY, "x-delayed-message", true, false, args);
        }
        
    
        @Bean
        public Queue finishCompleteQueue() {
            return new Queue(FINISH_COMPLETE);
        }
        
    
        @Bean
        public Binding finishCompleteBinding(@Qualifier("finishCompleteQueue") Queue queue,
                                              @Qualifier("customExchange") CustomExchange customExchange) {
            return BindingBuilder.bind(queue).to(customExchange).with(FINISH_COMPLETE).noargs();
        }
    }

    2)生产者

    package com.lbh360.order;
    
    import com.lbh360.OrderApp;
    import com.lbh360.order.config.RabbitConfig;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import javax.annotation.Resource;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = OrderApp.class)
    public class OrderTest {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void test() {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DELAY, RabbitConfig.FINISH_COMPLETE, "1270618586310995968", message -> {
                message.getMessageProperties().setDelay(3 * 1000);
                return message;
            });
        }
    }

    3)消费者

    package com.lbh360.order.listener;
    
    import com.lbh360.order.config.RabbitConfig;
    import com.lbh360.order.model.po.Order;
    import com.lbh360.order.model.response.OrderResp;
    import com.lbh360.order.service.OrderService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * @Description rabbitmq延时任务监听类
     * @Author bofeng
     * @Date 2019/4/8 21:50
     * @Version 1.0
     */
    @Component
    public class AmqDelayListener {
        private static final Logger logger = LoggerFactory.getLogger(AmqDelayListener.class);
    
        @Resource
        private OrderService orderService;
        
    
        /**
         * 完成订单后超时结单
         *
         * @param message
         */
        @RabbitListener(queues = RabbitConfig.FINISH_COMPLETE)
        public void completeOrder(Message message) {
            String orderNo = new String(message.getBody());
            logger.info("completeOrder, orderNo:{}", orderNo);
    
            orderService.completeOrder(orderNo);
        }
    }

    4)若消费端抛异常消息无法被消费,会导致rabbitmq不停回调。application.ym可l配置最大重试次数和间隔时间

    spring:
      rabbitmq:
        listener:
          simple:
            retry:
              enabled: true
              max-attempts: 3
              initial-interval: 1000ms
  • 相关阅读:
    Windows Message ID 常量列表大全
    C#中Thread与ThreadPool的比较
    HTML元素隐藏和显示
    Metrics.net + influxdb + grafana 构建WebAPI的自动化监控和预警
    Windbg DUMP分析(原创汇总)
    计算密集型分布式内存存储和运算平台架构
    从.net到java,从基础架构到解决方案。
    C# 泛型集合
    你该怎么选Offer
    C++ 虚拟桌面
  • 原文地址:https://www.cnblogs.com/naixin007/p/10654536.html
Copyright © 2011-2022 走看看