zoukankan      html  css  js  c++  java
  • 深入学习keepalived之预备工作--线程

    1. 线程的定义

    1.1 线程定义在scheduler.h文件中,其定义如下所示

    /* Thread itself. */
    typedef struct _thread {
        unsigned long id;          /*identify*/
        unsigned char type;        /* thread type */
        struct _thread *next;        /* next pointer of the thread */
        struct _thread *prev;        /* previous pointer of the thread */
        struct _thread_master *master;    /* pointer to the struct thread_master. */
        int (*func) (struct _thread *);    /* event function */
        void *arg;            /* event argument */
        timeval_t sands;        /* rest of time sands value. */
        union {
            int val;        /* second argument of the event. */
            int fd;            /* file descriptor in case of read/write. */
            struct {
                pid_t pid;    /* process id a child thread is wanting. */
                int status;    /* return status of the process */
            } c;
        } u;
    } thread_t;

    1.2. 线程链表定义

    /* Linked list of thread. */
    typedef struct _thread_list {
        thread_t *head;
        thread_t *tail;
        int count;
    } thread_list_t;

    线程类型的定义如下:

    /* Thread types. */
    #define THREAD_READ        0             //读线程
    #define THREAD_WRITE        1            //写线程
    #define THREAD_TIMER        2            //计时器线程
    #define THREAD_EVENT        3            //事件线程
    #define THREAD_CHILD        4            //子线程
    #define THREAD_READY        5            //就绪线程
    #define THREAD_UNUSED        6           //未使用线程
    #define THREAD_WRITE_TIMEOUT    7        //写超时线程
    #define THREAD_READ_TIMEOUT    8         //读超时线程
    #define THREAD_CHILD_TIMEOUT    9        //子超时线程
    #define THREAD_TERMINATE    10            //停止线程
    #define THREAD_READY_FD        11        

    1.3.主线程定义

    /* Master of the theads. */
    typedef struct _thread_master {
        thread_list_t read;
        thread_list_t write;
        thread_list_t timer;
        thread_list_t child;
        thread_list_t event;
        thread_list_t ready;
        thread_list_t unuse;
        fd_set readfd;
        fd_set writefd;
        fd_set exceptfd;
        unsigned long alloc;
    } thread_master_t;

    2. 线程操作

    2.1 生成主线程

    /* global vars */
    thread_master_t *master = NULL;
    
    /* Make thread master. */
    thread_master_t *
    thread_make_master(void)
    {
        thread_master_t *new;
    
        new = (thread_master_t *) MALLOC(sizeof (thread_master_t));
        return new;
    }

    2.2 销毁一个主线程

        

    /* Stop thread scheduler. */
    void
    thread_destroy_master(thread_master_t * m)
    {
        thread_cleanup_master(m);
        FREE(m);
    }
    
    //调用子函数,清空主线程的内容
    /* Cleanup master */
    static void
    thread_cleanup_master(thread_master_t * m)
    {
        /* Unuse current thread lists */
        thread_destroy_list(m, m->read);
        thread_destroy_list(m, m->write);
        thread_destroy_list(m, m->timer);
        thread_destroy_list(m, m->event);
        thread_destroy_list(m, m->ready);
    
        /* Clear all FDs */
        FD_ZERO(&m->readfd);
        FD_ZERO(&m->writefd);
        FD_ZERO(&m->exceptfd);
    
        /* Clean garbage */
        thread_clean_unuse(m);
    }
    //回收主线程内存
    FREE(m);

    2.3 增加一个简单的事件线程

    /* Add simple event thread. */
    thread_t *
    thread_add_terminate_event(thread_master_t * m)
    {
        thread_t *thread;
    
        assert(m != NULL);
    
        thread = thread_new(m);
        thread->type = THREAD_TERMINATE;
        thread->id = 0;
        thread->master = m;
        thread->func = NULL;
        thread->arg = NULL;
        thread->u.val = 0;
        thread_list_add(&m->event, thread);
    
        return thread;
    }

    2.4 创建不同类型的线程,并加入主线程中的对应线程链表,如读线程为例介绍

    /* Add new read thread. */
    thread_t *
    thread_add_read(thread_master_t * m, int (*func) (thread_t *)
            , void *arg, int fd, long timer)
    {
        thread_t *thread;
    
        assert(m != NULL);
    
        if (FD_ISSET(fd, &m->readfd)) {
            log_message(LOG_WARNING, "There is already read fd [%d]", fd);
            return NULL;
        }
    
        thread = thread_new(m);
        thread->type = THREAD_READ;
        thread->id = 0;
        thread->master = m;
        thread->func = func;
        thread->arg = arg;
        FD_SET(fd, &m->readfd);
        thread->u.fd = fd;
    
        /* Compute read timeout value */
        set_time_now();
        thread->sands = timer_add_long(time_now, timer);
    
        /* Sort the thread. */
        thread_list_add_timeval(&m->read, thread);
    
        return thread;
    }

    2.4.1 创建一个新的线程

    /* Make new thread. */
    thread_t *
    thread_new(thread_master_t * m)
    {
        thread_t *new;
    
        /* If one thread is already allocated return it */
        if (m->unuse.head) {
            new = thread_trim_head(&m->unuse);
            memset(new, 0, sizeof (thread_t));
            return new;
        }
    
        new = (thread_t *) MALLOC(sizeof (thread_t));
        m->alloc++;
        return new;
    }

    2.4.2 设置为读线程

        thread->type = THREAD_READ;
        thread->id = 0;
        thread->master = m;
        thread->func = func;
        thread->arg = arg;
        FD_SET(fd, &m->readfd);
        thread->u.fd = fd;

    2.4.3 根据超时时间将读进程加入读进程列表中

    /* Add a thread in the list sorted by timeval */
    void
    thread_list_add_timeval(thread_list_t * list, thread_t * thread)
    {
        thread_t *tt;
    
        for (tt = list->head; tt; tt = tt->next) {
            if (timer_cmp(thread->sands, tt->sands) <= 0)
                break;
        }
    
        if (tt)
            thread_list_add_before(list, tt, thread);
        else
            thread_list_add(list, thread);
    }

    2.5 取消线程,从对应类型的线程列表中去除该线程,将它设置为unused类型,并加入unused线程链表。

    /* Cancel thread from scheduler. */
    void
    thread_cancel(thread_t * thread)
    {
        switch (thread->type) {
        case THREAD_READ:
            assert(FD_ISSET(thread->u.fd, &thread->master->readfd));
            FD_CLR(thread->u.fd, &thread->master->readfd);
            thread_list_delete(&thread->master->read, thread);
            break;
        case THREAD_WRITE:
            assert(FD_ISSET(thread->u.fd, &thread->master->writefd));
            FD_CLR(thread->u.fd, &thread->master->writefd);
            thread_list_delete(&thread->master->write, thread);
            break;
        case THREAD_TIMER:
            thread_list_delete(&thread->master->timer, thread);
            break;
        case THREAD_CHILD:
            /* Does this need to kill the child, or is that the
             * caller's job?
             * This function is currently unused, so leave it for now.
             */
            thread_list_delete(&thread->master->child, thread);
            break;
        case THREAD_EVENT:
            thread_list_delete(&thread->master->event, thread);
            break;
        case THREAD_READY:
        case THREAD_READY_FD:
            thread_list_delete(&thread->master->ready, thread);
            break;
        default:
            break;
        }
    
        thread->type = THREAD_UNUSED;
        thread_add_unuse(thread->master, thread);
    }

    2.6 获取下一个就绪进程

    /* Fetch next ready thread. */
    thread_t *
    thread_fetch(thread_master_t * m, thread_t * fetch)
    {
        int ret, old_errno;
        thread_t *thread;
        fd_set readfd;
        fd_set writefd;
        fd_set exceptfd;
        timeval_t timer_wait;
        int signal_fd;
    #ifdef _WITH_SNMP_
        timeval_t snmp_timer_wait;
        int snmpblock = 0;
        int fdsetsize;
    #endif
    
        assert(m != NULL);
    
        /* Timer initialization */
        memset(&timer_wait, 0, sizeof (timeval_t));
    
    retry:    /* When thread can't fetch try to find next thread again. */
    
        /* If there is event process it first. */
        while ((thread = thread_trim_head(&m->event))) {
            *fetch = *thread;
    
            /* If daemon hanging event is received return NULL pointer */
            if (thread->type == THREAD_TERMINATE) {
                thread->type = THREAD_UNUSED;
                thread_add_unuse(m, thread);
                return NULL;
            }
            thread->type = THREAD_UNUSED;
            thread_add_unuse(m, thread);
            return fetch;
        }
    
        /* If there is ready threads process them */
        while ((thread = thread_trim_head(&m->ready))) {
            *fetch = *thread;
            thread->type = THREAD_UNUSED;
            thread_add_unuse(m, thread);
            return fetch;
        }
    
        /*
         * Re-read the current time to get the maximum accuracy.
         * Calculate select wait timer. Take care of timeouted fd.
         */
        set_time_now();
        thread_compute_timer(m, &timer_wait);
    
        /* Call select function. */
        readfd = m->readfd;
        writefd = m->writefd;
        exceptfd = m->exceptfd;
    
        signal_fd = signal_rfd();
        FD_SET(signal_fd, &readfd);
    
    #ifdef _WITH_SNMP_
        /* When SNMP is enabled, we may have to select() on additional
         * FD. snmp_select_info() will add them to `readfd'. The trick
         * with this function is its last argument. We need to set it
         * to 0 and we need to use the provided new timer only if it
         * is still set to 0. */
        fdsetsize = FD_SETSIZE;
        snmpblock = 0;
        memcpy(&snmp_timer_wait, &timer_wait, sizeof(timeval_t));
        snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait, &snmpblock);
        if (snmpblock == 0)
            memcpy(&timer_wait, &snmp_timer_wait, sizeof(timeval_t));
    #endif
    
        ret = select(FD_SETSIZE, &readfd, &writefd, &exceptfd, &timer_wait);
    
        /* we have to save errno here because the next syscalls will set it */
        old_errno = errno;
    
           /* Handle SNMP stuff */
    #ifdef _WITH_SNMP_
        if (ret > 0)
            snmp_read(&readfd);
        else if (ret == 0)
            snmp_timeout();
    #endif
    
        /* handle signals synchronously, including child reaping */
        if (FD_ISSET(signal_fd, &readfd))
            signal_run_callback();
    
        /* Update current time */
        set_time_now();
    
        if (ret < 0) {
            if (old_errno == EINTR)
                goto retry;
            /* Real error. */
            DBG("select error: %s", strerror(old_errno));
            assert(0);
        }
    
        /* Timeout children */
        thread = m->child.head;
        while (thread) {
            thread_t *t;
    
            t = thread;
            thread = t->next;
    
            if (timer_cmp(time_now, t->sands) >= 0) {
                thread_list_delete(&m->child, t);
                thread_list_add(&m->ready, t);
                t->type = THREAD_CHILD_TIMEOUT;
            }
        }
    
        /* Read thead. */
        thread = m->read.head;
        while (thread) {
            thread_t *t;
    
            t = thread;
            thread = t->next;
    
            if (FD_ISSET(t->u.fd, &readfd)) {
                assert(FD_ISSET(t->u.fd, &m->readfd));
                FD_CLR(t->u.fd, &m->readfd);
                thread_list_delete(&m->read, t);
                thread_list_add(&m->ready, t);
                t->type = THREAD_READY_FD;
            } else {
                if (timer_cmp(time_now, t->sands) >= 0) {
                    FD_CLR(t->u.fd, &m->readfd);
                    thread_list_delete(&m->read, t);
                    thread_list_add(&m->ready, t);
                    t->type = THREAD_READ_TIMEOUT;
                }
            }
        }
    
        /* Write thead. */
        thread = m->write.head;
        while (thread) {
            thread_t *t;
    
            t = thread;
            thread = t->next;
    
            if (FD_ISSET(t->u.fd, &writefd)) {
                assert(FD_ISSET(t->u.fd, &writefd));
                FD_CLR(t->u.fd, &m->writefd);
                thread_list_delete(&m->write, t);
                thread_list_add(&m->ready, t);
                t->type = THREAD_READY_FD;
            } else {
                if (timer_cmp(time_now, t->sands) >= 0) {
                    FD_CLR(t->u.fd, &m->writefd);
                    thread_list_delete(&m->write, t);
                    thread_list_add(&m->ready, t);
                    t->type = THREAD_WRITE_TIMEOUT;
                }
            }
        }
        /* Exception thead. */
        /*... */
    
        /* Timer update. */
        thread = m->timer.head;
        while (thread) {
            thread_t *t;
    
            t = thread;
            thread = t->next;
    
            if (timer_cmp(time_now, t->sands) >= 0) {
                thread_list_delete(&m->timer, t);
                thread_list_add(&m->ready, t);
                t->type = THREAD_READY;
            }
        }
    
        /* Return one event. */
        thread = thread_trim_head(&m->ready);
    
    #ifdef _WITH_SNMP_
        run_alarms();
        netsnmp_check_outstanding_agent_requests();
    #endif
    
        /* There is no ready thread. */
        if (!thread)
            goto retry;
    
        *fetch = *thread;
        thread->type = THREAD_UNUSED;
        thread_add_unuse(m, thread);
    
        return fetch;
    }

    2.7 子线程处理,便利子线程链表取出子线程,并放入就绪线程链表。

    /* Synchronous signal handler to reap child processes */
    void
    thread_child_handler(void * v, int sig)
    {
        thread_master_t * m = v;
    
        /*
         * This is O(n^2), but there will only be a few entries on
         * this list.
         */
        thread_t *thread;
        pid_t pid;
        int status = 77;
        while ((pid = waitpid(-1, &status, WNOHANG))) {
            if (pid == -1) {
                if (errno == ECHILD)
                    return;
                DBG("waitpid error: %s", strerror(errno));
                assert(0);
            } else {
                thread = m->child.head;
                while (thread) {
                    thread_t *t;
                    t = thread;
                    thread = t->next;
                    if (pid == t->u.c.pid) {
                        thread_list_delete(&m->child, t);
                        thread_list_add(&m->ready, t);
                        t->u.c.status = status;
                        t->type = THREAD_READY;
                        break;
                    }
                }
            }
        }
    }

    2.8 线程调用

    /* Call thread ! */
    void
    thread_call(thread_t * thread)
    {
        thread->id = thread_get_id();
        (*thread->func) (thread);
    }

    2.9 启动调度器

    /* Our infinite scheduling loop */
    void
    launch_scheduler(void)
    {
        thread_t thread;
    
        signal_set(SIGCHLD, thread_child_handler, master);
    
        /*
         * Processing the master thread queues,
         * return and execute one ready thread.
         */
        while (thread_fetch(master, &thread)) {
            /* Run until error, used for debuging only */
    #ifdef _DEBUG_
            if ((debug & 520) == 520) {
                debug &= ~520;
                thread_add_terminate_event(master);
            }
    #endif
            thread_call(&thread);
        }
    }
  • 相关阅读:
    简单实现 C# 与 Javascript的兼容
    如何写好CSS系列之表单(form)
    D3、openlayers的一次尝试
    如何写好css系列之button
    mockjs,json-server一起搭建前端通用的数据模拟框架
    AIX中的/etc/inittab文件
    AIX中crontab和at 定时任务
    AIX中的服务管理
    AIX系统的备份和恢复
    AIX中磁带设备的使用
  • 原文地址:https://www.cnblogs.com/davidwang456/p/3543937.html
Copyright © 2011-2022 走看看