zoukankan      html  css  js  c++  java
  • RabbitMQ的工作模式

    RabbitMQ的六种工作模式(含PHP代码实现)

         接着上一篇文章RabbitMQ入门,我们再来看下RabbitMQ的工作模式有哪些。

         1、简单队列模式(simple queue)-最简单的收发模式

         1)只包含一个生产者和一个消费者

         2)生产者将消息发送到队列中,消费者从队列中接收消息

          

         工作过程:

         消息的消费者(consumer) 监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

         可能存在的问题:

         消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

         解决办法:

         这里可以设置成手动的ack,但如果设置成手动ack,处理完后及时发送ack消息给队列,否则会造成内存溢出。

         扩展一下:消息确认-自动应答和手动应答

         noack:true 自动应答 ,false(手动应答) 默认为false-关闭

         noack=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

         注意:

         生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。

         实际的情况是:当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空,类型为default的交换机(exchange)上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。

         下面我们从代码层面来感受一下:

         1)生产

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use PhpAmqpLibMessageAMQPMessage;
     7 use yiiconsoleController;
     8 
     9 class SendController extends Controller
    10 {
    11     public function actionIndex()
    12     {
    13         // RabbitMQ: 简单的收发模式
    14 
    15         // 打开一个连接和通道
    16         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    17         $channel = $connection->channel();
    18         // 声明一个队列
    19         $channel->queue_declare('hxq', false, false, false, false);
    20 
    21         // 向队列发布消息
    22         $msg = new AMQPMessage('Hello World!');
    23 
    24         // 注意这个路由key一定要设置跟队列匹配,交换机名称为空,类型为default
    25         $channel->basic_publish($msg, '', 'hxq');
    26         echo "[x] Sent 'Hello World!'
    ";
    27 
    28         // 关闭通道和连接
    29         $channel->close();
    30         $connection->close();
    31     }
    32 }    

        2)费端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use yiiconsoleController;
     7 
     8 class ReceiveController extends Controller
     9 {
    10     public function actionIndex()
    11     {
    12         // RabbitMQ: 简单的收发模式
    13 
    14         // 打开一个连接和通道
    15         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    16         $channel = $connection->channel();
    17         // 声明一个队列
    18         $channel->queue_declare('hxq', false, false, false, false);
    19         echo '[*] Waiting for messages. To exit press CTRL+C', "
    ";
    20 
    21         // 接收消息进行处理的回调函数
    22         $callback = function ($msg) {
    23             echo "[x] Received ", $msg->body, "
    ";
    24         };
    25 
    26         // basic_consume是一个阻塞函数,在接收消息的时候调用$callback函数
    27         $channel->basic_consume('hxq', '', false, true, false, false, $callback);
    28         while ($channel->is_consuming()) {
    29             $channel->wait();
    30         }
    31 
    32         // 关闭通道和连接
    33         $channel->close();
    34         $connection->close();
    35     }
    36 } 

      下面我们开两个命令窗口,模拟发送端和消费端:

         a) 消费者在队列端监听:

          

         b)发送端发送消息

          

         c)消费者获取到消息,进行消费

          

         2、工作队列模式(work Queues)-资源的竞争

         工作队列是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
         这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务

         特点:

         1) 一个生产者对应多个消费者,一条消息只被一个消费者进行消费

         2)工作队列有轮询分发公平分发两种模式

          

         工作过程:

         消息产生者将消息放入队列消费者可以有多个,消费者C1,消费者C2同时监听同一个队列。

         C1、C2共同争抢当前的消息队列内容,,一条消息只能由一个消费者消费,这样就形成了资源竞争,谁的资源空闲大争抢到的可能性就大,谁先拿到谁负责消费消息。

         可能存在的问题:

         高并发情况下,默认会产生某一个消息被多个消费者共同使用。

         解决办法:

         可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用。

         注意:

        上图中虽然没有画出交换机的部分,但是原理同simple queue中阐述的一样,不再赘述。

         下面是参考代码:

         1)生产端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use PhpAmqpLibMessageAMQPMessage;
     7 use yiiconsoleController;
     8 
     9 class SendController extends Controller
    10 {
    11     public function actionIndex2($argv)
    12     {
    13         // RabbitMQ: 工作队列-资源的竞争
    14         // 打开一个连接和通道
    15         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    16         $channel = $connection->channel();
    17         // 声明一个队列,第三个参数durable(是否消息持久化):true是 false否
    18         $channel->queue_declare('task_queue', false, true, false, false);
    19 
    20 
    21         if (empty($argv)) {
    22             $argv = "Hello World!";
    23         }
    24         // 向队列发布消息
    25         $msg = new AMQPMessage($argv, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    26 
    27         // 注意这个路由key一定要设置跟队列匹配
    28         $channel->basic_publish($msg, '', 'task_queue');
    29         echo "[x] Sent ", $argv, "
    ";
    30 
    31         // 关闭通道和连接
    32         $channel->close();
    33         $connection->close();
    34     }
    35 }

         2) 消费端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use yiiconsoleController;
     7 
     8 class ReceiveController extends Controller
     9 {
    10     public function actionIndex2()
    11     {
    12         // RabbitMQ: 工作队列-资源的竞争
    13 
    14         // 打开一个连接和通道,声明一个队列
    15         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    16         $channel = $connection->channel();
    17         // 第三个参数durable为true  消息持久化
    18         $channel->queue_declare('task_queue', false, true, false, false);
    19         echo '[*] Waiting for messages. To exit press CTRL+C', "
    ";
    20 
    21         // 接收消息进行处理的回调函数
    22         $callback = function ($msg) {
    23             echo "[x] Received ", $msg->body, "
    ";
    24             // 模拟耗时操作
    25             sleep(substr_count($msg->body, '.'));
    26             echo "[x] Done", "
    ";
    27 
    28             // 手动ack
    29             $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    30 
    31         };
    32  
    33         // 公平调度-只有consumer已经处理并确认了上一条message时queue才分派新的message给它,为了测试使用前后的效果,我们先把下面代码屏蔽掉,等会儿再打开
    34         // $channel->basic_qos(null, 1, null);
    35 
    36         // basic_consume第四个参数:no_ack true 自动应答 false手动应答
    37         $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
    38         while ($channel->is_consuming()) {
    39             $channel->wait();
    40         }
    41 
    42         // 关闭通道和连接
    43         $channel->close();
    44         $connection->close();
    45     }
    46 }    

        为了模拟多个worker,我们这里开三个命令窗口,依次开local,local2作为消费端,local3为消费端,然后进行如下操作:

        

        我们从上面图中可以发现,它仍旧没有按照我们期望的那样进行分发。比如图上有两个工作者(workers),local1中工作者处理的第一条消息耗时5秒,local2中工作者处理的第一条消息耗时1秒。明明local1中的工作者还没有处理完,响应消息。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息,将sleep(3)的这条消息依然发给了local1。

        这是因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

        公平调度

        

         

         


         为了解决上面的问题,我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,在同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

         于是我们把消费者端的这段代码注释打开:

         $channel->basic_qos(null, 1, null);

         再测试下:

         

         这就比较符合我们的预期了,大家可以自行测试下。  

         队列大小

         如果所有的工作者都处于繁忙状态,那么队列就会被填满。此时需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

         3、发布-订阅模式(Publish/SubScribe)

         1)一个生产者,多个消费者

         2)生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

     

         工作过程:

         RabbitMQ把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,无视binding key。一个消息可以被多个消费者消费。

         注意:

         如果消息发送到没有队列绑定的交换机时,消息将会消失,因为交换机没有存储消息的能力只有队列才有存储消息的能力。

         应用场景

         为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。在我们的这个日志系统中,所有正在运行的接收方程序都会接收消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接收者(receiver)把日志输出到屏幕上。最终,日志消息被广播给所有的接收者(receivers)。

         下面是参考代码:

         1)生产端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use PhpAmqpLibMessageAMQPMessage;
     7 use yiiconsoleController;
     8 
     9 class SendController extends Controller
    10 {
    11     public function actionIndex3($argv)
    12     {
    13         // RabbitMQ: 工作队列-发布/订阅(fanout)
    14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    15         $channel = $connection->channel();
    16         // 声明交换机
    17         $channel->exchange_declare('logs', 'fanout', false, false, false);
    18 
    19         if (empty($argv)) {
    20             $argv = "info: Hello World!";
    21         }
    22 
    23         // 发送消息到我们命名为logs的交换机
    24         $msg = new AMQPMessage($argv);
    25         $channel->basic_publish($msg, 'logs');
    26         echo "[x] Sent ", $argv, "
    ";
    27 
    28         // 关闭通道和连接
    29         $channel->close();
    30         $connection->close();
    31     }
    32 }

         2)消费端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use yiiconsoleController;
     7 
     8 class ReceiveController extends Controller
     9 {
    10     public function actionIndex3()
    11     {
    12         // RabbitMQ: 工作队列-发布/订阅(fanout)
    13         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    14         $channel = $connection->channel();
    15 
    16         // 声明交换机
    17         $channel->exchange_declare('logs', 'fanout', false, false, false);
    18         list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    19 
    20         // 绑定队列到交换机
    21         $channel->queue_bind($queue_name, 'logs');
    22         echo ' [*] Waiting for logs. To exit press CTRL+C', "
    ";
    23 
    24         // 定义接收消息进行处理的回调函数
    25         $callback = function ($msg) {
    26             echo "[x] Received ", $msg->body, "
    ";
    27         };
    28 
    29         $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    30 
    31         while (count($channel->callbacks)) {
    32             $channel->wait();
    33         }
    34 
    35         // 关闭通道和连接
    36         $channel->close();
    37         $connection->close();
    38     }
    39 }   

          我们来模拟实现一个日志系统,分别开两个窗口来作为消费端监听,其中一个将监听消息写入日志,另外一个直接命令窗口输出,如下图:

          

         再开一个窗口来发送消息:

         

        4、路由模式(routing)

        生产者将消息发送到direct交换机,它会把消息路由到那些binding key 与 routing key 完全匹配的queue中。在相应队列监听的消费者才能消费消息。这样就能实现消费者有选择的去消费消息。

        

       说明: 如果routing key为black或者green,那么交换机会将消息路由到队列Q2中。

       

       说明: 如果routing key为black,那么交换机会将消息路由到队列Q1和队列Q2中。

       应用场景

       我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。

       下面参考代码:

       1)生产端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use PhpAmqpLibMessageAMQPMessage;
     7 use yiiconsoleController;
     8 
     9 class SendController extends Controller
    10 {
    11     public function actionIndex4($argv)
    12     {
    13         // RabbitMQ: 路由模式
    14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    15         $channel = $connection->channel();
    16 
    17         // 声明交换机
    18         $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    19         if (strpos($argv, '/') !== false) {
    20             $arr = explode('/', $argv);
    21             // routing_key
    22             $severity = $arr[0];
    23             // 消息内容
    24             $data = $arr[1];
    25         } else {
    26             echo '参数错误';
    27             return;
    28         }
    29         // 发送消息到我们命名为direct_logs的交换机
    30         $msg = new AMQPMessage($data);
    31         $channel->basic_publish($msg, 'direct_logs', $severity);
    32         echo "[x] Sent ", $data, "
    ";
    33 
    34         // 关闭通道和连接
    35         $channel->close();
    36         $connection->close();
    37     }
    38 }

        2)消费端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use yiiconsoleController;
     7 
     8 class ReceiveController extends Controller
     9 {
    10     public function actionIndex4($argv)
    11     {
    12         // RabbitMQ: 路由模式
    13 
    14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    15         $channel = $connection->channel();
    16 
    17         // 声明交换机
    18         $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    19         list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    20 
    21         // 为我们感兴趣的每个严重级别分别创建一个新的绑定
    22         if (strpos($argv, '/') !== false) {
    23             $severities = explode('/', $argv);
    24             foreach ($severities as $severity) {
    25                 // 第三个参数为routing_key
    26                 $channel->queue_bind($queue_name, 'direct_logs', $severity);
    27             }
    28         } else {
    29             $severity = $argv;
    30             $channel->queue_bind($queue_name, 'direct_logs', $severity);
    31         }
    32 
    33         echo ' [*] Waiting for logs. To exit press CTRL+C', "
    ";
    34 
    35         // 接收消息进行处理的回调函数
    36         $callback = function ($msg) {
    37             echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
    ";
    38         };
    39         $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    40         while (count($channel->callbacks)) {
    41             $channel->wait();
    42         }
    43 
    44         // 关闭通道和连接
    45         $channel->close();
    46         $connection->close();
    47     }
    48 }    

        消费端监听消息:

         

        生产端发送消息:

        

       5、主题模式(Topic) - 路由模式的一种

          主题交换机是很强大的,它可以表现出跟其他交换机类似的行为。

    •       当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
    •       当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

         1)"#" 和 "*" 符号代表通配符

         2)""#"代表零个或者多个单词, "*" 代表一个单词

         3)路由功能添加模糊匹配

         4)消息产生者产生消息,把消息交给交换机

         5)单词之间用英文句点"."隔开

          

         工作过程:

         交换机根据binding key的规则,使用routing Key来模糊匹配到对应的队列,由队列的监听消费者接收消息消费。如果没有匹配到相应队列,则消息被丢弃。

         1)生产端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use PhpAmqpLibMessageAMQPMessage;
     7 use yiiconsoleController;
     8 
     9 class SendController extends Controller
    10 {
    11     public function actionIndex5($argv)
    12     {
    13         // RabbitMQ: 主题模式
    14         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    15         $channel = $connection->channel();
    16 
    17         // 声明交换机
    18         $channel->exchange_declare('topic_logs', 'topic', false, false, false);
    19         if (strpos($argv, '/') !== false) {
    20             $arr = explode('/', $argv);
    21             // routing_key
    22             $routing_key = $arr[0];
    23             // 消息内容
    24             $data = $arr[1];
    25         } else {
    26             echo '参数错误';
    27             return;
    28         }
    29 
    30         // 发送消息
    31         $msg = new AMQPMessage($data);
    32         $channel->basic_publish($msg, 'topic_logs', $routing_key);
    33         echo "[x] Sent ", $routing_key, ':', $data, "
    ";
    34 
    35         // 关闭通道和连接
    36         $channel->close();
    37         $connection->close();
    38     }
    39 }

         2)消费端

     1 <?php
     2 
     3 namespace consolecontrollers;
     4 
     5 use PhpAmqpLibConnectionAMQPStreamConnection;
     6 use yiiconsoleController;
     7 
     8 class ReceiveController extends Controller
     9 {
    10     public function actionIndex5($argv)
    11     {
    12         // RabbitMQ: 主题模式
    13         $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    14         $channel = $connection->channel();
    15 
    16         // 声明交换机
    17         $channel->exchange_declare('topic_logs', 'topic', false, false, false);
    18         list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    19 
    20         // 多个队列和这个名称为topic_logs的交换机绑定
    21         if (strpos($argv, '/') !== false) {
    22             $binding_keys = explode('/', $argv);
    23             foreach ($binding_keys as $binding_key) {
    24                 // 第三个参数为routing_key
    25                 $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
    26             }
    27         } else {
    28             $binding_key = $argv;
    29             $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
    30         }
    31 
    32         echo ' [*] Waiting for logs. To exit press CTRL+C', "
    ";
    33 
    34         // 接收消息进行处理的回调函数
    35         $callback = function ($msg) {
    36             echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
    ";
    37         };
    38         $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    39         while (count($channel->callbacks)) {
    40             $channel->wait();
    41         }
    42 
    43         // 关闭通道和连接
    44         $channel->close();
    45         $connection->close();
    46     }
    47 
    48 }    

        我们分别开4个窗口,监听消息

       

        发送消息后

        

       6、RPC模式- 远程过程调用

        这里面有两个重要的概念:

        1) replyTo:  存储回调队列的名称

        2) correlationId:  唯一标识本次的请求,主要用于RPC调用。

     

        工作过程:

         使用 RabbitMQ 实现 RPC,相应的角色是由生产者来作为客户端,消费者作为服务端。

         但 RPC 调用一般是同步的,客户端和服务器也是紧密耦合的。即客户端通过 IP/域名和端口链接到服务器,向服务器发送请求后等待服务器返回响应信息。

         但 MQ 的生产者和消费者是完全解耦的,那么如何用 MQ 实现 RPC 呢?很明显就是把 MQ 当作中间件,实现一次双向的消息传递

         客户端和服务端既是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。

     

         纸上谈兵终觉浅,后面有空再来补充一些RabbitMQ在项目中使用的案例。

         

    参考链接:

    https://www.rabbitmq.com/getstarted.html

  • 相关阅读:
    SGU 271 Book Pile (双端队列)
    POJ 3110 Jenny's First Exam (贪心)
    HDU 4310 Hero (贪心)
    ZOJ 2132 The Most Frequent Number (贪心)
    POJ 3388 Japanese Puzzle (二分)
    UVaLive 4628 Jack's socks (贪心)
    POJ 2433 Landscaping (贪心)
    CodeForces 946D Timetable (DP)
    Android Studio教程从入门到精通
    Android Tips – 填坑手册
  • 原文地址:https://www.cnblogs.com/hld123/p/14687401.html
Copyright © 2011-2022 走看看