zoukankan      html  css  js  c++  java
  • 协程(Coroutine)(二)

    协程(Coroutine)(二)

    1、并发查询

         关于协程的使用,经常会用到它来做并发查询,可以用协程+waitgroup+channel的方式来做,也可以用csp并发来做。

         1)协程+waitgroup+channel

     1 go(function (){
     2        $chan=new Channel(12);//设置通道的容量是12
     3        $wait=new WaitGroup();
     4        for ($i=1;$i<=12;$i++){
     5             $wait->add();
     6             go(function ()use($wait,$chan,$i){
     7                  co::sleep(rand(1,3));
     8                   $chan->push("第{$i}个月的数据!");
     9                   $wait->done();
    10              });
    11        }
    12        $wait->wait();//挂起当前协程,等待所有任务完成后执行下面代码
    13        while(true){
    14             if ($chan->isEmpty()){
    15                  break;
    16              }
    17              $res=$chan->pop();
    18              Logger::getInstance()->info($res.PHP_EOL);
    19        }
    20        Logger::getInstance()->info("------------------".PHP_EOL);
    21  });

        运行结果:

          2)csp并发

     1 public function testCsp1()
     2 {
     3        $time=microtime(true);
     4        $chan=new Channel();
     5        go(function ()use($chan){
     6             $csp = new EasySwooleComponentCsp();
     7             $csp->add('t1',function (){
     8                 co::sleep(4);
     9                 return 't1 result';
    10             });
    11             $csp->add('t2',function (){
    12                 co::sleep(3);
    13                 return 't2 result';
    14             });
    15             $res=$csp->exec();
    16             $chan->push($res);
    17         });
    18         $time1=microtime(true)-$time;
    19         Logger::getInstance()->info("go代码执行后用时:".$time1."s");
    20         $res=$chan->pop();
    21         $time2=microtime(true)-$time;
    22         Logger::getInstance()->info("获取到结果用时:".$time2."s");
    23         Logger::getInstance()->info('res:'.json_encode($res));
    24 }

           运行结果:

    2、csp使用go和不使用go的区别

        上面2)中的csp并发是是在协程容器下执行的。作为比较,下面的代码展示在非协程容器下来执行

     1 public function testCsp2()
     2 {
     3         $time1=microtime(true);
     4         $csp = new EasySwooleComponentCsp();
     5         $csp->add('t1',function (){
     6             co::sleep(4);
     7             return 't1 result';
     8         });
     9         $csp->add('t2',function (){
    10             co::sleep(3);
    11              return 't2 result';
    12         });
    13         $res=$csp->exec();
    14         $time2=microtime(true);
    15         $time=$time2-$time1;
    16         Logger::getInstance()->info("用时:".$time."s");
    17         Logger::getInstance()->info("res:".json_encode($res));
    18 }

        运行结果:

        总结

        1)可以看出在go()里面执行的代码,是不会阻塞下面的代码运行的,最后$csp->exec()获取到结果才返回。没有在go()里面写的代码,就是正常从上往下执行,执行到$csp->exec()这段代码时,因为是相对比较耗时的任务,要获取到结果才会接着往下执行。

        2)协程之间的通信需要使用channel方式,协程中访问外部变量必须使用use关键字,或者传形参方式,不能引用变量。外部要访问协程中的变量需要通过channel的方式。

        3)注意:exec方法提供了一个默认参数:超时时间(默认为5s),当调用$csp->exec()后,最长等待5s左右会返回结果。如果你在t2函数中co::sleep(6),那么5s后,返回的数据中不会包含t2函数的返回数据。

    3、csp类的源码不多,我们来看看

         个人的理解都放到注释里面了,有问题的地方欢迎大家指正。

     1 <?php
     2 
     3 namespace EasySwooleComponent;
     4 use SwooleCoroutineChannel;
     5 use SwooleCoroutine;
     6 class Csp
     7 {
     8     private $chan;       //通道channel的实例
     9     private $count = 0;  //任务总数
    10     private $success = 0;//任务成功执行的总数
    11     private $task = [];  //任务数组
    12 
    13     function __construct(int $size = 8)
    14     {
    15         $this->chan = new Channel($size);
    16     }
    17 
    18     function add($itemName,callable $call):Csp
    19     {
    20         $this->count = 0;
    21         $this->success = 0;
    22        //将所有的请求的别名与回调函数以数组的形式存放在私有变量$task中
    23         $this->task[$itemName] = $call;
    24         return $this;
    25     }
    26 
    27     function successNum():int
    28     {
    29         return $this->success;
    30     }
    31     //最重要的执行方法
    32     function exec(?float $timeout = 5)
    33     {
    34         if($timeout <= 0){
    35             $timeout = PHP_INT_MAX;
    36         }
    37         $this->count = count($this->task);//总的任务数
    38         //将所有的请求的别名与回调函数进行遍历
    39         foreach ($this->task as $key => $call){
    40             //在协程容器中,通过执行回调函数获取结果,调用channel进行通信
    41             Coroutine::create(function ()use($key,$call){
    42                 $data = call_user_func($call);
    43                 $this->chan->push([
    44                     'key'=>$key,
    45                     'result'=>$data
    46                 ]);
    47             });
    48         }
    49         $result = [];
    50         $start = microtime(true);
    51         while($this->count > 0)
    52         {
    53             $temp = $this->chan->pop(1);
    54             if(is_array($temp)){
    55                 $key = $temp['key'];
    56                 $result[$key] = $temp['result'];
    57                 $this->count--;   //任务总数减一
    58                 $this->success++; //任务成功执行的数量加1
    59             }
    60             //考虑超时的情况,如果某个任务超时,那么其返回的数据就不会被纳入并发查询的最终结果中。
    61             if(microtime(true) - $start > $timeout){
    62                 break;
    63             }
    64         }
    65         return $result;
    66     }
    67 }

    参考链接:

    https://www.easyswoole.com/Cn/Swoole/Coroutine/introduction.html

  • 相关阅读:
    已安装 SQL Server 2005 Express 工具。若要继续,请删除 SQL Server 2005 Express 工具
    超时时间已到。超时时间已到,但是尚未从池中获取连接。出现这种情况可能是因为所有池连接均在使用,并且达到了最大池大小。
    C#微信公众号开发 -- (七)自定义菜单事件之VIEW及网页(OAuth2.0)授权
    C#微信公众号开发 -- (六)自定义菜单事件之CLICK
    C#微信公众号开发 -- (五)自定义菜单创建
    C#微信公众号开发 -- (四)获取API调用所需的全局唯一票据access_token
    C#微信公众号开发 -- (三)用户关注之后自动回复
    C#微信公众号开发 -- (二)验证成为开发者
    linux下删除大量文件提示参数过长解决办法
    Linux 远程连接sftp与ftp
  • 原文地址:https://www.cnblogs.com/hld123/p/13331403.html
Copyright © 2011-2022 走看看