zoukankan      html  css  js  c++  java
  • RabbitMQ进阶

    MQ进阶:
        
        1.springboot整合rabbitMQ:
            为了方便,这里只是模拟使用场景,所以不创建多个系统了,自己给自己发消息;
            1.创建工程it-springboot-rabbitmq,添加依赖:
                <?xml version="1.0" encoding="UTF-8"?>
                <project xmlns="http://maven.apache.org/POM/4.0.0"
                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                    <modelVersion>4.0.0</modelVersion>
    
                    <groupId>com.it</groupId>
                    <artifactId>it-springboot-rabbitmq</artifactId>
                    <version>1.0-SNAPSHOT</version>
    
                    <!--父工程-->
                    <parent>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-parent</artifactId>
                        <version>2.1.4.RELEASE</version>
                    </parent>
                    <dependencies>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-amqp</artifactId>
                        </dependency>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-test</artifactId>
                        </dependency>
                    </dependencies>
    
                    <repositories>
                        <repository>
                            <id>alimaven</id>
                            <name>aliyun maven</name>
                            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                            <releases>
                                <enabled>true</enabled>
                            </releases>
                            <snapshots>
                                <enabled>false</enabled>
                            </snapshots>
                        </repository>
                    </repositories>
    
    
                </project>
            2.配置文件application.yml(实际使用过程中,生产者端和消费者端都要配置连接信息):
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    virtual-host: /lyle
                    username: lyle
                    password: lyle
                  application:
                    name: springboot-rabbitmq
                    
            3.创建启动类,并在启动类中利用注解@Bean注册队列,注册交换机,将队列绑定到交换机
                package it.com;
    
                import org.springframework.amqp.core.Binding;
                import org.springframework.amqp.core.BindingBuilder;
                import org.springframework.amqp.core.Queue;
                import org.springframework.amqp.core.TopicExchange;
                import org.springframework.boot.SpringApplication;
                import org.springframework.boot.autoconfigure.SpringBootApplication;
                import org.springframework.context.annotation.Bean;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @SpringBootApplication
                public class RabbitMQApplication {
                    public static void main(String[] args) {
                        SpringApplication.run(RabbitMQApplication.class,args);
                    }
                    /**
                     * 发送消息选择通配符模式
                     */
    
                    //创建队列,指定队列名称
                    @Bean
                    public Queue createQueue(){
                        return new Queue("topic_springboot_queue");
                    }
    
                    //创建交换机,指定交换机名称
                    @Bean
                    public TopicExchange createExchange(){
                        return new TopicExchange("topic_springboot_exchange");
                    }
    
                    //将队列绑定到交换机
                    @Bean
                    public Binding createBinding(){
                        return BindingBuilder.bind(createQueue()).to(createExchange()).with("item.*");
                    }
                }
            4.创建类,使用RabbitTemplate发送消息:
                package it.com;
    
                import org.junit.Test;
                import org.junit.runner.RunWith;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.boot.test.context.SpringBootTest;
                import org.springframework.test.context.junit4.SpringRunner;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @RunWith(SpringRunner.class)
                @SpringBootTest
                public class RabbitMQTest {
                    //用于发送MQ消息
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Test
                    public void testCreateMessage(){
                        rabbitTemplate.convertAndSend("topic_springboot_exchange", "item.insert", "商品新增,routing key 为item.insert");
                        rabbitTemplate.convertAndSend("topic_springboot_exchange", "item.update", "商品修改,routing key 为item.update");
                        rabbitTemplate.convertAndSend("topic_springboot_exchange", "item.delete", "商品删除,routing key 为item.delete");
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                }
            5.创建类并交给spring管理,监听并处理消息
                注解:@RabbitListener(queues = "topic_springboot_queue"),用在类上,指定要监听的队列
                注解:@RabbitHandler,用在方法上,该注解可以根据不同的参数类型,调用响应的方法处理消息.
                package it.com.listener;
    
                import org.springframework.amqp.rabbit.annotation.RabbitHandler;
                import org.springframework.amqp.rabbit.annotation.RabbitListener;
                import org.springframework.stereotype.Component;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @Component
                @RabbitListener(queues = "topic_springboot_queue")
                public class MyMessageListener {
                    /**
                     * 接收并处理消息
                     * @param msg
                     */
                    @RabbitHandler
                    public void receiveMessage(String msg){
    
                        System.out.println("message:"+msg);
                    }
    
    
                }
                
    2.rabbitMQ高级特性:
        生产者可靠性消息投递:
            confirm模式:生产者发送消息到交换机的处理模式;
                        只确认消息是否有到达交换机,至于消息是否从交换机到达队列,不管;
            
            return:生产者发送的消息已经到了交换机,确认消息是否从交换机到达对列;
        confirm机制演示:
            1.创建工程,导入依赖:
                <?xml version="1.0" encoding="UTF-8"?>
                <project xmlns="http://maven.apache.org/POM/4.0.0"
                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                    <modelVersion>4.0.0</modelVersion>
    
                    <groupId>com.it</groupId>
                    <artifactId>it-springboot-rabbitMQ-day02</artifactId>
                    <version>1.0-SNAPSHOT</version>
    
                    <!--父工程-->
                    <parent>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-parent</artifactId>
                        <version>2.1.4.RELEASE</version>
                    </parent>
                    <dependencies>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-web</artifactId>
                        </dependency>
    
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-amqp</artifactId>
                        </dependency>
                        <dependency>
                            <groupId>org.springframework.boot</groupId>
                            <artifactId>spring-boot-starter-test</artifactId>
    
                        </dependency>
                    </dependencies>
    
                </project>
            2.创建配置文件,配置连接信息,并配置开启confirm机制:
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    virtual-host: /lyle
                    username: lyle
                    password: lyle
                    #开启confirm机制
                    publisher-confirms: true
    
                  application:
                    name: springboot-rabbitmq
            3.创建启动类,并在启动类中注册队列,注册交换机,将队列绑定到交换机
                package com.it;
    
                import org.springframework.amqp.core.Binding;
                import org.springframework.amqp.core.BindingBuilder;
                import org.springframework.amqp.core.DirectExchange;
                import org.springframework.amqp.core.Queue;
                import org.springframework.boot.SpringApplication;
                import org.springframework.boot.autoconfigure.SpringBootApplication;
                import org.springframework.context.annotation.Bean;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @SpringBootApplication
                public class RabbitmqApplication {
                    public static void main(String[] args) {
                        SpringApplication.run(RabbitmqApplication.class,args);
                    }
    
                    //创建队列,指定交换机名称
                    @Bean
                    public Queue createQueue(){
                        return new Queue("queue_demo01");
                    }
                    //创建交换机,指定交换机名称
                    @Bean
                    public DirectExchange createExchange(){
                        return new DirectExchange("exchange_direct_demo01");
                    }
                    //创建绑定,将队列绑定到交换机
                    @Bean
                    public Binding createBinding(){
                        return BindingBuilder.bind(createQueue()).to(createExchange()).with("item.insert");
                    }
                }
            4.创建类交给spring管理,该类实现RabbitTemplate.ConfirmCallback,重写接口方法用于处理回调逻辑
                package com.it.confirm;
    
                import org.springframework.amqp.rabbit.connection.CorrelationData;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.stereotype.Component;
    
                /**
                 * ConfirmCallback,该接口用于confirm机制中的回调
                 *
                 * @author Lyle
                 * @date 2020/4/6
                 */
                @Component
                public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
                    /**
                     *
                     * @param correlationData:数据信息
                     * @param b:标记交换机是否已经收到消息,true则表示收到消息
                     * @param cause:错误日志,发送成功则为null
                     */
                    @Override
                    public void confirm(CorrelationData correlationData, boolean b, String cause) {
                        //处理回调逻辑
                        if (b){
                            //值为true,消息发送成功
                            System.out.println("消息发送成功...");
                        }else {
                            //消息发送失败,并打印错误信息
                            System.out.println("消息发送失败.."+cause);
                            //消息失败后的处理逻辑
    
                        }
                    }
                }
            5.创建Controller类,注入RabbitTemplate,在浏览器中发送请求,模拟发送消息到交换机;
                注入RabbitTemplate.ConfirmCallback接口的实现类,并在发送消息前设置回调函数;
                发送消息会自动调用回调函数进行处理;
                package com.it.controller;
    
                import com.it.confirm.MyConfirmCallback;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.web.bind.annotation.RequestMapping;
                import org.springframework.web.bind.annotation.RestController;
    
                /**
                 * ToDo
                 *
                 * @author Lyle
                 * @date 2020/4/5
                 */
                @RestController
                @RequestMapping("/test")
                public class TestController {
    
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
    
                    @Autowired
                    private MyConfirmCallback myConfirmCallback;
    
                    @RequestMapping("/send1")
                    public String send1(){
                        System.out.println("更新了数据....");
                        //在发送消息前设置回调函数:myConfirmCallback
                        rabbitTemplate.setConfirmCallback(myConfirmCallback);
    
                        rabbitTemplate.convertAndSend("exchange_direct_demo01","item.insert","hello insert...");
                        return "ok";
                    }
                }
                
        return 机制的演示:
            1.在配置文件中配置开启return机制:
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    virtual-host: /lyle
                    username: lyle
                    password: lyle
                    #开启confirm机制
                    publisher-confirms: true
                    # 开启return机制
                    publisher-returns: true
    
                  application:
                    name: springboot-rabbitmq
            2.创建类交给spring管理,该类实现接口RabbitTemplate.ReturnCallback,重写接口方法用于处理回调逻辑:
                package com.it.returncall;
    
                import org.springframework.amqp.core.Message;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.stereotype.Component;
    
                /**
                 * RabbitTemplate.ReturnCallback:该接口用于处理交换机路由消息到队列的异常处理
                 *
                 * @author Lyle
                 * @date 2020/4/6
                 */
                @Component
                public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
                    /**
                     *
                     * @param message:消息本身
                     * @param replyCode:错误码
                     * @param replyText:错误信息
                     * @param exchange:发生错误的交换机
                     * @param routingkey:发生错误时的routingkey
                     */
                    @Override
                    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {
                        //出现错误后,处理相关回滚业务
                        System.out.println("消息本身:"+new String(message.getBody()));
                        System.out.println("错误码:"+replyCode);
                        System.out.println("错误信息:"+replyText);
                        System.out.println("交换机:"+exchange);
                        System.out.println("routingkey:"+routingkey);
                        //回滚数据
                        System.out.println("回滚了数据...");
                    }
                }
            3.模拟发送消息,在发送消息前设置回调函数:
                @RequestMapping("/send2")
                public String send2(){
                    System.out.println("更新了数据....");
                    //在发送消息前设置回调函数:myReturnCallback
                    //写错routingkey,交换机路由消息到对列会出错,会执行回调函数
                    rabbitTemplate.setReturnCallback(myReturnCallback);
    
                    rabbitTemplate.convertAndSend("exchange_direct_demo01","item.insert1234","hello insert...");
                    return "ok";
                }
                
        消费者ACK确认机制:
            自动确认:acknowledge="none",默认的机制
            手动确认:acknowledge="manual"
            根据异常情况来确认:acknowledge="auto"
        
            手动确认机制:
                手动签收策略:
                    确认收到;
                    拒绝签收,并丢弃/或者重回队列; .....可以批量
                    拒绝签收,并丢弃/或者重回队列; .....不能批量处理
                    
                1.在配置文件中配置ACK确认机制为手动模式:
                    spring:
                      rabbitmq:
                        host: localhost
                        port: 5672
                        virtual-host: /lyle
                        username: lyle
                        password: lyle
                        #开启confirm机制
                        publisher-confirms: true
                        # 开启return机制
                        publisher-returns: true
                        # 配置ACK确认机制为手动模式:
                        listener:
                          direct:
                            acknowledge-mode: manual
    
                      application:
                        name: springboot-rabbitmq
                2.编写类和方法进行手动签收确认/或者拒绝签收
                    package com.it.listener;
    
                    import com.rabbitmq.client.Channel;
                    import org.springframework.amqp.core.Message;
                    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
                    import org.springframework.amqp.rabbit.annotation.RabbitListener;
                    import org.springframework.stereotype.Component;
    
                    import java.io.IOException;
    
                    /**
                     * ToDo
                     *
                     * @author Lyle
                     * @date 2020/4/6
                     */
                    @Component
                    @RabbitListener(queues = "queue_demo01")
                    public class MyMessageListener {
    
                        @RabbitHandler
                        public void receiveMessage(Message message, String msg, Channel channel){
                            //接收到了消息
                            System.out.println("message:"+msg);
                            try {
                                //处理业务
                                System.out.println("哈哈哈收到了1000块");
                                //int i=1/0;
                                //业务处理完成,手动签收
                                /**
                                 *参数2:是否批量处理
                                 */
                                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认签收,不批量处理
                            } catch (Exception e) {
    
                                e.printStackTrace();
                                try {
                                    /**
                                     * //出现异常,拒绝签收 ,直接丢弃
                                     * 参数2: 是否批量
                                     * 参数3:是否重回队列
                                     * 若是出错后,选择重回队列,该消息会在队列的最前面,重新被消费,这样会陷入死循环
                                     */
                                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                                } catch (Exception e1) {
                                    e1.printStackTrace();
                                }
    
                            }
    
    
                        }
                    }
                    
        消费端限流设置:
            在配置文件中进行配置prefetch的值,默认值为250:
                listener:
                  simple:
                    # 配置ACK确认机制为手动模式:
                    acknowledge-mode: manual
                    # 设置每一个消费端可以处理未确认的消息的最大数量
                    prefetch: 2
                    
        TTL:全称 Time To Live(指消息的存活时间/过期时间)
            当消息到达存活时间后,还没有被消费,会被自动清除。
            RabbitMQ设置过期时间有两种:
                - 针对某一个队列设置过期时间 ;队列中的所有消息在过期时间到之后,如果没有被消费则被全部清除
                - 针对某一个特定的消息设置过期时间;队列中的消息设置过期时间之后,如果这个消息没有被消费则被清除。
                需要指出的是,针对特定消息设置时间,是指从这个消息在队列头部开始计时.
            
            TTL演示:
            创建配置类,配置队列/交换机,绑定的信息,指定消息过期时间
                package com.it.config;
    
                import org.springframework.amqp.core.*;
                import org.springframework.context.annotation.Bean;
                import org.springframework.context.annotation.Configuration;
    
    
                @Configuration
                public class TtlConfig {
    
                    //创建过期队列
                    @Bean
                    public Queue createqueuettl1(){
                        //设置队列过期时间为10000 10S钟
                        return QueueBuilder.durable("queue_demo02").withArgument("x-message-ttl",10000).build();
                    }
    
                    //创建交换机
    
                    @Bean
                    public DirectExchange createExchangettl(){
                        return new DirectExchange("exchange_direct_demo02");
                    }
    
                    //创建绑定
                    @Bean
                    public Binding createBindingttl(){
                        return BindingBuilder.bind(createqueuettl1()).to(createExchangettl()).with("item.ttl");
                    }
                }
            模拟发送消息,10秒后,队列中的消息会自动清除:    
                @RequestMapping("/send3")
                public String send3() {
                    //设置回调函数
                    //发送消息
                    rabbitTemplate.convertAndSend("exchange_direct_demo02", "item.ttl", "hello ttl哈哈");
                    return "ok";
                }
                
          
        
                

    死信队列:
      死信队列原理图:
        


    成为死信的三种条件:
      队列消息长度到达限制;
      消费者拒接消费消息,并且该消息没有重回队列;
      原队列存在消息过期设置,消息到达超时时间未被消费;

    延迟队列:
      延迟队列原理图:

        

     延迟队列的典型应用场景:

      典型的案例:下订单之后,30分钟如果还未支付则,取消订单回滚库存。

  • 相关阅读:
    LeetCode OJ String to Integer (atoi) 字符串转数字
    HDU 1005 Number Sequence(AC代码)
    HDU 1004 Let the Balloon Rise(AC代码)
    HDU 1003 Max Sum(AC代码)
    012 Integer to Roman 整数转换成罗马数字
    011 Container With Most Water 盛最多水的容器
    010 Regular Expression Matching 正则表达式匹配
    007 Reverse Integer 旋转整数
    006 ZigZag Conversion
    005 Longest Palindromic Substring 最长回文子串
  • 原文地址:https://www.cnblogs.com/lyle-liu/p/12642856.html
Copyright © 2011-2022 走看看