zoukankan      html  css  js  c++  java
  • thinkphp 6 消息队列

    1.安装think-queue

    composer require topthink/think-queue

    2.配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动

    如选择database,需创建表

    CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserve_time` int(10) unsigned DEFAULT NULL, `available_time` int(10) unsigned NOT NULL, `create_time` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    3.创建生产者

    class Index extends BaseController
    {
        
        /**
         * 单任务
         */
        public function singleTask()
        {
            //当前任务将由哪个类来负责处理
            $jobHandlerClassName = 'appjobJob1';
            //业务数据 对象需要手动转序列化
            $jobData = ['ts' => time()];
            //队列名称
            $jobQueueName = "createOrderJob";
            //入队列,later延时发送,单位秒。push立即发送
            $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName);
            //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
            // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
            if( $isPushed !== false ){
                echo '执行成功';
            }else{
                echo '执行失败';
            }
            //php think queue:listen --queue createOrderJob  执行队列
            //nohup php think queue:listen --queue createOrderJob &  不以守护进程执行
        }
     
        /**
         * 多任务
         */
        public function multiTask(){
            $taskType = $_GET['taskType'];
            switch ($taskType) {
                case 'taskA':
                    $jobHandlerClassName  = 'appjobMultiTask@taskA';
                    $jobDataArr = ['a'   => '1'];
                    $jobQueueName = "multiTaskJobQueue";
                    break;
                case 'taskB':
                    $jobHandlerClassName  = 'appjobMultiTask@taskB';
                    $jobDataArr = ['b'   => '2'];
                    $jobQueueName = "multiTaskJobQueue";
                    break;
                default:
                    break;
            }
     
            $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
            if ($isPushed !== false) {
                echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
            }else{
                echo "push a new $taskType of MultiTask Job Failed!";
            }
        }
     
    }

    4.创建消费者

    单任务

    class Job1
    {
        public function fire(Job $job, $data)
        {
            //业务处理代码,具体不贴出来了
            $isJobDone = $this->jobDone($data);
            //执行成功删除
            if($isJobDone){
                $job->delete();
                print("任务已经被执行成功并且删除");
            }else{
                $job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行
                print("任务3s后再次被执行");
            }
            //通过这个方法可以检查任务重试了几次
            if ($job->attempts() > 3) {
                print("Job has been retried more than 3 times!");
                $job->delete();
            }
        }
     
        public function failed($data)
        {
            // ...任务达到最大重试次数后,失败了
        }
     
        private function jobDone($data){
            Log::write('这是数据 ' . json_encode($data));
            return true;
        }

    多任务

    class MultiTask{
        public function taskA(Job $job,$data){
     
            $isJobDone = $this->_doTaskA($data);
     
            if ($isJobDone) {
                $job->delete();
                print("Info: TaskA of Job MultiTask has been done and deleted"."
    ");
            }else{
                $job->release(3);
                print("任务3s后再次被执行");
            }
            if ($job->attempts() > 3) {
                print("Job has been retried more than 3 times!");
                $job->delete();
            }
        }
     
        public function taskB(Job $job,$data){
     
            $isJobDone = $this->_doTaskB($data);
     
            if ($isJobDone) {
                $job->delete();
                print("Info: TaskB of Job MultiTask has been done and deleted"."
    ");
            }else{
                $job->release(3);
                print("任务3s后再次被执行");
            }
            if ($job->attempts() > 3) {
                print("Job has been retried more than 3 times!");
                $job->delete();
            }
        }
     
        private function _doTaskA($data) {
            print("Info: doing TaskA of Job MultiTask "."
    ");
            return true;
        }
     
        private function _doTaskB($data) {
            print("Info: doing TaskB of Job MultiTask "."
    ");
            return true;
        }
    }

    5.执行

    php think queue:listen --queue createOrderJob   //(队列名)

     --daemon  //可后台运行,具体看手册

    //$job->delete();   删除任务
    //$job->attempts();  查看任务执行次数
    // 注意:执行完任务后必须删除任务

    在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。

    而在redis 模式下,3种重发都是先删除再插入。

    不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。

    此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么如果不需要自动重发的话, 请在抛出异常之前将任务删除 $job->delete() ,以免产生bug。 如果需要自动重发的话,请直接抛出异常,不要在 fire() 方法中又手动使用 $job->release() , 这样会导致该任务被重发两次,产生两个一样的新任务。

     可配合supervisor使用,保证进程常驻

    转: https://www.freesion.com/article/81081354933/

    https://www.sxxblog.com/index/detail/archive/18.html

    https://blog.csdn.net/chengzheng5879/article/details/100913455

  • 相关阅读:
    python virtualenv
    ICMP
    正则表达式
    tcpdump命令
    vim命令
    IP网际协议
    链路层
    python模块学习 logging
    Angular2+如何去除url中的#
    angular5懒加载之模块划分
  • 原文地址:https://www.cnblogs.com/fps2tao/p/15145470.html
Copyright © 2011-2022 走看看