zoukankan      html  css  js  c++  java
  • RabbitMQ进阶

    MQ进阶:
        
        1.springboot整合rabbitMQ:
            为了方便,这里只是模拟使用场景,所以不创建多个系统了,自己给自己发消息;
            1.创建工程it-springboot-rabbitmq,添加依赖:
                <?xml version="1.0" encoding="UTF-8"?>
                <project xmlns="http://maven.apache.org/POM/4.0.0"
                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                    <modelVersion>4.0.0</modelVersion>
    
                    <groupId>com.it</groupId>
                    <artifactId>it-springboot-rabbitmq</artifactId>
                    <version>1.0-SNAPSHOT</version>
    
                    <!--父工程-->
                    <parent>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-parent</artifactId>
                        <version>2.1.4.RELEASE</version>
                    </parent>
                    <dependencies>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-amqp</artifactId>
                        </dependency>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-test</artifactId>
                        </dependency>
                    </dependencies>
    
                    <repositories>
                        <repository>
                            <id>alimaven</id>
                            <name>aliyun maven</name>
                            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                            <releases>
                                <enabled>true</enabled>
                            </releases>
                            <snapshots>
                                <enabled>false</enabled>
                            </snapshots>
                        </repository>
                    </repositories>
    
    
                </project>
            2.配置文件application.yml(实际使用过程中,生产者端和消费者端都要配置连接信息):
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    virtual-host: /lyle
                    username: lyle
                    password: lyle
                  application:
                    name: springboot-rabbitmq
                    
            3.创建启动类,并在启动类中利用注解@Bean注册队列,注册交换机,将队列绑定到交换机
                package it.com;
    
                import org.springframework.amqp.core.Binding;
                import org.springframework.amqp.core.BindingBuilder;
                import org.springframework.amqp.core.Queue;
                import org.springframework.amqp.core.TopicExchange;
                import org.springframework.boot.SpringApplication;
                import org.springframework.boot.autoconfigure.SpringBootApplication;
                import org.springframework.context.annotation.Bean;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @SpringBootApplication
                public class RabbitMQApplication {
                    public static void main(String[] args) {
                        SpringApplication.run(RabbitMQApplication.class,args);
                    }
                    /**
                     * 发送消息选择通配符模式
                     */
    
                    //创建队列,指定队列名称
                    @Bean
                    public Queue createQueue(){
                        return new Queue("topic_springboot_queue");
                    }
    
                    //创建交换机,指定交换机名称
                    @Bean
                    public TopicExchange createExchange(){
                        return new TopicExchange("topic_springboot_exchange");
                    }
    
                    //将队列绑定到交换机
                    @Bean
                    public Binding createBinding(){
                        return BindingBuilder.bind(createQueue()).to(createExchange()).with("item.*");
                    }
                }
            4.创建类,使用RabbitTemplate发送消息:
                package it.com;
    
                import org.junit.Test;
                import org.junit.runner.RunWith;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.boot.test.context.SpringBootTest;
                import org.springframework.test.context.junit4.SpringRunner;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @RunWith(SpringRunner.class)
                @SpringBootTest
                public class RabbitMQTest {
                    //用于发送MQ消息
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Test
                    public void testCreateMessage(){
                        rabbitTemplate.convertAndSend("topic_springboot_exchange", "item.insert", "商品新增,routing key 为item.insert");
                        rabbitTemplate.convertAndSend("topic_springboot_exchange", "item.update", "商品修改,routing key 为item.update");
                        rabbitTemplate.convertAndSend("topic_springboot_exchange", "item.delete", "商品删除,routing key 为item.delete");
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                }
            5.创建类并交给spring管理,监听并处理消息
                注解:@RabbitListener(queues = "topic_springboot_queue"),用在类上,指定要监听的队列
                注解:@RabbitHandler,用在方法上,该注解可以根据不同的参数类型,调用响应的方法处理消息.
                package it.com.listener;
    
                import org.springframework.amqp.rabbit.annotation.RabbitHandler;
                import org.springframework.amqp.rabbit.annotation.RabbitListener;
                import org.springframework.stereotype.Component;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @Component
                @RabbitListener(queues = "topic_springboot_queue")
                public class MyMessageListener {
                    /**
                     * 接收并处理消息
                     * @param msg
                     */
                    @RabbitHandler
                    public void receiveMessage(String msg){
    
                        System.out.println("message:"+msg);
                    }
    
    
                }
                
    2.rabbitMQ高级特性:
        生产者可靠性消息投递:
            confirm模式:生产者发送消息到交换机的处理模式;
                        只确认消息是否有到达交换机,至于消息是否从交换机到达队列,不管;
            
            return:生产者发送的消息已经到了交换机,确认消息是否从交换机到达对列;
        confirm机制演示:
            1.创建工程,导入依赖:
                <?xml version="1.0" encoding="UTF-8"?>
                <project xmlns="http://maven.apache.org/POM/4.0.0"
                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                    <modelVersion>4.0.0</modelVersion>
    
                    <groupId>com.it</groupId>
                    <artifactId>it-springboot-rabbitMQ-day02</artifactId>
                    <version>1.0-SNAPSHOT</version>
    
                    <!--父工程-->
                    <parent>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-parent</artifactId>
                        <version>2.1.4.RELEASE</version>
                    </parent>
                    <dependencies>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-web</artifactId>
                        </dependency>
    
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-amqp</artifactId>
                        </dependency>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-test</artifactId>
    
                        </dependency>
                    </dependencies>
    
                </project>
            2.创建配置文件,配置连接信息,并配置开启confirm机制:
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    virtual-host: /lyle
                    username: lyle
                    password: lyle
                    #开启confirm机制
                    publisher-confirms: true
    
                  application:
                    name: springboot-rabbitmq
            3.创建启动类,并在启动类中注册队列,注册交换机,将队列绑定到交换机
                package com.it;
    
                import org.springframework.amqp.core.Binding;
                import org.springframework.amqp.core.BindingBuilder;
                import org.springframework.amqp.core.DirectExchange;
                import org.springframework.amqp.core.Queue;
                import org.springframework.boot.SpringApplication;
                import org.springframework.boot.autoconfigure.SpringBootApplication;
                import org.springframework.context.annotation.Bean;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @SpringBootApplication
                public class RabbitmqApplication {
                    public static void main(String[] args) {
                        SpringApplication.run(RabbitmqApplication.class,args);
                    }
    
                    //创建队列,指定交换机名称
                    @Bean
                    public Queue createQueue(){
                        return new Queue("queue_demo01");
                    }
                    //创建交换机,指定交换机名称
                    @Bean
                    public DirectExchange createExchange(){
                        return new DirectExchange("exchange_direct_demo01");
                    }
                    //创建绑定,将队列绑定到交换机
                    @Bean
                    public Binding createBinding(){
                        return BindingBuilder.bind(createQueue()).to(createExchange()).with("item.insert");
                    }
                }
            4.创建类交给spring管理,该类实现RabbitTemplate.ConfirmCallback,重写接口方法用于处理回调逻辑
                package com.it.confirm;
    
                import org.springframework.amqp.rabbit.connection.CorrelationData;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.stereotype.Component;
    
                /**
                 * ConfirmCallback,该接口用于confirm机制中的回调
                 *
                 * @author Lyle
                 * @date 2020/4/6
                 */
                @Component
                public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
                    /**
                     *
                     * @param correlationData:数据信息
                     * @param b:标记交换机是否已经收到消息,true则表示收到消息
                     * @param cause:错误日志,发送成功则为null
                     */
                    @Override
                    public void confirm(CorrelationData correlationData, boolean b, String cause) {
                        //处理回调逻辑
                        if (b){
                            //值为true,消息发送成功
                            System.out.println("消息发送成功...");
                        }else {
                            //消息发送失败,并打印错误信息
                            System.out.println("消息发送失败.."+cause);
                            //消息失败后的处理逻辑
    
                        }
                    }
                }
            5.创建Controller类,注入RabbitTemplate,在浏览器中发送请求,模拟发送消息到交换机;
                注入RabbitTemplate.ConfirmCallback接口的实现类,并在发送消息前设置回调函数;
                发送消息会自动调用回调函数进行处理;
                package com.it.controller;
    
                import com.it.confirm.MyConfirmCallback;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.web.bind.annotation.RequestMapping;
                import org.springframework.web.bind.annotation.RestController;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @RestController
                @RequestMapping("/test")
                public class TestController {
    
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
    
                    @Autowired
                    private MyConfirmCallback myConfirmCallback;
    
                    @RequestMapping("/send1")
                    public String send1(){
                        System.out.println("更新了数据....");
                        //在发送消息前设置回调函数:myConfirmCallback
                        rabbitTemplate.setConfirmCallback(myConfirmCallback);
    
                        rabbitTemplate.convertAndSend("exchange_direct_demo01","item.insert","hello insert...");
                        return "ok";
                    }
                }
                
        return 机制的演示:
            1.在配置文件中配置开启return机制:
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    virtual-host: /lyle
                    username: lyle
                    password: lyle
                    #开启confirm机制
                    publisher-confirms: true
                    # 开启return机制
                    publisher-returns: true
    
                  application:
                    name: springboot-rabbitmq
            2.创建类交给spring管理,该类实现接口RabbitTemplate.ReturnCallback,重写接口方法用于处理回调逻辑:
                package com.it.returncall;
    
                import org.springframework.amqp.core.Message;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.stereotype.Component;
    
                /**
                 * RabbitTemplate.ReturnCallback:该接口用于处理交换机路由消息到队列的异常处理
                 *
                 * @author Lyle
                 * @date 2020/4/6
                 */
                @Component
                public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
                    /**
                     *
                     * @param message:消息本身
                     * @param replyCode:错误码
                     * @param replyText:错误信息
                     * @param exchange:发生错误的交换机
                     * @param routingkey:发生错误时的routingkey
                     */
                    @Override
                    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {
                        //出现错误后,处理相关回滚业务
                        System.out.println("消息本身:"+new String(message.getBody()));
                        System.out.println("错误码:"+replyCode);
                        System.out.println("错误信息:"+replyText);
                        System.out.println("交换机:"+exchange);
                        System.out.println("routingkey:"+routingkey);
                        //回滚数据
                        System.out.println("回滚了数据...");
                    }
                }
            3.模拟发送消息,在发送消息前设置回调函数:
                @RequestMapping("/send2")
                public String send2(){
                    System.out.println("更新了数据....");
                    //在发送消息前设置回调函数:myReturnCallback
                    //写错routingkey,交换机路由消息到对列会出错,会执行回调函数
                    rabbitTemplate.setReturnCallback(myReturnCallback);
    
                    rabbitTemplate.convertAndSend("exchange_direct_demo01","item.insert1234","hello insert...");
                    return "ok";
                }
                
        消费者ACK确认机制:
            自动确认:acknowledge="none",默认的机制
            手动确认:acknowledge="manual"
            根据异常情况来确认:acknowledge="auto"
        
            手动确认机制:
                手动签收策略:
                    确认收到;
                    拒绝签收,并丢弃/或者重回队列; .....可以批量
                    拒绝签收,并丢弃/或者重回队列; .....不能批量处理
                    
                1.在配置文件中配置ACK确认机制为手动模式:
                    spring:
                      rabbitmq:
                        host: localhost
                        port: 5672
                        virtual-host: /lyle
                        username: lyle
                        password: lyle
                        #开启confirm机制
                        publisher-confirms: true
                        # 开启return机制
                        publisher-returns: true
                        # 配置ACK确认机制为手动模式:
                        listener:
                          direct:
                            acknowledge-mode: manual
    
                      application:
                        name: springboot-rabbitmq
                2.编写类和方法进行手动签收确认/或者拒绝签收
                    package com.it.listener;
    
                    import com.rabbitmq.client.Channel;
                    import org.springframework.amqp.core.Message;
                    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
                    import org.springframework.amqp.rabbit.annotation.RabbitListener;
                    import org.springframework.stereotype.Component;
    
                    import java.io.IOException;
    
                    /**
                     * ToDo
                     *
                     * @author Lyle
                     * @date 2020/4/6
                     */
                    @Component
                    @RabbitListener(queues = "queue_demo01")
                    public class MyMessageListener {
    
                        @RabbitHandler
                        public void receiveMessage(Message message, String msg, Channel channel){
                            //接收到了消息
                            System.out.println("message:"+msg);
                            try {
                                //处理业务
                                System.out.println("哈哈哈收到了1000块");
                                //int i=1/0;
                                //业务处理完成,手动签收
                                /**
                                 *参数2:是否批量处理
                                 */
                                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认签收,不批量处理
                            } catch (Exception e) {
    
                                e.printStackTrace();
                                try {
                                    /**
                                     * //出现异常,拒绝签收 ,直接丢弃
                                     * 参数2: 是否批量
                                     * 参数3:是否重回队列
                                     * 若是出错后,选择重回队列,该消息会在队列的最前面,重新被消费,这样会陷入死循环
                                     */
                                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                                } catch (Exception e1) {
                                    e1.printStackTrace();
                                }
    
                            }
    
    
                        }
                    }
                    
        消费端限流设置:
            在配置文件中进行配置prefetch的值,默认值为250:
                listener:
                  simple:
                    # 配置ACK确认机制为手动模式:
                    acknowledge-mode: manual
                    # 设置每一个消费端可以处理未确认的消息的最大数量
                    prefetch: 2
                    
        TTL:全称 Time To Live(指消息的存活时间/过期时间)
            当消息到达存活时间后,还没有被消费,会被自动清除。
            RabbitMQ设置过期时间有两种:
                - 针对某一个队列设置过期时间 ;队列中的所有消息在过期时间到之后,如果没有被消费则被全部清除
                - 针对某一个特定的消息设置过期时间;队列中的消息设置过期时间之后,如果这个消息没有被消费则被清除。
                需要指出的是,针对特定消息设置时间,是指从这个消息在队列头部开始计时.
            
            TTL演示:
            创建配置类,配置队列/交换机,绑定的信息,指定消息过期时间
                package com.it.config;
    
                import org.springframework.amqp.core.*;
                import org.springframework.context.annotation.Bean;
                import org.springframework.context.annotation.Configuration;
    
    
                @Configuration
                public class TtlConfig {
    
                    //创建过期队列
                    @Bean
                    public Queue createqueuettl1(){
                        //设置队列过期时间为10000 10S钟
                        return QueueBuilder.durable("queue_demo02").withArgument("x-message-ttl",10000).build();
                    }
    
                    //创建交换机
    
                    @Bean
                    public DirectExchange createExchangettl(){
                        return new DirectExchange("exchange_direct_demo02");
                    }
    
                    //创建绑定
                    @Bean
                    public Binding createBindingttl(){
                        return BindingBuilder.bind(createqueuettl1()).to(createExchangettl()).with("item.ttl");
                    }
                }
            模拟发送消息,10秒后,队列中的消息会自动清除:    
                @RequestMapping("/send3")
                public String send3() {
                    //设置回调函数
                    //发送消息
                    rabbitTemplate.convertAndSend("exchange_direct_demo02", "item.ttl", "hello ttl哈哈");
                    return "ok";
                }
                
          
        
                

    死信队列:
      死信队列原理图:
        


    成为死信的三种条件:
      队列消息长度到达限制;
      消费者拒接消费消息,并且该消息没有重回队列;
      原队列存在消息过期设置,消息到达超时时间未被消费;

    延迟队列:
      延迟队列原理图:

        

     延迟队列的典型应用场景:

      典型的案例:下订单之后,30分钟如果还未支付则,取消订单回滚库存。

  • 相关阅读:
    Python for i 循环
    Python 输入分数并评
    用户名和密码的输入
    cocos2d-x 3.0学习
    VS2008 ShotKey
    Cocos2d-x 3.0的安装方法
    VFC
    一、在WIN7 64位系统平台,VS2013环境下安装WTL90_4090_RC1(2014-04-01)
    http://www.vcf-online.org/
    Win7 64位 VS2012 安装 Qt5
  • 原文地址:https://www.cnblogs.com/lyle-liu/p/12642856.html
Copyright © 2011-2022 走看看