zoukankan      html  css  js  c++  java
  • swoole_event_add实现异步

    swoole提供了swoole_event_add函数,可以实现异步。此函数可以用在Server或Client模式下。

    swoole_event_add属于AsyncIO,必须运行在CLI 模式。

    异步tcp客户端

    stream_socket_client实现tcp同步客户端

    示例:

    <?php
    
    $start_time = microtime(TRUE);
    
    $fp = stream_socket_client("tcp://www.52fhy.com:80", $errno, $errstr, 30);
    fwrite($fp,"GET /test.json HTTP/1.1
    Host: www.52fhy.com
    
    ");
    
    echo $resp = fread($fp, 8192);
    fclose($fp);
    echo "Finish
    ";  
    
    $end_time = microtime(TRUE);
    echo sprintf("use time:%.3f s
    ", $end_time - $start_time);
    
    

    上述代码是同步执行的。如何变成异步呢?

    stream_socket_client实现tcp异步客户端

    由于fread读取响应数据是同步堵塞的,我们将$fp加入到事件监听后,底层会自动将该socket设置为非阻塞模式。修改fread那一行:

    swoole_event_add($fp, function($fp) {
        echo $resp = fread($fp, 8192);
        swoole_event_del($fp);//socket处理完成后,从epoll事件中移除socket
        fclose($fp);
    });
    echo "Finish
    ";  //swoole_event_add不会阻塞进程,这行代码会顺序执行
    

    执行后输出:

    Finish
    use time:0.087 s
    HTTP/1.1 200 OK
    Server: AliyunOSS
    Date: Sat, 21 Apr 2018 08:36:40 GMT
    Content-Type: application/json
    Content-Length: 26
    Connection: keep-alive
    x-oss-request-id: 5ADAF81884D23C965A5D2614
    Accept-Ranges: bytes
    ETag: "3B3B50D9C802324BB72A74FCD9060004"
    Last-Modified: Sat, 21 Apr 2018 04:43:33 GMT
    x-oss-object-type: Normal
    x-oss-hash-crc64ecma: 9917578698767912878
    x-oss-storage-class: Standard
    Content-MD5: OztQ2cgCMku3KnT82QYABA==
    x-oss-server-time: 5
    
    {"url":"http://52fhy.com"}
    

    swoole_event_add函数原型

    bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null,
        int $flags = null);
    

    $sock可以为以下四种类型:

    • int,就是文件描述符,包括swoole_client->$sockswoole_process->$pipe或者其他fd
    • stream资源,就是stream_socket_client/fsockopen创建的资源
    • sockets资源,就是sockets扩展中socket_create创建的资源,需要在编译时加入 ./configure --enable-sockets
    • object,swoole_process或swoole_client,底层自动转换为管道或客户端连接的socket

    多个tcp客户端实时交互

    上面的例子,已经实现了异步tcp客户端。接下来的例子会复杂些:可以在客户端A输入,客户端B能实时收到,反之也可以。

    首先,我们得创建个tcp_server:

    swoole_tcp_server.php

    <?php 
    
    $serv = new swoole_server('0.0.0.0', 9001);
    
    $serv->on('Start', function(){
        echo "Tcp server start. Waiting client... 
    ";
    });
    
    $serv->on('Connect', function($serv, $fd){
        echo "New client fd:{$fd}. 
    ";
    });
    
    $serv->on('Receive', function($serv, $fd, $from_id, $data){
        echo "Recv msg from fd:{$fd}:{$data}
    ";
        foreach ($serv->connections as $client) {
            if($fd != $client){
                $serv->send($client, $data);
            }
        }
    });
    
    $serv->on('Close', function($serv, $fd){
        echo "Client fd:{$fd} closed. 
    ";
    });
    
    $serv->start();
    

    然后实现客户端:

    event_add_tcp_client.php

    <?php
    
    $socket = @stream_socket_client("tcp://127.0.0.1:9001", $errno, $errstr, 30);
    if(!$socket) exit("connect server err!");
    
    function onRead($socket){
        $msg = stream_socket_recvfrom($socket, 1024);
        if(!$msg){
            swoole_event_del($socket);
        }
        echo "Recv: {$msg}
    ";
        fwrite(STDOUT, "ENTER MSG:");
    }
    
    function onWrite($socket){
        echo "onWrite
    ";
    }
    
    function onStdin(){
        global $socket;
        $msg = trim(fgets(STDIN));
        if($msg == 'exit'){ //必须trim此处才会相等
            swoole_event_exit();
            // exit();
        }
        fwrite($socket, $msg);//数据量大的时候用swoole_event_write
        fwrite(STDOUT, "ENTER MSG:");
    }
    
    swoole_event_add($socket, 'onRead', 'onWrite');
    swoole_event_add(STDIN, 'onStdin');
    
    fwrite(STDOUT, "ENTER MSG:");  //swoole_event_add不会阻塞进程,这行代码会顺序执行
    

    先运行服务端:

    $ php swoole_tcp_server.php
    Tcp server start. Waiting client...
    

    打开两个终端,运行客户端:

    $ php event_add_tcp_client.php
    ENTER MSG:
    

    效果图:

    swoole_client

    其实swoole已经提供了异步的swoole_client,无需使用stream_socket_*系列函数:

    <?php 
    
    $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
    $client->on("connect", function(swoole_client $cli) {
        
    });
    $client->on("receive", function(swoole_client $cli, $data){
        echo "Receive: $data";
        $cli->send(str_repeat('A', 100)."
    ");
        sleep(1);
    });
    $client->on("error", function(swoole_client $cli){
        echo "error
    ";
    });
    $client->on("close", function(swoole_client $cli){
        echo "Connection close
    ";
    });
    $client->connect('127.0.0.1', 9001);
    

    还有swoole实现的tcp/udp同步阻塞客户端:

    $client = new swoole_client(SWOOLE_SOCK_TCP);
    if (!$client->connect('127.0.0.1', 9001, -1))
    {
        exit("connect failed. Error: {$client->errCode}
    ");
    }
    $client->send("hello world
    ");
    echo $client->recv();
    $client->close();
    

    swoole_client 函数原型:

    swoole_client->__construct(int $sock_type, int $is_sync = SWOOLE_SOCK_SYNC, string $key);
    

    可以使用swoole提供的宏来之指定类型,请参考 swoole常量定义

    • $sock_type表示socket的类型,如TCP/UDP
    • 使用$sock_type | SWOOLE_SSL可以启用SSL加密
      $is_sync表示同步阻塞还是异步非阻塞,默认为同步阻塞
    • $key用于长连接的Key,默认使用IP:PORT作为key。相同key的连接会被复用

    php-fpm/apache环境下只能使用同步客户端。异步客户端只能使用在cli命令行环境。

    异步http客户端

    curl或者file_get_contents发送http请求是同步阻塞的。基于swoole_event_add封装可以实现异步。

    swoole_event_add实现异步http客户端

    event_add_http_client.php

    <?php
    
    class HttpClient{
    
        private $callback = null;
    
        public function __construct($url, $method = 'GET', $postfields = NULL, $headers = array()){
            
            //子进程发起请求
            $process = new swoole_process(function(swoole_process $worker) use($url, $method, $postfields, $headers){
                $response = $this->http($url, $method, $postfields, $headers);
                $worker->write($response);
            }, true);
            $process->start();
    
            //异步读取
            swoole_event_add($process->pipe, function($pipe) use ($process){
                $response = $process->read();
                // print_r($response);
                if(is_callable($this->callback)){
                    call_user_func($this->callback, $response); //回调
                }
                swoole_event_del($pipe);
            });
        }
    
        public function setCallback($callback){
            $this->callback = $callback;
        }
    
        /**
         * http请求
         */
        private function http($url, $method, $postfields = NULL, $headers = array()) {
            try{
                $ssl = stripos($url,'https://') === 0 ? true : false;
                $ci = curl_init();
                /* Curl settings */
                curl_setopt($ci, CURLOPT_USERAGENT, @$_SERVER['HTTP_USER_AGENT']); //在HTTP请求中包含一个"User-Agent: "头的字符串。    
                curl_setopt($ci, CURLOPT_CONNECTTIMEOUT, 30);
                curl_setopt($ci, CURLOPT_TIMEOUT, 30);
                curl_setopt($ci, CURLOPT_RETURNTRANSFER, TRUE);
                curl_setopt($ci, CURLOPT_ENCODING, "");
                if ($ssl) {
                    curl_setopt($ci, CURLOPT_SSL_VERIFYPEER, 0); // 对认证证书来源的检查 
                    curl_setopt($ci, CURLOPT_SSL_VERIFYHOST, 2); // 从证书中检查SSL加密算法是否存在
                }
                curl_setopt($ci, CURLOPT_HEADER, FALSE);
        
                switch ($method) {
                    case 'POST':
                        curl_setopt($ci, CURLOPT_POST, TRUE);
                        if (!empty($postfields)) {
                            curl_setopt($ci, CURLOPT_POSTFIELDS, $postfields);
                        }
                        break;
                }
        
                curl_setopt($ci, CURLOPT_URL, $url );
                curl_setopt($ci, CURLOPT_HTTPHEADER, $headers );
                curl_setopt($ci, CURLINFO_HEADER_OUT, TRUE );
        
                $response = curl_exec($ci);
                $httpCode = curl_getinfo($ci, CURLINFO_HTTP_CODE);
                $httpInfo = curl_getinfo($ci);
                
                if (FALSE === $response)
                    throw new Exception(curl_error($ci), curl_errno($ci));
            
            } catch(Exception $e) {
                throw $e;
            }
            
            //echo '<pre>';
            //var_dump($response);
            //var_dump($httpInfo);
    
            curl_close ($ci);
            return $response;
        }
    }
    
    $client = new HttpClient('http://www.52fhy.com/test.json');
    $client->setCallback(function($response){
        print_r($response);
    });
    echo "OK
    ";
    
    
    

    运行:

    $ php event_add_http_client.php
    OK
    {"url":"http://52fhy.com"}[
    

    由返回结果可以看出,客户端请求是异步执行的。

    swoole_http_client

    Swoole也内置了http异步客户端(swoole>=1.8.0)。

    相比curl和file_get_contents这样PHP提供的Http客户端,swoole_http_client最大的优势是支持大量并发。
    file_get_contents只能同时请求一个URL,并发只能通过开启多进程实现。curl提供了curl_multi功能实现并发基于select和多线程。并发能力都很差。而swoole_http_client是基于epoll实现的异步客户端,没有并发限制,可在一个进程内同时并发上万请求。更多介绍详见swoole文档

    示例:

    get:

    $cli = new swoole_http_client('www.52fhy.com', 80);
    $cli->setHeaders(['User-Agent' => "swoole"]);
    
    $cli->get('/test.json', function ($cli) {
        echo $cli->body;
    });
    
    echo "ok
    ";
    

    输出:

    ok
    {"url":"http://52fhy.com"}
    

    post:

    $cli = new swoole_http_client('127.0.0.1', 81); 
    $cli->post('/post_demo.php', array("a" => '1234', 'b' => '456'), function ($cli) {
        echo "Length: " . strlen($cli->body) . "
    ";
        echo $cli->body;
    });
    
    echo "ok
    ";
    

    websocket:

    <?php 
    $cli = new swoole_http_client('118.25.40.163', 8088);
    
    $cli->on('message', function ($_cli, $frame) {
        // var_dump($frame);
        echo $frame->data;
    });
    
    $cli->upgrade('/', function ($cli) {
        $cli->push("hello world");
    });
    
    echo "ok
    ";
    

    发送完客户端会立即close。

    参考

    1、swoole_event_add
    https://wiki.swoole.com/wiki/page/119.html
    2、Client
    https://wiki.swoole.com/wiki/page/p-client.html
    3、利用swoole_process和eventloop实现php异步编程
    https://segmentfault.com/a/1190000008034626
    4、关于swoole_process的一些使用疑惑
    https://group.swoole.com/question/106198
    5、swoole多进程操作
    https://blog.csdn.net/koastal/article/details/52871316
    6、swoole教程第一节:进程管理模块(Process)-中(消息队列)
    https://segmentfault.com/a/1190000003073389
    7、PHP编程中尝试程序并发的几种方式总结
    http://www.jb51.net/article/81245.htm
    8、1.8.0 使用内置Http异步客户端
    https://wiki.swoole.com/wiki/page/678.html
    9、异步Http/WebSocket客户端
    https://wiki.swoole.com/wiki/page/p-http_client.html

  • 相关阅读:
    VFIO PF SRIOV IOMMU UIO概念解释、关联
    集群节点间网络通信TIPC
    1. C语言中的数据结构.md
    第三讲. COTS包交换介绍
    SYSTick 定时器
    热电偶基础知识介绍-04
    附录1· 初识Linux操作系统
    热电偶冷端补偿
    珍惜是最宝贵的财富。
    CSS 设置标题文字只显示一行,多余显示省略号
  • 原文地址:https://www.cnblogs.com/52fhy/p/8904216.html
Copyright © 2011-2022 走看看