zoukankan      html  css  js  c++  java
  • rabbitmq的消费端限流模式(二)

    rabbitmq的消费端限流模式

    rabbitmq-high-producer项目

    application.properties文件

    server.port=8081
    # ip
    spring.rabbitmq.host=127.0.0.1
    #默认5672
    spring.rabbitmq.port=5672
    #用户名
    spring.rabbitmq.username=guest
    #密码
    spring.rabbitmq.password=guest
    #连接到代理时用的虚拟主机
    spring.rabbitmq.virtual-host=/
    #是否启用【发布确认】,默认false
    #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-confirm-type=correlated
    #是否启用【发布返回】,默认false
    spring.rabbitmq.publisher-returns=true
    #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #rabbitmq限流,必须在ack确认才能使用
    #消费者最小数量
    spring.rabbitmq.listener.simple.concurrency=1
    #最大的消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
    spring.rabbitmq.listener.simple.prefetch=2

    controller层

    package com.qingfeng.rabbitmqhighproducer.qsq.controller;
    
    import com.qingfeng.rabbitmqhighproducer.qsq.service.QsqRabbitService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    /**
     * 消费端限流
     */
    @RestController
    @RequestMapping("/qsq")
    public class QsqDirectSendMessageController {
    
        @Autowired
        private QsqRabbitService qsqRabbitService;
    
        //http://127.0.0.1:8081/qsq/sendDirectMessage
        @GetMapping("/sendDirectMessage")
        public String sendDirectMessage() {
            for (int i=1;i<100;i++) {
                String messageId = String.valueOf(UUID.randomUUID());
                //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM
                qsqRabbitService.sendMessageQsq("qsq_direct_exchange", "qsq_direct_routing", messageId);
            }
            return "ok";
        }
    
        @GetMapping("/sendDirectMessage02")
        public String sendDirectMessage02() {
            String messageId = String.valueOf(UUID.randomUUID());
    
            //将消息携带绑定键值:路由键MqConst.ROUTING_MSM_ITEM 发送到交换机MqConst.EXCHANGE_DIRECT_MSM
            qsqRabbitService.sendMessageQsq("qsq_direct_exchange","qsq_direct_routing",messageId);
            return "ok";
        }
    
    }
    QsqRabbitConfig类
    package com.qingfeng.rabbitmqhighproducer.qsq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     *限流机制
     */
    @Configuration
    public class QsqRabbitConfig {
    
        //交换机名称
        public static final String QSQ_DIRECT_EXCHANGE = "qsq_direct_exchange";
        //队列名称
        public static final String QSQ_DIRECT__QUEUE = "qsq_direct_queue";
        //路由
        public static final String QSQ_DIRECT__ROUTING = "qsq_direct_routing";
    
        /**
         * 交换机
         */
        @Bean(name = "qsqDirectExchange")
        public DirectExchange qsqDirectExchange() {
            // 参数意义:
            // name: 名称
            // durable: true
            // autoDelete: 自动删除
            return new DirectExchange(QSQ_DIRECT_EXCHANGE, true, false);
        }
    
        /**
         * 队列
         */
        @Bean(name ="qsqDirectQueue" )
        public Queue qsqDirectQueue() {
            // 参数意义:
            // name: 名称
            // durable: true为持久化
            return new Queue(QSQ_DIRECT__QUEUE, true);
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean
        public Binding bindingDirectQsq() {
            return BindingBuilder.bind(qsqDirectQueue())
                    .to(qsqDirectExchange())
                    .with(QSQ_DIRECT__ROUTING);
        }
    
    
    }
    QsqRabbitService类
    package com.qingfeng.rabbitmqhighproducer.qsq.service;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    @Service
    public class QsqRabbitService implements RabbitTemplate.ConfirmCallback {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 消息从 producer生产者 到 exchange交换机 则会返回一个 confirmCallback 。
         *  producer发送消息confirm 确认模式
         * @param exchange 交换机
         * @param routingKey 路由键
         * @param msg 消息
         */
        public void sendMessageQsq(String exchange, String routingKey, String msg){
            /**
             * 确认模式:
             * 步骤:
             * 1. 确认模式开启:spring.rabbitmq.publisher-confirm-type=correlated
             * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
             */
            //2. 定义回调
            rabbitTemplate.setConfirmCallback(this);
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            System.out.println("发送的消息为"+msg);
            this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,correlationData);
        }
    
        /**
         *
         * @param correlationData 相关配置信息
         * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
         * @param cause 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("消息id:" + correlationData.getId());
            if (ack) {
                System.out.println("消息发送确认成功");
            } else {
                System.out.println("消息发送确认失败:" + cause);
            }
        }
    
    }
    rabbitmq-high-consumer项目

    application.properties文件

    server.port=8082
    
    # ip
    spring.rabbitmq.host=127.0.0.1
    #默认5672
    spring.rabbitmq.port=5672
    #用户名
    spring.rabbitmq.username=guest
    #密码
    spring.rabbitmq.password=guest
    #连接到代理时用的虚拟主机
    spring.rabbitmq.virtual-host=/
    #是否启用【发布确认】,默认false
    #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-confirm-type=correlated
    #是否启用【发布返回】,默认false
    spring.rabbitmq.publisher-returns=true
    #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #rabbitmq限流,必须在ack确认才能使用
    #消费者最小数量
    spring.rabbitmq.listener.simple.concurrency=1
    #最大的消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
    spring.rabbitmq.listener.simple.prefetch=2
    QsqRabbitConfig类
    package com.qingfeng.rabbitmqhighconsumer.qsq;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     *限流机制
     */
    @Configuration
    public class QsqRabbitConfig {
    
        //交换机名称
        public static final String QSQ_DIRECT_EXCHANGE = "qsq_direct_exchange";
        //队列名称
        public static final String QSQ_DIRECT__QUEUE = "qsq_direct_queue";
        //路由
        public static final String QSQ_DIRECT__ROUTING = "qsq_direct_routing";
    
        /**
         * 交换机
         */
        @Bean(name = "qsqDirectExchange")
        public DirectExchange qsqDirectExchange() {
            // 参数意义:
            // name: 名称
            // durable: true
            // autoDelete: 自动删除
            return new DirectExchange(QSQ_DIRECT_EXCHANGE, true, false);
        }
    
        /**
         * 队列
         */
        @Bean(name ="qsqDirectQueue" )
        public Queue qsqDirectQueue() {
            // 参数意义:
            // name: 名称
            // durable: true为持久化
            return new Queue(QSQ_DIRECT__QUEUE, true);
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean("bindingDirectQsq")
        public Binding bindingDirectQsq() {
            return BindingBuilder.bind(qsqDirectQueue())
                    .to(qsqDirectExchange())
                    .with(QSQ_DIRECT__ROUTING);
        }
    
    
    
    
    }
    QsqListener类
    package com.qingfeng.rabbitmqhighconsumer.qsq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Consumer 限流机制
     *  1. 确保ack机制为手动确认。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
     *  2. listener-container配置属性  spring.rabbitmq.listener.simple.prefetch=1
     *      perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
     */
    @Component
    public class QsqListener {
    
        //限流机制
        @RabbitHandler
        @RabbitListener(queues = "qsq_direct_queue")
        public void onMessage(Message message, Channel channel) throws Exception {
            Thread.sleep(5000);
            //1.获取消息
            System.out.println(new String(message.getBody()));
    
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑");
    
            //3. 签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
    启动两个项目测试:正确的测试
    访问http://127.0.0.1:8081/qsq/sendDirectMessage
    rabbitmq-high-provider输出结果
    会把生产的消息全发送到MQ服务器



    rabbitmq-high-consumer输出结果
     
    在配置文件里配置了spring.rabbitmq.listener.simple.prefetch=2
     
    consumer两个两个的消费

     

     




  • 相关阅读:
    大数加法、乘法实现的简单版本
    hdu 4027 Can you answer these queries?
    zoj 1610 Count the Colors
    2018 徐州赛区网赛 G. Trace
    1495 中国好区间 尺取法
    LA 3938 动态最大连续区间 线段树
    51nod 1275 连续子段的差异
    caioj 1172 poj 2823 单调队列过渡题
    数据结构和算法题
    一个通用分页类
  • 原文地址:https://www.cnblogs.com/Amywangqing/p/14694960.html
Copyright © 2011-2022 走看看