zoukankan      html  css  js  c++  java
  • RabbitMQ入门_07_Fanout 与 Topic

    A. 用广播的方式实现发布订阅

    参考资料:https://www.rabbitmq.com/tutorials/tutorial-three-java.html

    Fanout 类型的 Exchange 以广播的方式向所有绑定到该 Exchange 的队列推送消息

    下面样例代码试图使用 fanout 将状态变更消息推送给所有接入系统:

    gordon.study.rabbitmq.fanout.Fanout.java

    public class Fanout {
     
        private static final String EXCHANGE_NAME = "StatusUpdateFanout";
     
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
     
            final Channel senderChannel = connection.createChannel();
            senderChannel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            final CountDownLatch latch = new CountDownLatch(1);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 10;) {
                            String message = "NO. " + ++i;
                            TimeUnit.MILLISECONDS.sleep(100);
                            senderChannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                            System.out.printf("(%1$s)[===>%2$s    ] %3$s
    ", "S", EXCHANGE_NAME + ":", message);
                            if (i == 4) {
                                latch.countDown();
                            }
                        }
                        senderChannel.close();
                    } catch (Exception e) {
                    }
                }
            }).start();
     
            final Channel consumerChannel1 = connection.createChannel();
            consumerChannel1.queueDeclare("SystemA", false, false, true, null);
            consumerChannel1.queueBind("SystemA", EXCHANGE_NAME, "");
            consumerChannel1.basicQos(3);
            Consumer consumer1 = new DefaultConsumer(consumerChannel1) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.printf(" [    %2$s<===](%1$s) %3$s
    ", "A", "SystemA", message);
                    try {
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (InterruptedException e) {
                    }
                    consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            consumerChannel1.basicConsume("SystemA", false, consumer1);
     
            latch.await();
            final Channel consumerChannel2 = connection.createChannel();
            consumerChannel2.queueDeclare("SystemB", false, false, true, null);
            consumerChannel2.queueBind("SystemB", EXCHANGE_NAME, "");
            consumerChannel2.basicQos(3);
            Consumer consumer2 = new DefaultConsumer(consumerChannel2) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.printf(" [    %2$s<===](%1$s) %3$s
    ", "B -- won't receive first 4 messages", "SystemB", message);
                    try {
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (InterruptedException e) {
                    }
                    consumerChannel2.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            consumerChannel2.basicConsume("SystemB", false, consumer2);
        }
    }
    

    代码第11行申明了名字叫 StatusUpdateFanout 的 fanout 类型 Exchange。

    第33行申明了 SystemA 队列,第34行将 SystemA 队列绑定到 StatusUpdateFanout Exchange。注意到绑定路由键为空,路由键对于 fanout 类型 Exchange 无意义

    同理,第20行代码发送消息时,也不用设置路由键。所有发向 fanout 类型 Exchange 的消息都会无视路由键,广播给每个绑定队列

    B. 用 Topic 实现发布订阅

    参考资料:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

    Topic 类型的 Exchange 通过支持通配符的路由键管理复杂的发布订阅关系

    发送消息时指定的路由键必须是点号(.)分隔的单词,例如 sourceA.statusUpdate.systemA。

    队列绑定 Exchange 时指定的路由键可以使用通配符:

    • *(星号)替代一个单词
    • #(井号)替代 0~n 个单词

    对每一条消息,Exchange 会遍历所有的绑定关系,确认消息指定的路由键(例如 sourceA.statusUpdate.systemA)是否匹配绑定关系中的路由键,如果匹配,则将消息推送到相应队列(例如 sourceA.statusUpdate.systemA、*.*.systemA、*.statusUpdate.* 和 sourceA.# 都匹配,但是 sourceB.# 和 *.systemA 都不匹配)。

    gordon.study.rabbitmq.topic.Topic.java

    senderChannel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
    consumerChannel1.queueBind("SystemA", EXCHANGE_NAME, "#.SystemA");
    
    consumerChannel2.queueBind("SystemB", EXCHANGE_NAME, "*.*.SystemB");
    
    senderChannel.basicPublish(EXCHANGE_NAME, "preOrder.statusUpdate.SystemA", null, message.getBytes("UTF-8"));
    

    C. 没有历史数据?

    对于中途创建的队列(例如上面的 SystemB 队列),是没有办法获得之前的消息的。但是如果队列提前创建好,就算没有消费者,队列里依然会有全量的数据。

  • 相关阅读:
    ....
    CodeForces 375A(同余)
    POJ 2377 Bad Cowtractors (最小生成树)
    POJ 1258 AgriNet (最小生成树)
    HDU 1016 Prime Ring Problem(全排列)
    HDU 4460 Friend Chains(bfs)
    POJ 2236 Wireless Network(并查集)
    POJ 2100 Graveyard Design(尺取)
    POJ 2110 Mountain Walking(二分/bfs)
    CodeForces 1059B Forgery(模拟)
  • 原文地址:https://www.cnblogs.com/gordonkong/p/6953251.html
Copyright © 2011-2022 走看看