zoukankan      html  css  js  c++  java
  • php多进程处理大数据

    $count=1000000;/*大数据*/
    $processNum=100;/*100个进程*/
    $chunkSize=1000;/*单个进程处理数目*/

    $allHandlerNum=ceil($count/$chunkSize);
    $childHandleNum=ceil($allHandlerNum/2);/**子进程处理数**/
    $parentHandleNum=$allHandlerNum-$childHandleNum;/*父进程处理数*/
    $childPerHandlePageNum=intdiv($childHandleNum,max(1,$processNum-1));/*每个子进程处理数*/
    $lastChildHandlePageNum=max(0,$childHandleNum - ($processNum-1) * $childPerHandlePageNum);/*最后子进程处理数*/
    $parentPerHandlerPageNum=intdiv($parentHandleNum,max(1,$processNum-1));/*每个父进程处理数*/
    $lastParentHandlePageNum=max(0,$parentHandleNum - ($processNum-1) * $parentPerHandlerPageNum);/*最后父进程处理数*/

    $initData=[
    'all_handle_num'=>$allHandlerNum,
    'callback'=>function($index){/*子进程处理大数据*/
    var_dump('to do work'); /*如果在此connect database建议使用连接池或者释放链接*/
    }
    ];
    $workProcess=function($params=[]) use ($initData){/*work 进程*/
    $allHandleNum=array_key_exists('all_handle_num',$initData)?$initData['all_handle_num']:0;
    $resolve=array_key_exists('callback',$initData)?$initData['callback']:null;
    $initHandleNum=array_key_exists('init_handle_num',$params)?$params['init_handle_num']:0;
    $perHandleNum=array_key_exists('per_handle_num',$params)?$params['per_handle_num']:0;
    $handleNum=array_key_exists('handle_num',$params)?$params['handle_num']:0;
    $processIndex=array_key_exists('process_index',$params)?$params['process_index']:0;

    for ($i=0;$i<$handleNum;$i++){
    $page= $perHandleNum * $processIndex + $i + $initHandleNum;
    if($page > $allHandleNum){
    exit(0);/*这里尽量避免父进程异常退出*/
    }
    $resolve([
    'index'=>$page
    ]);
    }
    };

    for ($i=0;$i<$processNum;$i++){
    $pid = pcntl_fork();
    switch (true){
    case $pid === -1:/*失败*/
    die('fork error');
    break;
    case $pid > 0:/*父进程*/
    echo "i am parent ";
    $parentData=[
    'init_handle_num'=>$childHandleNum,
    'per_handle_num'=>$parentPerHandlerPageNum,
    'handle_num'=>$parentPerHandlerPageNum,
    'process_index'=>$i
    ];
    if($i+1===$processNum && $lastParentHandlePageNum>0){//处理最后一个
    $parentData['handle_num']=$lastParentHandlePageNum;
    $workProcess($parentData);
    }elseif($parentPerHandlerPageNum>0){
    $workProcess($parentData);
    }
    #在父进程处理逻辑 就会只有父子同步进行,如果程序遇到阻塞就不好,父进程是主进程,可以利用父进程直接回收子进程避免阻塞
    #将父进程的代码注释利用这个 ------ pcntl_wait($status, WNOHANG);#WNOHANG跳出阻塞创建进程
    break;
    default:/*子进程*/
    echo "i am child ";
    $childData=[
    'init_handle_num'=>0,
    'per_handle_num'=>$childPerHandlePageNum,
    'handle_num'=>$childPerHandlePageNum,
    'process_index'=>$i
    ];
    if($i+1===$processNum && $lastChildHandlePageNum>0){//处理最后一个
    $childData['handle_num']=$lastChildHandlePageNum;
    $workProcess($childData);
    }elseif($childPerHandlePageNum>0){
    $workProcess($childData);
    }
    exit(0);/*不让子进程创建进程*/
    }
    }

    // 回收子进程避免僵死
    while (pcntl_waitpid(0, $status) != -1) {
    $status = pcntl_wexitstatus($status);
    echo "Child $status completed ";
    }

    return true;
  • 相关阅读:
    where T: class的解释
    调用钉钉的WebAPI接口实现与ERP数据的同步
    Json序列化和反序列化的方式
    Log4Net日志处理
    MVC项目中异常处理
    FindBI商业智能报表工具
    权限列表实现
    委托,匿名,lambda
    [经典贪心算法]贪心算法概述
    [zt]手把手教你写对拍程序(PASCAL)
  • 原文地址:https://www.cnblogs.com/guokefa/p/14231547.html
Copyright © 2011-2022 走看看