zoukankan      html  css  js  c++  java
  • php-resque 队列简单使用

    一、安装 php-resque

    进入项目根目录,composer 安装 php-resque

    composer require chrisboulton/php-resque

    二、常用方法

    1、连接 redis

    // setBackend($server, $database = 0)
    Resque::setBackend('127.0.0.1:6379');

    2、向队列中添加工作

    // enqueue($queue, $class, $args = null, $trackStatus = false)
    $token = Resque::enqueue('default', 'My_Job', ['name'=>'test'], true);

    3、查看工作状态

    $status = (new Resque_Job_Status($token))->get();

    4、停止(移除)工作

    (new Resque_Job_Status($token))->stop();

    三、常驻任务处理队列(示例:worker.php)

    // 处理 default 队列;也可以填 *,代表所有队列
    $worker = new Resque_Worker('default');
    // LOG_NONE 不写日志, LOG_NORMAL 普通,LOG_VERBOSE 详细
    $worker->logLevel = Resque_Worker::LOG_VERBOSE;
    // 队列处理时间间隔,单位:秒
    $worker->work(5);

    注:worker.php 要以命令行的方法执行,并长驻后台,/usr/local/php/bin/php /xxx/xxx/worker.php

    四、处理工作的类

    class My_Job
    {
        /**
         * 前置操作
         * @return void
         */
        public function setUp()
        {
            // ... Set up environment for this job
        }
    
        /**
         * 消费队列
         * @return void
         */
        public function perform()
        {
            // execute a job
        }
    
        /**
         * 后置操作
         * @return void
         */
        public function tearDown()
        {
            // ... Remove environment for this job
        }
    }

    五、完整例子

    test.php

    <?php
    require 'vendor/autoload.php';
    
    /**
     * php-resque 队列服务
     */
    class Queue
    {
        private $jobStatusObject = [];
    
        function __construct($server, $database = 0)
        {
            // 连接 redis
            Resque::setBackend($server);
        }
    
        private function getJobStatusObject($token)
        {
            if (!isset($this->jobStatusObject[$token])) {
                $this->jobStatusObject[$token] = new Resque_Job_Status($token);
            }
            
            return $this->jobStatusObject[$token];
        }
    
        /**
         * 入队并返回 token
         * @param  array  $args 参数
         * @return string
         */
        public function enqueue($args = [])
        {
            // 队列名称、指定消费类、参数
            return Resque::enqueue('default', 'My_Job', $args, true);
        }
    
        /**
         * 查询 job 的状态
         * @param  string $token 任务token
         * @return array         状态信息
         */
        public function getJobStatus($token)
        {
            $status = $this->getJobStatusObject($token)->get();
            $statusOptions = [
                1 => 'STATUS_WAITING',
                2 => 'STATUS_RUNNING',
                3 => 'STATUS_FAILED',
                4 => 'STATUS_COMPLETE'
            ];
            return isset($statusOptions[$status])? $statusOptions[$status]:$status;
        }
    
        /**
         * 清除 job
         * @param  string $token 任务token
         * @return boolean         状态信息
         */
        public function clearJob($token)
        {
            if ($this->getJobStatusObject($token) == 'STATUS_COMPLETE')
            {
                return $this->getJobStatusObject($token)->stop();
            }
            else
            {
                error_log("非已完成任务,不可清除。token: {$token}
    ", 3, 'logs.txt');
                return false;
            }
        }
    }
    
    empty($_GET) && die('url参数或数据不能为空!!');
    $token = isset($_GET['token'])? trim($_GET['token']):'';
    $act   = isset($_GET['act'])? trim($_GET['act']):'';
    
    $queue = new Queue('127.0.0.1:6379');
    if (empty($act))
    {
        // 任务入队
        $token = $queue->enqueue($_GET);
        error_log("token: {$token}
    ", 3, 'logs.txt');
        echo 'token: ', $token, '<hr>';
        exit;
    }
    elseif ($act == 'status' && !empty($token))
    {
        // 查看任务状态
        var_dump($queue->getJobStatus($token));
        exit;
    }
    elseif ($act == 'del' && !empty($token))
    {
        // 移除任务
        var_dump($queue->clearJob($token));
        exit;
    }
    ?>

    测试url:

    http://localhost/redis/queue/test.php?name=test
    // token: 37efa5889472ef04108a55ba8cc448e4
    
    http://localhost/redis/queue/test.php?act=status&token=37efa5889472ef04108a55ba8cc448e4
    // string(14) "STATUS_WAITING"
    
    http://localhost/redis/queue/test.php?act=del&token=37efa5889472ef04108a55ba8cc448e4
    // bool(false)

    worker.php

    <?php
    // 消费(处理)队列
    
    require 'vendor/autoload.php';
    
    /**
     * 任务
     */
    class My_Job
    {
        /**
         * 前置操作
         * @return void
         */
        public function setUp()
        {
            // ... Set up environment for this job
        }
    
        /**
         * 消费队列
         * @return void
         */
        public function perform()
        {
            // 按自己的业务处理 job
            error_log(var_export($this->args, true)."
    
    ", 3, 'logs.txt');
            
            // 抛出错误则表示工作处理失败,否则工作状态将更新为完成(STATUS_COMPLETE)
            // 注:return false 不管用,一定要 throw Exception 才行
            // throw new Exception('Unable to run this job!');
        }
    
        /**
         * 后置操作
         * @return void
         */
        public function tearDown()
        {
            // ... Remove environment for this job
        }
    }
    
    // 处理 default 队列;也可以填 *,代表所有队列
    $worker = new Resque_Worker('default');
    // LOG_NONE 不写日志, LOG_NORMAL 普通,LOG_VERBOSE 详细
    $worker->logLevel = Resque_Worker::LOG_VERBOSE;
    // 队列处理时间间隔,单位:秒
    $worker->work(5);
    ?>
  • 相关阅读:
    【转】 cin、cin.get()、cin.getline()、getline()、gets()等函数的用法
    HDU How many prime numbers
    《大学ACM的总结 》(转载)
    POJ 最小公倍数
    HDU 开门人和关门人
    HDU shǎ崽 OrOrOrOrz
    HDU Saving HDU 2111
    HDU 1106 排序
    strtok函数()
    HDU 2187汶川地震
  • 原文地址:https://www.cnblogs.com/tujia/p/10932199.html
Copyright © 2011-2022 走看看