RabbitMQ的六种工作模式(含PHP代码实现)
接着上一篇文章《RabbitMQ入门》,我们再来看下RabbitMQ的工作模式有哪些。
1、简单队列模式(simple queue)-最简单的收发模式
1)只包含一个生产者和一个消费者
2)生产者将消息发送到队列中,消费者从队列中接收消息
工作过程:
消息的消费者(consumer) 监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。
可能存在的问题:
消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。
解决办法:
这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出。
扩展一下:消息确认-自动应答和手动应答
noack:true 自动应答 ,false(手动应答) 默认为false-关闭
noack=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
注意:
生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。
实际的情况是:当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空,类型为default的交换机(exchange)上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。
下面我们从代码层面来感受一下:
1)生产端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use PhpAmqpLibMessageAMQPMessage; 7 use yiiconsoleController; 8 9 class SendController extends Controller 10 { 11 public function actionIndex() 12 { 13 // RabbitMQ: 简单的收发模式 14 15 // 打开一个连接和通道 16 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 17 $channel = $connection->channel(); 18 // 声明一个队列 19 $channel->queue_declare('hxq', false, false, false, false); 20 21 // 向队列发布消息 22 $msg = new AMQPMessage('Hello World!'); 23 24 // 注意这个路由key一定要设置跟队列匹配,交换机名称为空,类型为default 25 $channel->basic_publish($msg, '', 'hxq'); 26 echo "[x] Sent 'Hello World!' "; 27 28 // 关闭通道和连接 29 $channel->close(); 30 $connection->close(); 31 } 32 }
2)消费端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use yiiconsoleController; 7 8 class ReceiveController extends Controller 9 { 10 public function actionIndex() 11 { 12 // RabbitMQ: 简单的收发模式 13 14 // 打开一个连接和通道 15 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 16 $channel = $connection->channel(); 17 // 声明一个队列 18 $channel->queue_declare('hxq', false, false, false, false); 19 echo '[*] Waiting for messages. To exit press CTRL+C', " "; 20 21 // 接收消息进行处理的回调函数 22 $callback = function ($msg) { 23 echo "[x] Received ", $msg->body, " "; 24 }; 25 26 // basic_consume是一个阻塞函数,在接收消息的时候调用$callback函数 27 $channel->basic_consume('hxq', '', false, true, false, false, $callback); 28 while ($channel->is_consuming()) { 29 $channel->wait(); 30 } 31 32 // 关闭通道和连接 33 $channel->close(); 34 $connection->close(); 35 } 36 }
下面我们开两个命令窗口,模拟发送端和消费端:
a) 消费者在队列端监听:
b)发送端发送消息
c)消费者获取到消息,进行消费
2、工作队列模式(work Queues)-资源的竞争
工作队列是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。
特点:
1) 一个生产者对应多个消费者,一条消息只被一个消费者进行消费
2)工作队列有轮询分发和公平分发两种模式
工作过程:
消息产生者将消息放入队列消费者可以有多个,消费者C1,消费者C2同时监听同一个队列。
C1、C2共同争抢当前的消息队列内容,,一条消息只能由一个消费者消费,这样就形成了资源竞争,谁的资源空闲大,争抢到的可能性就大,谁先拿到谁负责消费消息。
可能存在的问题:
高并发情况下,默认会产生某一个消息被多个消费者共同使用。
解决办法:
可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用。
注意:
上图中虽然没有画出交换机的部分,但是原理同simple queue中阐述的一样,不再赘述。
下面是参考代码:
1)生产端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use PhpAmqpLibMessageAMQPMessage; 7 use yiiconsoleController; 8 9 class SendController extends Controller 10 { 11 public function actionIndex2($argv) 12 { 13 // RabbitMQ: 工作队列-资源的竞争 14 // 打开一个连接和通道 15 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 16 $channel = $connection->channel(); 17 // 声明一个队列,第三个参数durable(是否消息持久化):true是 false否 18 $channel->queue_declare('task_queue', false, true, false, false); 19 20 21 if (empty($argv)) { 22 $argv = "Hello World!"; 23 } 24 // 向队列发布消息 25 $msg = new AMQPMessage($argv, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); 26 27 // 注意这个路由key一定要设置跟队列匹配 28 $channel->basic_publish($msg, '', 'task_queue'); 29 echo "[x] Sent ", $argv, " "; 30 31 // 关闭通道和连接 32 $channel->close(); 33 $connection->close(); 34 } 35 }
2) 消费端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use yiiconsoleController; 7 8 class ReceiveController extends Controller 9 { 10 public function actionIndex2() 11 { 12 // RabbitMQ: 工作队列-资源的竞争 13 14 // 打开一个连接和通道,声明一个队列 15 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 16 $channel = $connection->channel(); 17 // 第三个参数durable为true 消息持久化 18 $channel->queue_declare('task_queue', false, true, false, false); 19 echo '[*] Waiting for messages. To exit press CTRL+C', " "; 20 21 // 接收消息进行处理的回调函数 22 $callback = function ($msg) { 23 echo "[x] Received ", $msg->body, " "; 24 // 模拟耗时操作 25 sleep(substr_count($msg->body, '.')); 26 echo "[x] Done", " "; 27 28 // 手动ack 29 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 30 31 }; 32 33 // 公平调度-只有consumer已经处理并确认了上一条message时queue才分派新的message给它,为了测试使用前后的效果,我们先把下面代码屏蔽掉,等会儿再打开 34 // $channel->basic_qos(null, 1, null); 35 36 // basic_consume第四个参数:no_ack true 自动应答 false手动应答 37 $channel->basic_consume('task_queue', '', false, false, false, false, $callback); 38 while ($channel->is_consuming()) { 39 $channel->wait(); 40 } 41 42 // 关闭通道和连接 43 $channel->close(); 44 $connection->close(); 45 } 46 }
为了模拟多个worker,我们这里开三个命令窗口,依次开local,local2作为消费端,local3为消费端,然后进行如下操作:
我们从上面图中可以发现,它仍旧没有按照我们期望的那样进行分发。比如图上有两个工作者(workers),local1中工作者处理的第一条消息耗时5秒,local2中工作者处理的第一条消息耗时1秒。明明local1中的工作者还没有处理完,响应消息。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息,将sleep(3)的这条消息依然发给了local1。
这是因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。
公平调度
为了解决上面的问题,我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,在同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
于是我们把消费者端的这段代码注释打开:
$channel->basic_qos(null, 1, null);
再测试下:
这就比较符合我们的预期了,大家可以自行测试下。
队列大小
如果所有的工作者都处于繁忙状态,那么队列就会被填满。此时需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。
3、发布-订阅模式(Publish/SubScribe)
1)一个生产者,多个消费者
2)生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
工作过程:
RabbitMQ把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,无视binding key。一个消息可以被多个消费者消费。
注意:
如果消息发送到没有队列绑定的交换机时,消息将会消失,因为交换机没有存储消息的能力,只有队列才有存储消息的能力。
应用场景:
为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。在我们的这个日志系统中,所有正在运行的接收方程序都会接收消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接收者(receiver)把日志输出到屏幕上。最终,日志消息被广播给所有的接收者(receivers)。
下面是参考代码:
1)生产端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use PhpAmqpLibMessageAMQPMessage; 7 use yiiconsoleController; 8 9 class SendController extends Controller 10 { 11 public function actionIndex3($argv) 12 { 13 // RabbitMQ: 工作队列-发布/订阅(fanout) 14 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 15 $channel = $connection->channel(); 16 // 声明交换机 17 $channel->exchange_declare('logs', 'fanout', false, false, false); 18 19 if (empty($argv)) { 20 $argv = "info: Hello World!"; 21 } 22 23 // 发送消息到我们命名为logs的交换机 24 $msg = new AMQPMessage($argv); 25 $channel->basic_publish($msg, 'logs'); 26 echo "[x] Sent ", $argv, " "; 27 28 // 关闭通道和连接 29 $channel->close(); 30 $connection->close(); 31 } 32 }
2)消费端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use yiiconsoleController; 7 8 class ReceiveController extends Controller 9 { 10 public function actionIndex3() 11 { 12 // RabbitMQ: 工作队列-发布/订阅(fanout) 13 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 14 $channel = $connection->channel(); 15 16 // 声明交换机 17 $channel->exchange_declare('logs', 'fanout', false, false, false); 18 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); 19 20 // 绑定队列到交换机 21 $channel->queue_bind($queue_name, 'logs'); 22 echo ' [*] Waiting for logs. To exit press CTRL+C', " "; 23 24 // 定义接收消息进行处理的回调函数 25 $callback = function ($msg) { 26 echo "[x] Received ", $msg->body, " "; 27 }; 28 29 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); 30 31 while (count($channel->callbacks)) { 32 $channel->wait(); 33 } 34 35 // 关闭通道和连接 36 $channel->close(); 37 $connection->close(); 38 } 39 }
我们来模拟实现一个日志系统,分别开两个窗口来作为消费端监听,其中一个将监听消息写入日志,另外一个直接命令窗口输出,如下图:
再开一个窗口来发送消息:
4、路由模式(routing)
生产者将消息发送到direct交换机,它会把消息路由到那些binding key 与 routing key 完全匹配的queue中。在相应队列监听的消费者才能消费消息。这样就能实现消费者有选择的去消费消息。
说明: 如果routing key为black或者green,那么交换机会将消息路由到队列Q2中。
说明: 如果routing key为black,那么交换机会将消息路由到队列Q1和队列Q2中。
应用场景:
我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。
下面参考代码:
1)生产端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use PhpAmqpLibMessageAMQPMessage; 7 use yiiconsoleController; 8 9 class SendController extends Controller 10 { 11 public function actionIndex4($argv) 12 { 13 // RabbitMQ: 路由模式 14 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 15 $channel = $connection->channel(); 16 17 // 声明交换机 18 $channel->exchange_declare('direct_logs', 'direct', false, false, false); 19 if (strpos($argv, '/') !== false) { 20 $arr = explode('/', $argv); 21 // routing_key 22 $severity = $arr[0]; 23 // 消息内容 24 $data = $arr[1]; 25 } else { 26 echo '参数错误'; 27 return; 28 } 29 // 发送消息到我们命名为direct_logs的交换机 30 $msg = new AMQPMessage($data); 31 $channel->basic_publish($msg, 'direct_logs', $severity); 32 echo "[x] Sent ", $data, " "; 33 34 // 关闭通道和连接 35 $channel->close(); 36 $connection->close(); 37 } 38 }
2)消费端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use yiiconsoleController; 7 8 class ReceiveController extends Controller 9 { 10 public function actionIndex4($argv) 11 { 12 // RabbitMQ: 路由模式 13 14 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 15 $channel = $connection->channel(); 16 17 // 声明交换机 18 $channel->exchange_declare('direct_logs', 'direct', false, false, false); 19 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); 20 21 // 为我们感兴趣的每个严重级别分别创建一个新的绑定 22 if (strpos($argv, '/') !== false) { 23 $severities = explode('/', $argv); 24 foreach ($severities as $severity) { 25 // 第三个参数为routing_key 26 $channel->queue_bind($queue_name, 'direct_logs', $severity); 27 } 28 } else { 29 $severity = $argv; 30 $channel->queue_bind($queue_name, 'direct_logs', $severity); 31 } 32 33 echo ' [*] Waiting for logs. To exit press CTRL+C', " "; 34 35 // 接收消息进行处理的回调函数 36 $callback = function ($msg) { 37 echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, " "; 38 }; 39 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); 40 while (count($channel->callbacks)) { 41 $channel->wait(); 42 } 43 44 // 关闭通道和连接 45 $channel->close(); 46 $connection->close(); 47 } 48 }
消费端监听消息:
生产端发送消息:
5、主题模式(Topic) - 路由模式的一种
主题交换机是很强大的,它可以表现出跟其他交换机类似的行为。
- 当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
- 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
1)"#" 和 "*" 符号代表通配符
2)""#"代表零个或者多个单词, "*" 代表一个单词
3)路由功能添加模糊匹配
4)消息产生者产生消息,把消息交给交换机
5)单词之间用英文句点"."隔开
工作过程:
交换机根据binding key的规则,使用routing Key来模糊匹配到对应的队列,由队列的监听消费者接收消息消费。如果没有匹配到相应队列,则消息被丢弃。
1)生产端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use PhpAmqpLibMessageAMQPMessage; 7 use yiiconsoleController; 8 9 class SendController extends Controller 10 { 11 public function actionIndex5($argv) 12 { 13 // RabbitMQ: 主题模式 14 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 15 $channel = $connection->channel(); 16 17 // 声明交换机 18 $channel->exchange_declare('topic_logs', 'topic', false, false, false); 19 if (strpos($argv, '/') !== false) { 20 $arr = explode('/', $argv); 21 // routing_key 22 $routing_key = $arr[0]; 23 // 消息内容 24 $data = $arr[1]; 25 } else { 26 echo '参数错误'; 27 return; 28 } 29 30 // 发送消息 31 $msg = new AMQPMessage($data); 32 $channel->basic_publish($msg, 'topic_logs', $routing_key); 33 echo "[x] Sent ", $routing_key, ':', $data, " "; 34 35 // 关闭通道和连接 36 $channel->close(); 37 $connection->close(); 38 } 39 }
2)消费端
1 <?php 2 3 namespace consolecontrollers; 4 5 use PhpAmqpLibConnectionAMQPStreamConnection; 6 use yiiconsoleController; 7 8 class ReceiveController extends Controller 9 { 10 public function actionIndex5($argv) 11 { 12 // RabbitMQ: 主题模式 13 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 14 $channel = $connection->channel(); 15 16 // 声明交换机 17 $channel->exchange_declare('topic_logs', 'topic', false, false, false); 18 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); 19 20 // 多个队列和这个名称为topic_logs的交换机绑定 21 if (strpos($argv, '/') !== false) { 22 $binding_keys = explode('/', $argv); 23 foreach ($binding_keys as $binding_key) { 24 // 第三个参数为routing_key 25 $channel->queue_bind($queue_name, 'topic_logs', $binding_key); 26 } 27 } else { 28 $binding_key = $argv; 29 $channel->queue_bind($queue_name, 'topic_logs', $binding_key); 30 } 31 32 echo ' [*] Waiting for logs. To exit press CTRL+C', " "; 33 34 // 接收消息进行处理的回调函数 35 $callback = function ($msg) { 36 echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, " "; 37 }; 38 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); 39 while (count($channel->callbacks)) { 40 $channel->wait(); 41 } 42 43 // 关闭通道和连接 44 $channel->close(); 45 $connection->close(); 46 } 47 48 }
我们分别开4个窗口,监听消息
发送消息后
6、RPC模式- 远程过程调用
这里面有两个重要的概念:
1) replyTo: 存储回调队列的名称
2) correlationId: 唯一标识本次的请求,主要用于RPC调用。
工作过程:
使用 RabbitMQ 实现 RPC,相应的角色是由生产者来作为客户端,消费者作为服务端。
但 RPC 调用一般是同步的,客户端和服务器也是紧密耦合的。即客户端通过 IP/域名和端口链接到服务器,向服务器发送请求后等待服务器返回响应信息。
但 MQ 的生产者和消费者是完全解耦的,那么如何用 MQ 实现 RPC 呢?很明显就是把 MQ 当作中间件,实现一次双向的消息传递。
客户端和服务端既是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。
纸上谈兵终觉浅,后面有空再来补充一些RabbitMQ在项目中使用的案例。
参考链接:
https://www.rabbitmq.com/getstarted.html