zoukankan      html  css  js  c++  java
  • RabbitMQ的使用三_Java Client方式使用路由模式

    RabbitMQ的使用三_Java Client方式使用路由模式

    官方文档地址:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

    路由模式:生产者将消息发送到了 type 为 direct 模式的交换机,消费者的队列在将自己绑定到路由的时候会给自己设置一个绑定key,只有生产者发送对应 key 格式的消息, 队列才会收到消息。

    前面的文章中,队列和交换机的之间的绑定关系,我们使用如下代码:

    channel.queueBind(queueName, EXCHANGE_NAME, "");

    这里的第三个参数,称之为routing_key.也就是绑定键

    绑定键的含义取决于交换机的类型。我们以前使用的Fanout交换机时,忽略了它的价值。

    Direct exchange交换机

    官网提供的配图

    解释:

    在上图中,我们可以看到direct交换机X与两个队列绑定在一起。第一个队列绑定键为error,第二个队列有三个绑定,一个绑定键为info,第二个绑定键为error,第三个绑定键为warning。

    绑定键为error的消息会发布到队列一和队列二。绑定键为info、warning的消息将会被路由到队列二。其他没有匹配到的消息会被丢弃。队列一和队列二有相同的绑定键error,在这种情况下,direct exchange的行为类似于fanout交换机

    direct交换机的声明

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    代码案例

    创建一个生产者:生产者发送三条消息,一条绑定key为error的消息,一条绑定key为info的数据,一条绑定key为trace的数据

    创建2个消费者:消费者1把direct交换机和队列1绑定。绑定key为error

            消费者2把direct交换机和队列2绑定,绑定key为info,error,warning

    预期结果:生产者发送绑定键error的数据,会被队列一和队列二同时接收

           生产者发送绑定键为info的数据,只会被队列二接收

         生产者发送绑定键为trace的数据,交换机找不到匹配队列,就会丢弃。队列一和队列二都接收不到消息

    生产者

    public class DirectExchangeSender {
    
        // 1.声明一个交换机
        private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            Channel channel = rabbitMQConnections.createChannel();
            channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct");
            StringBuilder errorMsg = new StringBuilder("小河流水哗啦啦");
            StringBuilder infoMsg = new StringBuilder("天街小雨润如酥");
            StringBuilder traceMsg = new StringBuilder("草色要看近却无");
            channel.basicPublish(DIRECT_EXCHANGE_NAME, "error", null, errorMsg.append("_error").toString().getBytes());
            channel.basicPublish(DIRECT_EXCHANGE_NAME, "info", null, infoMsg.append("_info").toString().getBytes());
            channel.basicPublish(DIRECT_EXCHANGE_NAME, "trace", null, traceMsg.append("_trace").toString().getBytes());
            channel.close();
            rabbitMQConnections.close();
            System.out.println("消息发送完成");
        }
    }
    View Code

    消费者1

    public class DirectExchangeReceive {
        private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
        private static final String QUEUE_NAME_NUM_ONE = "queue_name_num_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            Channel channel = rabbitMQConnections.createChannel();
            // 1.声明一个direct交换机
            channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct");
            // 2.声明队列
            channel.queueDeclare(QUEUE_NAME_NUM_ONE, false, false, false, null);
            // 3绑定队列和交换机的名字
            channel.queueBind(QUEUE_NAME_NUM_ONE, DIRECT_EXCHANGE_NAME, "error");
    
            // 消费回调
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String receiverMessage = new String(delivery.getBody(), "UTF-8");
                    System.out.println(new Date() + "一号队列接收到的消息=======>" + receiverMessage);
                    // 手动提供一个应答
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME_NUM_ONE, false, deliverCallback, consumerTag -> {
            });
        }
    }
    View Code

    消费者2

    public class DirectExchangeReceive2 {
        private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
        private static final String QUEUE_NAME_NUM_TWO = "queue_name_num_two";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
            Channel channel = rabbitMQConnections.createChannel();
            // 1.声明一个direct交换机
            channel.exchangeDeclare(DIRECT_EXCHANGE_NAME,"direct");
            // 2.声明队列
            channel.queueDeclare(QUEUE_NAME_NUM_TWO,false,false,false,null);
            // 3绑定队列和交换机的名字
            channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"error");
            channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"info");
            channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"warn");
    
            // 消费回调
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    String receiverMessage = new String(delivery.getBody(),"UTF-8");
                    System.out.println(new Date()+"二号队列接收到的消息=======>"+receiverMessage);
                    // 手动提供一个应答
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE_NAME_NUM_TWO,false,deliverCallback,consumerTag -> {});
        }
    }
    View Code

    执行结果截图

                            图一:消费者1号                                          图二:消费者2号

    根据结果可知,和估计的结果一样。

  • 相关阅读:
    Codeforces 960B(优先队列)
    “景驰科技杯”2018年华南理工大学程序设计竞赛 H-对称与反对称(逆元)
    AcWing
    POJ
    POJ
    AtCoder
    HRBUST
    CodeForces
    HYSBZ
    HDU
  • 原文地址:https://www.cnblogs.com/yingxiaocao/p/13303504.html
Copyright © 2011-2022 走看看