zoukankan      html  css  js  c++  java
  • thinkphp5.1+think-queue

    最近有一个需求,A用户充值积分到钱包,但是钱包只能在一分钟之后做出响应,那么就需要异步执行查看钱包是否到账的操作,本来打算用swoole异步,突然想到think-queue,那不妨就用对列来玩玩

    本文参考 CSDN 鼠你有钱 tp5.1 + think-queue + supervisor博文 点此穿越

    第一步 安装 think-queue

    composer require topthink/think-queue

    think-queue包地址 需要注意框架版本问题,现版本默认tp6框架

    第二步 配置

    安装好之后 默认会在 config文件夹下生成 queue.php配置文件

    <?php
    // +----------------------------------------------------------------------
    // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
    // +----------------------------------------------------------------------
    // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
    // +----------------------------------------------------------------------
    // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
    // +----------------------------------------------------------------------
    // | Author: yunwuxin <448901948@qq.com>
    // +----------------------------------------------------------------------
    use thinkEnv;
    
    return [
        'connector' => 'redis',
        "expire"=>60,//任务过期时间默认为秒,禁用为null
        "default"=>"default",//默认队列名称
        "host"=>"127.0.0.1",//Redis主机IP地址
        "port"=>6379,//Redis端口
        "password"=>"******",//Redis密码
        "select"=>5,//Redis数据库索引
        "timeout"=>0,//Redis连接超时时间
        "persistent"=>false,//是否长连接
    ];

    这里我使用的是 redis驱动 也可以根据官方文档选择数据库驱动

    第三步 编写代码

    在application/index/controller下创建Jobtest.php

    
    

    <?php

    
    

    namespace appindexcontroller;

    
    

    use thinkQueue;

    class Jobtest{
        public function actionWithHelloJob(){
            $params = request()->param();
            // 1.当前任务将由哪个类来负责处理。
            //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
            $jobHandlerClassName  = 'appindexjobHello';
    
            // 2.当前任务归属的队列名称,如果为新队列,会自动创建
            $jobQueueName        = "send";
    
            // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
            $data = $this->add($params);
    
            // 4.将该任务推送到消息队列,等待对应的消费者去执行
    
            // $isPushed = Queue::push( $jobHandlerClassName , $data , $jobQueueName );
    
            $isPushed = Queue::later(60,$jobHandlerClassName,$data,$jobQueueName); //把任务分配到队列中,延迟60s后执行
    
            // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
            if( $isPushed !== false ){
                echo '成功';
            }else{
                echo '错误';
            }
        }
        
        public function add($params){
            $data =[
                'order_no'=>$params['orderNo'],
                'msg'=>$params['orderNo'],
                'create_time'=>date('Y-m-d H:i:s'),
            ];
            Db::name('test')->insert($data);
        }
    }

    这里我创建了 test表来检测,你也可以根据自己的需求来创建数据库

    CREATE TABLE `test` (
      `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
      `order_no` varchar(255),
      `msg` varchar(255),
      `create` varchar(255),  
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;

    下来我们需要在 application/index 下在创建一个job目录 在job目录下创建Hello.php文件

    <?php
    namespace appindexjob;
    use thinkqueueJob;
    use thinkDb;
    
    class Hello {
        public function fire(Job $job,$data) {
            // 有些消息在到达消费者时,可能已经不再需要执行了
            $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
            if(!$isJobStillNeedToBeDone){
                $job->delete();
                return;
            }
    
            $isJobDone = $this->doHelloJob($data);
    
            if ($isJobDone) {
                // 如果任务执行成功, 记得删除任务
                $job->delete();
            }else{
                $job->release(3); //$delay为延迟时间
            }
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>
    ");
    
                $job->delete();
    
                // 也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>
    ");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
        
        public function failed($data)
        {
            // ...任务达到最大重试次数后,失败了
        }
    
        /**
        * 有些消息在到达消费者时,可能已经不再需要执行了
        * @param array|mixed    $data     发布任务时自定义的数据
        * @return boolean                 任务执行的结果
        */
        private function checkDatabaseToSeeIfJobNeedToBeDone($data){
            return true;
        }
    
        /**
        * 根据消息中的数据进行实际的业务处理...
        */
        private function doHelloJob($data)
        {
            //根据你的业务需求写逻辑即可 成功返回true失败返回false即可
        }
    }

    到这里代码基本就完成了,我们使用浏览器来访问我们的 Jobtest下actionWithHelloJob方法

    然后在终端执行

    php think queue:work --queue send 

    这里的 send就是你的对列名称

    执行后我们可以在redis里看到具体的数据对列 如果你没有安装redis那就需要在安装think-queue之前安装redis扩展

    第四步 使用supervisor 将queue进程常驻

    以下内容均来自 CSDN 鼠你有钱 tp5.1 + think-queue + supervisor博文 点此穿越  如有侵权 可联系本人第一时间删除
    1.安装supervisor

    # yum install epel-release
    # yum install supervisor
    
    //设置成开机自动启动
    # systemctl enable supervisord

    2.配置

    在这里我创建了一个命名为supervisor的目录用于存放supervisor和队列的日志文件以及include的配置文件,其目录结构为:

    /var/supervisor/log/    #可以自定义
                   /run/    #可以自定义
                   /conf/   #可以自定义

    然后找到/etc/supervisord.conf配置文件,编辑如下信息:

    ; 将supervisor.sock 的路径换成如下
    [unix_http_server]
    file=/var/supervisor/run/supervisor.sock   ; (the path to the socket file)
    
    ; 将supervisord.log 和 supervisord.pid 的路径换成如下
    [supervisord]
    logfile=/var/supervisor/log/supervisord.log  ; (main log file;default $CWD/supervisord.log)
    pidfile=/var/supervisor/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
    
    ; 将supervisor.sock 的路径换成如下
    [supervisorctl]
    serverurl=unix:///var/supervisor/run/supervisor.sock ; use a unix:// URL  for a unix socket
    
    ; 将最底部的files路径换成如下
    [include]
    files = /var/supervisor/conf/*.conf

    /var/supervisor/conf目录里创建一个.conf文件,这里命名为queue_work.conf,内容如下:

    [program:queue_worker] ;项目名称
    directory = /opt/www/tp5.1 ; 程序的启动目录,项目根目录的上一级
    command = php think queue:work --queue queueName --daemon ; 启动命令 queueName就是队列名
    process_name=%(program_name)s_%(process_num)02d
    numprocs = 3         ; 开启的进程数量
    autostart = true     ; 在 supervisord 启动的时候也自动启动
    startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了
    autorestart = true   ; 程序异常退出后自动重启
    startretries = 3     ; 启动失败自动重试次数,默认是 3
    user = root          ; 用哪个用户启动
    redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false
    stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默认 50MB
    stdout_logfile_backups = 20     ; stdout 日志文件备份数
    ; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)
    stdout_logfile = /var/supervisor/log/queue_worker.log
    loglevel=info

    对于index这个单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来:

    command = php think queue:work --queue queueName1,queueName2 --daemon ;

    重启

    # systemctl stop supervisord
    # systemctl start supervisord

    # systemctl restart supervisord

     

  • 相关阅读:
    mysql报错:java.sql.SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone.
    MD5登陆密码的生成
    15. 3Sum、16. 3Sum Closest和18. 4Sum
    11. Container With Most Water
    8. String to Integer (atoi)
    6. ZigZag Conversion
    5. Longest Palindromic Substring
    几种非线性激活函数介绍
    AI初探1
    AI初探
  • 原文地址:https://www.cnblogs.com/we-jack/p/14023333.html
Copyright © 2011-2022 走看看