zoukankan      html  css  js  c++  java
  • 1 producer — n consumers 模型 实现

      1 #include<stdio.h>
      2 #include<string.h>
      3 #include<pthread.h>
      4 #include<stdlib.h>
      5 #include<unistd.h>
      6 #include<queue>
      7 using namespace std;
      8 #define LEN 128
      9 typedef struct task_que
     10 {
     11     queue<char*> string_queue;
     12     int m_flag ;        //结束标志
     13     int m_capacity;
     14     pthread_mutex_t m_lock;
     15     pthread_cond_t m_pro , m_con;
     16 }QUE , *pQUE;
     17 void* thd_func(void* arg);
     18 void put(pQUE pq, char* src);
     19 void get(pQUE pq, char* dest);
     20 int main(int argc, char *argv[])
     21 {
     22     QUE aque;
     23     aque.m_flag = 0;
     24     int nthds = atoi( argv[1] );   //线程的个数
     25     aque.m_capacity = atoi( argv[2] );//字符串队列的大小
     26     pthread_mutex_init( &aque.m_lock , NULL ); //
     27     pthread_cond_init( &aque.m_pro ,NULL );   //生产者条件变量
     28     pthread_cond_init( &aque.m_con , NULL );  //消费者条件变量
     29     //开辟线程空间
     30     pthread_t *thd_arr = ( pthread_t*)calloc(nthds, sizeof(pthread_t));
     31     int* ntasks = (int*)calloc(nthds, sizeof(int));//用来记录 线程工作了几次
     32     //创建线程
     33     int index;
     34     for(index = 0; index < nthds; index ++)
     35     {    
     36         pthread_create( thd_arr + index, NULL, thd_func,(void*)&aque );
     37     }
     38 
     39 
     40     //输入字符串到队列中
     41     char buf[LEN] ;
     42     while( memset(buf, 0, LEN), fgets(buf, LEN, stdin) != NULL)
     43     {
     44         put(&aque, buf);
     45     }
     46 
     47 
     48 
     49     //发出结束字符串
     50     strcpy(buf, "over");
     51     put(&aque, buf);
     52 
     53     for(index = 0; index < nthds; index ++)
     54     {
     55         pthread_join(thd_arr[index], (void**)(ntasks + index ));
     56     }
     57     for(index = 0; index < nthds; index ++)
     58     {
     59         printf("%d ", ntasks[index]);
     60     }
     61     printf("
    ");
     62 
     63     pthread_mutex_destroy(&aque.m_lock);
     64     pthread_cond_destroy(&aque.m_pro);
     65     pthread_cond_destroy(&aque.m_con);
     66     return 0 ;
     67 }
     68 void put(pQUE pq,  char* src)      //把字符串写到队列中
     69 {
     70     pthread_mutex_lock(&pq ->m_lock);   //加锁
     71     while(pq ->string_queue.size() == pq ->m_capacity)  //队列满则阻塞
     72     {
     73         pthread_cond_wait(&pq -> m_pro, &pq ->m_lock);
     74 
     75     }
     76     //插入队列
     77     char* tem =  ( char*)calloc( LEN , sizeof( char ));
     78     strcpy(tem,src);
     79     pq->string_queue.push(tem);
     80     pthread_mutex_unlock(&pq -> m_lock);  //解锁
     81     pthread_cond_broadcast(&pq ->m_con);  //唤醒所有消费者线程
     82 
     83 
     84 }
     85 
     86 void get(pQUE pq, char* dest)
     87 {
     88     pthread_mutex_lock(&pq ->m_lock); //加锁
     89     while(pq ->m_flag == 0 &&  pq ->string_queue.empty()  ) //队列空 并且未结束 则阻塞
     90     {
     91         pthread_cond_wait(&pq ->m_con, &pq ->m_lock);
     92     }
     93     if(pq ->m_flag == 1)                 //判断结束标志
     94     {
     95         pthread_mutex_unlock(&pq ->m_lock); //解锁
     96         return ;                         
     97     }
     98     //出队
     99     strcpy(dest, pq ->string_queue.front());
    100     pq->string_queue.pop();
    101 
    102     pthread_mutex_unlock(&pq ->m_lock);
    103     pthread_cond_signal(&pq ->m_pro);
    104 
    105 }
    106 
    107 void* thd_func(void* arg)
    108 {
    109     pQUE pq = (pQUE)arg ;
    110     char buf[LEN] ;
    111     int ncnt = 0 ;
    112     while(1)
    113     {
    114         memset(buf, 0, LEN) ;
    115         get(pq, buf);
    116         if(pq ->m_flag == 1)            //判断结束标志
    117         {
    118             printf("%u exit!
    ", pthread_self());
    119             pthread_exit((void*)ncnt);             //退出
    120         }
    121         ncnt ++ ;
    122         printf("%u: %s
    ", pthread_self(), buf);  //打印字符串
    123         if(strcmp("over", buf) == 0)              //判断结束字符串        
    124         {
    125             pq ->m_flag = 1 ;                    //把结束符号置为 1
    126             pthread_cond_broadcast(&pq ->m_con); //唤醒所有线程
    127             pthread_exit((void*)ncnt);           //退出
    128         }
    129         if(ncnt & 1 == 1) sleep(1);     //简单的负载平衡
    130     }
    131 }
  • 相关阅读:
    go语言的运行时支持到底是多线程还是单线程
    丑数
    把数组排成最小数
    连续子数组的最大和
    最小的k个数
    数组中出现次数超过一半的数字
    字符串的排序
    二叉搜索树与双向链表
    复杂链表的赋值
    二叉树中和为某一值的路径
  • 原文地址:https://www.cnblogs.com/xiaoyesoso/p/4278811.html
Copyright © 2011-2022 走看看