zoukankan      html  css  js  c++  java
  • php多进程处理大数据

    $count=1000000;/*大数据*/
    $processNum=100;/*100个进程*/
    $chunkSize=1000;/*单个进程处理数目*/

    $allHandlerNum=ceil($count/$chunkSize);
    $childHandleNum=ceil($allHandlerNum/2);/**子进程处理数**/
    $parentHandleNum=$allHandlerNum-$childHandleNum;/*父进程处理数*/
    $childPerHandlePageNum=intdiv($childHandleNum,max(1,$processNum-1));/*每个子进程处理数*/
    $lastChildHandlePageNum=max(0,$childHandleNum - ($processNum-1) * $childPerHandlePageNum);/*最后子进程处理数*/
    $parentPerHandlerPageNum=intdiv($parentHandleNum,max(1,$processNum-1));/*每个父进程处理数*/
    $lastParentHandlePageNum=max(0,$parentHandleNum - ($processNum-1) * $parentPerHandlerPageNum);/*最后父进程处理数*/

    $initData=[
    'all_handle_num'=>$allHandlerNum,
    'callback'=>function($index){/*子进程处理大数据*/
    var_dump('to do work'); /*如果在此connect database建议使用连接池或者释放链接*/
    }
    ];
    $workProcess=function($params=[]) use ($initData){/*work 进程*/
    $allHandleNum=array_key_exists('all_handle_num',$initData)?$initData['all_handle_num']:0;
    $resolve=array_key_exists('callback',$initData)?$initData['callback']:null;
    $initHandleNum=array_key_exists('init_handle_num',$params)?$params['init_handle_num']:0;
    $perHandleNum=array_key_exists('per_handle_num',$params)?$params['per_handle_num']:0;
    $handleNum=array_key_exists('handle_num',$params)?$params['handle_num']:0;
    $processIndex=array_key_exists('process_index',$params)?$params['process_index']:0;

    for ($i=0;$i<$handleNum;$i++){
    $page= $perHandleNum * $processIndex + $i + $initHandleNum;
    if($page > $allHandleNum){
    exit(0);/*这里尽量避免父进程异常退出*/
    }
    $resolve([
    'index'=>$page
    ]);
    }
    };

    for ($i=0;$i<$processNum;$i++){
    $pid = pcntl_fork();
    switch (true){
    case $pid === -1:/*失败*/
    die('fork error');
    break;
    case $pid > 0:/*父进程*/
    echo "i am parent ";
    $parentData=[
    'init_handle_num'=>$childHandleNum,
    'per_handle_num'=>$parentPerHandlerPageNum,
    'handle_num'=>$parentPerHandlerPageNum,
    'process_index'=>$i
    ];
    if($i+1===$processNum && $lastParentHandlePageNum>0){//处理最后一个
    $parentData['handle_num']=$lastParentHandlePageNum;
    $workProcess($parentData);
    }elseif($parentPerHandlerPageNum>0){
    $workProcess($parentData);
    }
    #在父进程处理逻辑 就会只有父子同步进行,如果程序遇到阻塞就不好,父进程是主进程,可以利用父进程直接回收子进程避免阻塞
    #将父进程的代码注释利用这个 ------ pcntl_wait($status, WNOHANG);#WNOHANG跳出阻塞创建进程
    break;
    default:/*子进程*/
    echo "i am child ";
    $childData=[
    'init_handle_num'=>0,
    'per_handle_num'=>$childPerHandlePageNum,
    'handle_num'=>$childPerHandlePageNum,
    'process_index'=>$i
    ];
    if($i+1===$processNum && $lastChildHandlePageNum>0){//处理最后一个
    $childData['handle_num']=$lastChildHandlePageNum;
    $workProcess($childData);
    }elseif($childPerHandlePageNum>0){
    $workProcess($childData);
    }
    exit(0);/*不让子进程创建进程*/
    }
    }

    // 回收子进程避免僵死
    while (pcntl_waitpid(0, $status) != -1) {
    $status = pcntl_wexitstatus($status);
    echo "Child $status completed ";
    }

    return true;
  • 相关阅读:
    Javaweb实现表单数据和多文件上传
    Java一般命名规范
    基于微信公众号的答题投票系统——项目开发心得体会记录
    C语言实现对二叉树的操作
    C语言使用链表实现学生信息管理系统
    C语言实现对队列的基本操作
    C语言使用顺序表实现对学生信息的管理系统
    PHP实现周和月起止时间计算方法
    IOC容器和注入方式
    Spring简介+HelloWorld
  • 原文地址:https://www.cnblogs.com/guokefa/p/14231547.html
Copyright © 2011-2022 走看看