easyswoole中队列的使用
队列的使用其实在easyswoole官方文档中已经有相关介绍,但是它只给了一个Redis驱动示例,以此来说明队列的使用流程。实际开发中,这个还不能拿来直接用。下面记录一下在实际项目中是如何使用队列。
1、Queue介绍
Easyswoole封装实现了一个轻量级的队列,默认以Redis作为队列驱动器。可以自己实现一个队列驱动来实现用kafka或者启动方式的队列存储。
从上可知,Queue并不是一个单独使用的组件,它更像一个对不同驱动的队列进行统一封装的门面组件。
2、安装
composer require easyswoole/queue
3、使用
队列的使用流程分三个步骤:注册队列驱动器、设置消费进程、生产者投递任务
1)注册队列驱动器
在EasySwooleEvent.php中的mainServeCreate()中注册:
1 public static function mainServerCreate(EventRegister $register) 2 { 3 $confInstance = EasySwooleEasySwooleConfig::getInstance(); 4 5 // 注册热重启 6 $swooleServer = ServerManager::getInstance()->getSwooleServer(); 7 $swooleServer->addProcess((new HotReload('HotReload', ['disableInotify' => false]))->getProcess()); 8 9 static::registerOrmPool($confInstance, 'MYSQL', 80, 160, 'default'); // 注册v1.0的数据库pool 10 static::registerOrmPool($confInstance, 'MYSQL_READ', 80, 160, 'read'); // 注册v1.0的read数据库pool 11 12 static::registerRedisPool($confInstance, 'REDIS', 10, 80, 'redis'); // 注册redis链接池 13 14 //注册队列驱动器 15 $driver = static::registerQueue($confInstance, 'REDIS', 'queue-depart'); 16 DepartmentQueue::getInstance($driver); 17 18 //设置消费进程 19 ServerManager::getInstance()->addProcess(new SyncDepartment()); 20 }
2)设置消费进程
比如这里的:
ServerManager::getInstance()->addProcess(new SyncDepartment());
SyncDepartment类必须继承AbstractProcess类,在重新实现的run()方法中自定义消费的相关业务处理
SyncDepartment.php代码如下:
1 class SyncDepartment extends AbstractProcess 2 { 3 public function run($arg) 4 { 5 // TODO: Implement run() method. 6 go(function () { 7 AppQueueDepartmentQueue::getInstance()->consumer()->listen(function (EasySwooleQueueJob $job) { 8 set_time_limit(0); 9 $data = $job->getJobData(); 10 $access_token = $data['access_token']; 11 $type = $data['type']; 12 DepartmentLogic::getAllDepartmentsFromZw($access_token,$type); 13 }); 14 }); 15 } 16 17 }
3)生产者投递任务
场景举例一:
场景举例二:
关键代码贴上:
1 $job = new Job(); 2 $data = ['access_token' => $access_token,'type'=>1]; 3 $job->setJobData($data); 4 DepartmentQueue::getInstance()->producer()->push($job);
参考链接:https://www.easyswoole.com/Cn/Components/Queue/install.html