zoukankan      html  css  js  c++  java
  • php调用rabbitmq实现订单消费队列,和延时消费队列

    2020年10月19日15:57:24

     个人一点学习和使用rabbitmq,先理解其中概念,不然使用起来十分混乱

    php使用rabbitmq的相关博客还是相对较少的,java的偏多一些,我也是参考一些java博客才算是搞清楚

    环境php7.3 laravel 8.0 一部分原因也是测试一下 laravel 8.0的改变

    安装参考

    composer require php-amqplib/php-amqplib

    https://www.cnblogs.com/zx-admin/p/13825182.html

    先贴代码

    BaseRabbitmqService

    <?php
    
    namespace AppService;
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibWireAMQPTable;
    
    class BaseRabbitmqService {
    
        //死信队列和交换机
        public static $dlxQueue = 'dlx.queue';
        public static $dlxExchange = 'dlx.exchange';
        public static $dlxKey = 'dlxKey';
        //死信之后的队列和交换机
        public static $normalQueue = 'normal.queue';
        public static $normalExchange = 'normal.exchange';
        public static $normalKey = 'normalKey';
        //消息发布者的routing_key
        public static $msgKey = 'msgkey';
    
        private static function getConfig() {
            $isOnline = config('system.is_online');
            if ($isOnline) {
                return config('system.online');
            } else {
                return config('system.offline');
            }
        }
    
        public static function getConnection() {
            $config = self::getConfig();
    
            $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']);
            self::init($connection);
            return $connection;
        }
    
        //初始化一些队列信息
        private static function init(&$connection) {
            $channel = $connection->channel();
    
            //定义交换机
            $channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true);
            $channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true);
    
            //定义队列,在正常队列超时之后就送去死信队列
            $args = new AMQPTable();
            // 消息过期方式:设置 queue.normal 队列中的消息5s之后过期,毫秒单位
            $args->set('x-message-ttl', 5000);
            // 设置队列最大长度方式: x-max-length
            //$args->set('x-max-length', 1);
            $args->set('x-dead-letter-exchange', self::$dlxExchange);
            $args->set('x-dead-letter-routing-key', self::$msgKey);
            $channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args);
            $channel->queue_declare(self::$dlxQueue, false, true, false, false);
    
            $channel->queue_bind(self::$normalQueue, self::$normalExchange);
            $channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey);
        }
    
    }
    View Code

    生产者代码 ProducerService

    <?php
    
    namespace AppService;
    
    use AppServiceBaseRabbitmqService;
    use AppModelsOrder;
    use PhpAmqpLibMessageAMQPMessage;
    
    class ProducerService extends BaseRabbitmqService {
    
        public static function doTask() {
    //        echo 'ProducerService';
            $connection = self::getConnection();
            $channel = $connection->channel();
    
            $data = [];
            //生成5条数数据
            for ($i = 0; $i < 5; $i++) {
    
                $data['user_id'] = mt_rand(1, 100);
                $data['order_amount'] = mt_rand(10000, 99999);
                $data['order_number'] = mt_rand(100, 999);
    
    //            $msg = new AMQPMessage(json_encode($data),
    //                    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) # 使消息持久化
    //            );
                $msg = new AMQPMessage(json_encode($data));
    
                echo " [x] Send  ", date('Y-m-d H:i:s') . '--' . json_encode($data), "
    ";
                $channel->basic_publish($msg, self::$normalExchange);
            }
    
            $channel->close();
            $connection->close();
        }
    
    }
    View Code

    消费者代码

    <?php
    
    namespace AppService;
    
    use AppServiceBaseRabbitmqService;
    use IlluminateSupportFacadesRedis;
    use IlluminateSupportFacadesDB;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class ConsumerService extends BaseRabbitmqService {
    
        public static function doTask() {
    //        echo 'ConsumerService';
            $connection = self::getConnection();
            $channel = $connection->channel();
    
            $callback = function($msg) {
                echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "
    ";
                //主动确认信息处理完
    //            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                //没有确认就手动丢给死信队列
                sleep(10);
                $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
            };
            //发送一个未处理完就不发送下一个
    //        $channel->basic_qos(null, 1, null);
            $channel->basic_consume(self::$normalQueue, 'ConsumerService', false, false, false, false, $callback);
    
            while (count($channel->callbacks)) {
                $channel->wait();
            }
        }
    
    }
    View Code

    消费者代码t

    <?php
    
    namespace AppService;
    
    use AppServiceBaseRabbitmqService;
    use IlluminateSupportFacadesRedis;
    use IlluminateSupportFacadesDB;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class ConsumerServicet extends BaseRabbitmqService {
    
        public static function doTask() {
    //        echo 'ConsumerServicet';
            $connection = self::getConnection();
            $channel = $connection->channel();
    
            $callback = function($msg) {
                echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "
    ";
                //主动确认信息处理完
    //            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            };
            //发送一个未处理完就不发送下一个
    //        $channel->basic_qos(null, 1, null);
            $channel->basic_consume(self::$dlxQueue, 'ConsumerServicet', false, false, false, false, $callback);
    
            while (count($channel->callbacks)) {
                $channel->wait();
            }
        }
    
    }
    View Code

    注意点:

    1,AMQPExchangeType::DIRECT和AMQPExchangeType::FANOUT 交换机类型的区别,也就是订阅分发布的关系

    2,x-dead-letter-routing-key 死信key也就是死信订阅交换机需要关注的key,不然交换不过去,在绑定死信交换机和死信队列的时候绑定同一个key

    3,注意如何手动确认消息到达,和手动拒绝消息,这个再处理业务逻辑的时候,就需要

     //主动确认信息处理完
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    //没有确认就手动丢给死信队列$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);

    4,对于死信队列的里面什么情况下才会丢给死信交换机,

    1,消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
    2,消息过期
    3,队列达到最大长度
    4.当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能

    这里需要注意的是,你在监听正常消费的设置死信的队列的时候,即使设置的时间到了也是不会丢给死信队列的,如果你不开启正常消费队列的监听,这个设置了死信的队列就成了延迟队列的效果,再次强调 理解概念

    5,手动丢给死信队列

    $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);

    为啥在此说这个问题,因为4,5你需要多次尝试之后才能理解,所以在你想实现 延时消费队列的时候就可以不去监听正常消费队列,直接去监听死信队列,就可以实现延时效果,

    你也可以通过延迟插件来实现,但是在代码里就需要非常注意,不然就容易出现逻辑混乱的问题了

    6,Consumer必须在cli模式下执行,但是Producer就不必要

    7,逻辑梳理

    发布消息->正常交换机->设置了死信属性的队列->超时,拒绝,无人监听->死信交换机—>死信队列

    根据逻辑处理不同可以分为死信队列,也可以是延迟队列

    参考资料

    https://www.bbsmax.com/A/QV5Z36WZdy/

    https://www.cnblogs.com/wudequn/p/11198427.html

    https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ying-yong-jiao-cheng/php-ban/1-hello_world

  • 相关阅读:
    The formatter threw an exception while trying to deserialize the message in WCF
    通过Web Deploy方式部署WCF
    The Managed Metadata Service or Connection is currently not available
    How to create Managed Metadata Column
    冒泡算法
    asp.net core 实战项目(一)——ef core的使用
    Vue学习笔记入门篇——安装及常用指令介绍
    Vue学习笔记入门篇——数据及DOM
    Vue学习笔记目录
    Chart.js在Laravel项目中的应用
  • 原文地址:https://www.cnblogs.com/zx-admin/p/13840669.html
Copyright © 2011-2022 走看看