zoukankan      html  css  js  c++  java
  • RabbitMQ六种队列模式-主题模式

    前言

    RabbitMQ六种队列模式-简单队列
    RabbitMQ六种队列模式-工作队列
    RabbitMQ六种队列模式-发布订阅
    RabbitMQ六种队列模式-路由模式
    RabbitMQ六种队列模式-主题模式 [本文]

    从前面的几篇我们依次经历了 exchange 模式从 fanout > direct 的转变过程,在 fanout 时,我们只能进行简单的广播,对应类型比较单一,使用 direct 后,消费者则可以进行一定程度的选择,但是,direct 还是有局限性,路由不支持多个条件。

    怎么讲呢?

    direct 不支持匹配 routingKey,一但绑定了就是绑定了,而 topic 主题模式支持规则匹配,只要符合 routingKey 就能发送到绑定的队列上。

    文章目录

    1. 什么是主题模式2. 代码部分2.1 生产者2.2 *消费者2.3 #消费者2.4 运行截图3. 总结

    1. 什么是主题模式

    官方链接:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

    topics 主题模式跟 routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由键 routingKey,类似于SQL中 = 和 like 的关系。


    P 表示为生产者、 X 表示交换机、C1C2 表示为消费者,红色表示队列。

     

    topics 模式与 routing 模式比较相近,topics 模式不能具有任意的 routingKey,必须由
    一个英文句点号“.”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词),比如 "lazy.orange.fox"。topics routingKey 中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

    "*" 表示任何一个词
    "#" 表示0或1个词

    以上图中的配置为例:

    如果一个消息的 routingKey 设置为 “xxx.orange.rabbit”,那么该消息会同时路由到 Q1 与 Q2,routingKey="lazy.orange.fox”的消息会路由到Q1与Q2;

    routingKey="lazy.brown.fox”的消息会路由到 Q2;

    routingKey="lazy.pink.rabbit”的消息会路由到 Q2(只会投递给Q2一次,虽然这个routingKey 与 Q2 的两个 bindingKey 都匹配);

    routingKey="quick.brown.fox”、routingKey="orange”、routingKey="quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

    接下来代码为例:

    2. 代码部分

    2.1 生产者

    public class ProducerTopic {

        private static final String EXCHANGE_NAME = "my_topic_exchange";

        public static void main(String[] args) throws IOException, TimeoutException {
            /** 1.创建新的连接 */
            Connection connection = MQConnectionUtils.newConnection();
            /** 2.创建通道 */
            Channel channel = connection.createChannel();
            /** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            /** 4.发送消息 */
            String routingKey = "log.info.error";
            String msg = "topic_exchange_msg:" + routingKey;
            System.out.println("[send] = " + msg);
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
            /** 5.关闭通道、连接 */
            channel.close();
            connection.close();
            /** 注意:如果消费没有绑定交换机和队列,则消息会丢失 */
        }
    }

    2.2 *消费者

    public class ConsumerLogXTopic {

        private static final String QUEUE_NAME = "topic_consumer_info";
        private static final String EXCHANGE_NAME = "my_topic_exchange";

        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("log * 消费者启动");
            /* 1.创建新的连接 */
            Connection connection = MQConnectionUtils.newConnection();
            /* 2.创建通道 */
            Channel channel = connection.createChannel();
            /* 3.消费者关联队列 */
            channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
            /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException 
    {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息:" + msg);
                }
            };
            /* 5.消费者监听队列消息 */
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }

    }

    2.3 #消费者

    public class ConsumerLogJTopic {

        private static final String QUEUE_NAME = "topic_consumer_info";
        private static final String EXCHANGE_NAME = "my_topic_exchange";

        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("log # 消费者启动");
            /* 1.创建新的连接 */
            Connection connection = MQConnectionUtils.newConnection();
            /* 2.创建通道 */
            Channel channel = connection.createChannel();
            /* 3.消费者关联队列 */
            channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
            /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException 
    {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息:" + msg);
                }
            };
            /* 5.消费者监听队列消息 */
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }

    }

    2.4 运行截图

    生产者

    星花*消费者

    #消费者

    3. 总结

    1、topic 相对于之前几种算是比较复杂了,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),exchange 会将消息转发到所有关注主题能与 routeKey 模糊匹配的队列。

    2、在进行绑定时,要提供一个该队列关心的主题,如“#.sscai.#”表示该队列关心所有涉及 sscai 的消息(一个 routeKey 为 "club.sscai.tmax”的消息会被转发到该队列)。

    3、"#”表示0个或若干个关键字,“”表示一个关键字。如“club.”能与“club.sscai”匹配,无法与“club.sscai.xxx”匹配;但是“club.#”能与上述两者匹配。

    4、同样,如果 exchange 没有发现能够与 routeKey 匹配的 Queue,则会抛弃此消息。

    案例代码:https://www.lanzous.com/i5ydu6d

    我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

  • 相关阅读:
    boost::asio在VS2008下的编译错误
    Java集合框架——接口
    ACM POJ 3981 字符串替换(简单题)
    ACM HDU 1042 N!(高精度计算阶乘)
    OneTwoThree (Uva)
    ACM POJ 3979 分数加减法(水题)
    ACM HDU 4004 The Frog's Games(2011ACM大连赛区第四题)
    Hexadecimal View (2011ACM亚洲大连赛区现场赛D题)
    ACM HDU 4002 Find the maximum(2011年大连赛区网络赛第二题)
    ACM HDU 4001 To Miss Our Children Time (2011ACM大连赛区网络赛)
  • 原文地址:https://www.cnblogs.com/niceyoo/p/11448089.html
Copyright © 2011-2022 走看看