zoukankan      html  css  js  c++  java
  • PHP 使用 Swoole

    在一般的 Server 程序中都会有一些耗时的任务,比如:发送邮件、聊天服务器发送广播等。如果我们采用同步阻塞的防水去执行这些任务,那么这肯定会非常的慢。

    Swoole 的 TaskWorker 进程池可以用来执行一些异步的任务,而且不会影响接下来的任务,很适合处理以上场景。

    那么什么是异步任务呢?可以从下面的图示中来简单了解一下。(来源于网络)

    要实现上述的异步处理,只需要增加两个事件回调即可:onTask 和 onFinish, 这两个回调函数分别用于执行 Task 任务和处理 Task 任务的返回结果。另外还需要在 set 方法中设置 task 进程数量。

    使用示例:

    class Server{    private $serv;    public function __construct() {        $this->serv = new swoole_server("0.0.0.0", 9501);        $this->serv->set(array(            'worker_num' => 4,            'daemonize' => false,            'task_worker_num' => 8
            ));        $this->serv->on('Start', array($this, 'onStart'));        $this->serv->on('Connect', array($this, 'onConnect'));        $this->serv->on('Receive', array($this, 'onReceive'));        $this->serv->on('Close', array($this, 'onClose'));        $this->serv->on('Task', array($this, 'onTask'));        $this->serv->on('Finish', array($this, 'onFinish'));        $this->serv->start();
        }    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {        echo "Get Message From Client {$fd}:{$data}
    ";        // 发送任务到Task进程
            $param = array(            'fd' => $fd
            );
            $serv->task( json_encode( $param ) );        echo "继续处理之后的逻辑
    ";
        }    public function onTask($serv, $task_id, $from_id, $data) {        echo "This Task {$task_id} from Worker {$from_id}
    ";        echo "Data: {$data}
    ";        for($i = 0 ; $i < 5 ; $i ++ ) {
                sleep(1);            echo "Task {$task_id} Handle {$i} times...
    ";
            }
            $fd = json_decode( $data , true )['fd'];
            $serv->send( $fd , "Data in Task {$task_id}");        return "Task {$task_id}'s result";
        }    public function onFinish($serv,$task_id, $data) {        echo "Task {$task_id} finish
    ";        echo "Result: {$data}
    ";
        }    public function onStart( $serv ) {        echo "Server Start
    ";
        }    public function onConnect( $serv, $fd, $from_id ) {        echo "Client {$fd} connect
    ";
        }    public function onClose( $serv, $fd, $from_id ) {        echo "Client {$fd} close connection
    ";
        }
    }
    $server = new Server();

    通过上述示例可以看到,发起一个异步任务只需要调用 swoole_server 的 task 方法就可以。发送之后会触发 onTask 回调,可以通过 $task_id 和 $from_id 处理不同进程的不同任务。最后可以通过 return 一个字符串来将执行结果返回给 Worker 进程,Worker 进程通过 onFinish 回调来处理结果。

    那么基于上述代码就可以实现异步操作 mysql。异步操作 mysql 较适合以下场景:

    • 并发的读写操作

    • 没有时序上的严格关系

    • 不影响主线程逻辑

    好处:

    • 提高并发

    • 降低 IO 消耗

    数据库的压力主要在于 mysql 维持的连接数,如果存在 1000 个并发,那么 mysql 就需要建立对应数量的连接。而采用长连接的方式,mysql 的连接一直维持在进程中,减少了创建连接的损耗。可以通过 swoole 开启多个 task 进程,每一个进程内维持一个mysql 长连接,那么这样子也可以引申出来 mysql 连接池技术。还需要注意的是,mysql 服务器如果检测到长时间没有没有查询,则会断开连接回收资源,所以要有断线重连的机制。

    以下是一个简单的异步操作 mysql 的示例:

    还是以上的代码,我们只需要修改 onReceive、onTask、onFinish 三个函数。

    class Server{    private $serv;    public function __construct() {        $this->serv = new swoole_server("0.0.0.0", 9501);        $this->serv->set(array(            'worker_num' => 4,            'daemonize' => false,            'task_worker_num' => 8 // task进程数量 即为维持的MySQL连接的数量
            ));        $this->serv->on('Start', array($this, 'onStart'));        $this->serv->on('Connect', array($this, 'onConnect'));        $this->serv->on('Receive', array($this, 'onReceive'));        $this->serv->on('Close', array($this, 'onClose'));        $this->serv->on('Task', array($this, 'onTask'));        $this->serv->on('Finish', array($this, 'onFinish'));        $this->serv->start();
        }    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {        echo "收到数据". $data . PHP_EOL;        // 发送任务到Task进程
            $param = array(            'sql' => $data, // 接收客户端发送的 sql 
                'fd'  => $fd
            );
            $serv->task( json_encode( $param ) );  // 向 task 投递任务
            echo "继续处理之后的逻辑
    ";
        }    public function onTask($serv, $task_id, $from_id, $data) {        echo "This Task {$task_id} from Worker {$from_id}
    ";        echo "recv SQL: {$data['sql']}
    ";        static $link = null;
            $sql = $data['sql'];
            $fd  = $data['fd'];
            HELL:        if ($link == null) {
                $link = @mysqli_connect("127.0.0.1", "root", "root", "test");
            }
            $result = $link->query($sql);        if (!$result) { //如果查询失败
                if(in_array(mysqli_errno($link), [2013, 2006])){                //错误码为2013,或者2006,则重连数据库,重新执行sql
                        $link = null;                    goto HELL;
                }
            }        if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
                 $data = array();                while ($fetchResult = mysqli_fetch_assoc($result) ){
                         $data['data'][] = $fetchResult;
                    }                
            }else{//否则直接返回结果
                $data['data'] = $result;
            }
            $data['status'] = "OK";
            $data['fd'] = $fd;
            $serv->finish(json_encode($data));
        }    public function onFinish($serv, $task_id, $data) {        echo "Task {$task_id} finish
    ";
            $result = json_decode($result, true);        if ($result['status'] == 'OK') {            $this->serv->send($result['fd'], json_encode($result['data']) . "
    ");
            } else {            $this->serv->send($result['fd'], $result);
            }
        }    public function onStart( $serv ) {        echo "Server Start
    ";
        }    public function onConnect( $serv, $fd, $from_id ) {        echo "Client {$fd} connect
    ";
        }    public function onClose( $serv, $fd, $from_id ) {        echo "Client {$fd} close connection
    ";
        }
    }
    $server = new Server();

    以上代码在 onReceive 时直接接收一条 sql,之后直接发送到 Task 任务中,这个时候下一步的流程紧接着输出,这里也就体现出了异步。然后 onTask 和 onFinish 分别用来向数据库发送 sql,处理 task 执行结果。欢迎拍砖

  • 相关阅读:
    解决xcode5升级后,Undefined symbols for architecture arm64:问题
    第8章 Foundation Kit介绍
    app 之间发送文件 ios
    iphone怎么检测屏幕是否被点亮 (用UIApplication的Delegate)
    CRM下载对象一直处于Wait状态的原因
    错误消息Customer classification does not exist when downloading
    How to resolve error message Distribution channel is not allowed for sales
    ABAP CCDEF, CCIMP, CCMAC, CCAU, CMXXX这些东东是什么鬼
    有了Debug权限就能干坏事?小心了,你的一举一动尽在系统监控中
    SAP GUI和Windows注册表
  • 原文地址:https://www.cnblogs.com/myJuly/p/12686261.html
Copyright © 2011-2022 走看看