zoukankan      html  css  js  c++  java
  • pgpool-II在故障切换过程中是如何选举新主节点的

    在pgpool的源代码中有有一个pgpool_main.c文件,在该文件中有一个pgpool的主函数pgpoolmain控制着pgpool的运行及相关操作。

    libpcp_ext.h文件中定义了pgpool在一个集群中所运行的数据库节点个数如下宏定义及128个;

    #define MAX_NUM_BACKENDS 128
    #define MAX_CONNECTION_SLOTS MAX_NUM_BACKENDS
    #define MAX_DB_HOST_NAMELEN     128
    #define MAX_PATH_LENGTH 256

    在pgpool中对后端数据库状态的几种定义如下:

    typedef enum {
        CON_UNUSED,            /* unused slot */
        CON_CONNECT_WAIT,    /* waiting for connection starting */
        CON_UP,                /* up and running */
        CON_DOWN            /* down, disconnected */
    } BACKEND_STATUS;

    /* backend status name strings */
    #define BACKEND_STATUS_CON_UNUSED                 "unused"
    #define BACKEND_STATUS_CON_CONNECT_WAIT    "waiting"
    #define BACKEND_STATUS_CON_UP                             "up"
    #define BACKEND_STATUS_CON_DOWN                        "down"

    PostgreSQL数据库在pgpool集群中的描述信息如下:

    /*
     * PostgreSQL backend descriptor. Placed on shared memory area.
     */
    typedef struct {
        char backend_hostname[MAX_DB_HOST_NAMELEN];    /* backend host name */
        int backend_port;    /* backend port numbers */
        BACKEND_STATUS backend_status;    /* backend status */
        double backend_weight;    /* normalized backend load balance ratio */
        double unnormalized_weight; /* descripted parameter */
        char backend_data_directory[MAX_PATH_LENGTH];
        unsigned short flag;        /* various flags */
        unsigned long long int standby_delay;        /* The replication delay against the primary */
    } BackendInfo;

    typedef struct {
        sig_atomic_t num_backends;        /* Number of used PostgreSQL backends.
                                         * This needs to be a sig_atomic_t type
                                         * since it is replaced by a local
                                         * variable while reloading pgpool.conf.
                                         */

        BackendInfo backend_info[MAX_NUM_BACKENDS];#记录了pgpool中数据库的最大数;
    } BackendDesc;

    /*
     * Calculate next valid master node id.
     * If no valid node found, returns -1.
     */

    这个函数就是控制选举下一个master节点的函数;

    static int get_next_master_node(void)
    {
        int i;

        for (i=0;i<pool_config->backend_desc->num_backends;i++)
        {
            /*
             * Do not use VALID_BACKEND macro in raw mode.
             * VALID_BACKEND return true only if the argument is master
             * node id. In other words, standby nodes are false. So need
             * to check backend status with VALID_BACKEND_RAW.
             */
            if (RAW_MODE)
            {
                if (VALID_BACKEND_RAW(i))
                    break;
            }
            else
            {
                if (VALID_BACKEND(i))
                    break;
            }
        }

        if (i == pool_config->backend_desc->num_backends)
            i = -1;

        return i;
    }

    /*
     * backend connection error, failover/failback request, if possible
     * failover() must be called under protecting signals.
     */
    static void failover(void)
    {
        int i, j, k;
        int node_id;
        int new_master;
        int new_primary;
        int nodes[MAX_NUM_BACKENDS];
        bool need_to_restart_children;
        bool partial_restart;
        int status;
        int sts;
        bool need_to_restart_pcp = false;
        bool all_backend_down = true;

        ereport(DEBUG1,
            (errmsg("failover handler called")));

        memset(nodes, 0, sizeof(int) * MAX_NUM_BACKENDS);

        /*
         * this could happen in a child process if a signal has been sent
         * before resetting signal handler
         */
        if (getpid() != mypid)
        {
            ereport(DEBUG1,
                (errmsg("failover handler called"),
                     errdetail("I am not parent")));
            kill(pcp_pid, SIGUSR2);
            return;
        }
        /*
         * processing SIGTERM, SIGINT or SIGQUIT
         */
        if (exiting)
        {
            ereport(DEBUG1,
                    (errmsg("failover handler called while exiting")));
            kill(pcp_pid, SIGUSR2);
            return;
        }

        /*
         * processing fail over or switch over
         */
        if (switching)
        {
            ereport(DEBUG1,
                    (errmsg("failover handler called while switching")));
            kill(pcp_pid, SIGUSR2);
            return;
        }

        Req_info->switching = true;
        switching = 1;
        for(;;)
        {
            POOL_REQUEST_KIND reqkind;
            int queue_index;
            int node_id_set[MAX_NUM_BACKENDS];
            int node_count;
            unsigned char request_details;
            WDFailoverCMDResults wdInterlockingRes;

            pool_semaphore_lock(REQUEST_INFO_SEM);

            if(Req_info->request_queue_tail == Req_info->request_queue_head) /* request queue is empty*/
            {
                switching = 0;
                Req_info->switching = false;
                pool_semaphore_unlock(REQUEST_INFO_SEM);
                break;
            }

            /* make a local copy of request */
            Req_info->request_queue_head++;
            queue_index = Req_info->request_queue_head % MAX_REQUEST_QUEUE_SIZE;
            memcpy(node_id_set, Req_info->request[queue_index].node_id , (sizeof(int) * Req_info->request[queue_index].count));
            reqkind = Req_info->request[queue_index].kind;
            request_details = Req_info->request[queue_index].request_details;
            node_count = Req_info->request[queue_index].count;

            pool_semaphore_unlock(REQUEST_INFO_SEM);

            ereport(DEBUG1,
                (errmsg("failover handler"),
                 errdetail("kind: %d flags: %x node_count: %d index:%d", reqkind, request_details, node_count, queue_index)));

            if (reqkind == CLOSE_IDLE_REQUEST)
            {
                kill_all_children(SIGUSR1);
                continue;
            }

            /* start watchdog interlocking */
            wdInterlockingRes = wd_start_failover_interlocking();

            /*
             * if not in replication mode/master slave mode, we treat this a restart request.
             * otherwise we need to check if we have already failovered.
             */
            ereport(DEBUG1,
                (errmsg("failover handler"),
                     errdetail("starting to select new master node")));
            node_id = node_id_set[0];

            /* failback request? */
            if (reqkind == NODE_UP_REQUEST)
            {
                if (node_id < 0 || node_id >= MAX_NUM_BACKENDS ||
                    (reqkind == NODE_UP_REQUEST && !(RAW_MODE &&
                    BACKEND_INFO(node_id).backend_status == CON_DOWN) && VALID_BACKEND(node_id)) ||
                    (reqkind == NODE_DOWN_REQUEST && !VALID_BACKEND(node_id)))
                {
                    if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
                        ereport(LOG,
                            (errmsg("invalid failback request, node id: %d is invalid. node id must be between [0 and %d]",node_id,MAX_NUM_BACKENDS)));
                    else
                        ereport(LOG,
                                (errmsg("invalid failback request, status: [%d] of node id : %d is invalid for failback",BACKEND_INFO(node_id).backend_status,node_id)));

                    if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
                        wd_end_failover_interlocking();

                    continue;
                }

                ereport(LOG,
                    (errmsg("starting fail back. reconnect host %s(%d)",
                         BACKEND_INFO(node_id).backend_hostname,
                         BACKEND_INFO(node_id).backend_port)));

                /* Check to see if all backends are down */
                for (i=0;i<NUM_BACKENDS;i++)
                {
                    if (BACKEND_INFO(i).backend_status != CON_DOWN &&
                        BACKEND_INFO(i).backend_status != CON_UNUSED)
                    {
                        ereport(LOG,
                                (errmsg("Node %d is not down (status: %d)",
                                        i, BACKEND_INFO(i).backend_status)));
                        all_backend_down = false;
                        break;
                    }
                }

                BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT;    /* unset down status */
                (void)write_status_file();

                /* Aquire failback start command lock */
                if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
                {
                    trigger_failover_command(node_id, pool_config->failback_command,
                                                MASTER_NODE_ID, get_next_master_node(), PRIMARY_NODE_ID);
                    wd_failover_lock_release(FAILBACK_LOCK);
                }
                else
                {
                    /*
                     * Okay we are not allowed to execute the failover command
                     * so we need to wait till the one who is executing the command
                     * finish with it.
                     */
                    wd_wait_until_command_complete_or_timeout(FAILBACK_LOCK);
                }
            }
            else if (reqkind == PROMOTE_NODE_REQUEST)
            {
                if (node_id != -1 && VALID_BACKEND(node_id))
                {
                    ereport(LOG,
                        (errmsg("starting promotion. promote host %s(%d)",
                             BACKEND_INFO(node_id).backend_hostname,
                             BACKEND_INFO(node_id).backend_port)));
                }
                else
                {
                    ereport(LOG,
                            (errmsg("failover: no backends are promoted")));
                    if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
                        wd_end_failover_interlocking();
                    continue;
                }
            }
            else    /* NODE_DOWN_REQUEST */
            {
                int cnt = 0;

                for (i = 0; i < node_count; i++)
                {
                    if (node_id_set[i] != -1 &&
                        ((RAW_MODE && VALID_BACKEND_RAW(node_id_set[i])) ||
                         VALID_BACKEND(node_id_set[i])))
                    {
                        ereport(LOG,
                                (errmsg("starting degeneration. shutdown host %s(%d)",
                                 BACKEND_INFO(node_id_set[i]).backend_hostname,
                                 BACKEND_INFO(node_id_set[i]).backend_port)));

                        BACKEND_INFO(node_id_set[i]).backend_status = CON_DOWN;    /* set down status */
                        (void)write_status_file();

                        /* save down node */
                        nodes[node_id_set[i]] = 1;
                        cnt++;
                    }
                }

                if (cnt == 0)
                {
                    ereport(LOG,
                            (errmsg("failover: no backends are degenerated")));

                    if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
                        wd_end_failover_interlocking();

                    continue;
                }
            }

            new_master = get_next_master_node();

            if (new_master < 0)
            {
                ereport(LOG,
                        (errmsg("failover: no valid backends node found")));
            }

            ereport(DEBUG1, (errmsg("failover/failback request details: STREAM: %d reqkind: %d detail: %x node_id: %d",
                                    STREAM, reqkind, request_details & REQ_DETAIL_SWITCHOVER,
                                    node_id)));

            /* On 2011/5/2 Tatsuo Ishii says: if mode is streaming replication
            * and request is NODE_UP_REQUEST (failback case) we don't need to
            * restart all children. Existing session will not use newly
            * attached node, but load balanced node is not changed until this
            * session ends, so it's harmless anyway.
            */
            /*
             * On 2015/9/21 Tatsuo Ishii says: this judgment is not sufficient if
             * all backends were down. Child process has local status in which all
             * backends are down. In this case even if new connection arrives from
             * frontend, the child will not accept it because the local status
             * shows all backends are down. For this purpose we refer to
             * "all_backend_down" variable, which was set before updating backend status.
             *
             * See bug 248 for more details.
             */

            if (STREAM && reqkind == NODE_UP_REQUEST && all_backend_down == false)
            {
                ereport(LOG,
                        (errmsg("Do not restart children because we are failbacking node id %d host: %s port: %d and we are in streaming replication mode and not all backends were down", node_id,
                         BACKEND_INFO(node_id).backend_hostname,
                         BACKEND_INFO(node_id).backend_port)));

                need_to_restart_children = false;
                partial_restart = false;
            }

            /*
             * If the mode is streaming replication and the request is
             * NODE_DOWN_REQUEST and it's actually a switch over request, we don't
             * need to restart all children, except the node is primary.
             */
            else if (STREAM && reqkind == NODE_DOWN_REQUEST &&
                     request_details & REQ_DETAIL_SWITCHOVER && node_id != PRIMARY_NODE_ID)
            {
                ereport(LOG,
                        (errmsg("Do not restart children because we are switching over node id %d host: %s port: %d and we are in streaming replication mode", node_id,
                                BACKEND_INFO(node_id).backend_hostname,
                                BACKEND_INFO(node_id).backend_port)));

                need_to_restart_children = true;
                partial_restart = true;

                for (i = 0; i < pool_config->num_init_children; i++)
                {
                    bool restart = false;

                    for (j=0;j<pool_config->max_pool;j++)
                    {
                        for (k=0;k<NUM_BACKENDS;k++)
                        {
                            ConnectionInfo *con = pool_coninfo(i, j, k);

                            if (con->connected && con->load_balancing_node == node_id)
                            {
                                ereport(LOG,
                                        (errmsg("child pid %d needs to restart because pool %d uses backend %d",
                                                process_info[i].pid, j, node_id)));
                                restart = true;
                                break;
                            }
                        }
                    }

                    if (restart)
                    {
                        pid_t pid = process_info[i].pid;
                        if (pid)
                        {
                            kill(pid, SIGQUIT);
                            ereport(DEBUG1,
                                    (errmsg("failover handler"),
                                     errdetail("kill process with PID:%d", pid)));
                        }
                    }
                }
            }
            else
            {
                ereport(LOG,
                        (errmsg("Restart all children")));

                /* kill all children */
                for (i = 0; i < pool_config->num_init_children; i++)
                {
                    pid_t pid = process_info[i].pid;
                    if (pid)
                    {
                        kill(pid, SIGQUIT);
                        ereport(DEBUG1,
                            (errmsg("failover handler"),
                                 errdetail("kill process with PID:%d", pid)));
                    }
                }

                need_to_restart_children = true;
                partial_restart = false;
            }
            if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
            {
                /* Exec failover_command if needed */
                for (i = 0; i < pool_config->backend_desc->num_backends; i++)
                {
                    if (nodes[i])
                        trigger_failover_command(i, pool_config->failover_command,
                                                    MASTER_NODE_ID, new_master, PRIMARY_NODE_ID);
                }
                wd_failover_lock_release(FAILOVER_LOCK);
            }
            else
            {
                wd_wait_until_command_complete_or_timeout(FAILOVER_LOCK);
            }

        /* no need to wait since it will be done in reap_handler */
    #ifdef NOT_USED
            while (wait(NULL) > 0)
                ;

            if (errno != ECHILD)
                ereport(LOG,
                    (errmsg("failover_handler: wait() failed. reason:%s", strerror(errno))));

    #endif

            if (reqkind == PROMOTE_NODE_REQUEST && VALID_BACKEND(node_id))
                new_primary = node_id;

            /*
             * If the down node was a standby node in streaming replication
             * mode, we can avoid calling find_primary_node_repeatedly() and
             * recognize the former primary as the new primary node, which
             * will reduce the time to process standby down.
             */
            else if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE &&
                     reqkind == NODE_DOWN_REQUEST)
            {
                if (Req_info->primary_node_id != node_id)
                    new_primary = Req_info->primary_node_id;
                else
                    new_primary =  find_primary_node_repeatedly();
            }
            else
                new_primary =  find_primary_node_repeatedly();

            /*
             * If follow_master_command is provided and in master/slave
             * streaming replication mode, we start degenerating all backends
             * as they are not replicated anymore.
             */
            int follow_cnt = 0;
            if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
            {
                if (*pool_config->follow_master_command != '' ||
                    reqkind == PROMOTE_NODE_REQUEST)
                {
                    /* only if the failover is against the current primary */
                    if (((reqkind == NODE_DOWN_REQUEST) &&
                         (nodes[Req_info->primary_node_id])) ||
                        ((reqkind == PROMOTE_NODE_REQUEST) &&
                         (VALID_BACKEND(node_id))))
                    {

                        for (i = 0; i < pool_config->backend_desc->num_backends; i++)
                        {
                            /* do not degenerate the new primary */
                            if ((new_primary >= 0) && (i != new_primary)) {
                                BackendInfo *bkinfo;
                                bkinfo = pool_get_node_info(i);
                                ereport(LOG,
                                        (errmsg("starting follow degeneration. shutdown host %s(%d)",
                                         bkinfo->backend_hostname,
                                         bkinfo->backend_port)));
                                bkinfo->backend_status = CON_DOWN;    /* set down status */
                                (void)write_status_file();

                                follow_cnt++;
                            }
                        }

                        if (follow_cnt == 0)
                        {
                            ereport(LOG,
                                    (errmsg("failover: no follow backends are degenerated")));
                        }
                        else
                        {
                            /* update new master node */
                            new_master = get_next_master_node();
                            ereport(LOG,
                                    (errmsg("failover: %d follow backends have been degenerated", follow_cnt)));
                        }
                    }
                }
            }

            /*
             * follow master command also uses the same locks used by trigring command
             */
            if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
            {
                if ((follow_cnt > 0) && (*pool_config->follow_master_command != ''))
                {
                    follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
                                                Req_info->primary_node_id);
                }
                wd_failover_lock_release(FOLLOW_MASTER_LOCK);
            }
            else
            {
                wd_wait_until_command_complete_or_timeout(FOLLOW_MASTER_LOCK);

            }

            /* Save primary node id */
            Req_info->primary_node_id = new_primary;
            ereport(LOG,
                    (errmsg("failover: set new primary node: %d", Req_info->primary_node_id)));

            if (new_master >= 0)
            {
                Req_info->master_node_id = new_master;
                ereport(LOG,
                        (errmsg("failover: set new master node: %d", Req_info->master_node_id)));
            }


            /* Kill children and restart them if needed */
            if (need_to_restart_children)
            {
                for (i=0;i<pool_config->num_init_children;i++)
                {
                    /*
                     * Try to kill pgpool child because previous kill signal
                     * may not be received by pgpool child. This could happen
                     * if multiple PostgreSQL are going down (or even starting
                     * pgpool, without starting PostgreSQL can trigger this).
                     * Child calls degenerate_backend() and it tries to aquire
                     * semaphore to write a failover request. In this case the
                     * signal mask is set as well, thus signals are never
                     * received.
                     */

                    bool restart = false;

                    if (partial_restart)
                    {
                        for (j=0;j<pool_config->max_pool;j++)
                        {
                            for (k=0;k<NUM_BACKENDS;k++)
                            {
                                ConnectionInfo *con = pool_coninfo(i, j, k);

                                if (con->connected && con->load_balancing_node == node_id)
                                {

                                    ereport(LOG,
                                            (errmsg("child pid %d needs to restart because pool %d uses backend %d",
                                                    process_info[i].pid, j, node_id)));
                                    restart = true;
                                    break;
                                }
                            }
                        }
                    }
                    else
                        restart = true;

                    if (restart)
                    {
                        if (process_info[i].pid)
                        {
                            kill(process_info[i].pid, SIGQUIT);

                            process_info[i].pid = fork_a_child(fds, i);
                            process_info[i].start_time = time(NULL);
                        }
                    }
                    else
                        process_info[i].need_to_restart = 1;
                }
            }

            else
            {
                /* Set restart request to each child. Children will exit(1)
                 * whenever they are convenient.
                 */
                for (i=0;i<pool_config->num_init_children;i++)
                {
                    process_info[i].need_to_restart = 1;
                }
            }

            /*
             * Send restart request to worker child.
             */
            kill(worker_pid, SIGUSR1);

            if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
                wd_end_failover_interlocking();

            if (reqkind == NODE_UP_REQUEST)
            {
                ereport(LOG,
                        (errmsg("failback done. reconnect host %s(%d)",
                         BACKEND_INFO(node_id).backend_hostname,
                         BACKEND_INFO(node_id).backend_port)));

            }
            else if (reqkind == PROMOTE_NODE_REQUEST)
            {
                ereport(LOG,
                        (errmsg("promotion done. promoted host %s(%d)",
                         BACKEND_INFO(node_id).backend_hostname,
                         BACKEND_INFO(node_id).backend_port)));
            }
            else
            {
                /* Temporary black magic. Without this regression 055 does not finish */
                fprintf(stderr, "failover done. shutdown host %s(%d)",
                         BACKEND_INFO(node_id).backend_hostname,
                        BACKEND_INFO(node_id).backend_port);

                ereport(LOG,
                        (errmsg("failover done. shutdown host %s(%d)",
                         BACKEND_INFO(node_id).backend_hostname,
                         BACKEND_INFO(node_id).backend_port)));
            }
            need_to_restart_pcp = true;
        }
        switching = 0;
        Req_info->switching = false;

        /* kick wakeup_handler in pcp_child to notice that
         * failover/failback done
         */
        kill(pcp_pid, SIGUSR2);

        if(need_to_restart_pcp)
        {
            sleep(1);

            /*
             * Send restart request to pcp child.
             */
            kill(pcp_pid, SIGUSR1);
            for (;;)
            {
                sts = waitpid(pcp_pid, &status, 0);
                if (sts != -1)
                    break;
                if (sts == -1)
                {
                    if (errno == EINTR)
                        continue;
                    else
                    {
                        ereport(WARNING,
                                (errmsg("failover: waitpid failed. reason: %s", strerror(errno))));
                        continue;
                    }
                }
            }
            if (WIFSIGNALED(status))
                ereport(LOG,
                        (errmsg("PCP child %d exits with status %d by signal %d in failover()", pcp_pid, status, WTERMSIG(status))));
            else
                ereport(LOG,
                        (errmsg("PCP child %d exits with status %d in failover()", pcp_pid, status)));

            pcp_pid = pcp_fork_a_child(pcp_unix_fd, pcp_inet_fd, pcp_conf_file);
            ereport(LOG,
                    (errmsg("fork a new PCP child pid %d in failover()", pcp_pid)));
        }
    }

  • 相关阅读:
    (转载)机器学习方法的PPT
    算法的力量(转李开复)
    CNKI免费帐号
    图像增强(二)
    初始化 Microsoft Visual SourceSafe 源代码管理提供程序时失败。您无法使用此提供程序执行源代码管理操作。”
    2012年"浪潮杯"山东省第三届ACM大学生程序设计竞赛 Fruit Ninja I
    hdu 3607 Traversal
    zoj 3686 A Simple Tree Problem
    hdu 3727 Jewel
    hdu 4366 Successor
  • 原文地址:https://www.cnblogs.com/songyuejie/p/7053169.html
Copyright © 2011-2022 走看看