zoukankan      html  css  js  c++  java
  • php调用rabbitmq实现订单消费队列,和延时消费队列

    2020年10月19日15:57:24

     个人一点学习和使用rabbitmq,先理解其中概念,不然使用起来十分混乱

    php使用rabbitmq的相关博客还是相对较少的,java的偏多一些,我也是参考一些java博客才算是搞清楚

    环境php7.3 laravel 8.0 一部分原因也是测试一下 laravel 8.0的改变

    安装参考

    composer require php-amqplib/php-amqplib

    https://www.cnblogs.com/zx-admin/p/13825182.html

    先贴代码

    BaseRabbitmqService

    <?php
    
    namespace AppService;
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibWireAMQPTable;
    
    class BaseRabbitmqService {
    
        //死信队列和交换机
        public static $dlxQueue = 'dlx.queue';
        public static $dlxExchange = 'dlx.exchange';
        public static $dlxKey = 'dlxKey';
        //死信之后的队列和交换机
        public static $normalQueue = 'normal.queue';
        public static $normalExchange = 'normal.exchange';
        public static $normalKey = 'normalKey';
        //消息发布者的routing_key
        public static $msgKey = 'msgkey';
    
        private static function getConfig() {
            $isOnline = config('system.is_online');
            if ($isOnline) {
                return config('system.online');
            } else {
                return config('system.offline');
            }
        }
    
        public static function getConnection() {
            $config = self::getConfig();
    
            $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']);
            self::init($connection);
            return $connection;
        }
    
        //初始化一些队列信息
        private static function init(&$connection) {
            $channel = $connection->channel();
    
            //定义交换机
            $channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true);
            $channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true);
    
            //定义队列,在正常队列超时之后就送去死信队列
            $args = new AMQPTable();
            // 消息过期方式:设置 queue.normal 队列中的消息5s之后过期,毫秒单位
            $args->set('x-message-ttl', 5000);
            // 设置队列最大长度方式: x-max-length
            //$args->set('x-max-length', 1);
            $args->set('x-dead-letter-exchange', self::$dlxExchange);
            $args->set('x-dead-letter-routing-key', self::$msgKey);
            $channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args);
            $channel->queue_declare(self::$dlxQueue, false, true, false, false);
    
            $channel->queue_bind(self::$normalQueue, self::$normalExchange);
            $channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey);
        }
    
    }
    View Code

    生产者代码 ProducerService

    <?php
    
    namespace AppService;
    
    use AppServiceBaseRabbitmqService;
    use AppModelsOrder;
    use PhpAmqpLibMessageAMQPMessage;
    
    class ProducerService extends BaseRabbitmqService {
    
        public static function doTask() {
    //        echo 'ProducerService';
            $connection = self::getConnection();
            $channel = $connection->channel();
    
            $data = [];
            //生成5条数数据
            for ($i = 0; $i < 5; $i++) {
    
                $data['user_id'] = mt_rand(1, 100);
                $data['order_amount'] = mt_rand(10000, 99999);
                $data['order_number'] = mt_rand(100, 999);
    
    //            $msg = new AMQPMessage(json_encode($data),
    //                    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) # 使消息持久化
    //            );
                $msg = new AMQPMessage(json_encode($data));
    
                echo " [x] Send  ", date('Y-m-d H:i:s') . '--' . json_encode($data), "
    ";
                $channel->basic_publish($msg, self::$normalExchange);
            }
    
            $channel->close();
            $connection->close();
        }
    
    }
    View Code

    消费者代码

    <?php
    
    namespace AppService;
    
    use AppServiceBaseRabbitmqService;
    use IlluminateSupportFacadesRedis;
    use IlluminateSupportFacadesDB;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class ConsumerService extends BaseRabbitmqService {
    
        public static function doTask() {
    //        echo 'ConsumerService';
            $connection = self::getConnection();
            $channel = $connection->channel();
    
            $callback = function($msg) {
                echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "
    ";
                //主动确认信息处理完
    //            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                //没有确认就手动丢给死信队列
                sleep(10);
                $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
            };
            //发送一个未处理完就不发送下一个
    //        $channel->basic_qos(null, 1, null);
            $channel->basic_consume(self::$normalQueue, 'ConsumerService', false, false, false, false, $callback);
    
            while (count($channel->callbacks)) {
                $channel->wait();
            }
        }
    
    }
    View Code

    消费者代码t

    <?php
    
    namespace AppService;
    
    use AppServiceBaseRabbitmqService;
    use IlluminateSupportFacadesRedis;
    use IlluminateSupportFacadesDB;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class ConsumerServicet extends BaseRabbitmqService {
    
        public static function doTask() {
    //        echo 'ConsumerServicet';
            $connection = self::getConnection();
            $channel = $connection->channel();
    
            $callback = function($msg) {
                echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "
    ";
                //主动确认信息处理完
    //            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            };
            //发送一个未处理完就不发送下一个
    //        $channel->basic_qos(null, 1, null);
            $channel->basic_consume(self::$dlxQueue, 'ConsumerServicet', false, false, false, false, $callback);
    
            while (count($channel->callbacks)) {
                $channel->wait();
            }
        }
    
    }
    View Code

    注意点:

    1,AMQPExchangeType::DIRECT和AMQPExchangeType::FANOUT 交换机类型的区别,也就是订阅分发布的关系

    2,x-dead-letter-routing-key 死信key也就是死信订阅交换机需要关注的key,不然交换不过去,在绑定死信交换机和死信队列的时候绑定同一个key

    3,注意如何手动确认消息到达,和手动拒绝消息,这个再处理业务逻辑的时候,就需要

     //主动确认信息处理完
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    //没有确认就手动丢给死信队列$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);

    4,对于死信队列的里面什么情况下才会丢给死信交换机,

    1,消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
    2,消息过期
    3,队列达到最大长度
    4.当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能

    这里需要注意的是,你在监听正常消费的设置死信的队列的时候,即使设置的时间到了也是不会丢给死信队列的,如果你不开启正常消费队列的监听,这个设置了死信的队列就成了延迟队列的效果,再次强调 理解概念

    5,手动丢给死信队列

    $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);

    为啥在此说这个问题,因为4,5你需要多次尝试之后才能理解,所以在你想实现 延时消费队列的时候就可以不去监听正常消费队列,直接去监听死信队列,就可以实现延时效果,

    你也可以通过延迟插件来实现,但是在代码里就需要非常注意,不然就容易出现逻辑混乱的问题了

    6,Consumer必须在cli模式下执行,但是Producer就不必要

    7,逻辑梳理

    发布消息->正常交换机->设置了死信属性的队列->超时,拒绝,无人监听->死信交换机—>死信队列

    根据逻辑处理不同可以分为死信队列,也可以是延迟队列

    参考资料

    https://www.bbsmax.com/A/QV5Z36WZdy/

    https://www.cnblogs.com/wudequn/p/11198427.html

    https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ying-yong-jiao-cheng/php-ban/1-hello_world

  • 相关阅读:
    关于Oracle数据库字符集
    NK3C:关于svg文件使用
    NK3C:异常处理(前端)
    关于返回值问题
    NK3C开发要点
    velocity模板使用建议
    样本随机抽样、局号抽样逻辑
    样本回收逻辑
    NKUI框架使用
    解决chrome,下载在文件夹中显示,调用错误的关联程序
  • 原文地址:https://www.cnblogs.com/zx-admin/p/13840669.html
Copyright © 2011-2022 走看看