zoukankan      html  css  js  c++  java
  • thinkphp5.1+think-queue

    最近有一个需求,A用户充值积分到钱包,但是钱包只能在一分钟之后做出响应,那么就需要异步执行查看钱包是否到账的操作,本来打算用swoole异步,突然想到think-queue,那不妨就用对列来玩玩

    本文参考 CSDN 鼠你有钱 tp5.1 + think-queue + supervisor博文 点此穿越

    第一步 安装 think-queue

    composer require topthink/think-queue

    think-queue包地址 需要注意框架版本问题,现版本默认tp6框架

    第二步 配置

    安装好之后 默认会在 config文件夹下生成 queue.php配置文件

    <?php
    // +----------------------------------------------------------------------
    // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
    // +----------------------------------------------------------------------
    // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
    // +----------------------------------------------------------------------
    // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
    // +----------------------------------------------------------------------
    // | Author: yunwuxin <448901948@qq.com>
    // +----------------------------------------------------------------------
    use thinkEnv;
    
    return [
        'connector' => 'redis',
        "expire"=>60,//任务过期时间默认为秒,禁用为null
        "default"=>"default",//默认队列名称
        "host"=>"127.0.0.1",//Redis主机IP地址
        "port"=>6379,//Redis端口
        "password"=>"******",//Redis密码
        "select"=>5,//Redis数据库索引
        "timeout"=>0,//Redis连接超时时间
        "persistent"=>false,//是否长连接
    ];

    这里我使用的是 redis驱动 也可以根据官方文档选择数据库驱动

    第三步 编写代码

    在application/index/controller下创建Jobtest.php

    
    

    <?php

    
    

    namespace appindexcontroller;

    
    

    use thinkQueue;

    class Jobtest{
        public function actionWithHelloJob(){
            $params = request()->param();
            // 1.当前任务将由哪个类来负责处理。
            //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
            $jobHandlerClassName  = 'appindexjobHello';
    
            // 2.当前任务归属的队列名称,如果为新队列,会自动创建
            $jobQueueName        = "send";
    
            // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
            $data = $this->add($params);
    
            // 4.将该任务推送到消息队列,等待对应的消费者去执行
    
            // $isPushed = Queue::push( $jobHandlerClassName , $data , $jobQueueName );
    
            $isPushed = Queue::later(60,$jobHandlerClassName,$data,$jobQueueName); //把任务分配到队列中,延迟60s后执行
    
            // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
            if( $isPushed !== false ){
                echo '成功';
            }else{
                echo '错误';
            }
        }
        
        public function add($params){
            $data =[
                'order_no'=>$params['orderNo'],
                'msg'=>$params['orderNo'],
                'create_time'=>date('Y-m-d H:i:s'),
            ];
            Db::name('test')->insert($data);
        }
    }

    这里我创建了 test表来检测,你也可以根据自己的需求来创建数据库

    CREATE TABLE `test` (
      `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
      `order_no` varchar(255),
      `msg` varchar(255),
      `create` varchar(255),  
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;

    下来我们需要在 application/index 下在创建一个job目录 在job目录下创建Hello.php文件

    <?php
    namespace appindexjob;
    use thinkqueueJob;
    use thinkDb;
    
    class Hello {
        public function fire(Job $job,$data) {
            // 有些消息在到达消费者时,可能已经不再需要执行了
            $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
            if(!$isJobStillNeedToBeDone){
                $job->delete();
                return;
            }
    
            $isJobDone = $this->doHelloJob($data);
    
            if ($isJobDone) {
                // 如果任务执行成功, 记得删除任务
                $job->delete();
            }else{
                $job->release(3); //$delay为延迟时间
            }
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>
    ");
    
                $job->delete();
    
                // 也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>
    ");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
        
        public function failed($data)
        {
            // ...任务达到最大重试次数后,失败了
        }
    
        /**
        * 有些消息在到达消费者时,可能已经不再需要执行了
        * @param array|mixed    $data     发布任务时自定义的数据
        * @return boolean                 任务执行的结果
        */
        private function checkDatabaseToSeeIfJobNeedToBeDone($data){
            return true;
        }
    
        /**
        * 根据消息中的数据进行实际的业务处理...
        */
        private function doHelloJob($data)
        {
            //根据你的业务需求写逻辑即可 成功返回true失败返回false即可
        }
    }

    到这里代码基本就完成了,我们使用浏览器来访问我们的 Jobtest下actionWithHelloJob方法

    然后在终端执行

    php think queue:work --queue send 

    这里的 send就是你的对列名称

    执行后我们可以在redis里看到具体的数据对列 如果你没有安装redis那就需要在安装think-queue之前安装redis扩展

    第四步 使用supervisor 将queue进程常驻

    以下内容均来自 CSDN 鼠你有钱 tp5.1 + think-queue + supervisor博文 点此穿越  如有侵权 可联系本人第一时间删除
    1.安装supervisor

    # yum install epel-release
    # yum install supervisor
    
    //设置成开机自动启动
    # systemctl enable supervisord

    2.配置

    在这里我创建了一个命名为supervisor的目录用于存放supervisor和队列的日志文件以及include的配置文件,其目录结构为:

    /var/supervisor/log/    #可以自定义
                   /run/    #可以自定义
                   /conf/   #可以自定义

    然后找到/etc/supervisord.conf配置文件,编辑如下信息:

    ; 将supervisor.sock 的路径换成如下
    [unix_http_server]
    file=/var/supervisor/run/supervisor.sock   ; (the path to the socket file)
    
    ; 将supervisord.log 和 supervisord.pid 的路径换成如下
    [supervisord]
    logfile=/var/supervisor/log/supervisord.log  ; (main log file;default $CWD/supervisord.log)
    pidfile=/var/supervisor/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
    
    ; 将supervisor.sock 的路径换成如下
    [supervisorctl]
    serverurl=unix:///var/supervisor/run/supervisor.sock ; use a unix:// URL  for a unix socket
    
    ; 将最底部的files路径换成如下
    [include]
    files = /var/supervisor/conf/*.conf

    /var/supervisor/conf目录里创建一个.conf文件,这里命名为queue_work.conf,内容如下:

    [program:queue_worker] ;项目名称
    directory = /opt/www/tp5.1 ; 程序的启动目录,项目根目录的上一级
    command = php think queue:work --queue queueName --daemon ; 启动命令 queueName就是队列名
    process_name=%(program_name)s_%(process_num)02d
    numprocs = 3         ; 开启的进程数量
    autostart = true     ; 在 supervisord 启动的时候也自动启动
    startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了
    autorestart = true   ; 程序异常退出后自动重启
    startretries = 3     ; 启动失败自动重试次数,默认是 3
    user = root          ; 用哪个用户启动
    redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false
    stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默认 50MB
    stdout_logfile_backups = 20     ; stdout 日志文件备份数
    ; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)
    stdout_logfile = /var/supervisor/log/queue_worker.log
    loglevel=info

    对于index这个单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来:

    command = php think queue:work --queue queueName1,queueName2 --daemon ;

    重启

    # systemctl stop supervisord
    # systemctl start supervisord

    # systemctl restart supervisord

     

  • 相关阅读:
    Paths on a Grid
    Three Kingdoms(优先队列+bfs)
    Factstone Benchmark(数学)
    C. Searching for Graph(cf)
    B. Trees in a Row(cf)
    String Successor(模拟)
    乘积最大的分解(数学)
    Kindergarten Election
    In 7-bit
    Friends
  • 原文地址:https://www.cnblogs.com/we-jack/p/14023333.html
Copyright © 2011-2022 走看看