zoukankan      html  css  js  c++  java
  • php Pthread 多线程

    原文链接:https://www.cnblogs.com/jkko123/tag/php%E7%9A%84%E5%A4%9A%E7%BA%BF%E7%A8%8BPthread/

    线程,有时称为轻量级进程,是程序执行的最小单元。线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,它与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。每一个程序都至少有一个线程,那就是程序本身,通常称为主线程。线程是程序中一个单一的顺序控制流程。 在单个程序中同时运行多个线程完成不同的工作,称为多线程。

    一、基本试用

    <?php
     
    //实现多线程必须继承Thread类
    class test extends Thread {
        public function __construct($arg){
            $this->arg = $arg;
        }
     
        //当调用start方法时,该对象的run方法中的代码将在独立线程中异步执行。
        public function run(){
            if($this->arg){
                printf("Hello %s
    ", $this->arg);
            }
        }
    }
    $thread = new test("World");
     
    if($thread->start()) {
        //join方法的作用是让当前主线程等待该线程执行完毕
        //确认被join的线程执行结束,和线程执行顺序没关系。
        //也就是当主线程需要子线程的处理结果,主线程需要等待子线程执行完毕
        //拿到子线程的结果,然后处理后续代码。
        $thread->join();
    }
    ?>

    二、Worker和Threaded

    Worker对象与Threaded对象的关系有点像是,船在河中运行,一条河里有很多条船,而河也不止一条。不同的船运行在特定的环境下,比如大吨位的船是无法运行在河床浅的河中。船是可以随时变化的,而河的环境确相对持久。我们通过stack方法把对象加入到Worker中,在对象的run方法中通过对worker的访问来获取信息。

    <?php
    //Worker是具有持久化上下文(执行环境)的线程对象
    //Worker对象start()后,会执行run()方法,run()方法执行完毕,线程也不会消亡
    class MySqlWorker extends Worker {
        private $name = '';
        private $db = null;
     
        public function __construct($name) {
            $this->name = $name;
        }
     
        public function run() {
            $this->db = mysql_connect('127.0.0.1', 'root', '');
            mysql_select_db('test', $this->db);
        }
     
        public function getDb() {
            return $this->db;
        }
    }
     
    //Stackable是Threaded的一个别称,直到pthreads v.2.0.0
    class Query extends Threaded {
        private $sql = '';
        private $data = array();
     
        public function __construct($sql) {
            $this->sql = $sql;
        }
     
        public function run() {
            //访问线程工作对象
            $db = $this->worker->getDb();
            $res = mysql_query($this->sql, $db);
            $tmp = array();
            while($row = mysql_fetch_assoc($res)) {
                //这里不能使用$this->data[] = $row;这种方式。
                $tmp[] = $row;
            }
            $this->data = $tmp;
        }
     
        public function getData() {
            return $this->data;
        }
    }
     
    $mysqlWork = new MySqlWorker('mysqlWork');
    $query1 = new Query('select * from test order by id limit 0,2');
    $query2 = new Query('select * from test order by id limit 2,2');
     
    //通过Worker的stack方法,我们把对象加入到Worker中
    //会激活Worker执行对象的run()方法。
    //说白了就是会执行$query1,$query2的run()方法。
    $mysqlWork->stack($query1);
    $mysqlWork->stack($query2);
     
    $mysqlWork->start();
    //执行完Worker中的对象后,关闭Worker。
    //如果把这段代码放到$query1->getData()和$query2->getData()之后
    //则会输出两个空数组,那该方法的作用有可能是等待Worker中对象执行完毕,类似join方法()。
    $mysqlWork->shutdown();
     
    var_dump($query1->getData());
    var_dump($query2->getData());

    三、Mutex 互斥量

    当我们用多线程操作同一个资源时,在同一时间内只能有一个线程能够对资源进行操作,这时就需要用到互斥量了。比如我们对同一个文件进行读写操作时。当第一个线程给互斥量加锁后,如果在操作期间,其他线程再次给互斥量加锁,会导致线程进入阻塞状态,直到互斥量被解锁。这就很好的保护了文件在同一时间内只能被一个线程操作。如果不加锁,那么对文件的操作结果是不可预知的,因为同一时间内有很多线程同时操作文件,无法判断先后顺序。

    <?php
    class Add extends Thread {
        private $name = '';
        private $res = null;
        private $mutex = null;
        
        public function __construct($name, $res, $mutex = null) {
            $this->name = $name;
            $this->res = $res;
            $this->mutex = $mutex;
        }
     
        public function run() {
            if($this->mutex) {
                //给互斥量加锁
                Mutex::lock($this->mutex);
            }
            //从文件中获取数据
            $data = trim(fgets($this->res));
            $data = intval($data);
            ++$data;
            //重置文件指针到开始处
            fseek($this->res, 0);
            //写入数据
            fwrite($this->res, $data);
            //重置文件指针
            fseek($this->res, 0);
            echo "Thread {$this->name} add {$data} 
    ";
            if($this->mutex) {
                //给互斥量解锁
                Mutex::unlock($this->mutex);
            }
        }
    }
     
    $fp = fopen('./add.txt', 'r+');
     
    //创建互斥量,立即加锁
    $mutex = Mutex::create(true);
     
    $threads = array();
    for($ix = 0; $ix < 20; ++$ix) {
        $thread = new Add($ix, $fp, $mutex);
        $thread->start();
        $threads[] = $thread;
    }
     
    Mutex::unlock($mutex);
     
    foreach($threads as $thread) {
        $thread->join();
    }
     
    //销毁互斥量
    Mutex::destroy($mutex);

    四、内存共享

    有些时候我们希望在多个线程中共享一些需要的数据,我们可以使用shmop扩展。

    <?php
     
    class Count extends Thread {
        private $name = '';
     
        public function __construct($name) {
            $this->name = $name;
        }
     
        public function run() {
            //在Linux下可以使用sysvshm的扩展, shm_等函数
            //共享内存段的key
            $shmKey = 123;
            //创建共享内存段
            $shmId = shmop_open($shmKey, 'c', 0777, 64);
            //读取共享内存数据
            $data = trim(shmop_read($shmId, 0, 64));
            $data = intval($data);
            ++$data;
            shmop_write($shmId, $data, 0);
            echo "thread {$this->name} data {$data} 
    ";
     
            //删除关闭共享内存段
            shmop_delete($shmId);
            shmop_close($shmId);
        }
    }
     
    $threads = array();
    for($ix = 0; $ix < 10; ++$ix) {
        $thread = new Count($ix);
        $thread->start();
        $threads[] = $thread;
    }
     
    foreach($threads as $thread) {
        $thread->join();
    }

    五、线程同步

    有些时候我们不希望线程调用start()后就立刻执行,在处理完我们的业务逻辑后在需要的时候让线程执行。

    <?php
    class Sync extends Thread {
        private $name = '';
     
        public function __construct($name) {
            $this->name = $name;
        }
     
        public function run() {
            //让线程进入等待状态
            $this->synchronized(function($self){
                $self->wait();
            }, $this);
     
            echo "thread {$this->name} run... 
    ";
        }
    }
     
    //我们创建10个线程
    $threads = array();
    for($ix = 0; $ix < 10; ++$ix) {
        $thread = new Sync($ix);
        $thread->start();
        $threads[] = $thread;
    }
     
    $num = 1;
    while(true) {
        if($num > 5) {
            //当$num大于5时,我们才唤醒线程让它们执行
            foreach($threads as $thread) {
                $thread->synchronized(function($self){
                    $self->notify();
                }, $thread);
            }
            break;
        }
     
        //这里我们处理我们需要的代码
        //这时候线程是处在等待状态的
        echo "wait... 
    ";
        sleep(3);
        ++$num;
    }
     
    foreach($threads as $thread) {
        $thread->join();
    }
    echo "end... 
    ";

    六、线程池

    Pool对象是多个Worker对象的容器,同时也是它们的控制器,对Worker功能更高抽象。

    比如Worker是河,而线程是运行在河里的船。Pool则是管理着多条河。
    <?php
    //继承Threaded类,Threaded提供了隐式的线程安全机制
    //这个对象中的所有操作都是线程安全的
    class MyWork extends Threaded {
        private $name = '';
        private $do = false;
        private $data = '';
     
        public function __construct($name) {
            $this->name = $name;
        }
     
        public function run() {
            $this->data = "{$this->name} run... in thread [" . $this->worker->getName() . "] 
    ";
            //通过do来判断是否完成
            //如果为true,则让Pool::collect回收
            $this->do = true;
        }
     
        public function isDo() {
            return  $this->do;
        }
     
        public function getData() {
            return $this->data;
        }
    }
     
    class MyWorker extends Worker {
        public static $name = 0;
     
        public function __construct() {
            self::$name++;
        }
     
        public function run() {
             
        }
     
        public function getName() {
            return self::$name;
        }
    }
     
    $pool = new Pool(5, 'MyWorker');
     
    $works = array();
    for($ix = 0; $ix < 20; ++$ix) {
        $work = new MyWork($ix);
        $works[] = $work;
    }
     
    foreach($works as $work) {
        $pool->submit($work);
    }
    $pool->shutdown();
     
    foreach($works as $work) {
        echo $work->getData();
    }
     
    //回收已完成的对象
    $pool->collect(function($work){
        //我们通过自定义函数isDo来判断对象是否执行完毕
        return $work->isDo();
    });
  • 相关阅读:
    python-使用pyecharts绘制各省份985学校数量图
    python-将多个表格的信息合并到一个表格中
    python-使用百度AipOcr实现表格文字图片识别
    python安装OCR识别库
    python-一种去掉前后缀获取子串的方法
    python-一种字符串排序方式
    How to write educational schema.
    RabbitMq related
    OPENId是什么, OAUTH 是什么
    使用abp的 redis cache
  • 原文地址:https://www.cnblogs.com/syhx/p/9719675.html
Copyright © 2011-2022 走看看