zoukankan      html  css  js  c++  java
  • swoole(6)Task异步任务

    一:什么是task进程?

    task进程是独立与worker进程的一组进程  ,他主要处理耗时较长的业务逻辑,并且不影响worker进程处理客户端的请求.worker进程通过task()函数把数据投递到Task进程去处理

    开启task功能(task功能是默认关闭的 ,开启task功能需要满足2个条件) 

    1. 配置task进程的数量(task_worker_num)
    2. 注册task的回掉函数onTask和onfinish

    二:task设计流程

    1. worker进程中,我们调用对应的task()方法发送数据通知到task worker进程
    2. task worker进程会在onTask()回调中接收到这些数据,并进行处理
    3. 处理完会可以通过finish函数或直接return消息给worker进程
    4. worker进程在onFinish()进程收到消息并进行处理

    代码:

    <?php
    /**
     * Created by PhpStorm.
     * User: huahua
     * Date: 2020/3/6
     * Time: 上午10:09
     */
    
    //基于第一个 TCP 服务器,只需要增加 onTask 和 onFinish2 个事件回调函数即可。
    //另外需要设置 task 进程数量,可以根据任务的耗时和任务量配置适量的 task 进程。
    
    $serv = new SwooleServer('127.0.0.1',9800);
    
    $serv->set([
        'worker_num'=>2, //设置进程
        //配置此参数后将会启用 task 功能。所以 Server 务必要注册 onTask、onFinish 2 个事件回调函数。如果没有注册,服务器程序将无法启动。
        'task_worker_num' => 4, //配置 Task 进程的数量。
        'task_ipc_mode'=>2,//设置 Task 进程与 Worker 进程之间通信的方式
    ]);
    //监听连接进入事件
    $serv->on('Connect', function ($serv, $fd) {
        echo "Client: Connect.
    ";
    });
    
    //监听数据接收事件
    $serv->on('Receive', function ($serv, $fd, $from_id, $data) {
        $data=['tid'=>time()];
        $task_id = $serv->task($data);////投递到taskWorker进程组
        echo "task id = {$task_id},from id = $from_id".PHP_EOL;
    });
    
    $serv->on('task',function ($serv, $task_id, $from_id, $data){
        echo "task id = {$task_id},from id = $from_id,进程id =".posix_getpid().PHP_EOL;
        echo '我是task任务';
        $serv->finish("data -> ok");
    });
    
    $serv->on('finish',function ($serv, $task_id, $data){
        echo "task id = {$task_id}".PHP_EOL;
    });
    //监听连接关闭事件
    $serv->on('Close', function ($serv, $fd) {
        echo "Client: Close.
    ";
    });
    
    //启动服务器
    $serv->start();

    cli下运行结果:

    $task_id不等于workerStart中的worker_id,而是swoole维护的任务自增长id
    $from_id表示来自于哪个woker投递而来的任务,id等于worker_id值
    $serv->worker_id等于workerStart中的worker_id, 此处是task_woker的id
    $serv->worker_pid 此处是task_woker的进程pid

     三:task任务切分

    场景:假设有一台服务器专门处理前台投递的数据,利用简单的任务拆分,分配到相应的进程去处理

    思路:

    • 1.将一个大的任务拆分成相应份数(是由$task_worker_num数量来确定)
    • 2.通过foreach循环将数据投递到指定的task进程,范围是(0-(task_worker_num-1))区间之内
    • 3.执行失败的任务,需要保留,重新投递执行(进程间通讯管道方式)
    $server->on('receive',function (swoole_server $server, int $fd, int $reactor_id, string $data){
        for ($i=0;$i<100;$i++){
            $tasks[] =['id'=>$i,'msg'=>time()];
        }
        $count=count($tasks);
        $data=array_chunk($tasks,ceil($count/3));
        foreach ($data as $k=>$v){
            $server->task($v,$k);  //(0-task_woker_num-1)
        }
    
    });
  • 相关阅读:
    云计算设计模式(十九)——运行重构模式
    云计算设计模式(十八)——重试模式
    云计算设计模式(十七)——基于队列的负载均衡模式
    云计算设计模式(十六)——优先级队列模式
    云计算设计模式(十五)——管道和过滤器模式
    云计算设计模式(十四)——实体化视图模式
    云计算设计模式(十三)——领导人选举模式
    云计算设计模式(十二)——索引表模式
    使用WebClient上传文件时的一些问题
    .NET研发人员面试题(一)
  • 原文地址:https://www.cnblogs.com/8013-cmf/p/12427234.html
Copyright © 2011-2022 走看看