zoukankan      html  css  js  c++  java
  • YAR 并行RPC框架研究

    前几天,部门召开了PHP技术峰会 学习会议,大家分别对这次会议的PPT 做了简单的介绍,

    其中提到了 鸟哥【惠新辰】的一篇PPT《微博LAMP 演变》,如果谁有需要可以去谷歌搜,或者去

    http://www.laruence.com/2013/08/15/2913.html  他的博客去看一下,我就不提供下载链接了。

    这篇PPT中提到了几个点: Yaf,Yac,Yar;我们会后也分任务对这些去做一些了解。

    我选了Yar,去年5月份,因为一淘首页要做一次Bigpipe的改版,我用C写过一个并行化的PHP扩展,

    对这些比较熟悉,就拿来对比一下。

    好吧,步入正题:

    Yar:yet another rpc,这是它的全称。

    关于一些基本介绍 http://www.laruence.com/2012/09/15/2779.html 可以去 鸟哥的博客去了解下。

    我也简单介绍下用法,下面的代码来自鸟哥的博客。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    server.php
    <?php
    class API {
        /**
         * the doc info will be generated automatically into service info page.
         * @params
         * @return
         */
        public function api($parameter, $option = "foo") {
        }
     
        protected function client_can_not_see() {
        }
    }
     
    $service = new Yar_Server(new API());
    $service->handle();
    ?>

    实例化 Yar_server的时候,需要传递一个对象,然后会将我们提交的参数作为 这个对象的method来使用,其实这个做法就相当于是一个

    简单的路由功能,它巧妙将 doc作为api的接口说明来使用,让我们get直接访问server.php的时候,看到的会是下面的页面:

    整体来讲,yar分为两部分:server和client.

    Server端:

    上面有讲到server可以理解为一个简单的路由,它只是将我们的对象作为一种服务接口来使用,

    它的主要流程如下:

    实例化Yar_Server时,传递一个自定义API对象,将这个对象保留在 属性 _executor中,调用handle方法,

    如果不是post请求,则只输出接口说明;

    如果是post请求,接收api函数,读取post数据,检测 API对象中是否存在此方法,

    如果存在就call_user_function_ex来执行此方法,如果不存在bailout。

    看下代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    /* {{{ proto Yar_Server::__construct($obj, $protocol = NULL)
       initizing an Yar_Server object */
    PHP_METHOD(yar_server, __construct) {
        zval *obj;
     
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "o", &obj) == FAILURE) {
            return;
        }
    //保存对象
        zend_update_property(yar_server_ce, getThis(), "_executor", sizeof("_executor")-1, obj TSRMLS_CC);
    }
    /* }}} */
     
    /* {{{ proto Yar_Server::handle()
       start service */
    PHP_METHOD(yar_server, handle)
    {
        if (SG(headers_sent)) {
            php_error_docref(NULL TSRMLS_CC, E_WARNING, "headers already has been sent");
            RETURN_FALSE;
        } else {
            const char *method;
            zval *executor = NULL;
     
            //获取对象
            executor = zend_read_property(yar_server_ce, getThis(), ZEND_STRL("_executor"), 0 TSRMLS_CC);
            if (!executor || IS_OBJECT != Z_TYPE_P(executor)) {
                php_error_docref(NULL TSRMLS_CC, E_WARNING, "executor is not a valid object");
                RETURN_FALSE;
            }  
     
            method = SG(request_info).request_method;
            //是否是POST请求
            if (!method || strncasecmp(method, "POST", 4)) {
                if (YAR_G(expose_info)) {
                    php_yar_server_info(executor TSRMLS_CC);
                    RETURN_TRUE;
                } else {
                    zend_throw_exception(yar_server_exception_ce, "server info is not allowed to access", YAR_ERR_FORBIDDEN TSRMLS_CC);
                    return;
                }
            }
            //执行call_user_function_ex的操作
            php_yar_server_handle(executor TSRMLS_CC);
        }  
     
        RETURN_TRUE;
    }
    static void php_yar_server_handle(zval *obj TSRMLS_DC) /* {{{ */ {
        char *payload, *err_msg, method[256];
        size_t payload_len;
        zend_bool bailout = 0;
        zval *post_data = NULL, output, func;
        zend_class_entry *ce;
        yar_response_t *response;
        yar_request_t  *request = NULL;
        yar_header_t *header;
     
        response = php_yar_response_instance(TSRMLS_C);
        if (!SG(request_info).raw_post_data) {
            goto response_no_output;
        }
     //读取post数据
        payload = SG(request_info).raw_post_data;
        payload_len = SG(request_info).raw_post_data_length;
        if (!(header = php_yar_protocol_parse(payload TSRMLS_CC))) {
            php_yar_error(response, YAR_ERR_PACKAGER TSRMLS_CC, "malformed request header '%.10s'", payload TSRMLS_CC);
            DEBUG_S("0: malformed request '%s'", payload);
            goto response_no_output;
        }
     
        DEBUG_S("%ld: accpect rpc request form '%s'",
                header->id, header->provider? (char *)header->provider : "Yar PHP " YAR_VERSION);
     
        payload += sizeof(yar_header_t);
        payload_len -= sizeof(yar_header_t);
     
        if (!(post_data = php_yar_packager_unpack(payload, payload_len, &err_msg TSRMLS_CC))) {
            php_yar_error(response, YAR_ERR_PACKAGER TSRMLS_CC, err_msg);
            efree(err_msg);
            goto response_no_output;
        }
     
        request = php_yar_request_unpack(post_data TSRMLS_CC);
        zval_ptr_dtor(&post_data);
        ce = Z_OBJCE_P(obj);
        if (!php_yar_request_valid(request, response, &err_msg TSRMLS_CC)) {
            php_yar_error(response, YAR_ERR_REQUEST TSRMLS_CC, "%s", err_msg);
            efree(err_msg);
            goto response_no_output;
        }
     
    #if ((PHP_MAJOR_VERSION == 5) && (PHP_MINOR_VERSION < 4))
        if (php_start_ob_buffer(NULL, 0, 0 TSRMLS_CC) != SUCCESS) {
    #else
        if (php_output_start_user(NULL, 0, PHP_OUTPUT_HANDLER_STDFLAGS TSRMLS_CC) == FAILURE) {
    #endif
            php_yar_error(response, YAR_ERR_OUTPUT TSRMLS_CC, "start output buffer failed");
            goto response_no_output;
        }
     
        ce = Z_OBJCE_P(obj);
        zend_str_tolower_copy(method, request->method, request->mlen);
        if (!zend_hash_exists(&ce->function_table, method, strlen(method) + 1)) {
            php_yar_error(response, YAR_ERR_REQUEST TSRMLS_CC, "call to undefined api %s::%s()", ce->name, request->method);
            goto response;
        }
         ...................
        ZVAL_STRINGL(&func, request->method, request->mlen, 0);
    //执行API方法。
            if (call_user_function_ex(NULL, &obj, &func, &retval_ptr, count, func_params, 0, NULL TSRMLS_CC) != SUCCESS) {
                if (func_params) {
                    efree(func_params);
                }
                php_yar_error(response, YAR_ERR_REQUEST TSRMLS_CC, "call to api %s::%s() failed", ce->name, request->method);
                goto response;
            }
    }

    关键部分已经加了注释,代码写的也通俗易懂。

    那现在看Client了:

    我之前做的其实就相当于这个client,没有server端,只是用libcurl+epoll的事件模式去curl我们的接口,有数据返回时,执行

    callback函数,看了yar代码之后,发现 ,思路其实很相似。

    使用方法 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <?php
    function callback($retval, $callinfo) {
         var_dump($retval);
    }
     
    Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
    Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
    Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
    Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
    Yar_Concurrent_Client::loop(); //send
    ?>

    Yar_Concurrent_Client::call方法接收四个参数,

    第一个是访问的接口地址,

    第二个是需要执行的方法,

    第三个是传递的参数,

    第四个就是回调函数。

    还有第五个参数,一个数组,它用来设置options,默认为空。

    这些所有信息保存在 _callstack属性中。

    由Yar_Concurrent_Client::loop()统一发送 接收epoll事件,当发现有一个读事件时,读到所有数据然后调用callback。

    在扩展实现中有一个数据结构:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    typedef struct _yar_call_data {
        ulong sequence;
        char *uri; //调用的url接口
        uint ulen;
        char *method;//需要执行的方法
        uint mlen;
        zval *callback;//callback函数
        zval *ecallback;
        zval *parameters;//传递的参数
        zval *options;
    } yar_call_data_t;

    他保存所有call设置的参数,在执行loop时,会从这里拿这些信息,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    /* {{{ proto Yar_Concurrent_Client::loop($callback = NULL, $error_callback) */
    PHP_METHOD(yar_concurrent_client, loop) {
        char *name = NULL;
        zval *callstack;
        zval *callback = NULL, *error_callback = NULL;
        zval *status;
        uint ret = 0;
     
        if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|zz", &callback, &error_callback) == FAILURE) {
            return;
        }
     
        status = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_start"), 0 TSRMLS_CC);
        if (Z_BVAL_P(status)) {
            php_error_docref(NULL TSRMLS_CC, E_WARNING, "concurrent client has already started");
            RETURN_FALSE;
        }
     
        if (callback && !ZVAL_IS_NULL(callback) &&
                !zend_is_callable(callback, 0, &name
    #if ((PHP_MAJOR_VERSION == 5) && (PHP_MINOR_VERSION > 2))
                    TSRMLS_CC
    #endif
                    )) {
            php_error_docref1(NULL TSRMLS_CC, name, E_ERROR, "first argument is expected to be a valid callback");
            efree(name);
            RETURN_FALSE;
        }
        if (name) {
            efree(name);
        }
     
        callstack = zend_read_static_property(yar_concurrent_client_ce, ZEND_STRL("_callstack"), 0 TSRMLS_CC);
        if (ZVAL_IS_NULL(callstack) || zend_hash_num_elements(Z_ARRVAL_P(callstack)) == 0) {
            RETURN_TRUE;
        }
     
        if (callback && !ZVAL_IS_NULL(callback)) {
            zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_callback"), callback TSRMLS_CC);
        }
     
        if (error_callback && !ZVAL_IS_NULL(error_callback)) {
            zend_update_static_property(yar_concurrent_client_ce, ZEND_STRL("_error_callback"), error_callback TSRMLS_CC);
        }
     
        ZVAL_BOOL(status, 1);
    //这里就是很关键的地方了。
        ret = php_yar_concurrent_client_handle(callstack TSRMLS_CC);
        ZVAL_BOOL(status, 0);
        RETURN_BOOL(ret);
    }

    client关键的地方在php_yar_concurrent_client_handle函数中:

    在这之前,有几个比较重要的数据结构:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    typedef struct _yar_curl_multi_data_t {
        CURLM *cm;
        yar_transport_interface_t *chs; //指向 _yar_transport_interface
    } yar_curl_multi_data_t;
     
    typedef struct _yar_transport_interface {
        void *data;
    //创建一个新的curl句柄,curl_easy_init
        int  (*open)(struct _yar_transport_interface *self, char *address, uint len, long options, char **msg TSRMLS_DC);
        int  (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg TSRMLS_DC);
    //发送curl请求读取数据
        struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request TSRMLS_DC);
        int  (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition TSRMLS_DC);
        int  (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata TSRMLS_DC);
        void (*close)(struct _yar_transport_interface *self TSRMLS_DC);
    } yar_transport_interface_t;
     
    typedef struct _yar_transport_multi_interface {
        void *data;
    //添加multi的cm句柄。
        int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp TSRMLS_DC);
    //轮询epoll事件,分别执行 _yar_transport_interface 的exec
        int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback TSRMLS_DC);
        void (*close)(struct _yar_transport_multi_interface *self TSRMLS_DC);
    } yar_transport_multi_interface_t;
     
    typedef struct _yar_transport_multi {
        struct _yar_transport_multi_interface * (*init)(TSRMLS_D);
    } yar_transport_multi_t;
     
    typedef struct _yar_transport {
        const char *name;  //请求方式,curl or socket
        struct _yar_transport_interface * (*init)(TSRMLS_D); //初始化参数
        void (*destroy)(yar_transport_interface_t *self TSRMLS_DC);
        yar_transport_multi_t *multi;
    } yar_transport_t;

    上面的函数指针代码量比较大,我就不贴代码了。

    下面看 php_yar_concurrent_client_handle喊。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    int php_yar_concurrent_client_handle(zval *callstack TSRMLS_DC) /* {{{ */ {
        char *dummy, *msg;
        ulong sequence;
        zval **calldata;
        yar_request_t *request;
        yar_transport_t *factory;
        yar_transport_interface_t *transport;
        yar_transport_multi_interface_t *multi;
        //以curl方式执行。
        factory = php_yar_transport_get(ZEND_STRL("curl") TSRMLS_CC);
    //执行 php_yar_curl_multi_init进行curl_multi_init的初始化
        multi = factory->multi->init(TSRMLS_C);
    //读取callstack数据
        for(zend_hash_internal_pointer_reset(Z_ARRVAL_P(callstack));
                zend_hash_has_more_elements(Z_ARRVAL_P(callstack)) == SUCCESS;
                zend_hash_move_forward(Z_ARRVAL_P(callstack))) {
            yar_call_data_t *entry;
            long flags = 0;
     
            if (zend_hash_get_current_data(Z_ARRVAL_P(callstack), (void**)&calldata) == FAILURE) {
                continue;
            }  
     
            ZEND_FETCH_RESOURCE_NO_RETURN(entry, yar_call_data_t *, calldata, -1, "Yar Call Data", le_calldata);
     
            if (!entry) {
                continue;
            }  
     
            zend_hash_get_current_key(Z_ARRVAL_P(callstack), &dummy, &sequence, 0);
     
            if (!entry->parameters) {
                zval *tmp;
                MAKE_STD_ZVAL(tmp);
                array_init(tmp);
                entry->parameters = tmp;
            }
           //执行 php_yar_curl_init 初始化 yar_transport_interface_t结构
            transport = factory->init(TSRMLS_C);
     
            if (YAR_G(allow_persistent)) {
                if (entry->options) {
                    zval *flag = php_yar_client_get_opt(entry->options, YAR_OPT_PERSISTENT TSRMLS_CC);
                    if (flag && (Z_TYPE_P(flag) == IS_BOOL || Z_TYPE_P(flag) == IS_LONG) && Z_LVAL_P(flag)) {
                        flags |= YAR_PROTOCOL_PERSISTENT;
                    }
            if (!(request = php_yar_request_instance(entry->method, entry->mlen, entry->parameters, entry->options TSRMLS_CC))) {
                transport->close(transport TSRMLS_CC);
                factory->destroy(transport TSRMLS_CC);
                return 0;
            }
     //执行 php_yar_curl_open ,创建easy curl句柄,执行curl_easy_init
            if (!transport->open(transport, entry->uri, entry->ulen, flags, &msg TSRMLS_CC)) {
                php_yar_client_trigger_error(1 TSRMLS_CC, YAR_ERR_TRANSPORT, msg TSRMLS_CC);
                transport->close(transport TSRMLS_CC);
                factory->destroy(transport TSRMLS_CC);
                efree(msg);
                return 0;
            }
     
            DEBUG_C("%ld: call api '%s' at (%c)'%s' with '%d' parameters",
                    request->id, request->method, (flags & YAR_PROTOCOL_PERSISTENT)? 'p' : 'r', entry->uri,
                    zend_hash_num_elements(Z_ARRVAL_P(request->parameters)));
     
            if (!transport->send(transport, request, &msg TSRMLS_CC)) {
                php_yar_client_trigger_error(1 TSRMLS_CC, YAR_ERR_TRANSPORT, msg TSRMLS_CC);
                transport->close(transport TSRMLS_CC);
                factory->destroy(transport TSRMLS_CC);
                efree(msg);
                return 0;
            }
     
            transport->calldata(transport, entry TSRMLS_CC);
            multi->add(multi, transport TSRMLS_CC);
            php_yar_request_destroy(request TSRMLS_CC);
        }
    //执行epoll事件,读取数据,并通过php_yar_concurrent_client_callback执行callback函数。
        if (!multi->exec(multi, php_yar_concurrent_client_callback TSRMLS_CC)) {
            multi->close(multi TSRMLS_CC);
            return 0;
        }
        multi->close(multi TSRMLS_CC);
        return 1;
     
    }

    在执行multi->exec 时,如果所有数据已经读取成功,就会调用 php_yar_concurrent_client_callback函数,它也是执行 call_user_function_ex的地方,这样我们设置的callback函数也就执行成功了。

    Client 看代码的过程比较绕,原因就是上面的几个结构,
    yar_transport_t;
    yar_transport_multi_t;
    yar_transport_multi_interface_t;
    yar_transport_interface_t;

    关键的是yar_transport_multi_t和yar_transport_interface_t,

    我们要并行几个接口,创建几个 yar_transport_interface_t结构,它保存了curl_easy句柄,

    yar_transport_interface_t 创建完成后,调用 yar_transport_multi_t->php_yar_curl_multi_add_handle函数,将

    yar_transport_interface_t 指针添加到yar_curl_multi_data_t->chs中,所有的curl句柄添加完后执行

    yar_transport_multi_t->php_yar_curl_multi_exec,执行epoll事件模型,所有数据读取结束后 调用

    yar_concurrent_client_callback执行我们传递的callback函数,就是这样。

    http://www.imsiren.com/archives/867

  • 相关阅读:
    C,C++,VC++有什么区别
    RF & Microarray
    偏最小二乘法
    各种机器学习方法的优缺点
    纠错输出编码法ECOC
    遗传算法GA
    支持向量机SVM
    神经网络NN
    机器学习的基本概念
    SPI通信协议(SPI总线)学习
  • 原文地址:https://www.cnblogs.com/chunguang/p/5630415.html
Copyright © 2011-2022 走看看