zoukankan      html  css  js  c++  java
  • php-resque 轻量级队列

    一:简介

    github地址:https://github.com/chrisboulton/php-resque

    这个轻量级队列是由 Ruby 开发的 Resque 启发而来的。
    注意:
    1. php-resque需要用到pcntl,所以它全部是在linux下操作的。
    2. 它使用的队列是redis

    二:php-resque中后台角色划分

    实际就是对于这个队列中各个要执行的任务的一个抽象,抽象出几个部分
    1)任务 job
    一个job就是一个需要在后台完成的任务。比如发送邮件,发送短等等。
    在php-resque中就可以把一个job抽象为一个Class


    2)队列 queue
    也就是消息队列,我们把需要执行的任务Job加入到队列里。在php-resque中,队列是由redis实现的。php-resque提供了一个队列管理器,可以实现将job插入, 移除队列的功能


    3)执行者 worker
    就是从队列中取出任务来执行,可以以守护进程的方式运行

    创建上面3个角色的步骤:
    1:将一个后台任务编写为独立的class,这个class就是一个job
    2:在需要后台程序地方把job class的名称和所需要的参数加入队列
    3:开启worker,通过参数执行worker需要处理的队列
    4:worker作为守护进程运行,定时检查队列
    说明:在Resque中,还有一个很重要的设计:一个Worker,可以处理一个队列,也可以处理很多个队列,并且可以通过增加Worker的进程/线程数来加快队列的执行速度

    三:安装

    1)可以用composer安装,十分方便。
    composer require chrisboulton/php-resque

    2)另外一种
    先mkdir phpresque, 进入到目录,
    编写一个 composer.json
    {
    "require":{
    "chrisboulton/php-resque": "1.2.x"
    }
    }

    然后: composer update
    就可以安装好了

    四:使用的命令和方法

    可以参照 chrisboulton/php-resque/demo 下面的文件
    1) 启动守护队列
    QUEUE=default VVERBOSE=1 php demo/resque.php

    QUEUE部分是设置环境变量,我们指定当前的Worker只负责处理default队列。也可以使用QUEUE=* php demo/resque.php 来处理所有队列

    * QUEUE: 需要执行的队列的名字
    * INTERVAL:在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒
    * APP_INCLUDE:需要自动载入 PHP 文件路径,Worker 需要知道你的 Job 的位置并载入 Job
    * COUNT:需要创建的 Worker 的数量。所有的 Worker 都具有相同的属性。默认是创建1个Worker
    * REDIS_BACKEND:Redis 服务器的地址,使用 hostname:port 的格式,如 127.0.0.1:6379,或 localhost:6379。默认是 localhost:6379
    * REDIS_BACKEND_DB:使用的 Redis 数据库的名称,默认是 0
    * VERBOSE:啰嗦模式,设置 1 为启用,会输出基本的调试信息
    * VVERBOSE:设置“1”启用更啰嗦模式,会输出详细的调试信息
    * PREFIX:前缀。在 Redis 数据库中为队列的 KEY 添加前缀,以方便多个 Worker 运行在同一个Redis 数据库中方便区分。默认为空
    * PIDFILE:手动指定 PID 文件的位置,适用于单 Worker 运行方式
    * 以上参数中只有QUEUE是必须的。如果让 Worker 监视执行多个队列,可以用逗号隔开多个队列的名称,如:queue1,queue2,queue3,队列执行是有顺序的,如上 queue2 和 queue3 总是会在 queue1 后面被执行。
    * 也可以设置QUEUE为*让 Worker 以字母顺序执行所有的队列

    2) 结束进程
    kill -QUIT YOUR-WORKER-PID

    * QUIT - 等待子进程结束后再结束
    * TERM / INT - 立即结束子进程并退出
    * USR1 - 立即结束子进程,但不退出
    * USR2 - 暂停Worker,不会再执行新任务
    * CONT - 继续运行Worker

    3)示例一个比较完整命令

    nohup QUEUE=notification VVERBOSE=1 INTERVAL=10 COUNT=5 APP_INCLUDE=/usr/local/html/bin/loader.php REDIS_BACKENT=127.0.0.1:6379 php /usb/local/html/bin/daemo_queue.php >> /var/log/phpresque.log 2>&1 &


    4) 一些其他方法
    在源代码中的demo文件夹下有一个 check_status.php 的文件, 这个可以查看job运行的状态
    命令:php demo/check_status.php 4ee7cefcb7df03ff5475fd450e2a5bea
    输出:
    可以看到输出:
    Tracking status of 10de5352387ba7212bc0d4e8975a3523. Press [break] to stop.
    Status of 4ee7cefcb7df03ff5475fd450e2a5bea is: 1
    Status of 4ee7cefcb7df03ff5475fd450e2a5bea is: 1
    Status of 4ee7cefcb7df03ff5475fd450e2a5bea is: 1

    在Resque中,一个Job有以下4中状态:

    Resque_Job_Status::STATUS_WAITING = 1; (等待)
    Resque_Job_Status::STATUS_RUNNING = 2; (正在执行)
    Resque_Job_Status::STATUS_FAILED = 3; (失败)
    Resque_Job_Status::STATUS_COMPLETE = 4; (结束)

    * 移除Jobs

    #Removes multiple jobs
    Resque::dequeue('default');
    Resque::dequeue('default', ['My_Job']);
    Resque::dequeue('default', ['My_Job' => '087df5819a790ac666c9608e2234b21e']);
    Resque::dequeue('default', ['My_Job' => array('foo' => 1, 'bar' => 2)]);
    Resque::dequeue('default', ['My_Job', 'My_Job2']); 移除多个jobs

    * 消费者可以有三个方法 worker

    public function setUp() {} // .. Set up environment for this job
    public function perform() {} // .. Run job
    public function tearDown() {} // ... Remove environment for this job


    * 生产者

    Resque::setBackend('127.0.0.1:6379');
       $args = array(
       'time' => time(),
       'array' => array(
             'test' => 'test',
       ),
    );
    
    
    $jobId = Resque::enqueue('default', $argv[1], $args, true);

    $argv[1]为调用的类, $args 为参数
    第一个参数表示消息队列的名称(可随意标记,比如 email,log等),
    第二个参数表示取出任务后,由My_Job这个类来处理此条任务

    总结:
    1.开启守护队列 QUEUE=* php resque.php >> /var/log/phpresque.log &
    2.调用job,把job增加到队列里,命令: php queue.php class_Job 参数

    然后守护程序就会执行worker PHP_Job->perform()
    在 tail -f /var/log/phpresque.log 可以看到执行的结果

    五:动手写一个demo实例

    1)先安装php-resque
    mkdir phpresque
    进入到目录,
    编写一个 composer.json 的文件,内容如下

    {
        "require":{
            "chrisboulton/php-resque": "1.2.x"
        }
    }


    然后 compose update 安装

    2)文件目录,如下图

    2.1 新建job,queue,worker3个php文件
    a. 先建立一个 job的文件夹,存放要运行的job
    b. 然后建立一个Queue.php文件,把要运行的任务加入到队列中
    c. 最后是建立Resque.php文件,后台一直运行执行任务的文件,daemon方式运行

    2.2 编写程序
    job目录下的DemoJob.php

    <?php
    class DemoJob {
            public function perform()
            {
                    $id = $this->args['id'];
                    $msg = $this->args['msg'];
                    fwrite(STDOUT, 'DemoJob: id: '.$id.' msg: '.$msg);
            }
    }
    ?>

    Queue.php文件

    <?php
    require_once dirname(__FILE__).'/vendor/chrisboulton/php-resque/lib/Resque.php';
    
    class Queue {
            public static $queue_name;
            public static $job_name;
            public static $args;       //参数
    
            public static function addqueue($queue_name = 'default', $job_name='', $args)
            {
                    if (empty($job_name)) return false;
    
                    self::$queue_name = $queue_name;
                    self::$job_name = $job_name;
                    self::$args = $args;
    
                    date_default_timezone_set("Asia/Shanghai");
                    Resque::setBackend('127.0.0.1:6379');
    
                    $job_id = Resque::enqueue($queue_name, $job_name, $args, true);
                    return $job_id;
            }
    }
    
    $id = 'demo_id:1';
    $msg = 'this is test msg';
    //第一个参数:队列名称 第二个参数:job的类名称 第三个参数:传入的参数
    $job_id = Queue::addqueue('default', 'DemoJob', ['id'=>$id, 'msg'=>$msg]);
    echo $job_id;
    ?>

    Resque.php文件:

    <?php
    //Worker 常驻内存程序
    date_default_timezone_set("Asia/Shanghai");
    
    spl_autoload_register(function($class_name){
            require_once dirname(__FILE__).'/job/'.$class_name.'.php';
    });
    
    require_once dirname(__FILE__).'/vendor/chrisboulton/php-resque/lib/Resque.php';
    require_once dirname(__FILE__).'/vendor/chrisboulton/php-resque/lib/Resque/Worker.php';
    
    $QUEUE = getenv('QUEUE');
    if (empty($QUEUE)) {
            die('set QUEUE env');
    }
    
    Resque::setBackend('127.0.0.1:6379');
    require_once dirname(__FILE__).'/vendor/chrisboulton/php-resque/resque.php';
    echo "Done";
    ?>

    2.3 运行脚本
    a. 首先运行 Resque.php 脚本,便于查看效果,就不以daemon的方式运行了

    [root@centos phpresque]# QUEUE=default /usr/local/php/bin/php Resque.php 
    *** Starting worker centos:3996:default
    DemoJob: id: demo_id:1 msg: this is test msg

    b. 把任务加入到队列里

    [root@centos phpresque]# /usr/local/php/bin/php Queue.php 
    8d9c402d3c621d764242dd8f2335f28e # job id

    a上面马上就会出现
    DemoJob: id: demo_id:1 msg: this is test msg
    的信息
    说明job运行成功了

    参考:
    https://blog.csdn.net/maquealone/article/details/75333349
    https://blog.csdn.net/u012129607/article/details/78560482

  • 相关阅读:
    start with connect by prior 递归查询用法(二)
    start with connect by prior 递归查询用法(一)
    oracle之spool详细使用总结
    关于ETL面试相关
    ETL常用的三种工具介绍及对比Datastage,Informatica和Kettle
    Oracle执行计划详解
    随机函数的使用(DBMS_RANDOM)
    oracle中的替换函数replace和translate函数
    ces
    文章11
  • 原文地址:https://www.cnblogs.com/jiujuan/p/9048554.html
Copyright © 2011-2022 走看看