zoukankan      html  css  js  c++  java
  • Linux平台上实现队列

    转载: http://my.oschina.net/sundq/blog/203600

    Linux上目前有两种事件通知方式,一种是线程条件变量,一种是利用eventfd实现事件通知,下面介绍一下利用这两种方法实现异步队列的方法。

    线程条件变量

    相关函数介绍

    • pthread_cond_init:初始化一个线程条件变量。
    • pthread_cond_wait:等待条件触发。
    • pthread_cond_signal:通知一个线程,线程条件发生。
    • pthread_cond_timedwait:等待条件触发,可以设置超时时间。
    • pthread_cond_reltimedwait_np:和pthread_cond_timedwait使用基本相同,区别是使用的是相对时间间隔而不是绝对时间间隔。
    • pthread_cond_broadcast:通知所有等待线程,线程条件发生。
    • pthread_cond_destroy:销毁条件变量。

    唤醒丢失问题

    如果线程未持有与条件相关联的互斥锁,则调用 pthread_cond_signal() 或 pthread_cond_broadcast() 会产生唤醒丢失错误。满足以下所有条件时,即会出现唤醒丢失问题:

    • 一个线程调用 pthread_cond_signal() 或 pthread_cond_broadcast()
    • 另一个线程已经测试了该条件,但是尚未调用 pthread_cond_wait()
    • 没有正在等待的线程

    信号不起作用,因此将会丢失,仅当修改所测试的条件但未持有与之相关联的互斥锁时,才会出现此问题。只要仅在持有关联的互斥锁同时修改所测试的条件,即可调用 pthread_cond_signal() 和 pthread_cond_broadcast(),而无论这些函数是否持有关联的互斥锁。

    线程条件变量使用方法

    get_resources(int amount)    
    {    
        pthread_mutex_lock(&rsrc_lock);    
        while (resources < amount) 
       {    
            pthread_cond_wait(&rsrc_add, &rsrc_lock);
       }   
       resources -= amount;
       pthread_mutex_unlock(&rsrc_lock);

    }

    add_resources(int amount) 
    {

    pthread_mutex_lock(&rsrc_lock);
       resources += amount;
       pthread_cond_broadcast(&rsrc_add);
       pthread_mutex_unlock(&rsrc_lock);
    }

    eventfd

    int eventfd(unsigned int initval, int flags);

    eventfd 是Linux提供内核态的事件等待/通知机制,内核维护了一个8字节的整型数,该整型数由 initval 来初始化, flags 参数可以由以下值位或而来:

    • EFD_CLOEXEC:设置该描述符的 O_CLOEXEC 标志。
    • EFD_NONBLOCK:设置描述符为非阻塞模式。
    • EFD_SEMAPHORE:设置描述符为信号量工作模式,在此模式下, read 模式会使整型数减1并返回数值1。

    当内核维护的8字节整型数为0时, read 操作会阻塞,如果为fd设置为非阻塞模式,则返回 EAGAIN 错误。

    简单的唤醒队列

    下面我们实现一个简单的环形队列:

    #define default_size 1024
    
    typedef struct queue
    {
        int header;
        int tail;
        int size;
        int capcity;
        void **_buf;
    } queue_t;
    
    queue_t *queue_create(int size)
    {
        queue_t *q = malloc(sizeof (queue_t));
        if (q != NULL)
        {
            if (size > 0)
            {
                q->_buf = malloc(size);
                q->capcity = size;
            }
            else
            {
                q->_buf = malloc(default_size * sizeof (void *));
                q->capcity = default_size;
            }
            q->header = q->tail = q->size = 0;
        }
    
        return q;
    }
    
    int queue_is_full(queue_t *q)
    {
        return q->size == q->capcity;
    }
    
    int queue_is_empty(queue_t *q)
    {
        return q->size == 0;
    }
    
    void queue_push_tail(queue_t *q, void *data)
    {
        if (!queue_is_full(q))
        {
            q->_buf[q->tail] = data;
            q->tail = (q->tail + 1) % q->capcity;
            q->size++;
        }
    }
    
    void *queue_pop_head(queue_t *q)
    {
        void *data = NULL;
        if (!queue_is_empty(q))
        {
            data = q->_buf[(q->header)];
            q->header = (q->header + 1) % q->capcity;
            q->size--;
        }
        return data;
    }
    
    int *queue_free(queue_t *q)
    {
        free(q->_buf);
        free(q);
    }

    线程变量实现的异步队列

    typedef struct async_queue
    {
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        int waiting_threads;
        queue_t *_queue;
    } async_queue_t;
    async_queue_t *async_queue_create(int size)
    {
        async_queue_t *q = malloc(sizeof (async_queue_t));
        q->_queue = queue_create(size);
        q->waiting_threads = 0;
        pthread_mutex_init(&(q->mutex), NULL);
        pthread_cond_init(&(q->cond), NULL);
    
        return q;
    }
    
    void async_queue_push_tail(async_queue_t *q, void *data)
    {
        if (!queue_is_full(q->_queue))
        {
            pthread_mutex_lock(&(q->mutex));
            queue_push_tail(q->_queue, data);
            if (q->waiting_threads > 0)
            {
                pthread_cond_signal(&(q->cond));
            }
            pthread_mutex_unlock(&(q->mutex));
        }
    
    }
    
    void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
    {
        void *retval = NULL;
        pthread_mutex_lock(&(q->mutex));
        if (queue_is_empty(q->_queue))
        {
            q->waiting_threads++;
            while (queue_is_empty(q->_queue))
            {
                pthread_cond_wait(&(q->cond), &(q->mutex));
            }
            q->waiting_threads--;
        }
        retval = queue_pop_head(q->_queue);
        pthread_mutex_unlock(&(q->mutex));
        return retval;
    }
    
    void async_queue_free(async_queue_t *q)
    {
        queue_free(q->_queue);
        pthread_cond_destroy(&(q->cond));
        pthread_mutex_destroy(&(q->mutex));
        free(q);
    }

    eventfd实现的异步队列

    typedef struct async_queue
    {
        int efd; //event fd
        fd_set rdfds; //for select
        queue_t *_queue;
    } async_queue_t;
    async_queue_t *async_queue_create(int size)
    {
        async_queue_t *q = malloc(sizeof (async_queue_t));
    
        q->efd = eventfd(0, EFD_SEMAPHORE|EFD_NONBLOCK);
        q->_queue = queue_create(size);
        FD_ZERO(&(q->rdfds));
        FD_SET(q->efd, &(q->rdfds));
    
        return q;
    }
    
    void async_queue_push_tail(async_queue_t *q, void *data)
    {
        unsigned long long i = 1;
        if (!queue_is_full(q->_queue))
        {
            queue_push_tail(q->_queue, data);
            write(q->efd, &i, sizeof (i));
        }
    }
    
    void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
    {
        unsigned long long i = 0;
        void *data = NULL;
        if (select(q->efd + 1, &(q->rdfds), NULL, NULL, tv) == 0)
        {
            return data;
        }
        else
        {
            read(q->efd, &i, sizeof (i));
            return queue_pop_head(q->_queue);
        }
    }
    
    void async_queue_free(async_queue_t *q)
    {
        queue_free(q->_queue);
        close(q->efd);
        free(q);
    }

    总结

    两种实现方法线程条件变量比较复杂,但是性能略高,而eventfd实现简单,但是性能略低。

  • 相关阅读:
    POJ 3261 Milk Patterns (求可重叠的k次最长重复子串)
    UVaLive 5031 Graph and Queries (Treap)
    Uva 11996 Jewel Magic (Splay)
    HYSBZ
    POJ 3580 SuperMemo (Splay 区间更新、翻转、循环右移,插入,删除,查询)
    HDU 1890 Robotic Sort (Splay 区间翻转)
    【转】ACM中java的使用
    HDU 4267 A Simple Problem with Integers (树状数组)
    POJ 1195 Mobile phones (二维树状数组)
    HDU 4417 Super Mario (树状数组/线段树)
  • 原文地址:https://www.cnblogs.com/yyx1-1/p/6339803.html
Copyright © 2011-2022 走看看