代码如下,一般10个生产者10个消费者拷贝1个g的文件大概在6s左右,速度还是不错的。
1 #include <semaphore.h> 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <fcntl.h> 5 #include <pthread.h> 6 #include <sys/stat.h> 7 #include <unistd.h> 8 #include <time.h> 9 10 #define NBUFF 10 11 #define MAXTHREAD 1000 12 int nitems, nproducers, nconsumers; 13 int fin; 14 int fout; 15 16 struct{ 17 struct{ 18 char data[BUFSIZ]; 19 ssize_t n ; 20 }buff[NBUFF];//多个缓冲区 21 int nput; 22 int nget; 23 sem_t mutex, nempty, nstored; 24 } shared; 25 26 27 void * produce(void *), *consume(void *); 28 29 int 30 main(int argc, char ** argv) 31 { 32 time_t startTime, endTime; 33 int i, prodcount[MAXTHREAD], conscount[MAXTHREAD]; 34 pthread_t tid_produce[MAXTHREAD], tid_consume[MAXTHREAD]; 35 if(argc != 5){ 36 perror("Invalid argument! "); 37 exit(0); 38 } 39 nproducers = atoi(argv[1]); 40 nconsumers = atoi(argv[2]); 41 42 43 fin = open(argv[3], O_RDONLY); 44 fout = open(argv[4], O_CREAT | O_RDWR | O_TRUNC); 45 46 sem_init(&shared.mutex, 0, 1); 47 sem_init(&shared.nempty, 0, NBUFF); 48 sem_init(&shared.nstored, 0, 0); 49 50 //shared.nget = 0; 51 //shared.nput = 0; 52 53 pthread_setconcurrency(nproducers + nconsumers); 54 55 time(&startTime); 56 for(i = 0; i < nproducers; ++i){ 57 prodcount[i] = 0; 58 pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]); 59 } 60 61 for(i = 0; i < nconsumers; ++i){ 62 conscount[i] = 0; 63 pthread_create(&tid_consume[i], NULL, consume, &conscount[i]); 64 } 65 66 for(i = 0; i < nproducers; i++){ 67 pthread_join(tid_produce[i], NULL); 68 printf("producer count[%d] = %d ", i, prodcount[i]); 69 } 70 71 for(i = 0; i < nconsumers; i++){ 72 pthread_join(tid_consume[i], NULL); 73 printf("consumer connt[%d] = %d ", i, conscount[i]); 74 } 75 close(fin); 76 close(fout); 77 78 time(&endTime); 79 printf("The total time cost is : %.3f seconds ", difftime(endTime, startTime)); 80 81 sem_destroy(&shared.mutex); 82 sem_destroy(&shared.nempty); 83 sem_destroy(&shared.nstored); 84 exit(0); 85 } 86 87 void * produce(void * arg) 88 { 89 for(;;){ 90 sem_wait(&shared.nempty); 91 sem_wait(&shared.mutex); 92 93 shared.buff[shared.nput%NBUFF].n = 94 read(fin, shared.buff[shared.nput%NBUFF].data, BUFSIZ); 95 if(shared.buff[shared.nput%NBUFF].n == 0){ 96 printf("asdasd "); 97 sem_post(&shared.nstored); //目的是让消费者得以结束 98 sem_post(&shared.nempty); 99 sem_post(&shared.mutex); 100 return NULL; 101 } 102 shared.nput++; 103 104 sem_post(&shared.mutex); 105 sem_post(&shared.nstored); 106 ++*((int*)arg); 107 } 108 } 109 110 void * consume(void * arg) 111 { 112 for(;;){ 113 sem_wait(&shared.nstored); 114 sem_wait(&shared.mutex); 115 116 if(shared.buff[shared.nget%NBUFF].n == 0){ 117 sem_post(&shared.nstored); 118 sem_post(&shared.mutex); 119 return (NULL); 120 } 121 write(fout, shared.buff[shared.nget%NBUFF].data, shared.buff[shared.nget%NBUFF].n); 122 if(!(shared.nget%10000)) 123 printf("Process is going on, please wait... "); 124 shared.buff[shared.nget%NBUFF].n = 0; 125 shared.nget++; 126 sem_post(&shared.mutex); 127 sem_post(&shared.nempty); 128 ++*((int*)arg); 129 } 130 }