zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

    RabbitMQ优先级队列注意点:

    1、只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效

    2、RabbitMQ3.5以后才支持优先级队列

    代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下:

    消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器)

    <!-- ========================================RabbitMQ========================================= -->
        <!-- 连接工厂 -->
        <rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" />
        <!-- 监听器 -->
        <rabbit:listener-container connection-factory="connectionFactory">
            <!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
            <rabbit:listener queues="red" method="onMessage" ref="redQueueListener" />
            <rabbit:listener queues="blue" method="onMessage" ref="blueQueueListener" />
            <rabbit:listener queues="queue" method="queueList" ref="queueListener" />
        </rabbit:listener-container>
        <!-- 队列声明 -->
        <rabbit:queue name="red" durable="true" />
        <rabbit:queue name="blue" durable="true" />
        <rabbit:queue name="queue" durable="true" />
        <!-- 红色监听处理器 -->
        <bean id="redQueueListener" class="com.aitongyi.customer.RedQueueListener" />
        <!-- 颜色监听处理器 -->
        <bean id="blueQueueListener" class="com.aitongyi.customer.BlueQueueListener" />
        <!-- 优先级队列监听处理器 -->
        <bean id="queueListener" class="com.aitongyi.customer.QueueListener" />

    生产者增加一个主方法:

    public static void main(String[] args)
        {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("127.0.0.1:5672");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            connectionFactory.setPublisherConfirms(true); // 必须要设置
    
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
    
            for (final int i : priority)
            {
                template.convertAndSend("queue", (Object) ("queue" + i), new MessagePostProcessor() {
    
                    @Override
                    public Message postProcessMessage(Message arg0) throws AmqpException
                    {
                        arg0.getMessageProperties().setPriority(i);
                        return arg0;
                    }
                });
            }
        }

    当然,还需要在客户端创建一个优先级队列:

    注意,x-max-length = 9999,这个值最后不要写太大了,否则你电脑的内存会一直处于100%的使用状态(至于出现这种状况怎么解决,请点击:RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)),并且这个取值范围在0~255之间,超过了可能会出现问题,我测试了设置9999时,部分有问题,后面会把有问题的地方贴上来。

    好了,所有的事情准备完毕了,我们先启动消费者,然后再运行主方法,此时的队列优先级设置为,private static final int[] priority = { 1, 5, 1, 2, 3, 4, 5, 5, 0, 3, 6, 10, 4, 100, 99, 98 };执行结果如下:

    2017-05-16 09:51:48 399 [INFO] c.a.c.QueueListener - queueList Receved:queue1
    2017-05-16 09:51:48 417 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:51:48 435 [INFO] c.a.c.QueueListener - queueList Receved:queue1
    2017-05-16 09:51:48 493 [INFO] c.a.c.QueueListener - queueList Receved:queue2
    2017-05-16 09:51:48 514 [INFO] c.a.c.QueueListener - queueList Receved:queue3
    2017-05-16 09:51:48 596 [INFO] c.a.c.QueueListener - queueList Receved:queue4
    2017-05-16 09:51:48 950 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:51:48 975 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:51:49 015 [INFO] c.a.c.QueueListener - queueList Receved:queue0
    2017-05-16 09:51:49 039 [INFO] c.a.c.QueueListener - queueList Receved:queue3
    2017-05-16 09:51:49 058 [INFO] c.a.c.QueueListener - queueList Receved:queue6
    2017-05-16 09:51:49 084 [INFO] c.a.c.QueueListener - queueList Receved:queue10
    2017-05-16 09:51:49 102 [INFO] c.a.c.QueueListener - queueList Receved:queue4
    2017-05-16 09:51:49 140 [INFO] c.a.c.QueueListener - queueList Receved:queue100
    2017-05-16 09:51:49 561 [INFO] c.a.c.QueueListener - queueList Receved:queue99
    2017-05-16 09:51:49 595 [INFO] c.a.c.QueueListener - queueList Receved:queue98

    很奇怪,没有按照优先级执行?文章前面已经提到,只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。所以,我们需要先将所有的消息队列发送至服务器,然后再启动消费者处理消息,这样,就可以看到效果了。为了达到这个目的,则需要先运行主方法,然后再启动消费者,队列优先级设置和上方一样,执行结果如下:

    2017-05-16 09:56:23 296 [INFO] c.a.c.QueueListener - queueList Receved:queue100
    2017-05-16 09:56:23 359 [INFO] c.a.c.QueueListener - queueList Receved:queue99
    2017-05-16 09:56:23 438 [INFO] c.a.c.QueueListener - queueList Receved:queue98
    2017-05-16 09:56:23 500 [INFO] c.a.c.QueueListener - queueList Receved:queue10
    2017-05-16 09:56:23 594 [INFO] c.a.c.QueueListener - queueList Receved:queue6
    2017-05-16 09:56:23 656 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:56:23 765 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:56:23 874 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:56:24 077 [INFO] c.a.c.QueueListener - queueList Receved:queue4
    2017-05-16 09:56:24 202 [INFO] c.a.c.QueueListener - queueList Receved:queue4
    2017-05-16 09:56:24 262 [INFO] c.a.c.QueueListener - queueList Receved:queue3
    2017-05-16 09:56:24 311 [INFO] c.a.c.QueueListener - queueList Receved:queue3
    2017-05-16 09:56:24 389 [INFO] c.a.c.QueueListener - queueList Receved:queue2
    2017-05-16 09:56:24 422 [INFO] c.a.c.QueueListener - queueList Receved:queue1
    2017-05-16 09:56:24 500 [INFO] c.a.c.QueueListener - queueList Receved:queue1
    2017-05-16 09:56:24 547 [INFO] c.a.c.QueueListener - queueList Receved:queue0

    这样,优先级队列就实现了,至于相同优先级的队列,执行顺序应该是随机的吧,我也没有测试,有兴趣的同学可以自己研究。

    附:队列优先级设置为:private static final int[] priority = { 1, 5, 1, 2, 9998, 3, 4, 5, 999, 5, 0, 3, 6, 10, 4, 1000, 9999, 100, 99, 98, 899 };运行结果为:

    2017-05-16 09:59:29 839 [INFO] c.a.c.QueueListener - queueList Receved:queue1000
    2017-05-16 09:59:29 917 [INFO] c.a.c.QueueListener - queueList Receved:queue999
    2017-05-16 09:59:29 995 [INFO] c.a.c.QueueListener - queueList Receved:queue899
    2017-05-16 09:59:30 073 [INFO] c.a.c.QueueListener - queueList Receved:queue100
    2017-05-16 09:59:30 182 [INFO] c.a.c.QueueListener - queueList Receved:queue99
    2017-05-16 09:59:30 275 [INFO] c.a.c.QueueListener - queueList Receved:queue98
    2017-05-16 09:59:30 353 [INFO] c.a.c.QueueListener - queueList Receved:queue9999
    2017-05-16 09:59:30 431 [INFO] c.a.c.QueueListener - queueList Receved:queue9998
    2017-05-16 09:59:30 509 [INFO] c.a.c.QueueListener - queueList Receved:queue10
    2017-05-16 09:59:30 619 [INFO] c.a.c.QueueListener - queueList Receved:queue6
    2017-05-16 09:59:30 670 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:59:30 733 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:59:30 826 [INFO] c.a.c.QueueListener - queueList Receved:queue5
    2017-05-16 09:59:31 138 [INFO] c.a.c.QueueListener - queueList Receved:queue4
    2017-05-16 09:59:31 282 [INFO] c.a.c.QueueListener - queueList Receved:queue4
    2017-05-16 09:59:31 316 [INFO] c.a.c.QueueListener - queueList Receved:queue3
    2017-05-16 09:59:31 379 [INFO] c.a.c.QueueListener - queueList Receved:queue3
    2017-05-16 09:59:31 410 [INFO] c.a.c.QueueListener - queueList Receved:queue2
    2017-05-16 09:59:31 472 [INFO] c.a.c.QueueListener - queueList Receved:queue1
    2017-05-16 09:59:31 712 [INFO] c.a.c.QueueListener - queueList Receved:queue1
    2017-05-16 09:59:31 745 [INFO] c.a.c.QueueListener - queueList Receved:queue0

    恩,排序结果是有点乱。。。

  • 相关阅读:
    javaweb(五)——Servlet开发(一)
    JDK1.8改为JDK1.7过程
    javaweb(四)——Http协议(请求头,响应头详解)
    JavaWeb(三)——Tomcat服务器(二)
    JavaWeb(二)——Tomcat服务器(一)
    JavaWeb——JavaWeb开发入门
    java学习之异常总结
    java学习之异常之覆盖
    java学习之异常之格式
    java学习之异常之finally
  • 原文地址:https://www.cnblogs.com/wuzhiyuan/p/6859864.html
Copyright © 2011-2022 走看看