假如要发100封邮件,for循环100遍,用户直接揭竿而起,什么破网站!
但实际上,我们很可能有超过1万的邮件。怎么处理这个延迟的问题?
答案就是用异步。把“发邮件”这个操作封装,然后后台异步地执行1万遍。这样的话,用户提交网页后,他所等待的时间只是“把发邮件任务请求推送进队列里”的时间。而我们的后台服务将在用户看不见的地方跑。
在实现“异步队列”这点上,有人采用MySQL表或者redis来存放待发送的邮件,然后,每分钟定时读取待发送列表,然后处理。这便是定时异步任务队列。但当前提交的任务要一分钟后才能执行,在某些实时性要求应用场景里还是不快。有些场景要求,只有一提交任务,便马上执行,但用户不需要等待返回结果。
本文将探讨用php扩展swoole实现实时异步任务队列的方案。
服务端
在打算放置脚本的目录(你也可以自行新建)新建Server.php,代码如下
1 <?php 2 3 class Server 4 { 5 private $serv; 6 7 public function __construct() 8 { 9 $this->serv = new swoole_server("0.0.0.0", 9501); 10 $this->serv->set(array( 11 'worker_num' => 1, //一般设置为服务器CPU数的1-4倍 12 'daemonize' => 1, //以守护进程执行 13 'max_request' => 10000, 14 'dispatch_mode' => 2, 15 'task_worker_num' => 8, //task进程的数量 16 "task_ipc_mode " => 3, //使用消息队列通信,并设置为争抢模式 17 //"log_file" => "log/taskqueueu.log" ,//日志 18 )); 19 $this->serv->on('Receive', array($this, 'onReceive')); 20 // bind callback 21 $this->serv->on('Task', array($this, 'onTask')); 22 $this->serv->on('Finish', array($this, 'onFinish')); 23 $this->serv->start(); 24 } 25 26 public function onReceive(swoole_server $serv, $fd, $from_id, $data) 27 { 28 //echo "Get Message From Client {$fd}:{$data} "; 29 // send a task to task worker. 30 $serv->task($data); 31 } 32 33 public function onTask($serv, $task_id, $from_id, $data) 34 { 35 $array = json_decode($data, true); 36 if ($array['url']) { 37 return $this->httpGet($array['url'], $array['param']); 38 } 39 } 40 41 public function onFinish($serv, $task_id, $data) 42 { 43 //echo "Task {$task_id} finish "; 44 //echo "Result: {$data} "; 45 } 46 47 protected function httpGet($url, $data) 48 { 49 if ($data) { 50 $url .= '?' . http_build_query($data); 51 } 52 $curlObj = curl_init(); //初始化curl, 53 curl_setopt($curlObj, CURLOPT_URL, $url); //设置网址 54 curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); //将curl_exec的结果返回 55 curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE); 56 curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE); 57 curl_setopt($curlObj, CURLOPT_HEADER, 0); //是否输出返回头信息 58 $response = curl_exec($curlObj); //执行 59 curl_close($curlObj); //关闭会话 60 return $response; 61 } 62 63 } 64 65 $server = new Server();
客户端
启动服务后,让我们看看如何调用服务。新建测试文件Client_test.php
1 <?php 2 3 class Client 4 { 5 private $client; 6 7 public function __construct() 8 { 9 $this->client = new swoole_client(SWOOLE_SOCK_TCP); 10 } 11 12 public function connect() 13 { 14 if (!$this->client->connect("127.0.0.1", 9501, 1)) { 15 throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode)); 16 } 17 } 18 19 public function send($data) 20 { 21 if ($this->client->isConnected()) { 22 if (!is_string($data)) { 23 $data = json_encode($data); 24 } 25 26 return $this->client->send($data); 27 } else { 28 throw new Exception('Swoole Server does not connected.'); 29 } 30 } 31 32 public function close() 33 { 34 $this->client->close(); 35 } 36 } 37 38 $data = array( 39 "url" => "http://192.168.10.19/send_mail", 40 "param" => array( 41 "username" => 'test', 42 "password" => 'test' 43 ) 44 ); 45 $client = new Client(); 46 $client->connect(); 47 if ($client->send($data)) { 48 echo 'success'; 49 } else { 50 echo 'fail'; 51 } 52 $client->close();
在上面代码中,url即为任务所在地址,param为所需传递参数。
保存好代码,在命令行或者浏览器中执行Client_test.php,便实现了异步任务队列。你所填写的URL,将会在每次异步任务被提交后,以HTTP GET的方式异步执行。