关于一个消费者模式,,,引起的问题..
我在io线程里不断的把一个函数调用放到队列里
然后ruby线程就不断的从这个队列里取出函数之争并运行.
典型的 消费者模式.
我曾经以为是这样...
这是work线程
pthread_mutex_lock(&mutex2) while(( invoke = get_invoke() ) != NULL){ do_invoke(invoke); } pthread_mutex_unlock(&mutex2) pthread_cond_wait(&cond, &mutex);
这里是io线程
pthread_mutex_lock(&mutex2); push_invoke(invoke); pthread_mutex_unlock(&mutex2); pthread_cond_signal(&cond);// 本意是打算在这里唤醒线程
可是会出现一个情况..就是在
pthread_mutex_unlock(&mutex2); // 这里解锁,也就是说 上面的线程就会開始运行 pthread_cond_signal(&cond);// 本意是打算在这里唤醒线程这里..在unlock的时候..非常有可能work线程已经解锁了...一旦 work 比較耗时..那么就会出现..
先 运行 io线程的 pthread_cond_signal 然后再运行 pthread_cond_wait
由于先pthread_cond_signal后pthread_cond_wait ..那么 pthread_cond_signal 就变得无效了
肯定会问.为什么 不先wait 再 unlock ,..可是这样会出现 io 线程一直处于无法进入的状态...
那么解决方法例如以下:
就是 mutex_lock 直接使用跟cond_wait 配套的相互排斥锁..
work thread
pthread_mutex_lock(&mutex) while(( invoke = get_invoke() ) != NULL){ do_invoke(invoke); } pthread_cond_wait(&cond, &mutex); pthread_mutex_unlock(&mutex)io thread
pthread_mutex_lock(&mutex); push_invoke(invoke); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex);
注意.顺序换了.然后相互排斥锁换了同一个.
肯定会问..那假设上面的開始等待后..那以下岂不是一直进去..无法解锁?.
但事实上是这种..
man 了一下 pthread_cond_wait
事实上是会先把mutex 给解锁..然后在堵塞..!!这里是重点,,
他会把mutex给unlock..那么.就可想而知了.
io线程解锁.再通过 signal 唤醒上面的线程 :)
事实就是这样...多看文档...经验不代表一切...
以下是项目代码
pthread_mutex_t pthread_ruby_invoke_call_invoke_mutex; pthread_cond_t pthread_ruby_invoke_call_invoke_cond; struct fs_loop_queue* ruby_invoke_loop_que; void fs_rb_loop(const char* main_file, int pathc, const char** pathv){ for(int i = 0 ; i < pathc ; i++){ char path[128]; snprintf(path, 128, " $: << '%s' ", pathv[i]); rb_eval_string(path); } ruby_invoke_loop_que = fs_create_loop_queue(INVOKE_LEN); pthread_mutex_init(&pthread_ruby_invoke_call_invoke_mutex, NULL); pthread_cond_init(&pthread_ruby_invoke_call_invoke_cond, NULL); int ret = 0; rb_load_protect(rb_str_buf_new_cstr(main_file), 0, &ret); if(ret != 0){ rb_p(rb_errinfo()); } struct fs_invoke_call_function* invoke = NULL; do{ retry: while((invoke = fs_ruby_pop_call_invoke()) != NULL){ invoke->func((VALUE)invoke->argv); fs_free(invoke->argv); fs_free(invoke); } pthread_mutex_lock(&pthread_ruby_invoke_call_invoke_mutex); // 过了临界点.发现有数据,就回去/// 效率 * 2 .....XDDD if(!fs_loop_queue_empty(ruby_invoke_loop_que)){ pthread_mutex_unlock(&pthread_ruby_invoke_call_invoke_mutex); goto retry; } pthread_cond_wait(&pthread_ruby_invoke_call_invoke_cond, &pthread_ruby_invoke_call_invoke_mutex); pthread_mutex_unlock(&pthread_ruby_invoke_call_invoke_mutex); }while (fs_true); } fs_bool fs_ruby_invoke(struct fs_invoke_call_function* invoke){ pthread_mutex_lock(&pthread_ruby_invoke_call_invoke_mutex); fs_bool ret = fs_false; ret = fs_loop_queue_push(ruby_invoke_loop_que, invoke); if(ret){ pthread_cond_signal(&pthread_ruby_invoke_call_invoke_cond); }else{ do{ ret = fs_loop_queue_push(ruby_invoke_loop_que, invoke); pthread_cond_signal(&pthread_ruby_invoke_call_invoke_cond); }while (!ret); } pthread_mutex_unlock(&pthread_ruby_invoke_call_invoke_mutex); return fs_true; } struct fs_invoke_call_function* fs_ruby_pop_call_invoke(){ struct fs_invoke_call_function* func = (struct fs_invoke_call_function*)fs_loop_queue_pop(ruby_invoke_loop_que); return func; }
pthread_mutex_unlock(&mutex2)