zoukankan      html  css  js  c++  java
  • RabbitMQ译文 — 主题(Topics)

    (使用 php-amqplib)

      在上一篇教程中我们升级了我们的日志系统,用一个“direct”交换来代替仅能做模拟广播的“fanout”交换,以此来达到可以选择性接收日志的功能。

      尽管使用“direct”交换升级了我们的系统,它仍然有局限性——它不能基于多个标准做路由。

      在我们的日志系统内,我们可能想不仅基于严重性,还基于发送日志的来源来订阅日志,你可能从syslog unix 工具中了解这个概念,这个工具同时根据严重性(info/warn/crit...)和设备(auth/cron/kern...)两者来路由日志。

      这可能给我们很多便利——我们可能只想监听来自“cron”的致命错误,但也要监听来自“kern”的所有错误。

      为了在我们的日志系统内实现这个功能,我们需要学习一个更复杂的“topic”交换。

    一、Topic 交换(Topic exchange)

      消息发给一个“topic”交换不能只有一个任意的路由键——它必须是一个被点分割的单词列表。这些单词可以是任意的,但是同城它们被指定一些和消息有关的特征。例如:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”就是一些合法的路由键。根据你的需要,路由键中可以有很多的单词,但最多255个字节。

      绑定键也必须是相同的格式,“topic”交换逻辑的背后和一个“direct”交换相似——一条带有一个特定路由键的消息被发送后,将被传递给所有绑定一个匹配绑定键的队列。然而,对于绑定键,还有两个重要的他特殊情况。

    • * (星号) 可以代替一个单词。
    • # (井号) 可以代替零或多个单词。

      在下面这个例子里很容易的解释了这个概念:

     

       在这个例子里,我们打算发送描述动物的一些消息。这些带有由三个单词(两个点)组成的路由键的消息将被发送。在路由键里的第一个单词将描述速度,第二个描述颜色,第三个描述物种:“<speed>.<colour>.<species>”。

      我们建立了三个绑定:Q1用绑定键"*.orange.*"绑定 ,Q2绑定"*.*.rabbit"和"lazy.#"。

      这些绑定能被概括为:

    • Q1对所有橙色动物感兴趣。
    • Q2想监听关于兔子的所有消息,以及行动迟缓动物的每个消息。

       一条设置路由键“quick.orange.rabbit”的消息将被传递到两个队列。设置“lazy.orange.elephant“键的消息也将被传递到这个两个队列。另一方面,带有"quick.orange.fox"键的消息将仅仅被传递到第一个队列,而带有“lazy.brown.fox”键的消息仅被传到第二个。绑定“lazy.pink.rabbit”的消息将仅被传递给第二个队列一次,即使它匹配两个绑定。带有"quick.brown.fox"键的消息不匹配任何绑定,因此它将被丢弃。

      如果我们打破我们的约定,发送一条带有一个或四个单词的消息,例如“orange”或者“quick.orange.male.rabbit”会怎样呢?好吧,这些消息不匹配任何绑定将会丢失。

      另一方面,“lazy.orange.male.rabbit”即使它有四个单词,但会匹配最后那个绑定,将被发送到第二个队列。

    Topic 交换

    Topic 交换非常强大,可以像其它交换一样运行

    当一个队列被绑定“#”(井号)绑定键——它将接收所有消息,不论路由键是什么——就像是“fanout”交换。

    When special characters "*" (star) and "#" (hash) aren't used in bindings, the topic exchange will behave just like a direct one.

    当特殊字符“*”(星号)和“#”(井号)没有被用在绑定里,“topic”交换就像是“direct”交换一样了。

    二、合在一起(Putting it all together)

      我们将在我们的日志系统内使用一个“topic”交换。我们将首先假设日志的路由键有两个单词“<facility>.<severity>”。

      代码几乎和前面教程里的一样。

       emit_log_topic.php 代码如下:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('topic_logs', 'topic', false, false, false);
    
    $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
    $data = implode(' ', array_slice($argv, 2));
    if (empty($data)) {
        $data = "Hello World!";
    }
    
    $msg = new AMQPMessage($data);
    
    $channel->basic_publish($msg, 'topic_logs', $routing_key);
    
    echo ' [x] Sent ', $routing_key, ':', $data, "
    ";
    
    $channel->close();
    $connection->close();

       receive_logs_topic.php 代码如下:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('topic_logs', 'topic', false, false, false);
    
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
    $binding_keys = array_slice($argv, 1);
    if (empty($binding_keys)) {
        file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]
    ");
        exit(1);
    }
    
    foreach ($binding_keys as $binding_key) {
        $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
    }
    
    echo " [*] Waiting for logs. To exit press CTRL+C
    ";
    
    $callback = function ($msg) {
        echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
    ";
    };
    
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    

      接收所有日志:

    php receive_logs_topic.php "#"
    

      接收所有来自设备“kern”的日志:

    php receive_logs_topic.php "kern.*"
    

      或者如果你只想要“critical”日志:

    php receive_logs_topic.php "*.critical"
    

      你可以建立多个绑定:

    php receive_logs_topic.php "kern.*" "*.critical"
    

      发送一个带有路由键“kern.critical”类型的日志:

    php emit_log_topic.php "kern.critical" "A critical kernel error"

      玩这些程序很开心吧,注意代码不会对路由和绑定键做出任何假设,或许你还想用多于两个的路由键的参数。

      (这是 emit_log_topic.php 和 receive_logs_topic.php 的完整源代码)

      下面,在第六节里来了解怎样做一个往返消息,来作为一个远程过程的调用。

      原文:https://www.rabbitmq.com/tutorials/tutorial-five-php.html

  • 相关阅读:
    [小技巧] micropython 如何执行 *.mpy 文件
    从零开始深入 Linux 底层(软件工程)
    从嵌套结构中取值时如何编写兜底逻辑
    学习JUC源码(2)——自定义同步组件
    学习JUC源码(1)——AQS同步队列(源码分析结合图文理解)
    Java多线程中的wait/notify通信模式
    详解Java锁的升级与对比(1)——锁的分类与细节(结合部分源码)
    认识Redis集群——Redis Cluster
    工作三年多的感慨与总结(二)
    工作三年多的感慨与总结(一)
  • 原文地址:https://www.cnblogs.com/penrodsheh/p/13201646.html
Copyright © 2011-2022 走看看