2020年10月19日15:57:24
个人一点学习和使用rabbitmq,先理解其中概念,不然使用起来十分混乱
php使用rabbitmq的相关博客还是相对较少的,java的偏多一些,我也是参考一些java博客才算是搞清楚
环境php7.3 laravel 8.0 一部分原因也是测试一下 laravel 8.0的改变
安装参考
composer require php-amqplib/php-amqplib
https://www.cnblogs.com/zx-admin/p/13825182.html
先贴代码
BaseRabbitmqService
<?php namespace AppService; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibExchangeAMQPExchangeType; use PhpAmqpLibWireAMQPTable; class BaseRabbitmqService { //死信队列和交换机 public static $dlxQueue = 'dlx.queue'; public static $dlxExchange = 'dlx.exchange'; public static $dlxKey = 'dlxKey'; //死信之后的队列和交换机 public static $normalQueue = 'normal.queue'; public static $normalExchange = 'normal.exchange'; public static $normalKey = 'normalKey'; //消息发布者的routing_key public static $msgKey = 'msgkey'; private static function getConfig() { $isOnline = config('system.is_online'); if ($isOnline) { return config('system.online'); } else { return config('system.offline'); } } public static function getConnection() { $config = self::getConfig(); $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']); self::init($connection); return $connection; } //初始化一些队列信息 private static function init(&$connection) { $channel = $connection->channel(); //定义交换机 $channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true); $channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true); //定义队列,在正常队列超时之后就送去死信队列 $args = new AMQPTable(); // 消息过期方式:设置 queue.normal 队列中的消息5s之后过期,毫秒单位 $args->set('x-message-ttl', 5000); // 设置队列最大长度方式: x-max-length //$args->set('x-max-length', 1); $args->set('x-dead-letter-exchange', self::$dlxExchange); $args->set('x-dead-letter-routing-key', self::$msgKey); $channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args); $channel->queue_declare(self::$dlxQueue, false, true, false, false); $channel->queue_bind(self::$normalQueue, self::$normalExchange); $channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey); } }
生产者代码 ProducerService
<?php namespace AppService; use AppServiceBaseRabbitmqService; use AppModelsOrder; use PhpAmqpLibMessageAMQPMessage; class ProducerService extends BaseRabbitmqService { public static function doTask() { // echo 'ProducerService'; $connection = self::getConnection(); $channel = $connection->channel(); $data = []; //生成5条数数据 for ($i = 0; $i < 5; $i++) { $data['user_id'] = mt_rand(1, 100); $data['order_amount'] = mt_rand(10000, 99999); $data['order_number'] = mt_rand(100, 999); // $msg = new AMQPMessage(json_encode($data), // array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) # 使消息持久化 // ); $msg = new AMQPMessage(json_encode($data)); echo " [x] Send ", date('Y-m-d H:i:s') . '--' . json_encode($data), " "; $channel->basic_publish($msg, self::$normalExchange); } $channel->close(); $connection->close(); } }
消费者代码
<?php namespace AppService; use AppServiceBaseRabbitmqService; use IlluminateSupportFacadesRedis; use IlluminateSupportFacadesDB; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; class ConsumerService extends BaseRabbitmqService { public static function doTask() { // echo 'ConsumerService'; $connection = self::getConnection(); $channel = $connection->channel(); $callback = function($msg) { echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, " "; //主动确认信息处理完 // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //没有确认就手动丢给死信队列 sleep(10); $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']); }; //发送一个未处理完就不发送下一个 // $channel->basic_qos(null, 1, null); $channel->basic_consume(self::$normalQueue, 'ConsumerService', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } } }
消费者代码t
<?php namespace AppService; use AppServiceBaseRabbitmqService; use IlluminateSupportFacadesRedis; use IlluminateSupportFacadesDB; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; class ConsumerServicet extends BaseRabbitmqService { public static function doTask() { // echo 'ConsumerServicet'; $connection = self::getConnection(); $channel = $connection->channel(); $callback = function($msg) { echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, " "; //主动确认信息处理完 // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; //发送一个未处理完就不发送下一个 // $channel->basic_qos(null, 1, null); $channel->basic_consume(self::$dlxQueue, 'ConsumerServicet', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } } }
注意点:
1,AMQPExchangeType::DIRECT和AMQPExchangeType::FANOUT 交换机类型的区别,也就是订阅分发布的关系
2,x-dead-letter-routing-key 死信key也就是死信订阅交换机需要关注的key,不然交换不过去,在绑定死信交换机和死信队列的时候绑定同一个key
3,注意如何手动确认消息到达,和手动拒绝消息,这个再处理业务逻辑的时候,就需要
//主动确认信息处理完 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //没有确认就手动丢给死信队列$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
4,对于死信队列的里面什么情况下才会丢给死信交换机,
1,消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
2,消息过期
3,队列达到最大长度
4.当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能
这里需要注意的是,你在监听正常消费的设置死信的队列的时候,即使设置的时间到了也是不会丢给死信队列的,如果你不开启正常消费队列的监听,这个设置了死信的队列就成了延迟队列的效果,再次强调 理解概念
5,手动丢给死信队列
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
为啥在此说这个问题,因为4,5你需要多次尝试之后才能理解,所以在你想实现 延时消费队列的时候就可以不去监听正常消费队列,直接去监听死信队列,就可以实现延时效果,
你也可以通过延迟插件来实现,但是在代码里就需要非常注意,不然就容易出现逻辑混乱的问题了
6,Consumer必须在cli模式下执行,但是Producer就不必要
7,逻辑梳理
发布消息->正常交换机->设置了死信属性的队列->超时,拒绝,无人监听->死信交换机—>死信队列
根据逻辑处理不同可以分为死信队列,也可以是延迟队列
参考资料
https://www.bbsmax.com/A/QV5Z36WZdy/
https://www.cnblogs.com/wudequn/p/11198427.html
https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ying-yong-jiao-cheng/php-ban/1-hello_world