zoukankan      html  css  js  c++  java
  • 【RabbitMQ学习之二】RabbitMQ四种交换机模式应用

    环境
      win7
      rabbitmq-server-3.7.17
      Erlang 22.1

    一、概念
    1、队列
    队列用于临时存储消息和转发消息。
    队列类型有两种,即时队列和延时队列。
      即时队列:队列中的消息会被立即消费;
      延时队列:队列中的消息会在指定的时间延时之后被消费。

    2、交换机
    交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。

    交换机有四种类型:Direct, topic, Headers and Fanout。
    Direct[精确匹配类型]:Direct是RabbitMQ默认的交换机模式,先匹配, 再投送。即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

    Topic[模式匹配]:按通配符匹配规则转发消息(最灵活),队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

    Headers[键值对匹配]:设置header attribute参数类型的交换机。
    消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

    Fanout[转发消息最快]:
    路由广播的形式,简单的将队列绑定到交换机上将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.

    3、使用spring boot和rabbitmq整合   搭建演示工程

    二、Direct Exchange-Work模式

    配置类:

    package com.wjy.direct;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    //@Configuration这个注解是必须的,保证在基本类实例化之前该类已经被实例化
    @Configuration
    public class RabbitConfig {
    
        /**
        * @Desc:  配置一个消息队列(routingKey=q_hello)
        */
        @Bean
        public Queue queue() {
            return new Queue("q_hello");
        }
    
        /**
         * @Desc:  配置一个消息队列(routingKey=notify.refund)
         */
        @Bean
        public Queue refundNotifyQueue() {
            return new Queue("notify.refund");
        }
    
        /**
         * @Desc:  配置一个消息队列(routingKey=query.order) 测试RPC
         */
        @Bean
        public Queue queryOrderQueue() {
            return new Queue("query.order");
        }
    }

    生产者:

    package com.wjy.direct;
    
    
    import com.wjy.mojo.Order;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
    * @Desc: 生产者
    */
    @Component
    public class MqSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        /**
        * @Desc: 将消息发送至默认的交换机且routingKey为q_hello
        */
        public void send() {
            //24小时制
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            String context = "hello " + date;
            System.err.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_hello", context);
        }
    
        /**
         * @Desc: 将消息发送至默认的交换机且routingKey为q_hello
         */
        public void send(String i) {
            //24小时制
            String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            String context = "hello " + i + " " + date;
            System.err.println("Sender : " + context);
            //简单对列的情况下routingKey即为Q名
            this.rabbitTemplate.convertAndSend("q_hello", context);
        }
    
        /**
         * @Desc: 将发送对象
         */
        public void sender(Order order){
            System.err.println("notify.refund send message: "+order);
            rabbitTemplate.convertAndSend("notify.refund", order);
        }
    
        /**
         * @Desc: 测试RPC
         */
        public void sender(String orderId){
            System.err.println("query.order send message: "+orderId);
            Order order = (Order) rabbitTemplate.convertSendAndReceive("query.order", orderId);
            System.err.println("query.order return message: "+order);
        }
    }
    View Code

    消费者:

    package com.wjy.direct;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
    * @Desc:  消费者
    */
    @Component
    @RabbitListener(queues = "q_hello")
    public class Receiver {
    
        /**
        * @Desc: 监听routingKey为nq_hello的队列消息
        */
        @RabbitHandler
        public void process(String hello) {
            System.err.println("Receiver1  : " + hello);
        }
    }
    package com.wjy.direct;
    
    import com.wjy.mojo.Order;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "notify.refund")
    public class RefundNotifyReceive {
        @RabbitHandler
        public void receive(Order order) {
            System.err.println("notify.refund receive message: "+order);
        }
    }
    View Code
    package com.wjy.direct;
    
    import com.wjy.mojo.Order;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.math.BigDecimal;
    import java.util.Date;
    
    @Component
    @RabbitListener(queues = "query.order")
    public class QueryOrderReceive {
        @RabbitHandler
        public Order receive(String orderId) {
            System.err.println("notify.refund receive message: "+orderId);
    
            Order order = new Order();
            order.setId(100001);
            order.setOrderId(orderId);
            order.setAmount(new BigDecimal("2999.99"));
            order.setCreateTime(new Date());
            return order;
        }
    }
    View Code

    测试类:

    package com.wjy.direct;
    
    import com.wjy.mojo.Order;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.math.BigDecimal;
    import java.util.Date;
    
    /**
    * @Desc: 测试类
    */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DirectExchangeTest {
        @Autowired
        private MqSender mqSender;
    
        @Test
        public void hello() throws Exception {
            mqSender.send();
        }
    
        /**
        * @Desc:  一对多
        * 应用场景:系统通常会做集群、分布式或灾备部署
        */
        @Test
        public void oneToMany() throws Exception {
            for (int i=0;i<100;i++){
                mqSender.send(i+"");
                Thread.sleep(200);
            }
        }
    
        /**
         * @Desc:  多对一 请求参数为偶数
         * 应用场景:系统通常会做集群、分布式或灾备部署
         */
        @Test
        public void test_sender_many2one_1() throws Exception {
            for (int i = 0; i < 20; i+=2) {
                mqSender.send("支付订单号:"+i);
                Thread.sleep(1000);
            }
        }
    
        /**
         * @Desc:  多对一 请求参数为奇数
         * 应用场景:系统通常会做集群、分布式或灾备部署
         */
        @Test
        public void test_sender_many2one_2() throws Exception {
            for (int i = 1; i < 20; i+=2) {
                mqSender.send("支付订单号:"+i);
                Thread.sleep(1000);
            }
        }
    
    
        /**
         * @Desc:  测试发送对象
         */
        @Test
        public void test_sender() {
            Order order = new Order();
            order.setId(100001);
            order.setOrderId(String.valueOf(System.currentTimeMillis()));
            order.setAmount(new BigDecimal("1999.99"));
            order.setCreateTime(new Date());
            mqSender.sender(order);
        }
    
        /**
         * @Desc:  测试RPC
         * RabbitMQ支持RPC远程调用,同步返回结果。
         * 虽然RabbitMQ支持RPC接口调用,但不推荐使用
         * 原因:
         * 1)RPC默认为单线程阻塞模型,效率极低。
         * 2)需要手动实现多线程消费。
         */
        @Test
        public void test_rpc() {
            mqSender.sender("900000001");
        }
    }
    View Code

    三、Topic Exchange-主题模式


    符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词.

    配置类:

    package com.wjy.topic;
    
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class TopicRabbitConfig {
    
        final static String message = "api.core";
        final static String messages = "api.payment";
    
        /**
         * 配置一个routingKey为api.core的消息队列
         */
        @Bean
        public Queue coreQueue() {
            return new Queue(TopicRabbitConfig.message);
        }
    
        /**
        * @Desc: 配置一个routingKey为api.payment的消息队列
        */
        @Bean
        public Queue paymentQueue() {
            return new Queue(TopicRabbitConfig.messages);
        }
    
        /**
         * @Desc: coreExchange交换机
         */
        @Bean
        public TopicExchange coreExchange() {
            return new TopicExchange("coreExchange");
        }
    
        /**
         * @Desc: paymentExchange交换机
         */
        @Bean
        public TopicExchange paymentExchange() {
            return new TopicExchange("paymentExchange");
        }
    
        /**
         * 配置一个routingKey为api.core的消息队列并绑定在coreExchange交换机上(交换机的匹配规则为api.core.*)
         */
        @Bean
        public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) {
            return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*");
        }
    
        /**
         * @Desc: 配置一个routingKey为api.payment的消息队列并绑定在paymentExchange交换机上(交换机的匹配规则为api.payment.#)
         */
        @Bean
        public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) {
            return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#");
        }
    }

    生产者:

    package com.wjy.topic;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApiCoreSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        /**
        * @Desc: 发送消息至coreExchange交换机且routingKey为api.core.user
        */
        public void user(String msg){
            System.err.println("api.core.user send message: "+msg);
            rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg);
        }
    
        /**
         * @Desc: 发送消息至coreExchange交换机且routingKey为api.core.user.query
         */
        public void userQuery(String msg){
            System.err.println("api.core.user.query send message: "+msg);
            rabbitTemplate.convertAndSend("coreExchange", "api.core.user.query", msg);
        }
    }
    View Code
    package com.wjy.topic;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApiPaymentSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        /**
        * @Desc: 添加一个order()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order
        */
        public void order(String msg){
            System.err.println("api.payment.order send message: "+msg);
            rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg);
        }
    
        /**
         * @Desc: 添加一个orderQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.query
         */
        public void orderQuery(String msg){
            System.err.println("api.payment.order.query send message: "+msg);
            rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg);
        }
    
        /**
         * @Desc: 添加一个orderDetailQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.detail.query
         */
        public void orderDetailQuery(String msg){
            System.err.println("api.payment.order.detail.query send message: "+msg);
            rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg);
        }
    }
    View Code

    消费者:

    package com.wjy.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApiCoreReceive {
        @RabbitHandler
        @RabbitListener(queues = "api.core")
        public void handle(String msg) {
            System.err.println("api.core receive message: "+msg);
        }
    }
    View Code
    package com.wjy.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApiPaymentReceive {
        @RabbitHandler
        @RabbitListener(queues = "api.payment")
        public void handle(String msg) {
            System.err.println("api.payment.order receive message: "+msg);
        }
    }
    View Code

    测试类:

    package com.wjy.topic;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiCoreSenderTests {
        @Autowired
        private ApiCoreSender sender;
    
        @Test
        public void test_user() {
            sender.user("用户管理!");
        }
    
        @Test
        public void test_userQuery() {
            sender.userQuery("查询用户信息!");
        }
    }
    View Code
    package com.wjy.topic;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiPaymentSenderTests {
        @Autowired
        private ApiPaymentSender sender;
    
        @Test
        public void test_order() {
            sender.order("订单管理!");
        }
    
        @Test
        public void test_orderQuery() {
            sender.orderQuery("查询订单信息!");
        }
    
        @Test
        public void test_orderDetailQuery() {
            sender.orderDetailQuery("查询订单详情信息!");
        }
    }
    View Code

    四、Fanout Exchange-订阅模式

    配置类:

    package com.wjy.fanout;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutConfig {
    
        /**
        * @Desc:  配置一个routingKey为api.report.payment的消息队列
        */
        @Bean
        public Queue reportPaymentQueue() {
            return new Queue("api.report.payment");
        }
    
        /**
         * @Desc:  配置一个routingKey为api.report.refund的消息队列
         */
        @Bean
        public Queue reportRefundQueue() {
            return new Queue("api.report.refund");
        }
    
        /**
         * @Desc:  配置一个reportExchange交换机
         */
        @Bean
        public FanoutExchange reportExchange() {
            return new FanoutExchange("reportExchange");
        }
    
    
        /**
         * @Desc:  配置routingKey为api.report.payment的消息队列并绑定在reportExchange交换机上
         */
        @Bean
        public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) {
            return BindingBuilder.bind(reportPaymentQueue).to(reportExchange);
        }
    
        /**
         * @Desc:  配置routingKey为api.report.refund的消息队列并绑定在reportExchange交换机上
         */
        @Bean
        public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) {
            return BindingBuilder.bind(reportRefundQueue).to(reportExchange);
        }
    }
    View Code

    生产者:

    package com.wjy.fanout;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApiReportSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void generateReports(String msg){
            System.err.println("api.generate.reports send message: "+msg);
            rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg);
        }
    }
    View Code

    消费者:

    package com.wjy.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApiReportReceive {
        @RabbitHandler
        @RabbitListener(queues = "api.report.payment")
        public void payment(String msg) {
            System.err.println("api.report.payment receive message: "+msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "api.report.refund")
        public void refund(String msg) {
            System.err.println("api.report.refund receive message: "+msg);
        }
    }
    View Code

    测试类:

    package com.wjy.fanout;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiReportSenderTests {
        @Autowired
        private ApiReportSender sender;
    
        @Test
        public void test_generateReports() {
            sender.generateReports("开始生成报表!");
        }
    }
    View Code

    五、Headers Exchange

    配置类:

    package com.wjy.headers;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.HeadersExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class HeadersConfig {
        /**
        * @Desc: 配置一个routingKey为credit.bank的消息队列
        */
        @Bean
        public Queue creditBankQueue() {
            return new Queue("credit.bank");
        }
    
        /**
         * @Desc: 配置一个routingKey为credit.finance的消息队列
         */
        @Bean
        public Queue creditFinanceQueue() {
            return new Queue("credit.finance");
        }
    
        /**
         * @Desc: 配置一个creditBankExchange交换机
         */
        @Bean
        public HeadersExchange creditBankExchange() {
            return new HeadersExchange("creditBankExchange");
        }
    
        /**
         * @Desc: 配置一个creditFinanceExchange交换机
         */
        @Bean
        public HeadersExchange creditFinanceExchange() {
            return new HeadersExchange("creditFinanceExchange");
        }
    
        /**
         * @Desc: 配置一个routingKey为credit.bank的消息队列并绑定在creditBankExchange交换机上
         */
        @Bean
        public Binding bindingCreditAExchange(Queue creditBankQueue, HeadersExchange creditBankExchange) {
            Map<String,Object> headerValues = new HashMap<>();
            headerValues.put("type", "cash");
            headerValues.put("aging", "fast");
            //whereall 完全匹配
            return BindingBuilder.bind(creditBankQueue).to(creditBankExchange).whereAll(headerValues).match();
        }
    
        /**
        * @Desc: 配置一个routingKey为credit.finance的消息队列并绑定在creditFinanceExchange交换机上
        */
        @Bean
        public Binding bindingCreditBExchange(Queue creditFinanceQueue, HeadersExchange creditFinanceExchange) {
            Map<String,Object> headerValues = new HashMap<>();
            headerValues.put("type", "cash");
            headerValues.put("aging", "fast");
            //whereany 其中一项匹配即可
            return BindingBuilder.bind(creditFinanceQueue).to(creditFinanceExchange).whereAny(headerValues).match();
        }
    }
    View Code

    生产者:

    package com.wjy.headers;
    
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.amqp.support.converter.SimpleMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component
    public class ApiCreditSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void creditBank(Map<String, Object> head, String msg){
            System.err.println("credit.bank send message: "+msg);
            rabbitTemplate.convertAndSend("creditBankExchange", "credit.bank", getMessage(head, msg));
        }
    
        public void creditFinance(Map<String, Object> head, String msg){
            System.err.println("credit.finance send message: "+msg);
            rabbitTemplate.convertAndSend("creditFinanceExchange", "credit.finance", getMessage(head, msg));
        }
    
        private Message getMessage(Map<String, Object> head, Object msg){
            MessageProperties messageProperties = new MessageProperties();
            for (Map.Entry<String, Object> entry : head.entrySet()) {
                messageProperties.setHeader(entry.getKey(), entry.getValue());
            }
            MessageConverter messageConverter = new SimpleMessageConverter();
            return messageConverter.toMessage(msg, messageProperties);
        }
    }
    View Code

    消费者:

    package com.wjy.headers;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ApiCreditReceive {
        @RabbitHandler
        @RabbitListener(queues = "credit.bank")
        public void creditBank(String msg) {
            System.err.println("credit.bank receive message: "+msg);
        }
    
        @RabbitHandler
        @RabbitListener(queues = "credit.finance")
        public void creditFinance(String msg) {
            System.err.println("credit.finance receive message: "+msg);
        }
    }
    View Code

    测试类:

    package com.wjy.headers;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiCreditSenderTests {
        @Autowired
        private ApiCreditSender sender;
    
        @Test
        public void test_creditBank_type() {
            Map<String,Object> head = new HashMap<>();
            head.put("type", "cash");
            sender.creditBank(head, "银行授信(部分匹配)");
        }
    
        @Test
        public void test_creditBank_all() {
            Map<String,Object> head = new HashMap<>();
            head.put("type", "cash");
            head.put("aging", "fast");
            sender.creditBank(head, "银行授信(全部匹配)");
        }
    
        @Test
        public void test_creditFinance_type() {
            Map<String,Object> head = new HashMap<>();
            head.put("type", "cash");
            sender.creditFinance(head, "金融公司授信(部分匹配)");
        }
    
        @Test
        public void test_creditFinance_all() {
            Map<String,Object> head = new HashMap<>();
            head.put("type", "cash");
            head.put("aging", "fast");
            sender.creditFinance(head, "金融公司授信(全部匹配)");
        }
    }
    View Code

    六、延时队列

    配置类:

    package com.wjy.delaymq;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class QueueConfiguration {
    
        /**
         * @Desc:消息队列app.queue.hello
         */
        @Bean
        public Queue helloQueue() {
            Queue queue = new Queue("app.queue.hello", true, false, false);
            return queue;
        }
    
        /**
         * 默认即时消息交换机
         */
        @Bean("defaultDirectExchange")
        public DirectExchange defaultDirectExchange() {
            return new DirectExchange("default.direct.exchange", true, false);
        }
    
        /**
         * @Desc:消息队列app.queue.hello绑定到默认队列上
         * 交换机匹配规则:app.queue.hello
         */
        @Bean
        public Binding helloBinding() {
            return BindingBuilder.bind(helloQueue()).to(defaultDirectExchange()).with("app.queue.hello");
        }
    
        /**
         * 配置延迟消息死信队列
         */
        @Bean
        public Queue defaultDeadLetterQueue() {
            Map<String, Object> arguments = new HashMap<>();
            //设置交换机路由
            arguments.put("x-dead-letter-exchange", "default.direct.exchange");
            //设置转发队列名称
            arguments.put("x-dead-letter-routing-key", "default.repeat.trade.queue");
            Queue queue = new Queue("default.dead.letter.queue", true, false, false, arguments);
            return queue;
        }
    
        /**
        * @Desc:将延迟消息队列绑定到延迟交换机上
         * 交换机匹配规则:default.dead.letter.queue
        */
        @Bean
        public Binding defaultDeadLetterBinding() {
            Binding bind = BindingBuilder.bind(defaultDeadLetterQueue()).to(defaultDirectExchange()).with("default.dead.letter.queue");
            return bind;
        }
    
    
        /**
         * 配置转发消息队列default.repeat.trade.queue
         * @return
         */
        @Bean
        public Queue defaultRepeatTradeQueue() {
            Queue queue = new Queue("default.repeat.trade.queue", true, false, false);
            return queue;
        }
    
        /**
         * 转发队列和默认交换机的绑定;
         * 交换机匹配规则:default.repeat.trade.queue
         */
        @Bean
        public Binding defaultRepeatTradeBinding() {
            return BindingBuilder
                    .bind(defaultRepeatTradeQueue())
                    .to(defaultDirectExchange())
                    .with("default.repeat.trade.queue");
        }
    
    
    }
    View Code

    生产者:

    package com.wjy.delaymq;
    
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Sender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(QueueMessage message) {
            //即时消息
            if(message.getType() == 10){
                sendMessage(message.getExchange(),message.getQueueName(),message.getMessage());
            }
            //延时消息
            if(message.getType() == 20){
                sendTimeMessage(message);
            }
        }
    
        //发送即时消息;
        private void sendMessage(String exchange,String queueName,String msg){
            rabbitTemplate.convertAndSend(exchange,queueName, msg);
        }
    
        //发送延时消息;
        public void sendTimeMessage(QueueMessage message) {
            int seconds = message.getSeconds();
            // 直接发送,无需进入死信队列
            if(seconds <= 0){
                sendMessage(message.getExchange(),message.getQueueName(), message.getMessage());
            }else{
                //rabbit默认为毫秒级
                long times = seconds * 1000;
                //这里需要字符定义延时处理器;
                MessagePostProcessor processor = new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration(times + "");
                        return message;
                    }
                };
                //注意传送的消息必须是字串串或者 字节或者实现序列化的对象
                //否则报错:Execution of Rabbit message listener failed
                //改完后将之前的队列数据清除 否则还会报错
                rabbitTemplate.convertAndSend("default.direct.exchange","default.dead.letter.queue", "转发消息", processor);
            }
        }
    
    }
    View Code

    消费者:

    package com.wjy.delaymq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "default.repeat.trade.queue")
    public class TradeRecever {
        @Autowired
        private Sender sender;
    
        @RabbitHandler
        public void process(String content) {
            System.err.println("-----------延时结束--------------"+content);
            QueueMessage message = new QueueMessage("app.queue.hello", "转发消息...");
            message.setType(10);
            sender.send(message);
        }
    }
    View Code
    package com.wjy.delaymq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "app.queue.hello")
    public class HelloRecever {
    
        @RabbitHandler
        public void process(String content) {
    
            System.err.println("hello 接受消息:" + content);
        }
    }

    POJO:

    package com.wjy.delaymq;
    
    import java.io.Serializable;
    import java.util.Date;
    
    public class QueueMessage implements Serializable {
        private String exchange;
    
        private String queueName;
    
        private Integer type;
    
        private Integer group;
    
        private Date timestamp;
    
        private String message;
    
        private Integer status;
    
        private int retry = 0;
    
        private int maxRetry = 10;
    
        private int seconds = 1;
    
        public QueueMessage() {
            super();
        }
    
        public QueueMessage(String queueName, String message) {
            super();
            this.queueName = queueName;
            this.message = message;
            this.exchange = "default.direct.exchange";
            this.type = 10;
            this.group = 10;
            this.timestamp = new Date();
            this.status = 10;
        }
    
        public String getExchange() {
            return exchange;
        }
    
        public void setExchange(String exchange) {
            this.exchange = exchange;
        }
    
        public String getQueueName() {
            return queueName;
        }
    
        public void setQueueName(String queueName) {
            this.queueName = queueName;
        }
    
        public Integer getType() {
            return type;
        }
    
        public void setType(Integer type) {
            this.type = type;
        }
    
        public Integer getGroup() {
            return group;
        }
    
        public void setGroup(Integer group) {
            this.group = group;
        }
    
        public Date getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(Date timestamp) {
            this.timestamp = timestamp;
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    
        public Integer getStatus() {
            return status;
        }
    
        public void setStatus(Integer status) {
            this.status = status;
        }
    
        public int getRetry() {
            return retry;
        }
    
        public void setRetry(int retry) {
            this.retry = retry;
        }
    
        public int getMaxRetry() {
            return maxRetry;
        }
    
        public void setMaxRetry(int maxRetry) {
            this.maxRetry = maxRetry;
        }
    
        public int getSeconds() {
            return seconds;
        }
    
        public void setSeconds(int seconds) {
            this.seconds = seconds;
        }
    }
    View Code

    测试类:

    package com.wjy.delaymq;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DelayTest {
    
        @Autowired
        private Sender sender;
    
        @Test
        public void delaySendTest() {
            System.err.println("发送延迟消息...");
            QueueMessage message = new QueueMessage("app.queue.hello", "测试延时消息...");
            //20代表延时消息队列;
            message.setType(20);
            //设置延时时间,单位为毫秒;
            message.setSeconds(6);
            sender.send(message);
            try {
                Thread.sleep(600000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    View Code

    七、消息确认机制

    配置类:

    package com.wjy.ack;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class AckRabbitConfig {
    
    
        /**
         * 定义一个hello的队列
         * Queue 可以有4个参数
         *      1.队列名
         *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
         *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false
         *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false
         */
        @Bean
        public Queue helloQueue() {
            return new Queue("queue-test");
        }
    
        /** ======================== 定制一些处理策略 =============================*/
    
         /**
         * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("ABExchange");
        }
    
    
        @Bean
        public Binding bindingExchangeA(Queue helloQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(helloQueue).to(fanoutExchange);
        }
    
    }
    View Code

    生产者:

    package com.wjy.ack;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    @Component
    public class Producer implements RabbitTemplate.ReturnCallback  {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 给hello队列发送消息
         */
        public void send() {
            String context = "你好现在是 " + new Date() +"";
            System.err.println("HelloSender发送内容 : " + context);
            this.rabbitTemplate.setReturnCallback(this);
            this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (!ack) {
                    System.out.println("HelloSender消息发送失败" + cause + correlationData.toString());
                } else {
                    System.out.println("HelloSender 消息发送成功 ");
                }
            });
    
            rabbitTemplate.convertAndSend("queue-test", context);
        }
    
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            System.err.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2);
        }
    }
    View Code

    消费者:

    package com.wjy.ack;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    @RabbitListener(queues = "queue-test")
    public class Comsumer {
    
        @RabbitHandler
        public void process(String msg,Message message, Channel channel) throws IOException {
            try {
                // 采用手动应答模式, 手动确认应答更为安全稳定
                /*channel.basicAck(deliveryTag,ack)
                deliveryTag-当前消息的类似编号的号码,服务端为每一个消息生成的类似编号的号码
                ack-false只确认当前一个消息收到,true确认所有consumer获得的消息
                */
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
                System.err.println("receive: " + new String(message.getBody()));
            }
            catch (Exception e){
                //丢弃这条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
                //拒绝这条消息
                //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            }
        }
    }
    View Code

    测试类:

    package com.wjy.ack;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqAckTests {
        @Autowired
        private Producer producer;
    
        /**
        * @Desc: 测试之前需在application.yml开启消息确认的配置
        */
        @Test
        public void send() {
            producer.send();
        }
    }
    View Code

    演示代码

    参考:

    springboot集成rabbitmq(实战)

    延时队列

    5种消息队列

    RabbitMQ四种交换机类型介绍

  • 相关阅读:
    tcp粘包解决
    socket网络编程
    logging模块
    异常处理
    hashlib configparser模块
    列表推导式和生成器表达式和内置函数
    迭代器与生成器
    装饰器
    函数
    文件操作
  • 原文地址:https://www.cnblogs.com/cac2020/p/11662433.html
Copyright © 2011-2022 走看看