zoukankan      html  css  js  c++  java
  • RabbitMq学习4-发布/订阅(Publish/Subscribe)

    一、发布/订阅

    分发一个消息给多个消费者(consumers)。这种模式被称为“发布/订阅”。

    为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。

    在这个日志系统中,所有正在运行的接收方程序都会接受消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受者(receiver)把日志输出到屏幕上。

    最终,日志消息被广播给所有的接受者(receivers)。

    二、交换器(Exchanges)

    • 发布者(producer)是发布消息的应用程序。
    • 队列(queue)用于消息存储的缓冲。
    • 消费者(consumer)是接收消息的应用程序。

    RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

    发布者(producer)只需要把消息发送给一个交换器(exchange)。交换器非常简单,它一边从发布者方接收消息,一边把消息推入队列。交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过exchange type来定义的。

    有几个可供选择的交换器类型:

    AMQPEXTYPEDIRECT

    AMQPEXTYPEFANOUT

    AMQPEXTYPEHEADER  or AMQPEXTYPETOPIC

    这里主要说明AMQPEXTYPE_FANOUT。先创建一个fanout类型的交换器,命名为logs:

    $exchange->setName('logs');
    $exchange->setType(AMQP_EX_TYPE_FANOUT);
    $exchange->declareExchange();
    

    fanout交换器很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。这正是我们的日志系统所需要的。

    现在,我们就可以发送消息到一个具名交换器了:

    $exchange->publish($message, '');

    三、临时队列

    给一个队列命名是很重要的——我们需要把工作者(workers)指向正确的队列。如果你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的。

    但是这并不适用于我们的日志系统。我们打算接收所有的日志消息,而不仅仅是一小部分。我们关心的是最新的消息而不是旧的。为了解决这个问题,我们需要做两件事情。

    首先,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只要在调用$queue->declareQueue();方法的时候,不提供queue参数就可以了:

    $queue = new AMQPQueue($channel);
    $queue->setFlags(AMQP_EXCLUSIVE);
    $queue->declareQueue();
    

    这时候我们可以通过$queue->getName();获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

    第二步,当与消费者(consumer)断开连接的时候,这个队列应当被删除。我们可以使用exclusive标识。$queue->setFlags(AMQP_EXCLUSIVE);

    四、绑定

    我们已经创建了一个fanout类型的交换器和一个队列。现在我们需要告诉交换器如何发送消息给我们的队列。交换器和队列之间的关系我们称之为绑定(binding)。

    $queue->bind($exchangeName, $queue->getName());
    

    现在,logs交换器将会把消息添加到我们的队列中。

    五、代码整合

      1、生产者:send_log.php

      

    <?php
    
    /**
     * PHP amqp(RabbitMQ) Demo-3
     */
    
    $exchangeName = 'logs';
    $message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1];
    
    $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
    $connection->connect() or die("Cannot connect to the broker!
    ");
    
    $channel = new AMQPChannel($connection);
    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_FANOUT);//把消息发送给它所知道的所有队列
    $exchange->declareExchange();
    
    $exchange->publish($message, '');
    var_dump("[x] Sent $message");
    
    $connection->disconnect();
    View Code

      2、消费者:receive_logs.php

      

    <?php
    
    /**
     * PHP amqp(RabbitMQ) Demo-3
     */
    $exchangeName = 'logs';
    
    $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
    $connection->connect() or die("Cannot connect to the broker!
    ");
    
    $channel = new AMQPChannel($connection);
    
    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_FANOUT);//把消息发送给它所知道的所有队列
    $exchange->declareExchange();
    
    $queue = new AMQPQueue($channel);
    $queue->setFlags(AMQP_EXCLUSIVE);//当与消费者(consumer)断开连接的时候,这个队列应当被删除
    $queue->declareQueue();//创建一个随机的队列名
    $queue->bind($exchangeName, '');
    
    var_dump('[*] Waiting for messages. To exit press CTRL+C');  
    while (TRUE) {  
            $queue->consume('callback');
    }
    $connection->disconnect();
    
    function callback($envelope, $queue) {  
            $msg = $envelope->getBody();
            var_dump(" [x] Received:" . $msg);
            $queue->nack($envelope->getDeliveryTag());
    }
    View Code

    这样我们就完成了。如果你想把日志保存到文件中,只需要打开控制台输入:

    $ php receive_logs.php > logs_from_rabbit.log
    

    如果你想在屏幕中查看日志,那么打开一个新的终端然后运行:

    $ php receive_logs.php
    

    当然还要发送日志:

    $ php send_log.php
     
  • 相关阅读:
    Ubuntu下通过makefile生成静态库和动态库简单实例
    C++获取Windows7 32位系统中所有进程名(类似于任务管理器中的进程)
    剑指offer(一)
    Leetcode题解(一)
    C Run-Time Error R6034问题的解决
    windows环境中利用NMake工具编译连接C++源代码
    通过命令行使用cl.exe编译器
    [bzoj3709] [PA2014]Bohater
    [bzoj3714] [PA2014]Kuglarz
    [bzoj2724] [Violet 6]蒲公英
  • 原文地址:https://www.cnblogs.com/ivy-zheng/p/10967884.html
Copyright © 2011-2022 走看看