zoukankan      html  css  js  c++  java
  • pthreads v3下的worker和pool的使用

    有些人会想,明明用thread已经可以很好的工作了,为什么还要搞个worker和pool?

    之所以要用到worker和pool还是因为效率,因为系统创建一个新线程代价是比较昂贵,每个创建的线程会复制当前执行的整个上下文。

    尽可能的重用线程可以让我们的程序更高效。

    一个简单的worker例子:

    <?php
    
    //创建自定义work类,给work取个名字,方便查看
    class Work extends Worker
    {
        private $name;
    
        public function __construct($name)
        {
            $this->name = $name;
        }
    
        public function getName()
        {
            return $this->name;
        }
    }
    
    class Task extends Thread
    {
        private $num;
    
        public function __construct($num)
        {
            $this->num = $num;
        }
    
        public function run()
        {
            //计算累加和
            $total = 0;
            for ($i = 0; $i < $this->num; $i++) {
                $total += $i;
            }
            echo "work : {$this->worker->getName()} task : {$total} 
    ";
            sleep(1);
        }
    }
    
    //创建一个worker线程
    $work = new Work('a');
    
    $work->start();
    
    for ($i = 1; $i <= 10; $i++) {
        //将Task对象压栈到worker线程中
        //这个时候Task对象就可以使用worker线程上下文(变量,函数等)
        $work->stack(new Task($i));
    }
    
    //循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
    while ($work->collect()) ;
    
    //关闭worker
    $work->shutdown();

    上面代码在运行的时候,计算结果会每隔一秒出来一条,也就是10个task对象是运行在1个worker线程上的。

    如果10个task对象是分别在独立空间运行的,sleep()函数就不会起作用,他们各自sleep并不会影响其他线程。

    把上面的代码修改一下:

    <?php
    
    //创建自定义work类,给work取个名字,方便查看
    class Work extends Worker
    {
        private $name;
    
        public function __construct($name)
        {
            $this->name = $name;
        }
    
        public function getName()
        {
            return $this->name;
        }
    }
    
    class Task extends Thread
    {
        private $num;
    
        public function __construct($num)
        {
            $this->num = $num;
        }
    
        public function run()
        {
            //计算累加和
            $total = 0;
            for ($i = 0; $i < $this->num; $i++) {
                $total += $i;
            }
            echo "work : {$this->worker->getName()} task : {$total} 
    ";
            sleep(1);
        }
    }
    
    //创建二个worker线程
    $work1 = new Work('a');
    $work2 = new Work('b');
    
    $work1->start();
    $work2->start();
    
    for ($i = 1; $i <= 10; $i++) {
        if ($i <= 5) {
            $work1->stack(new Task($i));
        } else {
            $work2->stack(new Task($i));
        }
    }
    
    //循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
    while ($work1->collect() || $work2->collect()) ;
    
    //关闭worker
    $work1->shutdown();
    $work2->shutdown();
    

    这里我们创建2个worker线程,让10个task对象分别压栈到2个worker中。

    这时可以看到,计算结果是一对一对的出来,说明10个task对象跑在了2个worker线程上。

    至于需要创建多少个worker线程,和多少个task对象,就看自已的需求了。

    worker还有一个好处就是可以重用worker中的对象和方法。我们可以在worker中创建一个连接数据库对象,方便各task调用。

    <?php
    
    class DB extends Worker
    {
        //注意这里设置为静态成员,pdo连接本身是不能在上下文中共享的
        //声明为静态成员,让每个worker有自已的pdo连接
        private static $db = null;
        public $msg = 'i from db';
    
        public function run()
        {
            self::$db = new PDO('mysql:host=192.168.33.226;port=3306;dbname=test;charset=utf8', 'root', '');
        }
    
        public function getDb()
        {
            return self::$db;
        }
    }
    
    class Task extends Thread
    {
        private $id;
        //注意,这里不要给成员设置默认值,$result成员是线程对象是不可变的,不能被改写
        private $result;
    
        public function __construct($id)
        {
            $this->id = $id;
        }
    
        public function run()
        {
            //获取worker中的数据库连接
            $db = $this->worker->getDb();
            $ret = $db->query("select * from tb_user where id = {$this->id}");
            $this->result = $ret->fetch(PDO::FETCH_ASSOC);
            //访问worker中的成员变量msg
            echo "data : {$this->result['id']} {$this->result['name']} 	 worker data : {$this->worker->msg} 
    ";
        }
    }
    
    //创建一个worker线程
    $work = new DB();
    
    $work->start();
    
    for ($i = 1; $i <= 5; $i++) {
        $work->stack(new Task($i));
    }
    
    //循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
    while ($work->collect()) ;
    
    //关闭worker
    $work->shutdown();
    

    tb_user表大家可以随意创建,我这里为了演示只创建了id和name字段

    运行结果如下:

    如果说worker是对线程的重用,那么pool就是对worker更高的抽象了,可以同时管理多个worker。

    <?php
    
    //之所以要创建一个Id线程类,主要是为了给work取个不同的ID,方便查看,哪些task线程属于哪个work中
    class Id extends Thread
    {
        private $id;
    
        public function getId()
        {
            //防止出现id混乱,这里使用同步操作
            $this->synchronized(function () {
                ++$this->id;
            });
            return $this->id;
        }
    }
    
    class Work extends Worker
    {
        private $id;
    
        public function __construct(Id $obj)
        {
            $this->id = $obj->getId();
        }
    
        public function getId()
        {
            return $this->id;
        }
    }
    
    class Task extends Thread
    {
        private $num = 0;
    
        public function __construct($num)
        {
            $this->num = $num;
        }
    
        //计算累加和
        public function run()
        {
            $total = 0;
            for ($i = 0; $i < $this->num; $i++) {
                $total += $i;
            }
            echo "work id : {$this->worker->getId()} task : {$total} 
    ";
        }
    }
    
    //创建pool,可容纳3个work对象
    $pool = new Pool(3, 'Work', [new Id()]);
    
    //循环的把20个task线程提交到pool中的work对象上运行
    for ($i = 1; $i <= 20; $i++) {
        $pool->submit(new Task($i));
    }
    
    //循环的清理任务,会阻塞主线程,直到任务都执行完毕
    while ($pool->collect()) ;
    
    //关闭pool
    $pool->shutdown();
    

    运行结果如下:

  • 相关阅读:
    原生AJAX基础讲解及兼容处理
    JS子元素oumouseover触发父元素onmouseout
    IE6常见bug
    让IE6支持position:fixed的方法,CSS expression与JavaScript eval讲解
    Alpha通道
    网络游戏开发前的pixel像素画习作
    网络游戏开发其一(游戏美工)
    周内琐记
    地图重置与各项绘图优化
    四足机器人搭建尝试
  • 原文地址:https://www.cnblogs.com/jkko123/p/8876783.html
Copyright © 2011-2022 走看看