1. 线程的定义
1.1 线程定义在scheduler.h文件中,其定义如下所示
/* Thread itself. */ typedef struct _thread { unsigned long id; /*identify*/ unsigned char type; /* thread type */ struct _thread *next; /* next pointer of the thread */ struct _thread *prev; /* previous pointer of the thread */ struct _thread_master *master; /* pointer to the struct thread_master. */ int (*func) (struct _thread *); /* event function */ void *arg; /* event argument */ timeval_t sands; /* rest of time sands value. */ union { int val; /* second argument of the event. */ int fd; /* file descriptor in case of read/write. */ struct { pid_t pid; /* process id a child thread is wanting. */ int status; /* return status of the process */ } c; } u; } thread_t;
1.2. 线程链表定义
/* Linked list of thread. */ typedef struct _thread_list { thread_t *head; thread_t *tail; int count; } thread_list_t;
线程类型的定义如下:
/* Thread types. */ #define THREAD_READ 0 //读线程 #define THREAD_WRITE 1 //写线程 #define THREAD_TIMER 2 //计时器线程 #define THREAD_EVENT 3 //事件线程 #define THREAD_CHILD 4 //子线程 #define THREAD_READY 5 //就绪线程 #define THREAD_UNUSED 6 //未使用线程 #define THREAD_WRITE_TIMEOUT 7 //写超时线程 #define THREAD_READ_TIMEOUT 8 //读超时线程 #define THREAD_CHILD_TIMEOUT 9 //子超时线程 #define THREAD_TERMINATE 10 //停止线程 #define THREAD_READY_FD 11
1.3.主线程定义
/* Master of the theads. */ typedef struct _thread_master { thread_list_t read; thread_list_t write; thread_list_t timer; thread_list_t child; thread_list_t event; thread_list_t ready; thread_list_t unuse; fd_set readfd; fd_set writefd; fd_set exceptfd; unsigned long alloc; } thread_master_t;
2. 线程操作
2.1 生成主线程
/* global vars */ thread_master_t *master = NULL; /* Make thread master. */ thread_master_t * thread_make_master(void) { thread_master_t *new; new = (thread_master_t *) MALLOC(sizeof (thread_master_t)); return new; }
2.2 销毁一个主线程
/* Stop thread scheduler. */ void thread_destroy_master(thread_master_t * m) { thread_cleanup_master(m); FREE(m); } //调用子函数,清空主线程的内容 /* Cleanup master */ static void thread_cleanup_master(thread_master_t * m) { /* Unuse current thread lists */ thread_destroy_list(m, m->read); thread_destroy_list(m, m->write); thread_destroy_list(m, m->timer); thread_destroy_list(m, m->event); thread_destroy_list(m, m->ready); /* Clear all FDs */ FD_ZERO(&m->readfd); FD_ZERO(&m->writefd); FD_ZERO(&m->exceptfd); /* Clean garbage */ thread_clean_unuse(m); } //回收主线程内存 FREE(m);
2.3 增加一个简单的事件线程
/* Add simple event thread. */ thread_t * thread_add_terminate_event(thread_master_t * m) { thread_t *thread; assert(m != NULL); thread = thread_new(m); thread->type = THREAD_TERMINATE; thread->id = 0; thread->master = m; thread->func = NULL; thread->arg = NULL; thread->u.val = 0; thread_list_add(&m->event, thread); return thread; }
2.4 创建不同类型的线程,并加入主线程中的对应线程链表,如读线程为例介绍
/* Add new read thread. */ thread_t * thread_add_read(thread_master_t * m, int (*func) (thread_t *) , void *arg, int fd, long timer) { thread_t *thread; assert(m != NULL); if (FD_ISSET(fd, &m->readfd)) { log_message(LOG_WARNING, "There is already read fd [%d]", fd); return NULL; } thread = thread_new(m); thread->type = THREAD_READ; thread->id = 0; thread->master = m; thread->func = func; thread->arg = arg; FD_SET(fd, &m->readfd); thread->u.fd = fd; /* Compute read timeout value */ set_time_now(); thread->sands = timer_add_long(time_now, timer); /* Sort the thread. */ thread_list_add_timeval(&m->read, thread); return thread; }
2.4.1 创建一个新的线程
/* Make new thread. */ thread_t * thread_new(thread_master_t * m) { thread_t *new; /* If one thread is already allocated return it */ if (m->unuse.head) { new = thread_trim_head(&m->unuse); memset(new, 0, sizeof (thread_t)); return new; } new = (thread_t *) MALLOC(sizeof (thread_t)); m->alloc++; return new; }
2.4.2 设置为读线程
thread->type = THREAD_READ; thread->id = 0; thread->master = m; thread->func = func; thread->arg = arg; FD_SET(fd, &m->readfd); thread->u.fd = fd;
2.4.3 根据超时时间将读进程加入读进程列表中
/* Add a thread in the list sorted by timeval */ void thread_list_add_timeval(thread_list_t * list, thread_t * thread) { thread_t *tt; for (tt = list->head; tt; tt = tt->next) { if (timer_cmp(thread->sands, tt->sands) <= 0) break; } if (tt) thread_list_add_before(list, tt, thread); else thread_list_add(list, thread); }
2.5 取消线程,从对应类型的线程列表中去除该线程,将它设置为unused类型,并加入unused线程链表。
/* Cancel thread from scheduler. */ void thread_cancel(thread_t * thread) { switch (thread->type) { case THREAD_READ: assert(FD_ISSET(thread->u.fd, &thread->master->readfd)); FD_CLR(thread->u.fd, &thread->master->readfd); thread_list_delete(&thread->master->read, thread); break; case THREAD_WRITE: assert(FD_ISSET(thread->u.fd, &thread->master->writefd)); FD_CLR(thread->u.fd, &thread->master->writefd); thread_list_delete(&thread->master->write, thread); break; case THREAD_TIMER: thread_list_delete(&thread->master->timer, thread); break; case THREAD_CHILD: /* Does this need to kill the child, or is that the * caller's job? * This function is currently unused, so leave it for now. */ thread_list_delete(&thread->master->child, thread); break; case THREAD_EVENT: thread_list_delete(&thread->master->event, thread); break; case THREAD_READY: case THREAD_READY_FD: thread_list_delete(&thread->master->ready, thread); break; default: break; } thread->type = THREAD_UNUSED; thread_add_unuse(thread->master, thread); }
2.6 获取下一个就绪进程
/* Fetch next ready thread. */ thread_t * thread_fetch(thread_master_t * m, thread_t * fetch) { int ret, old_errno; thread_t *thread; fd_set readfd; fd_set writefd; fd_set exceptfd; timeval_t timer_wait; int signal_fd; #ifdef _WITH_SNMP_ timeval_t snmp_timer_wait; int snmpblock = 0; int fdsetsize; #endif assert(m != NULL); /* Timer initialization */ memset(&timer_wait, 0, sizeof (timeval_t)); retry: /* When thread can't fetch try to find next thread again. */ /* If there is event process it first. */ while ((thread = thread_trim_head(&m->event))) { *fetch = *thread; /* If daemon hanging event is received return NULL pointer */ if (thread->type == THREAD_TERMINATE) { thread->type = THREAD_UNUSED; thread_add_unuse(m, thread); return NULL; } thread->type = THREAD_UNUSED; thread_add_unuse(m, thread); return fetch; } /* If there is ready threads process them */ while ((thread = thread_trim_head(&m->ready))) { *fetch = *thread; thread->type = THREAD_UNUSED; thread_add_unuse(m, thread); return fetch; } /* * Re-read the current time to get the maximum accuracy. * Calculate select wait timer. Take care of timeouted fd. */ set_time_now(); thread_compute_timer(m, &timer_wait); /* Call select function. */ readfd = m->readfd; writefd = m->writefd; exceptfd = m->exceptfd; signal_fd = signal_rfd(); FD_SET(signal_fd, &readfd); #ifdef _WITH_SNMP_ /* When SNMP is enabled, we may have to select() on additional * FD. snmp_select_info() will add them to `readfd'. The trick * with this function is its last argument. We need to set it * to 0 and we need to use the provided new timer only if it * is still set to 0. */ fdsetsize = FD_SETSIZE; snmpblock = 0; memcpy(&snmp_timer_wait, &timer_wait, sizeof(timeval_t)); snmp_select_info(&fdsetsize, &readfd, &snmp_timer_wait, &snmpblock); if (snmpblock == 0) memcpy(&timer_wait, &snmp_timer_wait, sizeof(timeval_t)); #endif ret = select(FD_SETSIZE, &readfd, &writefd, &exceptfd, &timer_wait); /* we have to save errno here because the next syscalls will set it */ old_errno = errno; /* Handle SNMP stuff */ #ifdef _WITH_SNMP_ if (ret > 0) snmp_read(&readfd); else if (ret == 0) snmp_timeout(); #endif /* handle signals synchronously, including child reaping */ if (FD_ISSET(signal_fd, &readfd)) signal_run_callback(); /* Update current time */ set_time_now(); if (ret < 0) { if (old_errno == EINTR) goto retry; /* Real error. */ DBG("select error: %s", strerror(old_errno)); assert(0); } /* Timeout children */ thread = m->child.head; while (thread) { thread_t *t; t = thread; thread = t->next; if (timer_cmp(time_now, t->sands) >= 0) { thread_list_delete(&m->child, t); thread_list_add(&m->ready, t); t->type = THREAD_CHILD_TIMEOUT; } } /* Read thead. */ thread = m->read.head; while (thread) { thread_t *t; t = thread; thread = t->next; if (FD_ISSET(t->u.fd, &readfd)) { assert(FD_ISSET(t->u.fd, &m->readfd)); FD_CLR(t->u.fd, &m->readfd); thread_list_delete(&m->read, t); thread_list_add(&m->ready, t); t->type = THREAD_READY_FD; } else { if (timer_cmp(time_now, t->sands) >= 0) { FD_CLR(t->u.fd, &m->readfd); thread_list_delete(&m->read, t); thread_list_add(&m->ready, t); t->type = THREAD_READ_TIMEOUT; } } } /* Write thead. */ thread = m->write.head; while (thread) { thread_t *t; t = thread; thread = t->next; if (FD_ISSET(t->u.fd, &writefd)) { assert(FD_ISSET(t->u.fd, &writefd)); FD_CLR(t->u.fd, &m->writefd); thread_list_delete(&m->write, t); thread_list_add(&m->ready, t); t->type = THREAD_READY_FD; } else { if (timer_cmp(time_now, t->sands) >= 0) { FD_CLR(t->u.fd, &m->writefd); thread_list_delete(&m->write, t); thread_list_add(&m->ready, t); t->type = THREAD_WRITE_TIMEOUT; } } } /* Exception thead. */ /*... */ /* Timer update. */ thread = m->timer.head; while (thread) { thread_t *t; t = thread; thread = t->next; if (timer_cmp(time_now, t->sands) >= 0) { thread_list_delete(&m->timer, t); thread_list_add(&m->ready, t); t->type = THREAD_READY; } } /* Return one event. */ thread = thread_trim_head(&m->ready); #ifdef _WITH_SNMP_ run_alarms(); netsnmp_check_outstanding_agent_requests(); #endif /* There is no ready thread. */ if (!thread) goto retry; *fetch = *thread; thread->type = THREAD_UNUSED; thread_add_unuse(m, thread); return fetch; }
2.7 子线程处理,便利子线程链表取出子线程,并放入就绪线程链表。
/* Synchronous signal handler to reap child processes */ void thread_child_handler(void * v, int sig) { thread_master_t * m = v; /* * This is O(n^2), but there will only be a few entries on * this list. */ thread_t *thread; pid_t pid; int status = 77; while ((pid = waitpid(-1, &status, WNOHANG))) { if (pid == -1) { if (errno == ECHILD) return; DBG("waitpid error: %s", strerror(errno)); assert(0); } else { thread = m->child.head; while (thread) { thread_t *t; t = thread; thread = t->next; if (pid == t->u.c.pid) { thread_list_delete(&m->child, t); thread_list_add(&m->ready, t); t->u.c.status = status; t->type = THREAD_READY; break; } } } } }
2.8 线程调用
/* Call thread ! */ void thread_call(thread_t * thread) { thread->id = thread_get_id(); (*thread->func) (thread); }
2.9 启动调度器
/* Our infinite scheduling loop */ void launch_scheduler(void) { thread_t thread; signal_set(SIGCHLD, thread_child_handler, master); /* * Processing the master thread queues, * return and execute one ready thread. */ while (thread_fetch(master, &thread)) { /* Run until error, used for debuging only */ #ifdef _DEBUG_ if ((debug & 520) == 520) { debug &= ~520; thread_add_terminate_event(master); } #endif thread_call(&thread); } }