zoukankan      html  css  js  c++  java
  • 5 交换机-direct (路由)

    路由

    上文中的exchange类型是fanout的,该类型的exchange会把消息发送给所有绑定到该exchangequeue

    现在我们需要添加一个特性,只订阅(subscribe)一部分指定的消息(Message)

    1、绑定

    在前面的例子中,我们已经在创建绑定。您可能会记得以下代码:

    // 绑定消息路由和消息队列
    // 第一个参数:队列名称
    // 第二个参数:交换器名称
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    

    绑定(bindings)是交换(exchange)和队列(queue)之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣;

    Binding可以使用一个已经存在的routingKey参数。为了避免和basic_publish参数混淆,我们称之为binding key。下边就是我们怎么用key来创建一个binding

    // 绑定消息路由和消息队列
    // 第一个参数:队列名称
    // 第二个参数:交换器名称
    // 第三个参数:路由键
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    

    binding key的意义有时候取决于exchange的类型。对于Fanout类型的exchange,会忽略binding key

    2、direct 类型exchange

    上文中的日志系统会把所有的log消息广播给所有的消费者。我们想扩展来根据他们的日志级别来过滤log消息。例如:我们只想把error级别的日志写到磁盘文件中,而其它级别的日志消息则过滤掉

    我们之前使用的fanout类型的exchange,但这样就不会有太多的灵活性

    在这里我们将要使用direct类型的exchangeDirect类型exchange的路由算法是很简单的:要想一个消息能到达这个队列,需要binding keyrouting key正好能匹配得上。

    为了说明这个道理,可以看看下边的描述:

    image

    在这样的结构中,我们可以看到direct类型的exchange X,有两个queue绑定到它。第一个queue是以orangebinding key绑定到exchange X上的,第二个queue是由两个binding keyblackgreen)绑定到exchange X的。

    在这样的设置中,一条消息被推送到exchange,如果使用的routing keyorange,那么消息就会被路由到Q1中;如果使用的routing keyblackgreen,那么该消息将会被路由到Q2中。其它的消息都将会被丢弃掉

    3、多重绑定

    此处输入图片的描述

    用同一个binding来把多个queue绑定到同一个exchange也是可行的。例如在之前例子的基础上,在X和Q1之间添加binding key名字为black,这样的话,这里的direct类型的exchange就和fanout类型的一样了,可以把消息推送给所有的queue。带有routing keyblack的消息将会被推送到Q1和Q2中

    4、示例代码

    4.1、生产者

    package com.demo.java.消息队列.路由;
    
    import com.demo.utils.MqConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author zhaodi
     * @description 路由
     * @date 2018/9/29 10:51
     */
    public class Producer {
    
        private static final String EXCHANGE_NAME = "my-exchange-2";
    
        private static final String ROUTE_KEY = "error";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            Connection connection = MqConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 申明交换器
            channel.exchangeDeclare("EXCHANGE_NAME", "direct");
    
            for (int i = 0; i < 30; i++) {
                String msg = "P--->INFO:" + i;
                channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, null, new String(msg).getBytes());
                System.out.println(msg);
                Thread.sleep(500);
            }
            channel.close();
            connection.close();
        }
    }
    
    
    

    4.2、消费者

     package com.demo.java.消息队列.路由;
    
    import com.demo.utils.MqConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * @author zhaodi
     * @description 路由
     * @date 2018/9/29 10:52
     */
    public class ConsumerInfo {
    
        private static final String EXCHANGE_NAME = "my-exchange-2";
    
        private static final String ROUTE_KEY = "info";
    
        public static void main(String[] args) throws IOException {
    
    
            Connection connection = MqConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
            String queueName = channel.queueDeclare().getQueue();
    
            // exchange和queue绑定
            // 第一个参数:队列名字;第二个参数:交换机名称;第三个参数:路由键
            channel.queueBind(queueName, EXCHANGE_NAME, ROUTE_KEY);
    
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
    
    
                    String msg = new String(body);
                    System.out.println("c1--->:" + msg);
    
                    channel.basicAck(envelope.getDeliveryTag(), false);
    
                }
            };
            boolean autoAck = false;
            channel.basicConsume(queueName, autoAck, consumer);
    
    
        }
    }
    
    一个小小的程序员
  • 相关阅读:
    Linux 终端常用快捷键
    问题集
    数据库
    mysql数据库知识点
    IntelliJ IDEAj集成开发环境
    Windows最全DOS的CMD命令
    DB2移植到Oracle数据库完整的图解教程
    169.254是什么IP地址 169.254的解决方法(添加局域网地址时)
    解析xml时报错
    童年乐趣
  • 原文地址:https://www.cnblogs.com/zhaod/p/11391476.html
Copyright © 2011-2022 走看看