zoukankan      html  css  js  c++  java
  • gpfdist原理解析

    gpfdist原理解析

     

    前言gpfdist作为批量向postgresql写入数据的工具,了解其内部原理有助于正确使用以及提供更合适的数据同步方案。文章先简要介绍gpfdist的整体流程,然后针对重要步骤详细展开。文章有的地方可能探索不够深入,感兴趣的可以继续深入。如有错误请指出。

    1 整体流程

    Gpfdist的整体流程可简单分为4步。

    (1) 解析参数;

    (2) 从指定的端口列表中搜寻可用端口;

    (3) 监听第一个可用端口;

    (4) 注册该端口的可读事件,等待连接请求;

    (5) 响应各类事件。

     

    下面通过源码及注释详细介绍上述过程。

    int main(int argc, const char* const argv[])
    {
        if (gpfdist_init(argc, argv) == -1)
            gfatal(NULL, "Initialization failed");
        return gpfdist_run();
    }

    Main函数很简短,调用了gpfdist_initgpfdist_run,其中gpfdist_run比较简单,源码如下,仅仅调用了libevent的事件分发函数,以回调形式响应各类事件(主要是socket读写事件)。

    int gpfdist_run()
    {
        return event_dispatch();
    }

     gpfdist_init比较复杂,完成了libevent的初始化、事件绑定、http服务启动等功能,源码如下。其中aprApache可移植运行库在该项目中主要用于资源管理,不影响理解gpfdist原理,这里不再介绍,有兴趣的可参考https://apr.apache.org/

    int gpfdist_init(int argc, const char* const argv[])
    {
        /*初始化apr资源池*/
        if (0 != apr_app_initialize(&argc, &argv, 0))
            gfatal(NULL, "apr_app_initialize failed");
        atexit(apr_terminate);
    
        if (0 != apr_pool_create(&gcb.pool, 0))
            gfatal(NULL, "apr_app_initialize failed");
    
        //apr_signal_init(gcb.pool);
        gcb.session.tab = apr_hash_make(gcb.pool);
    
        //解析命令行参数
    parse_command_line(argc, argv, gcb.pool);
    ......
        event_init();
    signal_register();
    //启动http服务
    http_setup();
    .....

     gpfdist_init通过调用http_setup函数完成http服务的启动,http_setup源码如下,主要功能是测试哪些端口可以使用

    http_setup(void)
    {
        SOCKET f;
        int on = 1;
        struct linger linger;
        struct addrinfo hints;
        struct addrinfo *addrs, *rp;
        int  s;
        int  i;
    
        char service[32];
        const char *hostaddr = NULL;
        //绑定gpfdist的文件读写函数,用于从文件或其他方式读写数据
    gpfdist_send    = gpfdist_socket_send;
        gpfdist_receive = gpfdist_socket_receive;
       ......
    /* 下面的内容就是从指定端口列表中测试哪些端口可用*/ for (;;) { //利用第一个端口组成socket使用的网络地址 snprintf(service,32,"%d",opt.p); memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* tcp socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ s = getaddrinfo(hostaddr, service, &hints, &addrs); ....... /* 测试地址是否可用,这个for循环只会执行一次,因为rp->ai_next=0*/ for (rp = addrs; rp != NULL; rp = rp->ai_next) { gprint(NULL, "Trying to open listening socket: "); print_listening_address(rp); /* * getaddrinfo gives us all the parameters for the socket() call * as well as the parameters for the bind() call. */ f = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); //设置keep_alive linger等属性 ...... if (bind(f, rp->ai_addr, rp->ai_addrlen) != 0) { ...... } /* listen with a big queue */ if (listen(f, opt.z)) { ...... } gcb.listen_socks[gcb.listen_sock_count++] = f; gprint(NULL, "Opening listening socket succeeded "); } ...... } /* * 为上述可用端口绑定可读事件响应函数do_accept,用于接收客户端的连接。 */ for (i = 0; i < gcb.listen_sock_count; i++) { /* when this socket is ready, do accept */ event_set(&gcb.listen_events[i], gcb.listen_socks[i], EV_READ | EV_PERSIST, do_accept, 0); ...... if (event_add(&gcb.listen_events[i], 0)) gfatal(NULL, "cannot set up event on listen socket: %s", strerror(errno)); } }

    自此http服务已经建立起来,并准备好接收postgresql segment的连接。

     

    2 核心数据结构间的联系

      接下来说明一下gpfdist中的几个核心数据结构及其之间的关系,便于对下文代码逻辑关系的理解。

      session_t是一次会话,由成员key唯一标识,key = tid:pathtid = xid.cid.sn,其中xid是事务idcid是查询命令id,每次查询时属于同一个sqlsegment请求的xidcid相同,但由于各segment请求的path可能不同,因此同一个查询的不同segment请求可能属于不同session。另外注意tid长度不能超过1023字节。

      request_t代表一个segment的请求,因此session_t对应多个request_t

      fstream_t代表属于同一session_trequest_t想要请求的数据流,其成员glob_and_copy_t包含多个文件地址,fstream_t会顺序读取这些文件回应给segment

     1 核心数据结构

    3 接受连接

      http服务接收到客户端连接后由do_accept函数响应,该函数首先接收客户端连接,并给该连接设置非阻塞等属性,接着创建request_t对象并初始化其部分属性,最后调用setup_read函数为该接绑定读事件响应函数do_read_request,到此gpfdist已经与客户端建立了连接并开始等待客户端的http请求。

    static void do_accept(int fd, short event, void* arg)
    {
        address_t           a;
        socklen_t           len = sizeof(a);
        SOCKET              sock;
        request_t*          r;
        apr_pool_t*         pool;
        int                 on = 1;
        struct linger       linger;
    
        /* do the accept */
        if ((sock = accept(fd, (struct sockaddr*) &a, &len)) < 0)
        {
            gwarning(NULL, "accept failed");
            goto failure;
        }
    
        /* set to non-blocking, and close-on-exec */
        ......
        /* set keepalive, reuseaddr, and linger */
        ......
        /* create a pool container for this socket */
        ......
        /* 调用setup_read为上述socket设置读事件响应函数do_read_request */
        if (setup_read(r))
        {
            http_error(r, FDIST_INTERNAL_ERROR, "internal error");
            request_end(r, 1, 0);
        }
        return;
    }

     接收请求后的处理

      如图2gpfdist接收到http请求解析出相关参数,包含tid、cid、文件路径等信息,然后绑定到对应session上,根据请求类型分别调用不同函数完成对segment的响应。下面着重讲解路径提取、session绑定两个操作的细节。

     2 接收请求

     

    1)路径提取

      segment请求中路径参数格式如下所示:

    1.csv空格t*.csv

    (注意:该串不能含有相对路径”..”)

    gpfdist会遍历该字符串,以空格为分隔符提取所有文件路径,并在每个路径前拼接gpfdist启动时命令行输入的目录,最终得到如下路径:

    /home/test/data/1.csv 空格/home/test/data/t*.csv

    转换后的路径将用于后面的文件读取写入操作。

    2session与连接绑定

      接收到segmenthttp请求后需要将其与session绑定,流程如图3。首先根据请求的key查找对应的session是否存在,存在则请求与session绑定,否则就新建并初始化fstream_tsession对象。

     

     3 绑定session

     

      新建fstream_t时会重新组织文件路径并检查是否有操作权限。首先把上文转换后的路径以空格分开,然后将每一个路径中包含的通配符解析成具体的文件名,得到如下的路径列表(这里假设目录下存在t1.csv  t2.csv):

    /home/test/data/1.csv

    /home/test/data/t1.csv

    /home/test/data/t2.csv

    后尝试打开上述文件以测试是否有操作权限。

    4 GET请求

      如果segmentGET请求 对应的socket会被设置可写事件响应函数do_write,其流程如4

    4 发送数据

     

      在读取一个数据块时,gpfdist采用整行读取方式,即每次回应的业务数据一定是源文件的完整若干行,目前gpfdist对于csv文件仅支持     三种行分隔符,但可通过修改scan_csv_records_crlf函数支持其他类型的行分隔符,另外csv文件允许数据中含有行分隔符;对于text格式的文件,行分隔只支持

      gpfdist会将本次读取到的数据的元信息填充到回应头部,包含本次回应的业务数据的长度、行数、文件名、在文件中的偏移等信息。

    5 POST请求

      图5gpfdistpost请求(写请求)的处理流程,不再详细展开。

      5 数据写入文件

    6 外表文件个数与segment数量的关系

      在此只针对文件形式的读外表进行分析,读外表的创建语句如下:

    create external table test
    (
      id integer,
      name varchar
    )
    location (‘gpfdist://$IP:$PORT/$file_name[,..])
    format ‘csv’(delimiter’,’)
    ;

      从以上语句可以看出,外表可以配置多个文件,但应注意配置的文件数量与segment存在以下关系:

    (1) 只有一个文件(通配符计为一个文件)

      每个segment都会请求该文件的数据,当数据量小时,有的segment可能获取不到数据,这不会对表的读取造成任何影响。

    (2) 配置两个以上文件

    • 文件数量 < segment数量

        postgresql会给每个segment分配一个文件进行读取。

    • 文件数量 > segment

        gpfdist报错,读表失败。

    参考:

    https://docs.greenplum.org/6-12/common/gpdb-features.html

    https://greenplum.org/readable-external-protocol-gpfdist/

    https://greenplum.org/introduction-writable-gpfdist/

  • 相关阅读:
    Servlet常用类
    Java库使用----xstream1.3.1
    字符串处理---统计每一行字符串当中的字符“u”个数
    读写锁
    求阶乘
    Fibonacci数列
    22.2-按照升序显示不重复的单词
    22.1-在散列集上进行集合操作
    完美世界-2015校园招聘-java服务器工程师-成都站
    运用jQuery写的验证表单
  • 原文地址:https://www.cnblogs.com/candl/p/14513157.html
Copyright © 2011-2022 走看看