zoukankan      html  css  js  c++  java
  • MQ的使用

    笔记来自
    基础使用
    延迟队列
    确认机制

    全过程

    • 连接mq服务器
    • 生成template对象
    • 我们的角色是admin
    • 设置好几个队列queue
    • 交换机exchange,有4种交换机
    • 设置处理队列的方法,也叫做消费者,这是自动处理的
    • 通过接口往队列添加内容

    pom.xml

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    application.properties

    #rabbitmq
    spring.rabbitmq.host=192.168.89.168
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=fzb
    spring.rabbitmq.password=fzb2019
    spring.rabbitmq.virtual-host=/
    #消费者数量
    spring.rabbitmq.listener.simple.concurrency=10
    #最大消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    #消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理
    spring.rabbitmq.listener.simple.prefetch=1
    #消费者自动启动
    spring.rabbitmq.listener.simple.auto-startup=true
    #消费者消费失败,自动重新入队
    spring.rabbitmq.listener.simple.default-requeue-rejected=true
    #启用发送重试 队列满了发不进去时启动重试
    spring.rabbitmq.template.retry.enabled=true 
    #1秒钟后重试一次
    spring.rabbitmq.template.retry.initial-interval=1000 
    #最大重试次数 3次
    spring.rabbitmq.template.retry.max-attempts=3
    #最大间隔 10秒钟
    spring.rabbitmq.template.retry.max-interval=10000
    #等待间隔 的倍数。如果为2  第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
    spring.rabbitmq.template.retry.multiplier=1.0
    

    交换机模式

    • direct,全匹配的点对点
    @Configuration
    public class SenderConf {
        @Bean
        public Queue queue() {
            return new Queue("queue");
        }
    }
    
    
    @Service
    public class HelloSender {
        @Autowired
        private AmqpTemplate template;
     
        public void send() {
            template.convertAndSend("queue", "hello,rabbit666~");
        }
    }
    
    @Component
    public class MyListner 
        @RabbitListener(queues = "queue")
        public void msg(String msg){
            System.out.println("消费者消费消息了:"+msg);
        }
    }
    
    • topic,模糊匹配,需要判断
    @Configuration
    public class SenderConf1 {
     
        @Bean(name="message")
        public Queue queueMessage() {
            return new Queue("topic.message");
        }
     
        @Bean(name="messages")
        public Queue queueMessages() {
            return new Queue("topic.messages");
        }
     
        @Bean
        public TopicExchange exchange() {
            return new TopicExchange("exchange");
        }
        @Bean
        Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
     
        @Bean
        Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
        }
    }
    
    
    @Service
    public class HelloSender {
        @Autowired
        private AmqpTemplate template;
     
        public void send() {
            template.convertAndSend("exchange","topic.message","hello,rabbit~~~11");
            template.convertAndSend("exchange","topic.messages","hello,rabbit~~~22");
        }
    }
    
    @Component
    public class MyListner{
        @RabbitListener(queues="topic.message")    //监听器监听指定的Queue
        public void process1(String str) {
            System.out.println("message:"+str);
        }
        @RabbitListener(queues="topic.messages")    //监听器监听指定的Queue
        public void process2(String str) {
            System.out.println("messages:"+str);
        }
    }
    
    • faout,全广播
    @Configuration
    public class SenderConf2 {
     
        @Bean(name="Amessage")
        public Queue AMessage() {
            return new Queue("fanout.A");
        }
     
        @Bean(name="Bmessage")
        public Queue BMessage() {
            return new Queue("fanout.B");
        }
     
        @Bean(name="Cmessage")
        public Queue CMessage() {
            return new Queue("fanout.C");
        }
     
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");//配置广播路由器
        }
     
        @Bean
        Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }
     
        @Bean
        Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }
     
        @Bean
        Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }
    }
    
    @Service
    public class HelloSender {
        @Autowired
        private AmqpTemplate template;
     
        public void send() {
            template.convertAndSend("fanoutExchange","","xixi,haha");//参数2忽略
        }
    }
    
    @Component
    public class MyListner{
        @RabbitListener(queues="fanout.A")
        public void processA(String str1) {
            System.out.println("ReceiveA:"+str1);
        }
        @RabbitListener(queues="fanout.B")
        public void processB(String str) {
            System.out.println("ReceiveB:"+str);
        }
        @RabbitListener(queues="fanout.C")
        public void processC(String str) {
            System.out.println("ReceiveC:"+str);
        }
    }
    

    过期时间

    • 设置了过期时间,过期了就没了,有两种方式
    • 如果两个同时设置已最早过期时间为准
    // 在发送消息时设置过期时间
    @Test
    public void ttlMessageTest(){
       MessageProperties messageProperties = new MessageProperties();
       //设置消息的过期时间,3秒
       messageProperties.setExpiration("3000");
       Message message = new Message("测试过期消息,3秒钟过期".getBytes(), messageProperties);
       //路由键与队列同名
       rabbitTemplate.convertAndSend("my_ttl_queue", message);
    }
    
    // 设置整个队列的过期时间
    @Configuration
    public class SenderConf3 {
        // 新建业务队列,添加死信配置,
        @Bean("redirectQueue")
        public Queue redirectQueue() {
            Map<String, Object> args = new HashMap<>(1);
            // 过期时间
            args.put("x-message-ttl", 10*1000);
            return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
        }
        // 需要普通业务交换机和绑定,这里省略
    }
    

    死信队列

    • 上面过期了没了,可以让他们去到死信的队列
    @Configuration
    public class SenderConf3 {
        @Bean("deadLetterExchange")
        public Exchange deadLetterExchange() {
            return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
        }
    
        @Bean("deadLetterQueue")
        public Queue deadLetterQueue() {
            return QueueBuilder.durable("DL_QUEUE").build();
        }
    
        @Bean
        public Binding deadLetterBinding() {
            return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
        }
    
        // 新建业务队列,添加死信配置,
        @Bean("redirectQueue")
        public Queue redirectQueue() {
            Map<String, Object> args = new HashMap<>(2);
    //       x-dead-letter-exchange    声明  死信交换机
            args.put("x-dead-letter-exchange", "DL_EXCHANGE");
    //       x-dead-letter-routing-key    声明 死信路由键
            args.put("x-dead-letter-routing-key", "DL_KEY");
            return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
        }
    
        // 需要普通业务交换机和绑定,这里省略
    }
    

    延迟队列

    • 需要给rabbitmq安装插件,放在pugins文件夹下重启服务
    1. 查看yum 安装的软件路径
       查找安装包:rpm -qa|grep rabbitmq
       查找位置: rpm -ql rabbitmq-server-3.6.15-1.el6.noarch
       卸载yum安装:yum remove rabbitmq-server-3.6.15-1.el6.noarch
    2. 上传到plugins文件夹
    3. 停止服务器
       service rabbitmq-server stop
    4. 开启插件
       rabbitmq-plugins enable rabbitmq_delayed_message_exchange
       (关闭插件)
       rabbitmq-plugins disable rabbitmq_delayed_message_exchange
    5. 启动服务器
       service rabbitmq-server start
    6. 查看插件
       rabbitmq-plugins list
    
    @Configuration
    public class DelayQueueConfig {
    
        public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
        public static final String DELAY_QUEUE = "DELAY_QUEUE";
        public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";
    
        @Bean("delayExchange")
        public Exchange delayExchange() {
            Map<String, Object> args = new HashMap<>(1);
    //       x-delayed-type    声明 延迟队列Exchange的类型
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
        }
    
        @Bean("delayQueue")
        public Queue delayQueue() {
            return QueueBuilder.durable(DELAY_QUEUE).build();
        }
        
        @Bean
        public Binding delayQueueBindExchange() {
            return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
        }
    
    }
    
    
    @Component
    public class DelayQueueSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendDelayQueue(int number) {
            rabbitTemplate.convertAndSend(
                    "textExchange",
                    "textKey",
                    number, (message) -> {
                        // 设置延迟的毫秒数
                        message.getMessageProperties().setDelay(number);
                        log.info("Now : {}", ZonedDateTime.now());
                        return message;
                    });
        }
    }
    
    // 监听textKey对应的队列等消息就行
    

    确认机制

    • 配置
    # 发送确认
    spring.rabbitmq.publisher-confirms=true
    # 发送回调
    spring.rabbitmq.publisher-returns=true
    # 消费手动确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    • 生产者发送消息确认机制
      • 其实这个也不能叫确认机制,只是起到一个监听的作用,监听生产者是否发送消息到exchange和queue。
      • 生产者和消费者代码不改变。
      • 新建配置类 MQProducerAckConfig.java 实现ConfirmCallback和ReturnCallback接口,@Component注册成组件。
      • ConfirmCallback只确认消息是否到达exchange,已实现方法confirm中ack属性为标准,true到达,反之进入黑洞。
      • ReturnCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
    @Component
    public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
            rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback
    
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                System.out.println("消息发送成功" + correlationData);
            } else {
                System.out.println("消息发送失败:" + cause);
            }
        }
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            // 反序列化对象输出
            System.out.println("消息主体: " + SerializationUtils.deserialize(message.getBody()));
            System.out.println("应答码: " + replyCode);
            System.out.println("描述:" + replyText);
            System.out.println("消息使用的交换器 exchange : " + exchange);
            System.out.println("消息使用的路由键 routing : " + routingKey);
        }
    }
    
    • 如果消费者消息是默认auto
      • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
      • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
      • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
      • 其他的异常,则消息会被拒绝,且 requeue = true,此时会发生死循环,可以通过 setDefaultRequeueRejected(默认是true)去设置抛弃消息
    • 消费者消息手动确认manual,一定要对消息做出应答,否则rabbit认为当前队列没有消费完成,将不再继续向该队列发送消息
    @Component
    public class MyListner 
        @RabbitListener(queues = "queue")
        public void msg(Channel channel,String msg) throws IOException {
            System.out.println("消费者消费消息了:"+msg);
            // 多了个channel,还要监听错误
            // channel有三个方法,一个是成功,一个是拒绝,一个是重新入队
            try {
                // 模拟执行任务
                Thread.sleep(1000);
                // 模拟异常
                String is = null;
                is.toString();
                // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                if (message.getMessageProperties().getRedelivered()) {
                    System.out.println("消息已重复处理失败,拒绝再次接收" + user.getName());
                    // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                } else {
                    System.out.println("消息即将再次返回队列处理" + user.getName());
                    // requeue为是否重新回到队列,true重新入队
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
                //e.printStackTrace();
            }
        }
    }
    
    • 这个就是手动事务了

    持久化

    • 交换机持久化
    @Bean
    public DirectExchange testDirectExchange(){
       //第二个参数就是是否持久化,第三个参数就是是否自动删除
       return new DirectExchange("direct.Exchange",true,false);
    }
    
    • 队列持久化
    @Bean
    public Queue txQueue(){
       //第二个参数就是durable,是否持久化
       return new Queue("txQueue",true);
    }
    

    高级知识

    • 集群
    • HAProxy
    • KeepAlived

    面试问题

    • 消息堆积
    // 原因
    太多入队,消费不及时,队列占满
    // 解决方案
    增加消费者
    
    • 消息丢失
    // 原因一
    消息在生产者丢失
    // 解决方案一
    信息被MQ接受后需要给生产者发送一个确认消息(确认机制)
    在confirm方法里的信息发送失败后面添加重发机制
    
    // 原因二
    消息在MQ宕机丢失
    // 解决方案二
    启动持续化
    
    // 原因三
    消息在消费者丢失
    // 解决方案二
    消费者确认机制,事务机制
    
    • 有序消费
    // 目的
    有ABC三个消息,想要顺序执行ABC,但是有多个消费者,ABC会被瞬间平分
    // 解决方案
    改成多个队列,一个队列一个消费者,信息由hash值放到对应队列
    
    • 重复消费
    // 原因
    为了防止消息在消费者丢失开启了手动回复,但是如果在消费者执行成功了,但是回复的时候出了问题,mq就以为消息没成功又给下一个消费者发送一次,同个消息执行多次
    // 解决
    每个消息都添加id,redis也添加id,消费者接受信息后判断这个信息是不是用过了,用过了直接返回成功
    
  • 相关阅读:
    【科普】.NET 泛型
    吐槽,青岛科技大学真他妈操蛋
    c# 数组和集合精讲
    c# System.Text.Json 精讲
    .NET 5的System.Text.Json的JsonDocument类讲解
    c#中Array,ArrayList 与List<T>的区别、共性与转换
    代码是怎么运行的?
    .NET6使用DOCFX根据注释自动生成开发文档
    spring通过注解注册bean的方式+spring生命周期
    莫比乌斯反演
  • 原文地址:https://www.cnblogs.com/pengdt/p/13523324.html
Copyright © 2011-2022 走看看