zoukankan      html  css  js  c++  java
  • RabbitMQ

    总结一下几种ExchangeTypes。


    之前写发布/订阅模式时第一次提到了exchange type。
    即producer不是将消息直接放到队列中,而是先到exchange中,exchange主要用于控制消息到队列的路由,根据具体的exchange type将消息传给需要的队列或者直接废弃。
    在这一篇中总结一下那些用到的exchange type。


    一.Direct Exchange
    direct exchange算是最基本的了。
    direct exchange用于将带上routing key的消息传值拥有相同routing key的队列中。


    当我们想用一个简单的标识符区别所有传入同一个exchange中的消息时direct exchange就非常合适。

    
    private static String DIRECT_EXCHANGE = "DIRECT_EXCHAGNE";
     
        static class FanoutProducer {
            public static void main(String[] args) throws IOException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel= connection.createChannel();;
     
                String content = "I miss the conversation";
                channel.exchangeDeclare(DIRECT_EXCHANGE, ExchangeTypes.DIRECT);
                channel.basicPublish(DIRECT_EXCHANGE, "alvez", null, content.getBytes());
            }
        }
     
        static class FanoutConsumer {
            public static void main(String[] args) throws IOException, InterruptedException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel= connection.createChannel();
     
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, DIRECT_EXCHANGE, "alvez");
     
                QueueingConsumer consumer = new QueueingConsumer(channel);
                String s = channel.basicConsume(queueName, true, consumer);
                System.out.println(s);
                while (true) {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    String routingKey = delivery.getEnvelope().getRoutingKey();
     
                    System.out.println("From:" + routingKey + "':'" + message + "'");
                }
     
            }
      }
    

    二.Fanout Exchange
    fanout和routing key无关,它将消息无差别地(indiscriminately)传送给所有队列。

    fanout exchange通常用于发布/订阅模式。
    将消息传送给不同的队列,不同的队列对同一种消息采取不同的行为。
    比如,现在有一个客户订单消息被三个队列接收,队列1完成该订单,队列2将订单写入日志,队列3将订单发给别的部门什么的。
    比如下面的代码,消费者可以获得routing key并输出,但能否获取与routing key无关:

    
        private static String FANOUT_EXCHANGE = "FANOUT_EXCHANGE";
     
        static class DirectProducer {
            public static void main(String[] args) throws IOException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel= connection.createChannel();;
     
                String content = "I miss the conversation";
                channel.exchangeDeclare(FANOUT_EXCHANGE, ExchangeTypes.FANOUT);
                channel.basicPublish(FANOUT_EXCHANGE, "alvez", null, content.getBytes());
            }
        }
     
        static class DirectConsumer {
            public static void main(String[] args) throws IOException, InterruptedException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel= connection.createChannel();
     
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, FANOUT_EXCHANGE, "");
     
                QueueingConsumer consumer = new QueueingConsumer(channel);
                String s = channel.basicConsume(queueName, true, consumer);
                System.out.println(s);
                while (true) {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    String routingKey = delivery.getEnvelope().getRoutingKey();
     
                    System.out.println("From:" + routingKey + "':'" + message + "'");
                }
     
            }
     
        }
    

    三.Topic Exchange
    如果根据topic exchange用法说明其特征的话反而更麻烦。
    topic exchange正如其名,就是根据某种主题而不是特定的标题,也就是可以匹配routing key的一部分或者全部。
    topic exchange的routing key可以有多个词组成,词用'.'分隔。
    routing key中可以包括'*'或者'#','*'表示一个词,'#'表示0~N个词。

    比如消息发布时routing key为"honda.civic.navy",
    能接收消息的队列的routing key可以是"honda.civic.navy"或"*.civic.*"或"honda.#"或"#",
    但不能是"honda.accord.navy"或"honda.accord.silver"或"*.accord.*"或"ford.#"。

    
        private static String TOPIC_EXCHANGE = "TOPIC_EXCHAGNE";
     
        static class TopicProducer {
            public static void main(String[] args) throws IOException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel= connection.createChannel();;
     
                String content = "I miss the conversation";
                channel.exchangeDeclare(TOPIC_EXCHANGE, ExchangeTypes.TOPIC);
                channel.basicPublish(TOPIC_EXCHANGE, "alvez.dep.FBI.map", null, content.getBytes());
            }
        }
     
        static class TopicConsumer {
            public static void main(String[] args) throws IOException, InterruptedException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel= connection.createChannel();
     
                String queueName = channel.queueDeclare().getQueue();
                channel.queueBind(queueName, TOPIC_EXCHANGE, "alvez.#");
     
                QueueingConsumer consumer = new QueueingConsumer(channel);
                String s = channel.basicConsume(queueName, true, consumer);
                System.out.println(s);
                while (true) {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    String routingKey = delivery.getEnvelope().getRoutingKey();
     
                    System.out.println("From:" + routingKey + "':'" + message + "'");
                }
     
            }
      }
    

    四.Headers Exchange
    即消息头和队列中声明的消息头匹配时可以通信,似乎不是很常用。
    就可以定义多个条件进行匹配这一点来说,headers exchange和topic exchange有些相似。
    有时候会给人"为什么会有这种东西?"的感觉,相比topic exchage有什么优势?
    如果仅仅说"headers exchange是基于headers的,topic exchange是基于routing key的",这种回答没什么意义。

    代码如下,可以看到producer和consumer的routing key是不同的,producer的header通过properties对象传输:

    
      private static String HEADERS_EXCHANGE = "HEADERS_EXCHANGE";
     
        static class HeadersProducer {
            public static void main(String[] args) throws IOException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel();
     
                String content = "I miss the conversation";
                channel.exchangeDeclare(HEADERS_EXCHANGE, ExchangeTypes.HEADERS);
                AMQP.BasicProperties properties = new AMQP.BasicProperties();
                Map<String,Object> map = new HashMap<>();
                map.put("key1","val1");
                properties.setHeaders(map);
     
                channel.basicPublish(HEADERS_EXCHANGE, "alvez", properties, content.getBytes());
     
            }
        }
     
        static class HeadersConsumer {
            public static void main(String[] args) throws IOException, InterruptedException {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel();
     
                String queueName = channel.queueDeclare().getQueue();
                Map<String, Object> headers = new HashMap<>();
                headers.put("key1","val1");
                headers.put("key2","val2");
                headers.put("key3","val3");
                headers.put("key4","val4");
                channel.queueBind(queueName, HEADERS_EXCHANGE, "",headers);
     
                QueueingConsumer consumer = new QueueingConsumer(channel);
                String s = channel.basicConsume(queueName, true, consumer);
                System.out.println(s);
                while (true) {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    String routingKey = delivery.getEnvelope().getRoutingKey();
     
                    System.out.println("From:" + routingKey + "':'" + message + "'");
                }
     
            }
     
        }
    

    (ps:图不错,感谢图片作者。)

  • 相关阅读:
    leetcode 268. Missing Number
    DBSCAN
    python二维数组初始化
    leetcode 661. Image Smoother
    leetcode 599. Minimum Index Sum of Two Lists
    Python中的sort() key含义
    leetcode 447. Number of Boomerangs
    leetcode 697. Degree of an Array
    滴滴快车奖励政策,高峰奖励,翻倍奖励,按成交率,指派单数分级(1月3日)
    北京Uber优步司机奖励政策(1月2日)
  • 原文地址:https://www.cnblogs.com/kavlez/p/4117295.html
Copyright © 2011-2022 走看看