协程(Coroutine)(二)
1、并发查询
关于协程的使用,经常会用到它来做并发查询,可以用协程+waitgroup+channel的方式来做,也可以用csp并发来做。
1)协程+waitgroup+channel
1 go(function (){ 2 $chan=new Channel(12);//设置通道的容量是12 3 $wait=new WaitGroup(); 4 for ($i=1;$i<=12;$i++){ 5 $wait->add(); 6 go(function ()use($wait,$chan,$i){ 7 co::sleep(rand(1,3)); 8 $chan->push("第{$i}个月的数据!"); 9 $wait->done(); 10 }); 11 } 12 $wait->wait();//挂起当前协程,等待所有任务完成后执行下面代码 13 while(true){ 14 if ($chan->isEmpty()){ 15 break; 16 } 17 $res=$chan->pop(); 18 Logger::getInstance()->info($res.PHP_EOL); 19 } 20 Logger::getInstance()->info("------------------".PHP_EOL); 21 });
运行结果:
2)csp并发
1 public function testCsp1() 2 { 3 $time=microtime(true); 4 $chan=new Channel(); 5 go(function ()use($chan){ 6 $csp = new EasySwooleComponentCsp(); 7 $csp->add('t1',function (){ 8 co::sleep(4); 9 return 't1 result'; 10 }); 11 $csp->add('t2',function (){ 12 co::sleep(3); 13 return 't2 result'; 14 }); 15 $res=$csp->exec(); 16 $chan->push($res); 17 }); 18 $time1=microtime(true)-$time; 19 Logger::getInstance()->info("go代码执行后用时:".$time1."s"); 20 $res=$chan->pop(); 21 $time2=microtime(true)-$time; 22 Logger::getInstance()->info("获取到结果用时:".$time2."s"); 23 Logger::getInstance()->info('res:'.json_encode($res)); 24 }
运行结果:
2、csp使用go和不使用go的区别
上面2)中的csp并发是是在协程容器下执行的。作为比较,下面的代码展示在非协程容器下来执行
1 public function testCsp2() 2 { 3 $time1=microtime(true); 4 $csp = new EasySwooleComponentCsp(); 5 $csp->add('t1',function (){ 6 co::sleep(4); 7 return 't1 result'; 8 }); 9 $csp->add('t2',function (){ 10 co::sleep(3); 11 return 't2 result'; 12 }); 13 $res=$csp->exec(); 14 $time2=microtime(true); 15 $time=$time2-$time1; 16 Logger::getInstance()->info("用时:".$time."s"); 17 Logger::getInstance()->info("res:".json_encode($res)); 18 }
运行结果:
总结
1)可以看出在go()里面执行的代码,是不会阻塞下面的代码运行的,最后$csp->exec()获取到结果才返回。没有在go()里面写的代码,就是正常从上往下执行,执行到$csp->exec()这段代码时,因为是相对比较耗时的任务,要获取到结果才会接着往下执行。
2)协程之间的通信需要使用channel方式,协程中访问外部变量必须使用use关键字,或者传形参方式,不能引用变量。外部要访问协程中的变量需要通过channel的方式。
3)注意:exec方法提供了一个默认参数:超时时间(默认为5s),当调用$csp->exec()后,最长等待5s左右会返回结果。如果你在t2函数中co::sleep(6),那么5s后,返回的数据中不会包含t2函数的返回数据。
3、csp类的源码不多,我们来看看
个人的理解都放到注释里面了,有问题的地方欢迎大家指正。
1 <?php 2 3 namespace EasySwooleComponent; 4 use SwooleCoroutineChannel; 5 use SwooleCoroutine; 6 class Csp 7 { 8 private $chan; //通道channel的实例 9 private $count = 0; //任务总数 10 private $success = 0;//任务成功执行的总数 11 private $task = []; //任务数组 12 13 function __construct(int $size = 8) 14 { 15 $this->chan = new Channel($size); 16 } 17 18 function add($itemName,callable $call):Csp 19 { 20 $this->count = 0; 21 $this->success = 0; 22 //将所有的请求的别名与回调函数以数组的形式存放在私有变量$task中 23 $this->task[$itemName] = $call; 24 return $this; 25 } 26 27 function successNum():int 28 { 29 return $this->success; 30 } 31 //最重要的执行方法 32 function exec(?float $timeout = 5) 33 { 34 if($timeout <= 0){ 35 $timeout = PHP_INT_MAX; 36 } 37 $this->count = count($this->task);//总的任务数 38 //将所有的请求的别名与回调函数进行遍历 39 foreach ($this->task as $key => $call){ 40 //在协程容器中,通过执行回调函数获取结果,调用channel进行通信 41 Coroutine::create(function ()use($key,$call){ 42 $data = call_user_func($call); 43 $this->chan->push([ 44 'key'=>$key, 45 'result'=>$data 46 ]); 47 }); 48 } 49 $result = []; 50 $start = microtime(true); 51 while($this->count > 0) 52 { 53 $temp = $this->chan->pop(1); 54 if(is_array($temp)){ 55 $key = $temp['key']; 56 $result[$key] = $temp['result']; 57 $this->count--; //任务总数减一 58 $this->success++; //任务成功执行的数量加1 59 } 60 //考虑超时的情况,如果某个任务超时,那么其返回的数据就不会被纳入并发查询的最终结果中。 61 if(microtime(true) - $start > $timeout){ 62 break; 63 } 64 } 65 return $result; 66 } 67 }
参考链接:
https://www.easyswoole.com/Cn/Swoole/Coroutine/introduction.html