zoukankan      html  css  js  c++  java
  • 数据结构【二】:简单阻塞队列BlockingQueue

    POSIX多线程【一】:简单队列simple queue的基础上使用内部互斥锁条件变量来控制并发以达到线程安全的目的,其主要用于 [生产者-消费者] 队列.

    1.BlockingQueue初始化时会确定队列容量(_capacity),如果队列已满(capacity=0),则会阻塞enqueue操作.

    2.关闭BlockingQueue(调用queue_free)是一个延迟的操作,它会等待所有元素都dequeue,期间,该队列的一切enqueue操作将无效.

    3.此代码未经生产环境检验,仅供学习参考.

    BlockingQueue.h

    #ifndef CUR_BLOCKINGQUEUE_H
    #define CUR_BLOCKINGQUEUE_H
    #include <stdlib.h>
    #include <pthread.h>
    
    
    struct node{
        int value;
        struct node * next;
    };
    
    
    typedef struct BlockingQueue_ST{
        int capacity,remaining,closed;
        struct node * head, *tail;
        pthread_mutex_t queue_mutex;
        pthread_cond_t  cond_not_full;
        pthread_cond_t  cond_not_empty;
        pthread_cond_t  cond_empty;
    }BlockingQueue;
    
    
    extern BlockingQueue* empty_queue(int _capacity);
    extern int queue_free(BlockingQueue *q);
    extern int is_empty(const BlockingQueue *q);
    extern int is_full(const BlockingQueue *q);
    extern int enqueue(struct node *item, BlockingQueue *q);
    extern struct node* dequeue(BlockingQueue *q);
    
    #endif

    BlockingQueue.c

    #include "BlockingQueue.h"
    #include <stdio.h>
    
    BlockingQueue* empty_queue(int _capacity)
    {
         BlockingQueue *q = malloc(sizeof(BlockingQueue));
         q->head  = q->tail = NULL;
         q->capacity = q->remaining = _capacity;
         q->closed = 0;
         pthread_mutex_init(&q->queue_mutex , NULL);
         pthread_cond_init(&q->cond_not_full , NULL);
         pthread_cond_init(&q->cond_not_empty , NULL);
         pthread_cond_init(&q->cond_empty , NULL);
         return q;
    }
     
    int queue_free(BlockingQueue *q)
    {
        pthread_mutex_lock(&q->queue_mutex);
        printf("close queue...
    ");
        q->closed = 1;
        //等待cond_empty
        while(!is_empty(q))
        {
            pthread_cond_wait(&q->cond_empty, &q->queue_mutex);
        }
        free(q);
        pthread_mutex_unlock(&q->queue_mutex);
        printf("closed...
    ");
    }
    
    int is_empty(const BlockingQueue *q)
    {
        return q->capacity == q->remaining;
    }
    
    int is_full(const BlockingQueue *q)
    {
        return q->remaining == 0;
    }
    
    int enqueue(struct node *item, BlockingQueue *q)
    {
    
        if(q->closed) goto err;
        //lock
        pthread_mutex_lock(&q->queue_mutex);
        //等待cond_not_full
        while(is_full(q))
        {
            pthread_cond_wait(&q->cond_not_full, &q->queue_mutex);
        }
        
        if(is_empty(q))
        {
            q->head = q->tail = item;
            //通知所有等待cond_not_empty的线程
            pthread_cond_broadcast(&q->cond_not_empty);
        }
        else
        {
            q->tail->next = item;
            q->tail = item;
        }
        q->remaining--;
        //unlock
        pthread_mutex_unlock(&q->queue_mutex);
    
        return 0;
    err : 
        return -1;
    }
    
    struct node* dequeue(BlockingQueue *q)
    {
        
        //已经关闭的空队列
        if(q->closed && is_empty(q)) goto err;
        //lock 
        pthread_mutex_lock(&q->queue_mutex);
        //空队列,等待cond_not_empty
        while(!q->closed && is_empty(q))
        {
            pthread_cond_wait(&q->cond_not_empty, &q->queue_mutex);
        }
        //take
        struct node * temp = q->head;
        q->head = q->head->next;
    
    
        //在未关闭队列的情况下,唤醒enqueue等待线程
        if(!q->closed && is_full(q))
        {
            pthread_cond_broadcast(&q->cond_not_full); //TODO 1
        }
        q->remaining++; 
        //唤醒关闭队列线程
        if(q->closed && is_empty(q))
        {
            pthread_cond_signal(&q->cond_empty);//TODO 2
        }
    
        //注意:TODO 1和TODO 2其实是互斥的,不可能同时满足条件
        //必须先判断是否激活cond_not_full然后remaining++
        //最后再判断是否激活cond_empty
        //unlock
        pthread_mutex_unlock(&q->queue_mutex);
        return temp;
    err:
        return NULL;
    }

    测试代码 : main.c

    #include<stdio.h>
    #include<stdlib.h>
    #include "BlockingQueue.h"
    extern void* func_put(void* _q);
    
    BlockingQueue *q;
    pthread_t thread1,thread2;
    void main()
    {
        q = empty_queue(5);
        pthread_create(&thread1,NULL,func_put,(void*)q);
        pthread_create(&thread2,NULL,func_put,(void*)q);
        
        int i;
        for(i=1; i<=10; i++)
        {
            struct node * item = (struct node *)malloc(sizeof(struct node));
            item->value = i;
            item->next = NULL;
            enqueue(item,q);
            printf("enqueue -> thread : %d, value : %d, remaining : %d
    ",pthread_self(),i,q->remaining);
            sleep(1);
        }
        queue_free(q);
    }
    
    void* func_put(void* _q)
    {
        BlockingQueue *q = (BlockingQueue*)_q;
        struct node *item;
        while((item = dequeue(q)) != NULL)
        {
            printf("dequeue -> thread : %d, value : %d, remaining : %d
    ",pthread_self(), item->value,q->remaining);
            free(item);
            sleep(3);
        }
    }

    测试结果 :

  • 相关阅读:
    Laravel 5.7 RCE (CVE-2019-9081)
    Laravel 5.8 RCE 分析
    CVE-2018-12613 的一些思考
    2019CISCN华南线下两道web复现
    Intellij idea导入项目时没有目录结构
    [BZOJ4907]柠檬
    [BZOJ3675]序列分割
    aes加解密
    java:基于redis实现分布式定时任务
    PBKDF2加密
  • 原文地址:https://www.cnblogs.com/coveted/p/3499335.html
Copyright © 2011-2022 走看看