zoukankan      html  css  js  c++  java
  • RabbitMQ译文 — 路由(Routing)

    一、路由(Routing)

    (使用 php-amqplib)

      在前一篇教程里,我们建立了一个简单的日志系统,我们能够广播日志信息给多个接受者。

      在这篇教程中我们将给它添加一个功能——我们将仅订阅一个消息的子集。例如,我们仅将致命错误消息发给日志文件(以节省磁盘空间),并且仍旧可以打印所有的日志消息到控制台。

    二、绑定(Bindings)

      在前面的例子里,我们已经建立了绑定,你可以回想一下如下的代码:

    $channel->queue_bind($queue_name, 'logs');

      一个绑定就是一个交换和一个队列关系,这可以简单的理解为:这个队列仅对来自这个交换的信息感兴趣。

      绑定可以使用一个额外的routing_key参数。为了避免和$channel::basic_publish参数混淆,我们将称他为绑定键(binding key)。这是我们如何用一个键来建立一个绑定:

    $binding_key = 'black';
    $channel->queue_bind($queue_name, $exchange_name, $binding_key);

      绑定键的意义依赖于交换的类型。我们使用前面的fanout交换,仅忽略它的值。

    三、Direct 交换(Direct exchange)

      我们的前面教程里的日志系统发送所有的消息给所有的消费者。我们想扩展这个系统,可以按照他们的严重程度来过滤消息。例如我们可能会想让正在写日志消息到磁盘的代码仅接收致命错误,而不浪费磁盘空间在警告或一般信息的日志消息上。

      我们当时用了一个fanout交换,这个不能让我们有更多的灵活性——因为它仅能盲目的广播信息。

      我们将使用给一个direct交换来代替它。在这个交换的背后有一个简单的路由算法——一条消息进入一个队列,这个队列的绑定键将精确匹配这个消息的路由键。

      为了演示这个,请考虑下面的图示:

      在这个图示里,我们可以看到direct交换“X”有两个绑定到它的队列。第一个队列用了“orange”这个绑定键,第二个有两个绑定,一个用“black”绑定键,另一个用“green”。

      在这样的图示里,一条带有路由键“orange”的消息发给这个交换后将会被发给“Q1”队列。而带有“black”或者“green”路由键的消息将走向“Q2”队列。而其它所有消息将被舍弃。

    四、多重绑定(Direct exchange)

     

      用同一个绑定键来绑定多个队列也是完全可以的。在我们的例子里我们也可以用“black”这个键在“X”和“Q1”之间添加一个绑定。在那种情况下,direct交换将会像fanout一样发送消息给所有匹配的队列,即一条带有“black”路由键的消息将被传给“Q1”和“Q2”两个队列。

    五、发送日志(Emitting logs)

      我们将这个模式应用于我们的日志系统。我们将用“direct”交换代替“fanout”交换来发送消息。我们以日志消息的严重程度作为它的路由键,在这种方式下,接收程序将能够选择它想接收的严重程度来接收。首先让我们来看日志消息的发送。

      跟往常一样,我们首先需要建立一个交换:

    $channel->exchange_declare('direct_logs', 'direct', false, false, false);

       然后我们准备发送一条消息:

    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    $channel->basic_publish($msg, 'direct_logs', $severity);

      为了简化程序,我们假设“严重程度”仅为“消息(info)”, “警告(warning)”, “错误(error)”三种之一。

    六、订阅(Subscribing)

      目前收到的信息将像前面教程中讲述的一样工作,不同的是,我们将为我们关注的每一个严重程度建立一个新的绑定。

    foreach ($severities as $severity) {
        $channel->queue_bind($queue_name, 'direct_logs', $severity);
    }

    七、合在一起(Putting it all together)

       “emit_log_direct.php” 类的代码如下:

    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    
    $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
    
    $data = implode(' ', array_slice($argv, 2));
    if (empty($data)) {
        $data = "Hello World!";
    }
    
    $msg = new AMQPMessage($data);
    
    $channel->basic_publish($msg, 'direct_logs', $severity);
    
    echo ' [x] Sent ', $severity, ':', $data, "
    ";
    
    $channel->close();
    $connection->close();

     “receive_logs_direct.php”的代码如下:

    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
    $severities = array_slice($argv, 1);
    if (empty($severities)) {
        file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]
    ");
        exit(1);
    }
    
    foreach ($severities as $severity) {
        $channel->queue_bind($queue_name, 'direct_logs', $severity);
    }
    
    echo " [*] Waiting for logs. To exit press CTRL+C
    ";
    
    $callback = function ($msg) {
        echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
    ";
    };
    
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

      如果你想仅保存“警告”和“错误”(不保存“信息”)日志信息到一个文件,敲入以下代码即可

    php receive_logs_direct.php warning error > logs_from_rabbit.log
    

      如果你想看所有的日志信息,打开一个新的控制台输入以下代码:

    php receive_logs_direct.php info warning error
    # => [*] Waiting for logs. To exit press CTRL+C

      例如:发送一个错误日志,输入如下:

    php emit_log_direct.php error "Run. Run. Or it will explode."
    # => [x] Sent 'error':'Run. Run. Or it will explode.'

      (完整代码请查看 emit_log_direct.php sourcereceive_logs_direct.php source)

      下一篇教程,看一看怎样基于一种模式来监听消息。

       原文:https://www.rabbitmq.com/tutorials/tutorial-four-php.html

  • 相关阅读:
    disruptor和ArrayBlockingQueue和LinkedBlockingQueue队列性能对比
    守护线程的作用和前台线程的区别
    tomcat导入idea作为maven项目
    百度网盘不限速
    netty ChannelOption参数 backlog 和 somaxconn同时设置才会生效
    dubbo的初探
    IDEA的常用快捷键
    Lucene简单了解和使用
    Hadoop的简单了解与安装
    Nginx的简单了解与使用
  • 原文地址:https://www.cnblogs.com/penrodsheh/p/13188370.html
Copyright © 2011-2022 走看看