zoukankan      html  css  js  c++  java
  • tp5.1 + think-queue + supervisor

    项目用的是 TP5.1 框架,消息队列用的think-queue消息队列,结合 supervisor 进程管理使队列进程常驻。在这里记录一下顺便分享给大家,下面逻辑是加入队列、消费队列和写入数据库。

    一、tp5.1的安装方法
      用 composer 安装最新稳定版本

    composer create-project topthink/think 5.1.*

    二、添加think-queue扩展
    1、composer安装

    composer require topthink/think-queue

    2、配置

    配置文件位于项目根目录下的 config/queue.php,添加如下内容:

        return [
            'connector'  => 'Redis',            // 可选驱动类型:sync(默认)、Redis、database、topthink等其他自定义类型
            'expire'     => 60,                    // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
            'default'    => 'default',            // 默认的队列名称
            'host'       => '127.0.0.1',        // redis 主机ip
            'port'       => 6379,                // redis 端口
            'password'   => '',                 // redis 连接密码
            'select'     => 0,                    // 使用哪一个 db,默认为 db0
            'timeout'    => 0,                    // redis连接的超时时间
            'persistent' => false,                // 是否是长连接
        ]; 

    我配置了redis驱动,大家可以根据自己的情况配置参数。

    3、创建数据库表

      创建一张表,用于展示消费队列写入数据库的操作。

    CREATE TABLE `test` (
      `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
      `data` text,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;

      创建消息队列任务

      以index模块为例,创建一个 /app/index/job/Task.php 文件(job目录也是手动创建,一定要注意的是:TP5.1里面链接数据库一定要用model,否则将会不定期的出现MYSQL链接超时,将会导致supervisor的子进程卡死,自动重启也无效;原因是没能释放连接,也没有找到Db手动释放的方法),代码如下:

    namespace appindexjob;
        
    use thinkqueueJob;
        
    class Task{
            
        public function fire(Job $job,$data){
            //任务失败(重复)3次以上,删除该任务
            if($job->attempts() > 3){
                //TODO
                ....
                //删除
                $job->delete();
            }else{
                 //写入数据库
                $res = db('test')->insert(['data'=>json_encode($data)]);
                if(!$res){
                    $delay = 1;//$delay为延迟时间
                    //重新发布该任务
                    $job->release($delay);
                }else{
                    //成功之后
                    $job->delete();
                }
            }
        }
    }

    5、入队列(生产任务)

      1). 有push()later()两种方法,前者是立即执行,后者是延迟$delay秒后执行。

    $job = 'app\index\job\Task';//调用的任务名
    $data = [];//传入的数据
    $queue = 'group1';//队列名,可以理解为组名
    
    //push()方法是立即执行
    Queue::push($job, $data, $queue);
    
    //later()方法是延迟 $delay 秒之后再执行
    $delay = 10;//延迟时间
    Queue::later($delay, $job, $data, $queue);

      2). 调用later()方法,将该任务分配到group1队列里,延迟10秒执行

    use thinkQueue;
    
    class Index{
        public function index(){
            Queue::later(10,'app\index\job\Task',['arg1'=>1,'arg2'=>2],'group1');
        }
    }

      3). 调用之后,我们通过redis的远程管理工具会发现指定db库的队列里有对应的数据,说明完成了入队列

    1) {"job":"app\index\job\Task","data":{"name":"group1","data":[]},"id":"90BWJfkfgIISzK67cWg99mDxB428GV3A","attempts":1}

    6、出队列(消费任务)

      在项目根目录执行命令

    php think queue:work --queue group1

    三、supervisor的安装和配置

    1、yum安装supervisor

    # yum install epel-release
    # yum install supervisor
    
    //设置成开机自动启动
    # systemctl enable supervisord

    2、配置

    1. 在这里我创建了一个命名为supervisor的目录用于存放supervisor和队列的日志文件以及include的配置文件,其目录结构为:
    /var/supervisor/log/    #可以自定义
                   /run/    #可以自定义
                   /conf/   #可以自定义

      2.然后找到/etc/supervisord.conf配置文件,编辑如下信息:

    ; 将supervisor.sock 的路径换成如下
    [unix_http_server]
    file=/var/supervisor/run/supervisor.sock   ; (the path to the socket file)
    
    ; 将supervisord.log 和 supervisord.pid 的路径换成如下
    [supervisord]
    logfile=/var/supervisor/log/supervisord.log  ; (main log file;default $CWD/supervisord.log)
    pidfile=/var/supervisor/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
    
    ; 将supervisor.sock 的路径换成如下
    [supervisorctl]
    serverurl=unix:///var/supervisor/run/supervisor.sock ; use a unix:// URL  for a unix socket
    
    ; 将最底部的files路径换成如下
    [include]
    files = /var/supervisor/conf/*.conf

      3.在/var/supervisor/conf目录里创建一个.conf文件,这里命名为queue_work.conf,内容如下:

    [program:queue_worker] ;项目名称
    directory = /opt/www/tp5.1 ; 程序的启动目录,项目根目录的上一级
    command = php think queue:work --queue queueName --daemon ; 启动命令 queueName就是队列名
    process_name=%(program_name)s_%(process_num)02d
    numprocs = 3         ; 开启的进程数量
    autostart = true     ; 在 supervisord 启动的时候也自动启动
    startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了
    autorestart = true   ; 程序异常退出后自动重启
    startretries = 3     ; 启动失败自动重试次数,默认是 3
    user = root          ; 用哪个用户启动
    redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false
    stdout_logfile_maxbytes = 50MB  ; stdout 日志文件大小,默认 50MB
    stdout_logfile_backups = 20     ; stdout 日志文件备份数
    ; stdout 日志文件,需要手动创建目录(supervisord 会自动创建日志文件)
    stdout_logfile = /var/supervisor/log/queue_worker.log
    loglevel=info

      对于index这个单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来:

    command = php think queue:work --queue queueName1,queueName2 --daemon ;

    4、重启

    # systemctl stop supervisord
    # systemctl start supervisord

    或者

    # systemctl restart supervisord

    调用方法,成功写入数据库。

    IT成长中的那些事儿
  • 相关阅读:
    ReactNative typescript路径别名配置,vscode不识别@/youpath配置方法
    代码片段:js数组对象排序
    ScreenToGif SharpDx 下载失败问题
    springboot的@CrossOrigin注解解决细粒度的配置跨域
    java 连接Kafka报错java.nio.channels.ClosedChannelExcep
    zookeeper日常报错总结
    Spring Aop、拦截器、过滤器的区别
    搭建zookeeper单机版以及简单命令的使用
    java后台代码发送邮件
    java后台调用http请求
  • 原文地址:https://www.cnblogs.com/life_lt/p/14849334.html
Copyright © 2011-2022 走看看