1.最近项目不是很忙,结合之前看的一些开源代码(skynet及其他github代码)及项目代码,抽空写了一个简单的任务队列当做练习。
2.介绍:
1)全局队列中锁的使用:多线程下,全局队列需要加锁,本例中封装了MutexGuard。操作全局队列之前,先在栈上创建一个临时锁对象,调用构造函数时加锁,对象销毁时调用析构函数从而解锁,减少了我们手动加锁,解锁的过程。
2)信号的使用:本例可以说是为了使用信号而使用信号,仅仅是为了熟悉信号机一些特性。 当程序以后台模式 跑起来以后,输入kill -USR1 %1 向程序发送SIGUSR1信号,从而使生产者生产一定数量的job,供消费者使用;消费者线程,在处理完全局队列以后sleep,等待生产者产生新任务; 输入 kill -USR2 %1, 改变变量状态,向信号监听线程发送结束通知,结束线程。
3)简单的线程池模型。
4)简单的线程间通信和同步方式示例。
5)简单的类模板的使用。
3.编译: 文件不多,偷懒没有写makefile文件,可自行加上。编译指令 : g++ -g -Wall -o test main.cpp mutex.cpp List.h mutex.h -lpthread
4:执行流程:
1)编译成功后,输入 ./mytest &。 以后台模式运行程序
2)此时所有consumer线程阻塞,等待生产者生产job; 一个producer线程阻塞在select处,等待读管道内的消息;一个signal_handler线程调用 pthread_sigwait( ... ) 等待 SIGUSR1 和 SIGUSR2 信号的到来。
可通过在控制台输入: kill -USR1 %1(ps: kill 指令用来产生信号 当以后台模式运行该进程时, %1用来获得该进程 id,因此该命令表示向 该进程发送 SIGUSR1 信号)进程发送SIGUSR1信号,被signal_handler捕捉到以后,生产job,唤醒consumer线程处理job,此流程可重复执行;当在控制台输入 kill -USR2 %1 时, 改变quit变量值,从而使得各个线程退出,进程结束。还有一个 spoliling 轮询线程,在全局队列不为空的情况下,及时唤醒consumer线程处理任务。可通过调整wakeup中的参数,调整唤醒consumer的频率。
5.参考:
1)UNIX环境高级编程。
2)https://github.com/idispatch/signaltest
3)https:github.com/cloudwu/skynet/skynet-src/skynet_start.c
水平有限,仅供参考,希望能对读者有所帮助。以上描述及以下源码有任何漏洞与不足,欢迎及时指正与交流。
6:源码:
main.cpp:
#include <iostream> #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <sys/select.h> #include <sys/types.h> #include <string.h> #include <errno.h> #include <signal.h> #include "List.h" #include "mutex.h" #define THREAD_NUM 4 #define JOB_NUM 100 #define handle_error_en(en, msg) do{ errno = en; perror(msg); exit(EXIT_FAILURE); } while(0) using std::cout; using std::endl; using std::string; using std::cin; struct monitor { int count; pthread_cond_t cond; pthread_mutex_t mutex; int sleep; int quit; int pfds[2]; }; struct sig { sigset_t set; struct monitor *m; }; typedef void (*thread_func)(void *arg, int value); //job call back struct job { void *arg; thread_func cb; }; List<job *> g_list; const int allowed_signals[] = {SIGUSR1, SIGUSR2, SIGQUIT}; static void print_v(void *value, int pid) { printf("pid: %d, value: %d ", pid, *(int*)value); } static void free_job(struct job *j) { if(j == NULL) { return; } free(j->arg); j->arg = NULL; free(j); j = NULL; } static int dispatch(int pid) { struct job *j = g_list.Pop(); if (j != NULL) { j->cb(j->arg, pid); free_job(j); return 0; } return -1; } static void * consumer(void *arg) { struct monitor *m = (struct monitor *)arg; int r = 0; usleep(50000); int pid = pthread_self(); while(!m->quit) { r = dispatch(pid); if (r < 0) { if(pthread_mutex_lock(&m->mutex) == 0) { ++m->sleep; cout << "thread : " << pid << " sleep" << endl; if(!m->quit) { pthread_cond_wait(&m->cond, &m->mutex); } -- m->sleep; cout << "thread : " << pid << " wakeup" << endl; if(pthread_mutex_unlock(&m->mutex)) { fprintf(stderr, "unlock mutex error"); exit(1); } } } } cout << "thread consumer quit " << endl; return NULL; } static void free_monitor(struct monitor *m) { if(m == NULL) { return; } cout << "free monitor called" << endl; close(m->pfds[0]); close(m->pfds[1]); free(m); cout << "free monitor over" << endl; } static void wakeup(struct monitor *m, int busy) { if (m->sleep >= m->count - busy) { // signal sleep worker, "spurious wakeup" is harmless pthread_cond_signal(&m->cond); } } static struct job* create_job() { struct job * j = (struct job *)calloc(1, sizeof(*j)); if (j == NULL) { fprintf(stderr, "create_job failed"); return NULL; } int v = rand(); j->arg = malloc(sizeof (int)); if (j->arg == NULL) { fprintf(stderr, "get arg failed"); return NULL; } memcpy(j->arg, &v, sizeof (int) ); j->cb = print_v; return j; } static void * producer(void *arg) { struct monitor *m = (struct monitor *)arg; cout << "producer called" << endl; int pid = pthread_self(); int state; while(!m->quit) { fd_set fds; FD_ZERO(&fds); FD_SET(m->pfds[0], &fds); state = select(m->pfds[0] + 1, &fds, NULL, NULL, NULL); if(state < 0) { if(errno == EINTR) { cout << "errno == EINTR" << endl; continue; } break; } else if (state == 0) { } else { char msg[200]; memset(msg, 0, sizeof(msg)); read(m->pfds[0], msg, sizeof(msg)); //only to clear up pipe. msg[strlen(msg)] = '