zoukankan      html  css  js  c++  java
  • php Pthread 多线程 (六) Pool类 线程池

    Pool对象是多个Worker对象的容器,同时也是它们的控制器,对Worker功能更高抽象。
    比如Worker是河,而线程是运行在河里的船。Pool则是管理着多条河。
    <?php
    //继承Collectable垃圾收集类,好让Pool::collect进行收集 
    class Sql extends Collectable {
        private $sql = '';
        private $data = array();
    
        public function __construct($sql) {
            $this->sql = $sql;
        }
    
        public function run() {
            $db = $this->worker->getDb();
            $res = mysql_query($this->sql, $db);
            $tmp = array();
            while($row = mysql_fetch_assoc($res)) {
                //这里不能使用$this->data[] = $row;这种方式。
                $tmp[] = $row;
            }
            $this->data = $tmp;
            //这里工作完后,设置为垃圾
            //在Pool::collect中isGarbage()判断时则为真
            $this->setGarbage();
        }
    
        public function getData() {
            return $this->data;
        }
    }
    
    class SqlWorker extends Worker {
        protected static $db = null;
    
        public function getDb() {
            if(!self::$db) {
                self::$db = mysql_connect('127.0.0.1', 'root', '');
                mysql_select_db('test', self::$db);
            }
            return self::$db;
        }
    }
    
    //这里创建5个Worker对象的Pool线程池
    $pool = new Pool(5, 'SqlWorker');
    
    //我们创建20个Sql线程对象,并提交到Pool的Worker中
    $sqls = array();
    for($ix = 0; $ix < 20; ++$ix) {
        $sql = new Sql("select * from test order by id limit {$ix},1");
        $sqls[] = $sql;
    
        //$pool->submit($sql);不要在这里submit
    }
    
    //注意,这里循环提交$sql
    //如果把$pool->submit放到前面的for循环内,会出现一个错误
    //第一个Sql对象的sql语句会跟最后一个相同,导致结果出现问题
    foreach($sqls as $sql) {
        //这里的submit方法有问题,它会修改$sqls
        //导致第一个Sql对象与最后一个相同
        //不知是不是BUG
        $pool->submit($sql);
    }
    
    //等待队列中执行完成
    $pool->shutdown();
    
    $ret = array();
    foreach($sqls as $sql) {
        $ret[] = $sql->getData();
    }
    file_put_contents('ret.txt', var_export($ret, true));
    
    //回收已完成的对象
    $pool->collect(function($sql){
        return $sql->isGarbage();
    });
    
    array (
      0 => 
      array (
        0 => 
        array (
          'id' => '20',
          'name' => 'mmm',
        ),
      ),
      1 => 
      array (
        0 => 
        array (
          'id' => '2',
          'name' => '222',
        ),
      ),
      2 => 
      array (
        0 => 
        array (
          'id' => '3',
          'name' => '333',
        ),
      ),
      3 => 
      array (
        0 => 
        array (
          'id' => '4',
          'name' => '444',
        ),
      ),
      4 => 
      array (
        0 => 
        array (
          'id' => '5',
          'name' => '555',
        ),
      ),
      5 => 
      array (
        0 => 
        array (
          'id' => '6',
          'name' => '666',
        ),
      ),
      6 => 
      array (
        0 => 
        array (
          'id' => '7',
          'name' => '777',
        ),
      ),
      7 => 
      array (
        0 => 
        array (
          'id' => '8',
          'name' => '888',
        ),
      ),
      8 => 
      array (
        0 => 
        array (
          'id' => '9',
          'name' => '999',
        ),
      ),
      9 => 
      array (
        0 => 
        array (
          'id' => '10',
          'name' => 'aaa',
        ),
      ),
      10 => 
      array (
        0 => 
        array (
          'id' => '11',
          'name' => 'bbb',
        ),
      ),
      11 => 
      array (
        0 => 
        array (
          'id' => '12',
          'name' => 'ccc',
        ),
      ),
      12 => 
      array (
        0 => 
        array (
          'id' => '13',
          'name' => 'ddd',
        ),
      ),
      13 => 
      array (
        0 => 
        array (
          'id' => '14',
          'name' => 'eee',
        ),
      ),
      14 => 
      array (
        0 => 
        array (
          'id' => '15',
          'name' => 'fff',
        ),
      ),
      15 => 
      array (
        0 => 
        array (
          'id' => '16',
          'name' => 'ggg',
        ),
      ),
      16 => 
      array (
        0 => 
        array (
          'id' => '17',
          'name' => 'vvv',
        ),
      ),
      17 => 
      array (
        0 => 
        array (
          'id' => '18',
          'name' => 'hhh',
        ),
      ),
      18 => 
      array (
        0 => 
        array (
          'id' => '19',
          'name' => 'nnn',
        ),
      ),
      19 => 
      array (
        0 => 
        array (
          'id' => '20',
          'name' => 'mmm',
        ),
      ),
    )
    
    从结果可以看出,第一条记录跟最后一条是相同的,再没有pool->submit之前$sqls数组中的对象都是正确的,submit之后第一个对象的数据就改变了,不知道是不是pthreads的BUG。
     
    上述代码我们通过创建一个包含5个SqlWorker对象的pool,然后创建20个Sql对象加入到pool中。
     
    当然我们的Sql类并不一定非要继承自Collectable类,我们也可自定义判断什么时候可回收。
    <?php
    //继承Threaded类,Threaded提供了隐式的线程安全机制
    //这个对象中的所有操作都是线程安全的
    class MyWork extends Threaded {
        private $name = '';
        private $do = false;
        private $data = '';
    
        public function __construct($name) {
            $this->name = $name;
        }
    
        public function run() {
            $this->data = "{$this->name} run... in thread [" . $this->worker->getName() . "] 
    ";
            //通过do来判断是否完成
            //如果为true,则让Pool::collect回收
            $this->do = true;
        }
    
        public function isDo() { 
            return  $this->do;
        }
    
        public function getData() {
            return $this->data;
        }
    }
    
    class MyWorker extends Worker {
        public static $name = 0;
    
        public function __construct() {
            self::$name++;
        }
    
        public function run() {
            
        }
    
        public function getName() {
            return self::$name;
        }
    }
    
    $pool = new Pool(5, 'MyWorker');
    
    $works = array();
    for($ix = 0; $ix < 20; ++$ix) {
        $work = new MyWork($ix);
        $works[] = $work;
    }
    
    foreach($works as $work) {
        $pool->submit($work);
    }
    $pool->shutdown();
    
    foreach($works as $work) {
        echo $work->getData();
    }
    
    //回收已完成的对象
    $pool->collect(function($work){
        //我们通过自定义函数isDo来判断对象是否执行完毕
        return $work->isDo();
    });
    
    执行结果如下:
    19 run... in thread [5]
    1 run... in thread [2]
    2 run... in thread [3]
    3 run... in thread [4]
    4 run... in thread [5]
    5 run... in thread [1]
    6 run... in thread [2]
    7 run... in thread [3]
    8 run... in thread [4]
    9 run... in thread [5]
    10 run... in thread [1]
    11 run... in thread [2]
    12 run... in thread [3]
    13 run... in thread [4]
    14 run... in thread [5]
    15 run... in thread [1]
    16 run... in thread [2]
    17 run... in thread [3]
    18 run... in thread [4]
    19 run... in thread [5]
    
    第一条记录为什么会有问题,前面我已经说过了。这里我们看20个MyWork对象,它们顺序的加入到5个MyWorker对象中,如果第一条记录没有问题的话,它们分别加入到1,2,3,4,5的MyWorker中,然后远行run方法。
     
     
     
  • 相关阅读:
    【转】VS2010中 C++创建DLL图解
    [转]error: 'retainCount' is unavailable: not available in automatic reference counting mode
    [转]关于NSAutoreleasePool' is unavailable: not available in automatic reference counting mode的解决方法
    【转】 Tomcat v7.0 Server at localhost was unable to start within 45
    【转】Server Tomcat v7.0 Server at localhost was unable to start within 45 seconds. If
    【转】SVN管理多个项目版本库
    【转】eclipse安装SVN插件的两种方法
    【转】MYSQL启用日志,和查看日志
    【转】Repository has not been enabled to accept revision propchanges
    【转】SVN库的迁移
  • 原文地址:https://www.cnblogs.com/jkko123/p/6294592.html
Copyright © 2011-2022 走看看