swoole library提供的WaitGroup
开始之前请各位查看swoole文档,先熟悉下channel特性
使用非常简单,官方例子一步到位。直接上代码了
# WaitGroup
<?php
declare(strict_types=1);
namespace SwooleCoroutine;
use BadMethodCallException;
use InvalidArgumentException;
class WaitGroup
{
protected $chan;
protected $count = 0;
protected $waiting = false;
public function __construct(int $delta = 0)
{
// 创建容量为1的通道 因为此通道只是用来唤醒wait方法中的消费者 没必要增加容量
$this->chan = new Channel(1);
if ($delta > 0) {
$this->add($delta);
}
}
public function add(int $delta = 1): void
{
// 保证wait后不能够继续add
// 为尽量规避这个问题 应该在子任务协程外面使用add方法 保证add和wait的串行执行
if ($this->waiting) {
throw new BadMethodCallException('WaitGroup misuse: add called concurrently with wait');
}
// 记录任务数
$count = $this->count + $delta;
if ($count < 0) {
throw new InvalidArgumentException('WaitGroup misuse: negative counter');
}
$this->count = $count;
}
public function done(): void
{
// 子任务调用一次done count减1
$count = $this->count - 1;
if ($count < 0) {
throw new BadMethodCallException('WaitGroup misuse: negative counter');
}
$this->count = $count;
// 当全部任务执行完成的时候 唤醒消费者 主协程继续向下执行
if ($count === 0 && $this->waiting) {
$this->chan->push(true);
}
}
public function wait(float $timeout = -1): bool
{
// wait未返回时 不能再次wait
if ($this->waiting) {
throw new BadMethodCallException('WaitGroup misuse: reused before previous wait has returned');
}
// 如果没add子任务 那么直接返回true
// 如果有任务成功进入wait并且将waiting标志置为true
if ($this->count > 0) {
$this->waiting = true;
// 无超时pop 一旦有生产者push数据 就立即返回 结束阻塞
// 这里的生产者就是最后一个调用done方法的子任务
$done = $this->chan->pop($timeout);
$this->waiting = false;
return $done;
}
return true;
}
}
发现错误欢迎指正,感谢!!!