zoukankan      html  css  js  c++  java
  • 使用互斥锁解决生产者与消费者问题

    互斥锁:上锁与解锁

    互斥锁指代相互排斥,是最基本的同步形式,它可用于保护临界取,以保证任何时刻只有一个线程在执行其中的代码。它可以用来同步一个进程内的各个线程。如果互斥锁存放在多个进程间共享的某个内存区中,它也可以用于这些进程间的同步。它实际上保护的是临界区中被操纵的数据,也就是保护由多个线程或多个进程分享的共享数据


    使用方法

    lock_the_mutex(...)

    临界区

    unlock_the_mutex(,,,)

    在Posix中,互斥锁声明为具有pthread_mutex_t数据类型的变量


    相关函数

    #include <pthread.h>

    int pthread_mutex_init (pthread_mutex_t * mutex, const pthread_mutexattr_t *mutexattr) //动态创建

    int pthread_mutex_destroy (pthread_mutex_t * mutex)   //销毁互斥量

    int pthread_mutex_lock(pthread_mutex_t  *mutex);    //上锁

    int pthread_mutex_trylock(pthread_mutex_t  *mutex);  //非阻塞解锁

    int pthread_mutex_unlock(pthread_mutex_t  *mutex);  //阻塞解锁

    如果尝试给一个已由某个线程锁住的互斥锁上锁,那么pthread_mutex_lock将阻塞到该互斥锁解锁为止。pthread_mutex_trylock是对应的非阻塞函数。如果该互斥锁已锁住。它就返回一个EBUSY错误

    解决生产者与消费者问题

     多个生产者一个消费者,如下图


    整数数组buff含有被生产和消费的条目(也就是共享数据),buff数据从0开始递增

    线程间的共享的全局变量


    设置并发线程级别

     调用pthread_setconcurrency,当我们希望多个生产者线程中每一个都有执行机会时,这个函数是必须的

    命令行参数

     第一个命令行参数指定生产者存放的条目数,下一个参数指定创建生产者线程的数目


    具体实现代码

    #include <pthread.h>
    #include <sys/types.h>
    #include <stdio.h>
    #include <stdlib.h>
    
    #define MAXNITEMS  10000
    #define MAXNTHREADS 20
    
    int nitems;    //read-only by producer and consumer
    struct 
    {
    	pthread_mutex_t  mutex;
    	int buff[MAXNITEMS];
    	int nput;
    	int nval;
    }shared =        //initialized staticly
    		{
    			PTHREAD_MUTEX_INITIALIZER
    		};
    
    void *producer(void *);
    void *consumer(void *);
    int min(int a, int b) { return a<b ? a : b; }
    
    int main(int argc, char *argv[])
    {
    	int i, nthreads, count[MAXNTHREADS];
    	pthread_t tid_produce[MAXNTHREADS], tid_consume;
    	
    	if(argc!=3)
    	{
    		printf("usage: ProCon <#items> <#threads>");
    		exit(1);
    	}
    	nitems = min(atoi(argv[1]), MAXNITEMS);
    	nthreads = min(atoi(argv[2]), MAXNTHREADS);
    	
    	pthread_setconcurrency(nthreads);
    	for(i=0; i<nthreads; i++)
    	{
    		count[i] = 0;
    		pthread_create(&tid_produce[i], NULL, producer, &count[i]);
    	}
    	//wait for all the producer threads
    	for(i=0; i<nthreads; i++)
    	{
    		pthread_join(tid_produce[i], NULL);
    		printf("count[%d] = %d
    ", i, count[i]);
    	}
    	//start, then wait for the consumer thread
    	pthread_create(&tid_consume, NULL, consumer, NULL);
    	pthread_join(tid_consume, NULL);
    	return 0;
    }
    
    void *producer(void * arg)
    {
    	for(; ; )
    	{
    		pthread_mutex_lock(&shared.mutex);
    		if(shared.nput >= nitems)
    		{
    			pthread_mutex_unlock(&shared.mutex);
    			return (NULL);		//array is full, we're done
    		}
    		shared.buff[shared.nput] = shared.nval;
    		shared.nput++;
    		shared.nval++;
    		pthread_mutex_unlock(&shared.mutex);
    		*((int *)arg) += 1;
    	}
    }
    
    void *consumer(void *arg)
    {
    	int i;
    	for(i=0; i<nitems; i++)
    	{
    		if(shared.buff[i]!=i)
    		{	
    			printf("buff[%d] = %d
    ", i, shared.buff[i]);
    		}
    	}
    	return NULL;
    }
    

    代码解释

      39-42行用来创建消费者线程,每个线程执行producer函数,在tid_produce数组中保存每个线程的线程ID。等待所有生产者线程结束后,再启动消费者线程。在生产者线程结束的同时输出每个线程的计数值,即线程的执行次数。

    注意:count元素的增加不属于临界区,因为每个线程有各自的计数器。

    运行结果如下


    如果去掉互斥锁的话,结果就会输出buff的错误值(当items比较大小),

    我在ubuntu上的测试结果 ./ProCon 400 5


     注:该博文为<<unix网络编程 --卷二>>读书笔记


  • 相关阅读:
    Mac 安装FFMpeg 与 FFmpeg 格式转换
    django channels
    python3 coroutine
    python中关于sql 添加参数
    python导包的问题
    python中的列表
    django中用model生成数据库表结构
    docker
    博客大神地址
    Bean复制的几种框架性能比较(Apache BeanUtils、PropertyUtils,Spring BeanUtils,Cglib BeanCopier)
  • 原文地址:https://www.cnblogs.com/bbsno1/p/3278374.html
Copyright © 2011-2022 走看看