zoukankan      html  css  js  c++  java
  • 利用生产者消费者模型实现大文件的拷贝

    代码如下,一般10个生产者10个消费者拷贝1个g的文件大概在6s左右,速度还是不错的。

      1 #include <semaphore.h>
      2 #include <stdio.h>
      3 #include <stdlib.h>
      4 #include <fcntl.h>
      5 #include <pthread.h>
      6 #include <sys/stat.h>
      7 #include <unistd.h>
      8 #include <time.h>
      9 
     10 #define NBUFF 10
     11 #define MAXTHREAD 1000
     12 int nitems, nproducers, nconsumers;
     13 int fin;
     14 int fout;
     15 
     16 struct{
     17     struct{
     18         char data[BUFSIZ];
     19         ssize_t n ;
     20     }buff[NBUFF];//多个缓冲区
     21     int nput;
     22     int nget;
     23     sem_t mutex, nempty, nstored;
     24 } shared;
     25 
     26 
     27 void * produce(void *), *consume(void *);
     28 
     29 int
     30 main(int argc, char ** argv)
     31 {
     32     time_t startTime, endTime;
     33     int i, prodcount[MAXTHREAD], conscount[MAXTHREAD];
     34     pthread_t tid_produce[MAXTHREAD], tid_consume[MAXTHREAD];
     35     if(argc != 5){
     36         perror("Invalid argument!
    ");
     37         exit(0);
     38     }
     39     nproducers = atoi(argv[1]);
     40     nconsumers = atoi(argv[2]);
     41 
     42 
     43     fin = open(argv[3], O_RDONLY);
     44     fout = open(argv[4], O_CREAT | O_RDWR | O_TRUNC);
     45 
     46     sem_init(&shared.mutex, 0, 1);
     47     sem_init(&shared.nempty, 0, NBUFF);
     48     sem_init(&shared.nstored, 0, 0);
     49 
     50     //shared.nget = 0;
     51     //shared.nput = 0;
     52 
     53     pthread_setconcurrency(nproducers + nconsumers);
     54 
     55     time(&startTime);
     56     for(i = 0; i < nproducers; ++i){
     57         prodcount[i] = 0;
     58         pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);
     59     }
     60 
     61     for(i = 0; i < nconsumers; ++i){
     62         conscount[i] = 0;
     63         pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);
     64     }
     65 
     66     for(i = 0; i < nproducers; i++){
     67         pthread_join(tid_produce[i], NULL);
     68         printf("producer count[%d] = %d
    ", i, prodcount[i]);
     69     }
     70 
     71     for(i = 0; i < nconsumers; i++){
     72         pthread_join(tid_consume[i], NULL);
     73         printf("consumer connt[%d] = %d
    ", i, conscount[i]);
     74     }
     75     close(fin);
     76     close(fout);
     77 
     78     time(&endTime);
     79     printf("The total time cost is : %.3f seconds
    ", difftime(endTime, startTime));
     80 
     81     sem_destroy(&shared.mutex);
     82     sem_destroy(&shared.nempty);
     83     sem_destroy(&shared.nstored);
     84     exit(0);
     85 }
     86 
     87 void * produce(void * arg)
     88 {
     89     for(;;){
     90         sem_wait(&shared.nempty);
     91         sem_wait(&shared.mutex);
     92 
     93         shared.buff[shared.nput%NBUFF].n =
     94                 read(fin, shared.buff[shared.nput%NBUFF].data, BUFSIZ);
     95         if(shared.buff[shared.nput%NBUFF].n == 0){
     96             printf("asdasd
    ");
     97             sem_post(&shared.nstored); //目的是让消费者得以结束
     98             sem_post(&shared.nempty);
     99             sem_post(&shared.mutex);
    100             return NULL;
    101         }
    102         shared.nput++;
    103 
    104         sem_post(&shared.mutex);
    105         sem_post(&shared.nstored);
    106         ++*((int*)arg);
    107     }
    108 }
    109 
    110 void * consume(void * arg)
    111 {
    112     for(;;){
    113         sem_wait(&shared.nstored);
    114         sem_wait(&shared.mutex);
    115 
    116         if(shared.buff[shared.nget%NBUFF].n == 0){
    117             sem_post(&shared.nstored);
    118             sem_post(&shared.mutex);
    119             return (NULL);
    120         }
    121         write(fout, shared.buff[shared.nget%NBUFF].data, shared.buff[shared.nget%NBUFF].n);
    122         if(!(shared.nget%10000))
    123             printf("Process is going on, please wait...
    ");
    124         shared.buff[shared.nget%NBUFF].n = 0;
    125         shared.nget++;
    126         sem_post(&shared.mutex);
    127         sem_post(&shared.nempty);
    128         ++*((int*)arg);
    129     }
    130 }
  • 相关阅读:
    XStream教程
    Log4j教程
    Java.io包
    Java输入/输出教程
    Java.math.BigDecimal.abs()方法
    数据类型转换
    JUnit教程
    java.lang
    标识符
    PHP面向对象笔记解析
  • 原文地址:https://www.cnblogs.com/-wang-cheng/p/4928131.html
Copyright © 2011-2022 走看看