zoukankan      html  css  js  c++  java
  • Linux_线程同步_生产者消费者模型

    #include <iostream>
    #include <pthread.h>
    #include <sys/types.h>
    #include <unistd.h>
    #include <string.h>
    #include <semaphore.h>
    using namespace std;
    
    int g_number = 0;
    pthread_mutex_t g_mutex;
    // 阻塞线程(条件变量类型的变量)
    pthread_cond_t g_cond;
    
    int g_number1 = 0;
    int g_number2 = 0;
    pthread_mutex_t g_mutex1;
    pthread_mutex_t g_mutex2;
    
    pthread_rwlock_t g_rwlock;
    
    sem_t g_producer_sem;
    sem_t g_consumer_sem;
    
    /**** 1、互斥锁案例 ****/
    // g_mutex 应为全局变量,保证多个线程共用一把锁
    // 在访问共享资源前加锁,访问结束后立即解锁。锁的“粒度”应越小越好。
    void* fun1(void* arg)
    {
        for(int i = 0; i < 50000; ++i)
        {
            // 访问全局变量之前加锁(如果 g_mutex 被锁上了,代码阻塞在当前位置)
            pthread_mutex_lock(&g_mutex);
            g_number++;
            cout << "tid = " << pthread_self() << ", g_number = " << g_number << endl;
            // 解锁
            pthread_mutex_unlock(&g_mutex);
            usleep(10);
        }
        return NULL;
    }
    
    void test1()
    {
        // 初始化互斥锁
        pthread_mutex_init(&g_mutex, NULL);
    
        pthread_t tid1, tid2;
        int ret1 = pthread_create(&tid1, NULL, fun1, NULL);
        int ret2 = pthread_create(&tid2, NULL, fun1, NULL);
        if(ret1 != 0)
        {
            // 线程创建失败,打印错误信息
            cout << "pthread_create error: " << strerror(ret1);
        }
        else
        {
            pthread_join(tid1, NULL);
        }
    
        if(ret2 != 0)
        {
            // 线程创建失败,打印错误信息
            cout << "pthread_create error: " << strerror(ret2);
        }
        else
        {
            pthread_join(tid2, NULL);
        }
    
        // 释放互斥锁资源
        pthread_mutex_destroy(&g_mutex);
    }
    
    /**** 2、死锁锁案例 ****/
    // 死锁的两种情况:(1) 线程试图对同一个互斥量A加锁两次; (2) 线程1拥有A锁,请求获得B锁;线程2拥有B锁,请求获得A锁
    void* fun21(void* arg)
    {
        for(int i = 0; i < 50000; ++i)
        {
            // 访问全局变量之前加锁(如果 g_mutex1 被锁上了,代码阻塞在当前位置)
            pthread_mutex_lock(&g_mutex1);
            g_number1++;
            cout << "tid = " << pthread_self() << ", g_number1 = " << g_number1 << endl;
    
            // 访问全局变量之前加锁(如果 g_mutex2 被锁上了,代码阻塞在当前位置)
            pthread_mutex_lock(&g_mutex2);
            g_number2++;
            cout << "tid = " << pthread_self() << ", g_number2 = " << g_number2 << endl;
            // 解锁
            pthread_mutex_unlock(&g_mutex2);
    
            // 解锁
            pthread_mutex_unlock(&g_mutex1);
    
    
            usleep(10);
        }
        return NULL;
    }
    
    void* fun22(void* arg)
    {
        for(int i = 0; i < 50000; ++i)
        {
            // 访问全局变量之前加锁(如果 g_mutex2 被锁上了,代码阻塞在当前位置)
            pthread_mutex_lock(&g_mutex2);
            g_number2++;
            cout << "tid = " << pthread_self() << ", g_number2 = " << g_number2 << endl;
    
            // 访问全局变量之前加锁(如果 g_mutex1 被锁上了,代码阻塞在当前位置)
            pthread_mutex_lock(&g_mutex1);
            g_number1++;
            cout << "tid = " << pthread_self() << ", g_number1 = " << g_number1 << endl;
            // 解锁
            pthread_mutex_unlock(&g_mutex1);
    
            // 解锁
            pthread_mutex_unlock(&g_mutex2);
            usleep(10);
        }
        return NULL;
    }
    
    void test2()
    {
        // 初始化互斥锁
        pthread_mutex_init(&g_mutex1, NULL);
        pthread_mutex_init(&g_mutex2, NULL);
    
        pthread_t tid1, tid2;
        int ret1 = pthread_create(&tid1, NULL, fun21, NULL);
        int ret2 = pthread_create(&tid2, NULL, fun22, NULL);
        if(ret1 != 0)
        {
            // 线程创建失败,打印错误信息
            cout << "pthread_create error: " << strerror(ret1);
        }
        else
        {
            pthread_join(tid1, NULL);
        }
    
        if(ret2 != 0)
        {
            // 线程创建失败,打印错误信息
            cout << "pthread_create error: " << strerror(ret2);
        }
        else
        {
            pthread_join(tid2, NULL);
        }
    
        // 释放互斥锁资源
        pthread_mutex_destroy(&g_mutex1);
        pthread_mutex_destroy(&g_mutex2);
    }
    
    /**** 3、读写锁案例 ****/
    // 与互斥量类似,但读写锁允许更高的并行性。其特性为:写独占,读共享。
    // (1) 读写锁是“写模式加锁”时, 解锁前,所有对该锁加锁的线程都会被阻塞。
    // (2) 读写锁是“读模式加锁”时, 如果线程以读模式对其加锁会成功;如果线程以写模式加锁会阻塞。
    // (3) 读写锁是“读模式加锁”时, 既有试图以写模式加锁的线程,也有试图以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求。优先满足写模式锁。读锁、写锁并行阻塞,写锁优先级高
    // 读写锁也叫共享-独占锁。当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独占模式锁住的。写独占、读共享。
    // 读写锁非常适合于对数据结构读的次数远大于写的情况。
    void* write_fun(void* arg)
    {
        while(true)
        {
            // 加写锁
            pthread_rwlock_wrlock(&g_rwlock);
            g_number++;
            cout << "write: tid = " << pthread_self() << ", g_number = " << g_number << endl;
            // 解锁
            pthread_rwlock_unlock(&g_rwlock);
            usleep(500);
        }
        return NULL;
    }
    
    void* read_fun(void* arg)
    {
        while(true)
        {
            // 加读锁
            pthread_rwlock_rdlock(&g_rwlock);
            cout << "read: tid = " << pthread_self() << ", g_number = " << g_number << endl;
            // 解锁
            pthread_rwlock_unlock(&g_rwlock);
            usleep(500);
        }
        return NULL;
    }
    
    void test3()
    {
        // 初始化读写锁
        pthread_rwlock_init(&g_rwlock, NULL);
    
        pthread_t tid[8] = { 0 };
        // 创建三个写线程
        for(int i = 0; i < 3; ++i)
        {
            pthread_create(&tid[i], NULL, write_fun, NULL);
        }
    
        // 创建五个读线程
        for(int i = 3; i < 8; ++i)
        {
            pthread_create(&tid[i], NULL, read_fun, NULL);
        }
    
        // 阻塞回收子线程的PCB
        for(int i = 0; i < 8; ++i)
        {
            pthread_join(tid[i], NULL);
        }
    
        // 释放读写锁资源
        pthread_rwlock_destroy(&g_rwlock);
    }
    
    /**** 4、生产者和消费者模型(条件变量) 案例 ****/
    
    // 节点结构
    typedef struct node
    {
        int data;
        struct node* next;
    }Node;
    
    // 永远指向链表头部的指针
    Node* g_head = NULL;
    
    void* producer_fun(void* arg)
    {
        while(true)
        {
            // 创建一个链表的结点
            Node* pnew = (Node*)malloc(sizeof(Node));
            // 结点的初始化
            pnew->data = rand() % 1000 + 1;
    
            // 使用互斥锁保护共享数据
            pthread_mutex_lock(&g_mutex);
            // 指针域
            pnew->next = g_head;
            g_head = pnew;
            cout << "producer tid = " << pthread_self() << ", data = " << pnew->data << endl;
            // 解锁
            pthread_mutex_unlock(&g_mutex);
    
            // 通知阻塞的消费者线程解除阻塞
            pthread_cond_signal(&g_cond);
    
            sleep(rand() % 3 + 1);
        }
        return NULL;
    }
    
    void* consumer_fun(void* arg)
    {
        while(true)
        {
            pthread_mutex_lock(&g_mutex);
            // 判断链表是否为空
            if(g_head == NULL)
            {
                // 线程阻塞
                // 该函数会对互斥锁解锁
                pthread_cond_wait(&g_cond, &g_mutex);
                // 解除阻塞之后,会对互斥锁做加锁操作
            }
            // 链表不为空,删掉头结点
            Node* pdel = g_head;
            g_head = g_head->next;
            cout << "consumer tid = " << pthread_self() << ", data = " << pdel->data << endl;
            free(pdel);
            pthread_mutex_unlock(&g_mutex);
        }
        return NULL;
    }
    
    void test4()
    {
        // 初始化
        pthread_mutex_init(&g_mutex, NULL);
        pthread_cond_init(&g_cond, NULL);
    
        pthread_t tid1 = 0;
        pthread_t tid2 = 0;
        // 创建生产者线程
        pthread_create(&tid1, NULL, producer_fun, NULL);
        // 创建消费者线程
        pthread_create(&tid2, NULL, consumer_fun, NULL);
    
        // 阻塞回收子线程的PCB
        pthread_join(tid1, NULL);
        pthread_join(tid2, NULL);
    
        // 释放资源
        pthread_mutex_destroy(&g_mutex);
        pthread_cond_destroy(&g_cond);
    }
    
    /**** 5、生产者和消费者模型(信号量) 案例 ****/
    void* producer_fun_sem(void* arg)
    {
        while(true)
        {
            // 创建一个链表的结点
            Node* pnew = (Node*)malloc(sizeof(Node));
            // 结点的初始化
            pnew->data = rand() % 1000 + 1;
    
            // 给信号量加锁(相当于 g_producer_sem--, 如果 g_producer_sem = 0, 则阻塞)
            sem_wait(&g_producer_sem);
    
            // 指针域
            pnew->next = g_head;
            g_head = pnew;
            cout << "producer tid = " << pthread_self() << ", data = " << pnew->data << endl;
    
            // 给信号量解锁(相当于 g_consumer_sem++)
            sem_post(&g_consumer_sem);
    
            sleep(rand() % 3 + 1);
        }
        return NULL;
    }
    
    void* consumer_fun_sem(void* arg)
    {
        while(true)
        {
            // 给信号量加锁(相当于 g_consumer_sem--, 如果 g_consumer_sem = 0, 则阻塞)
            sem_wait(&g_consumer_sem);
    
            Node* pdel = g_head;
            g_head = g_head->next;
            cout << "consumer tid = " << pthread_self() << ", data = " << pdel->data << endl;
            free(pdel);
    
            // 给信号量解锁(相当于 g_producer_sem++)
            sem_post(&g_producer_sem);
        }
        return NULL;
    }
    
    void test5()
    {
        // 初始化
        // 参1:sem信号量, 参2:pshared取0用于线程间;取非1用于进程间, 参3:value指定信号量初值
        sem_init(&g_producer_sem, 0, 1);
        sem_init(&g_consumer_sem, 0, 0);
    
        pthread_t tid1 = 0;
        pthread_t tid2 = 0;
        // 创建生产者线程
        pthread_create(&tid1, NULL, producer_fun_sem, NULL);
        // 创建消费者线程
        pthread_create(&tid2, NULL, consumer_fun_sem, NULL);
    
        // 阻塞回收子线程的PCB
        pthread_join(tid1, NULL);
        pthread_join(tid2, NULL);
    
        // 释放资源
        sem_destroy(&g_producer_sem);
        sem_destroy(&g_consumer_sem);
    }
    
    int main()
    {
        test5();
        return 0;
    }
  • 相关阅读:
    python第三周练习
    python第一周作业
    SQLite3—数据库的学习—python
    python实现跳一跳辅助的实验报告
    Python——自己的第一个网页(文件的使用)
    第一次爬虫和测试
    numpy和matplotlib使用
    Python作业———预测球队比赛成绩
    PIL库的学习
    Pytho作业——Jieba库的使用和好玩的词云
  • 原文地址:https://www.cnblogs.com/duxie/p/15086354.html
Copyright © 2011-2022 走看看