zoukankan      html  css  js  c++  java
  • Easyswoole的WaitGroup和Csp组件的分析和使用

    Easyswoole的WaitGroup和Csp组件的分析和使用

    easyswoole可真是个好名字,只是提供了恰到好处的封装,即使是源码也保持了这样的风格。这种风格不论好坏可能都需要各位适应下,哈哈。下面一起来感受下es中的实现吧。

    -waitgroup在easyswoole中的实现和使用

    -csp在easyswoole中的实现和使用

    waitgroup

    分析
    <?php
    
    namespace EasySwooleComponent;
    
    use SwooleCoroutineChannel;
    
    class WaitGroup
    {
        private $count = 0;
        /** @var Channel  */
        private $channel;
        private $success = 0;
        private $size;
    
        public function __construct(int $size = 128)
        {
            $this->size = $size;
            // 主要的初始化功能在reset中 提供了一个传递任务执行情况的通道
            // reset方法 还允许在当前协程多次使用同一个waitgroup变量
            $this->reset();
        }
    
        public function add()
        {   
            // wg中子任务的计数器
            $this->count++;
        }
    
        function successNum(): int
        {
            return $this->success;
        }
    
        public function done()
        {   
            // 当子协程任务执行完毕时 向管道中push了一个int 1
            $this->channel->push(1);
        }
    
        // 最主要的逻辑在这里
        // 其中的while循环 相当于在主协程中阻塞 直到全部的任务执行完成或者超时
        public function wait(?float $timeout = 15)
        {   
            if ($timeout <= 0) {
                $timeout = PHP_INT_MAX;
            }
            $this->success = 0;
            $left = $timeout;
            // $this->count是还要等待完成的子协程数量
            // $left是超时时间 即主协程继续向下执行还需要等待的时间(其实并不存在主子协程的概念 这里只是为了好区分)
            // 当add的任务全部执行完成 或者 超过了设置的timeout 此wait函数执行完毕 程序继续向下执行
            while (($this->count > 0) && ($left > 0)) {
                $start = round(microtime(true), 3);
                if ($this->channel->pop($left) === 1) {
                    // 每从通道pop一次就给还要执行的任务数量-1
                    $this->count--;
                    // 每从通道pop一次就给成功数量+1
                    $this->success++;
                }
                // 每循环一次更新一次还要wait的时间
                $left = $left - (round(microtime(true), 3) - $start);
            }
        }
    
        function reset()
        {
            $this->close();
            $this->count = 0;
            $this->success = 0;
            $this->channel = new Channel($this->size);
        }
    
        function close()
        {
            if ($this->channel) {
                $this->channel->close();
                $this->channel = null;
            }
        }
    
        function __destruct()
        {
            $this->close();
        }
    }
    
    
    使用
    # 确保开起了swoole的协程hook
    $wg = new WaitGroup();
    $wg->add();
    go(function () use ($wg) {
        defer(function () use ($wg) {
            $wg->done();
        });
        sleep(1); # 子协程使用sleep模拟阻塞
        var_dump("task1 done");
    });
    $wg->add();
    go(function () use ($wg) {
        defer(function () use ($wg) {
            $wg->done();
        });
        sleep(5); # 子协程使用sleep模拟阻塞
        var_dump('task2 done');
    });
    # 调用了wait方法 主协程就阻塞在wait方法中的while循环上 直到全部的子协程执行完毕或者超时才执行后面的代码
    $wg->wait(); # 可以尝试在wait中传递较小的超时时间 检查效果
    var_dump('main task done');
    
    # 给出swoole协程切换的条件
    1 协程中的代码遇到阻塞 如上的sleep
    2 协程主动yield让出执行权
    3 cpu计算超时 主动让出执行权(详细配置请查看swoole官方文档)
    其他的切换条件请大家补充
    

    csp

    分析
    <?php
    
    namespace EasySwooleComponent;
    
    use SwooleCoroutineChannel;
    use SwooleCoroutine;
    
    class Csp
    {
        private $chan;
        private $count = 0;
        private $success = 0;
        private $task = [];
    
        // 初始化channel
        function __construct(int $size = 8)
        {
            $this->chan = new Channel($size);
        }
    
        // 添加要并发执行的任务
        function add($itemName, callable $call): Csp
        {
            $this->count = 0;
            $this->success = 0;
            $this->task[$itemName] = $call;
            return $this;
        }
    
        function successNum(): int
        {
            return $this->success;
        }
    
        // 阻塞执行添加的任务 直到全部执行完毕 或者 超时
        function exec(?float $timeout = 5)
        {
            if ($timeout <= 0) {
                $timeout = PHP_INT_MAX;
            }
            $this->count = count($this->task);
            // 创建协程并发执行add进来的任务
            foreach ($this->task as $key => $call) {
                Coroutine::create(function () use ($key, $call) {
                    // 调用add进来的闭包
                    $data = call_user_func($call);
                    // 将执行结果送入通道
                    $this->chan->push([
                        'key' => $key,
                        'result' => $data
                    ]);
                });
            }
            $result = [];
            $start = microtime(true);
            // 同样是通过while循环 将代码阻塞在exec方法上
            while ($this->count > 0) {
                // 超时返回false
                $temp = $this->chan->pop(1);
                if (is_array($temp)) {
                    $key = $temp['key'];
                    $result[$key] = $temp['result'];
                    $this->count--;
                    $this->success++;
                }
                // 超时就结束循环 代码向下执行
                if (microtime(true) - $start > $timeout) {
                    break;
                }
            }
            return $result;
        }
    }
    
    使用
    # 执行发现并没有先打印主协程中的 main task done,这是因为Csp的exec方法会阻塞当前协程
    
    $csp = new Csp();
    $csp->add('task1', function () {
        return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
    });
    $csp->add('task2', function () {
        return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/waitGroup.html");
    });
    $res = $csp->exec();
    var_dump($res);
    go(function () {
        var_dump("main task done");
    });
    
    
    # 如此使用触发了协程调度 导致先输出了主协程的main task done
        
    go(function () {
        $csp = new Csp();
        $csp->add('task1', function () {
            return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
        });
        $csp->add('task2', function () {
            return file_get_contents("https://wiki.swoole.com/#/start/start_tcp_servers");
        });
        $res = $csp->exec();
        var_dump($res);
    });
    var_dump('main task done');
    
    
    # 有时候可能需要将csp放到自行创建的协程容器中执行 可以尝试下面配合WaitGroup的方式
    $wg = new WaitGroup();
    $wg->add();
    go(function () use ($wg) {
        defer(function () use ($wg) {
            $wg->done();
        });
        $csp = new Csp();
        $csp->add('task1', function () {
            return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
        });
        $csp->add('task2', function () {
            return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/waitGroup.html");
        });
        $res = $csp->exec();
        var_dump($res);
    });
    $wg->wait();
    // go(function(){
    //     var_dump('main task done');
    // });
    var_dump('main task done');   
    
    请勿将以上的代码搬运到go中,因为golang在语言层面完全支持了协程调度,而swoole的协程不仅需要在协程容器中使用,并且实现协程的切换也需要相应的条件。

    今天就说到这吧,发现错误欢迎指正,感谢!!!

  • 相关阅读:
    css切图Sprites
    javascript改变position值实现菜单滚动至顶部后固定
    Cannot create type class java.io.File from value
    关于如何拍摄瓷器(转)
    Struts2的jar问题
    vim的基本操作
    Flask基础
    Flask入门
    MongoDB 之 数据类型
    基于DBUtils实现数据库连接池
  • 原文地址:https://www.cnblogs.com/alwayslinger/p/13898935.html
Copyright © 2011-2022 走看看