zoukankan      html  css  js  c++  java
  • 简单多进程任务处理程序


    简单多进程任务处理程序

    
    <?php
    
    /**
     * 多进程任务处理辅助类
     */
    class TaskHelper{
      /**
    	 * worker进程最大数量, 至少两个
    	 */
    	protected $maxProcess;
    	
    	/**
    	 * 动态参数,设置为 0 表示不使用自适应进程方式
    	 */
    	protected $dynamicParam;
    	
    	/**
    	 * 任务的实际处理者,对象, 必须有 runWorker 方法
    	 */
    	protected $worker;
    	
    	public function __construct($worker, $maxProcess = 4, $dynamicParam = 0) {
    		$this->worker = $worker;
    		$this->maxProcess = max(2, (int)$maxProcess);
    		$this->dynamicParam = max(0, (int)$dynamicParam);
    	}
    	
    	/**
    	 * fork子进程处理数据
    	 * @param Array $data 需要处理的数据,必须是数组
    	 */
    	public function run(&$data) {
    		$count = count($data);
    		
    		// 需要开启的子进程数
    		$num = $this->dynamicParam ? min( $this->maxProcess, ceil($count / $this->dynamicParam) ) : $this->maxProcess;
    		
    		// 每个进程处理的数据量
    		$n = ceil($count / $num); 
    		
    		$childs = array();
    		
    		for($i = 0; $i < $count; $i += $n) {
    			$pid = pcntl_fork();
    			if($pid == -1) {
    				echo "Fork worker failed!";
    				return false;
    			}
    			
    			if($pid) {
    				echo "Fork worker success! pid:",  $pid, "
    ";
    				$childs[] = $pid;
    			} else {
    				$sliceData = array_slice($data, $i, $n);
    	
    				$this->worker->runWorker($sliceData);
    				exit();
    			}
    		}
    		
    		$this->check($childs);
    	}
    	
    	/**
    	 * 检测子进程状态,监控子进程是否退出,并防止僵尸进程
    	 */
    	protected function check($childs) {
    		while(true) {
    			foreach($childs as $index => $pid) {
    				$pid && $res = pcntl_waitpid($pid, $status, WNOHANG);
    				if(!$pid || $res == -1) {
    					echo "End worker: $pid 
    ";
    					unset($childs[$index]);
    				}
    			}
    			
    			if(empty($childs)) break;
    			sleep(1);
    		}
    	}
    }
    
    /**
     * 使用示例
     */
    class Test {
    	public function run() {
    		$data = array_fill(0, 800, 1);
    		
    		// 开8个进程将 $data 分成8份,交由下面的 runWorker 方法处理
    		$task = new TaskHelper($this, 8);
    		
    		$task->run($data); // 如果前面连接了数据库、redis等,最好在这之前关闭掉
    	}
    	
    	/**
    	 * 这里编写代相应码来处理数据
    	 */
    	public function runWorker($data) {
    		// do something...
    	}
    }
    
    $obj = new Test;
    $obj->run();
    
    
  • 相关阅读:
    AutoMapper,对象映射的简单使用
    Angular 4.0从入门到实战
    IE报错:The given path's format is not supported
    原生js中slice()方法和splice()区别
    使用jquery插件ajaxfileupload一次上传多个文件,示例
    C#路径中获取文件全路径、目录、扩展名、文件名称
    NET二进制图片存储与读取的常见方法,iTextSharp添加图片生成PDF文件
    Type.GetType()反射另外项目中的类时返回null的解决方法
    C#中对于Enum类型的遍历
    读取word到二进制,再转成word
  • 原文地址:https://www.cnblogs.com/qixidi/p/10199678.html
Copyright © 2011-2022 走看看