zoukankan      html  css  js  c++  java
  • OpenMPI源码剖析:网络通信原理(二) 如何选择网络协议?

    因为比较常用的是 TCP 协议,所以在 opal/mca/btl/tcp/btl_tcp.h 头文件中找到对应的

    struct mca_btl_tcp_component_t {
        mca_btl_base_component_3_0_0_t super;   /**< base BTL component */
        uint32_t tcp_addr_count;                /**< total number of addresses */
        uint32_t tcp_num_btls;                  /**< number of interfaces available to the TCP component */
        unsigned int tcp_num_links;             /**< number of logical links per physical device */
        struct mca_btl_tcp_module_t **tcp_btls; /**< array of available BTL modules */
        int tcp_free_list_num;                  /**< initial size of free lists */
        int tcp_free_list_max;                  /**< maximum size of free lists */
        int tcp_free_list_inc;                  /**< number of elements to alloc when growing free lists */
        int tcp_endpoint_cache;                 /**< amount of cache on each endpoint */
        opal_proc_table_t tcp_procs;            /**< hash table of tcp proc structures */
        opal_mutex_t tcp_lock;                  /**< lock for accessing module state */
        opal_list_t tcp_events;
    
        opal_event_t tcp_recv_event;            /**< recv event for IPv4 listen socket */
        int tcp_listen_sd;                      /**< IPv4 listen socket for incoming connection requests */
        unsigned short tcp_listen_port;         /**< IPv4 listen port */
        int tcp_port_min;                       /**< IPv4 minimum port */
        int tcp_port_range;                     /**< IPv4 port range */
    #if OPAL_ENABLE_IPV6
        opal_event_t tcp6_recv_event;           /**< recv event for IPv6 listen socket */
        int tcp6_listen_sd;                     /**< IPv6 listen socket for incoming connection requests */
        unsigned short tcp6_listen_port;        /**< IPv6 listen port */
        int tcp6_port_min;                      /**< IPv4 minimum port */
        int tcp6_port_range;                    /**< IPv4 port range */
    #endif
        /* Port range restriction */
    
        char*  tcp_if_include;                  /**< comma seperated list of interface to include */
        char*  tcp_if_exclude;                  /**< comma seperated list of interface to exclude */
        int    tcp_sndbuf;                      /**< socket sndbuf size */
        int    tcp_rcvbuf;                      /**< socket rcvbuf size */
        int    tcp_disable_family;              /**< disabled AF_family */
    
        /* free list of fragment descriptors */
        opal_free_list_t tcp_frag_eager;
        opal_free_list_t tcp_frag_max;
        opal_free_list_t tcp_frag_user;
    
        int tcp_enable_progress_thread;         /** Support for tcp progress thread flag */
    
        opal_event_t tcp_recv_thread_async_event;
        opal_mutex_t tcp_frag_eager_mutex;
        opal_mutex_t tcp_frag_max_mutex;
        opal_mutex_t tcp_frag_user_mutex;
        /* Do we want to use TCP_NODELAY? */
        int    tcp_not_use_nodelay;
    
        /* do we want to warn on all excluded interfaces
         * that are not found?
         */
        bool report_all_unfound_interfaces;
    };
    typedef struct mca_btl_tcp_component_t mca_btl_tcp_component_t;
    
    OPAL_MODULE_DECLSPEC extern mca_btl_tcp_component_t mca_btl_tcp_component;
    

     这里有定义了一个TCP的组件,以及 TCP模块 (这里我就没有贴出来了), 并且导出了这样一个变量。

    接到上次说的话题,那么刚开始的时候,选择哪个通信协议呢?

    通过在 PowerShell 下面搜索 findstr /SN "pml_recv" *.c, 并且 反复分析:

    终于找到了对 mca_pml 进行赋值的函数体, 在  openmpi-3.0.1ompimcavprotocolasevprotocol_base_parasite.c 文件中:

    int mca_vprotocol_base_parasite(void) {
        if(mca_vprotocol.add_procs)
            mca_pml.pml_add_procs = mca_vprotocol.add_procs;
        if(mca_vprotocol.del_procs)
            mca_pml.pml_del_procs = mca_vprotocol.del_procs;
        if(mca_vprotocol.progress)
            mca_pml.pml_progress = mca_vprotocol.progress;
        if(mca_vprotocol.add_comm)
            mca_pml.pml_add_comm = mca_vprotocol.add_comm;
        if(mca_vprotocol.del_comm)
            mca_pml.pml_del_comm = mca_vprotocol.del_comm;
        if(mca_vprotocol.irecv_init)
            mca_pml.pml_irecv_init = mca_vprotocol.irecv_init;
        if(mca_vprotocol.irecv)
            mca_pml.pml_irecv = mca_vprotocol.irecv;
        if(mca_vprotocol.recv)
            mca_pml.pml_recv = mca_vprotocol.recv;
        if(mca_vprotocol.isend_init)
            mca_pml.pml_isend_init = mca_vprotocol.isend_init;
        if(mca_vprotocol.isend)
            mca_pml.pml_isend = mca_vprotocol.isend;
        if(mca_vprotocol.send)
            mca_pml.pml_send = mca_vprotocol.send;
        if(mca_vprotocol.iprobe)
            mca_pml.pml_iprobe = mca_vprotocol.iprobe;
        if(mca_vprotocol.probe)
            mca_pml.pml_probe = mca_vprotocol.probe;
        if(mca_vprotocol.start)
            mca_pml.pml_start = mca_vprotocol.start;
        if(mca_vprotocol.dump)
            mca_pml.pml_dump = mca_vprotocol.dump;
        if(mca_vprotocol.wait)
            ompi_request_functions.req_wait = mca_vprotocol.wait;
        if(mca_vprotocol.wait_all)
            ompi_request_functions.req_wait_all = mca_vprotocol.wait_all;
        if(mca_vprotocol.wait_any)
            ompi_request_functions.req_wait_any = mca_vprotocol.wait_any;
        if(mca_vprotocol.wait_some)
            ompi_request_functions.req_wait_some = mca_vprotocol.wait_some;
        if(mca_vprotocol.test)
            ompi_request_functions.req_test = mca_vprotocol.test;
        if(mca_vprotocol.test_all)
            ompi_request_functions.req_test_all = mca_vprotocol.test_all;
        if(mca_vprotocol.test_any)
            ompi_request_functions.req_test_any = mca_vprotocol.test_any;
        if(mca_vprotocol.test_some)
            ompi_request_functions.req_test_some = mca_vprotocol.test_some;
        return mca_vprotocol_base_request_parasite();
    }
    

      这里对 mca_mpi 变量的每一个函数指针进行了赋值,让它选择对应的函数,也就是,我们选择什么协议,则依赖于  mca_vprotocol 这个变量了.

    该变量在 vprotocol/base/base.h 头文件中声明的:

    OMPI_DECLSPEC extern mca_vprotocol_base_module_t mca_vprotocol;

    我们在 vprotocol.h 头文件中看到  mca_vprotocol_base_module_t 这个结构体的声明:

    typedef struct mca_vprotocol_base_module_2_0_0_t
    {
        /* PML module stuff */
        mca_pml_base_module_add_procs_fn_t      add_procs;
        mca_pml_base_module_del_procs_fn_t      del_procs;
        mca_pml_base_module_enable_fn_t         enable;
        mca_pml_base_module_progress_fn_t       progress;
        mca_pml_base_module_add_comm_fn_t       add_comm;
        mca_pml_base_module_del_comm_fn_t       del_comm;
        mca_pml_base_module_irecv_init_fn_t     irecv_init;
        mca_pml_base_module_irecv_fn_t          irecv;
        mca_pml_base_module_recv_fn_t           recv;
        mca_pml_base_module_isend_init_fn_t     isend_init;
        mca_pml_base_module_isend_fn_t          isend;
        mca_pml_base_module_send_fn_t           send;
        mca_pml_base_module_iprobe_fn_t         iprobe;
        mca_pml_base_module_probe_fn_t          probe;
        mca_pml_base_module_start_fn_t          start;
        mca_pml_base_module_dump_fn_t           dump;
        /* Request wait/test stuff */
        ompi_request_test_fn_t                  test;
        ompi_request_test_any_fn_t              test_any;
        ompi_request_test_all_fn_t              test_all;
        ompi_request_test_some_fn_t             test_some;
        ompi_request_wait_fn_t                  wait;
        ompi_request_wait_any_fn_t              wait_any;
        ompi_request_wait_all_fn_t              wait_all;
        ompi_request_wait_some_fn_t             wait_some;
    
        /* Custom requests classes to add extra data at end of pml requests */
        opal_class_t *                            req_recv_class;
        opal_class_t *                            req_send_class;
    } mca_vprotocol_base_module_2_0_0_t;
    typedef mca_vprotocol_base_module_2_0_0_t mca_vprotocol_base_module_t;
    

    根据局部性原理,观察到附近有一个函数  mca_vprotocol_base_select  :  

    猜想它很可能是 选择可用协议  的函数, 于是在 linux 的 cscope 下直接跟进去,代码很长,但是很重要:

    /*
     * Function for selecting one component from all those that are
     * available.
     *
     * Call the init function on all available components and get their
     * priorities.  Select the component with the highest priority.  All
     * other components will be closed and unloaded.  The selected component
     * will have all of its function pointers saved and returned to the
     * caller.
     */
    int mca_vprotocol_base_select(bool enable_progress_threads,
                                  bool enable_mpi_threads)
    {
        int priority = 0, best_priority = -1;
        opal_list_item_t *item = NULL;
        mca_base_component_list_item_t *cli = NULL;
        mca_vprotocol_base_component_t *component = NULL, *best_component = NULL;
        mca_vprotocol_base_module_t *module = NULL, *best_module = NULL;
        opal_list_t opened;
        opened_component_t *om = NULL;
    
        /* Traverse the list of available components; call their init
            functions. */
        OBJ_CONSTRUCT(&opened, opal_list_t);
        OPAL_LIST_FOREACH(cli, &ompi_vprotocol_base_framework.framework_components, mca_base_component_list_item_t)
        {
            component = (mca_vprotocol_base_component_t *) cli->cli_component;
    
            if (NULL == mca_vprotocol_base_include_list) {
                continue;
            }
    
            V_OUTPUT_VERBOSE(500, "vprotocol select: initializing %s component %s", component->pmlm_version.mca_type_name, component->pmlm_version.mca_component_name);
            if(strcmp(component->pmlm_version.mca_component_name,
                      mca_vprotocol_base_include_list)) {
                V_OUTPUT_VERBOSE(500, "This component is not in the include list: skipping %s", component->pmlm_version.mca_component_name);
                continue;
            }
            if(NULL == component->pmlm_init) {
                V_OUTPUT_VERBOSE(2, "vprotocol select: no init function; ignoring component %s", component->pmlm_version.mca_component_name);
                continue;
            }
            module = component->pmlm_init(&priority, enable_progress_threads, enable_mpi_threads);
            if (NULL == module) {
                V_OUTPUT_VERBOSE(2, "vprotocol select: init returned failure for component %s", component->pmlm_version.mca_component_name);
                continue;
            }
            V_OUTPUT_VERBOSE(500, "vprotocol select: component %s init returned priority %d", component->pmlm_version.mca_component_name, priority);
            if (priority > best_priority)
            {
                best_priority = priority;
                best_component = component;
                best_module = module;
            }
    
            om = (opened_component_t *) malloc(sizeof(opened_component_t));
            if (NULL == om) return OMPI_ERR_OUT_OF_RESOURCE;
            OBJ_CONSTRUCT(om, opal_list_item_t);
            om->om_component = component;
            opal_list_append(&opened, (opal_list_item_t*) om);
        }
    
        /* Finished querying all components.  Check for the bozo case. */
        if (NULL == best_component) {
            V_OUTPUT_VERBOSE(2, "vprotocol select: no protocol has returned a positive priority, fault tolerance is OFF");
        }
        else
        {
            /* Save the winner */
            mca_vprotocol_component = *best_component;
            mca_vprotocol = *best_module;
        }
    
        /* Finalize all non-selected components */
        for (item = opal_list_remove_first(&opened);
             NULL != item;
             item = opal_list_remove_first(&opened))
        {
            om = (opened_component_t *) item;
            if (om->om_component != best_component) {
                /* Finalize */
                V_OUTPUT_VERBOSE(500, "vprotocol select: component %s not selected / finalized", om->om_component->pmlm_version.mca_component_name);
                if (NULL != om->om_component->pmlm_finalize) {
                    /* Blatently ignore the return code (what would we do to
                    recover, anyway?  This component is going away, so errors
                    don't matter anymore) */
                    om->om_component->pmlm_finalize();
                }
            }
            OBJ_DESTRUCT(om);
            free(om);
        }
    
        mca_base_components_close(mca_pml_v.output,
                                  &ompi_vprotocol_base_framework.framework_components,
                                  (mca_base_component_t *) best_component);
    
        /* All done */
        if(best_component != NULL)
        {
            V_OUTPUT_VERBOSE(500, "vprotocol select: component %s selected", mca_vprotocol_component.pmlm_version.mca_component_name);
            return OMPI_SUCCESS;
        }
        else
            return OMPI_ERR_NOT_FOUND;
    }  

    大概思路就是有一个所有可用的通信协议组件的线性表,遍历地去初始化它们,然后得到对应的优先级,选择最高优先级的:

            if (priority > best_priority)
            {
                best_priority = priority;
                best_component = component;
                best_module = module;
            }
    

    那么随之而来就有一个问题了,这个优先级是怎么确定的呢? 难道是通过不同网络协议的通信质量来决定不同的优先级吗? 

    仔细分析,得到优先级的一行源码在这里:

    module = component->pmlm_init(&priority, enable_progress_threads, enable_mpi_threads);
    

     component这个变量 其实就是 mca_vprotocol_base_component_2_0_0_t 对应的该结构体,在vprotocol.h 中有定义:

    typedef struct mca_vprotocol_base_component_2_0_0_t {
        mca_base_component_t pmlm_version;
        mca_base_component_data_t pmlm_data;
        mca_vprotocol_base_component_init_fn_t pmlm_init;
        mca_vprotocol_base_component_finalize_fn_t pmlm_finalize;
    } mca_vprotocol_base_component_2_0_0_t;
    typedef mca_vprotocol_base_component_2_0_0_t mca_vprotocol_base_component_t;
    

      那么,我们就该去看看 pmlm_init 这个函数是怎么得到优先级的。。。————下一篇。。

  • 相关阅读:
    deepsort+yolov3实现多类别多目标跟踪
    WAR2020暑期补题集
    【数据结构】浅谈主席树
    Github本地上传命令
    【蓝桥杯】2017年第八届蓝桥杯C/C++B组省赛——C题 承压计算
    【蓝桥杯】2017年第八届蓝桥杯C/C++B组省赛——B题 等差素数列
    【蓝桥杯】2019年第十届蓝桥杯C/C++ B组省赛——I题 后缀表达式
    防御Mimikatz-转载
    SQL注入之判断数据库
    XPATH注入
  • 原文地址:https://www.cnblogs.com/HelloGreen/p/8776981.html
Copyright © 2011-2022 走看看