队列适用与多个用户同时执行一个操作,锁适用与单个用户多次执行同一个操作
缺点:
一旦需处理数据加入到任务内就不能删除,如果删除可以使用redis
队列配置:
Tp5.1 config/queue.php 配置文件 ,启动reids 服务
/** * 消息队列配置 * 内置驱动:redis、database、topthink、sync */ return [ //sync驱动表示取消消息队列还原为同步执行 //'connector' => 'Sync', //Redis驱动 'connector' => 'redis', "expire"=>60,//任务过期时间默认为秒,禁用为null "default"=>"default",//默认队列名称 "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址 "port"=>Env::get("redis.port", 6379),//Redis端口 "password"=>Env::get("redis.password", "123456"),//Redis密码 "select"=>5,//Redis数据库索引 "timeout"=>0,//Redis连接超时时间 "persistent"=>false,//是否长连接 //Database驱动 //"connector"=>"Database",//数据库驱动 //"expire"=>60,//任务过期时间,单位为秒,禁用为null //"default"=>"default",//默认队列名称 //"table"=>"jobs",//存储消息的表明,不带前缀 //"dsn"=>[], //Topthink驱动 ThinkPHP内部的队列通知服务平台 //"connector"=>"Topthink", //"token"=>"", //"project_id"=>"", //"protocol"=>"https", //"host"=>"qns.topthink.com", //"port"=>443, //"api_version"=>1, //"max_retries"=>3, //"default"=>"default" ]; 代码示例: /*1.先在控制器里添加下面两个方法*/ namespace appindexcontroller; use thinkController; use thinkQueue; use thinkDb; class Index extends Controller { public function queueTest(){ $data = [ 'order_no' =>rand(100000,999999), ]; $this->add($data['order_no']); // 一次执行一个任务 // 类的命名空间类名 // 类的命名空间类名@方法 $res =Queue::push('appqueuecontrollerExecjob', $data, $queue = null); // 创建发布任务 var_dump($res); // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false } public function add($orderNo){ $data =[ 'order_no'=>$orderNo, 'msg'=>$orderNo, 'create_time'=>date('Y-m-d H:i:s'), ]; Db::name('tp5_test')->insert($data); } } /*2.在 app/queue/controller/Execjob 创建文件,要执行的任务内容*/ namespace appqueuecontroller; use thinkController; use thinkfacadeCache; use thinkqueueJob; class Execjob extends Controller { public function fire(Job $job, $data) { //....这里执行具体的任务 if($this->jobDone($data)) { $job->delete(); print("<info>Hello Job has been done and deleted"."</info> "); }else{ $job->release(3); //$delay为延迟时间 } if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 } //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法 // $job->delete(); // 也可以重新发布这个任务 // $job->release($delay); //$delay为延迟时间 } public function failed($data) { // ...任务达到最大重试次数后,失败了 } public function jobDone($data) { print("<info>Job is Done status!"."</info> "); return Db::name('tp5_test')->where('order_no',$data['order_no'])->update(['status'=>2]); } }
监听队列
写完之后再控制器切换到当前目录下执行,任务的消费与删除
// --queue 队列名称,不指定监听所有队列 php think queue:listen --queue send_mail
测试访问页面
http://tp.com/pub/test/index,任务的推送
<?php namespace apppubcontroller; use appqueuecontrollerJobqueue; use thinkController; class Test extends Controller { public function index(){ $job_queue=new Jobqueue(); $job_queue->index('send_mail',['1938450598@qq.com','sa','测试']); } }
队列执行完成后,处于等待下次调用
队列流程:创建 ->监听-> 推送 -> 消费 -> 删除
相关文章:tp5.1_队列queue学习