zoukankan      html  css  js  c++  java
  • Linux平台上实现队列

    转载: http://my.oschina.net/sundq/blog/203600

    Linux上目前有两种事件通知方式,一种是线程条件变量,一种是利用eventfd实现事件通知,下面介绍一下利用这两种方法实现异步队列的方法。

    线程条件变量

    相关函数介绍

    • pthread_cond_init:初始化一个线程条件变量。
    • pthread_cond_wait:等待条件触发。
    • pthread_cond_signal:通知一个线程,线程条件发生。
    • pthread_cond_timedwait:等待条件触发,可以设置超时时间。
    • pthread_cond_reltimedwait_np:和pthread_cond_timedwait使用基本相同,区别是使用的是相对时间间隔而不是绝对时间间隔。
    • pthread_cond_broadcast:通知所有等待线程,线程条件发生。
    • pthread_cond_destroy:销毁条件变量。

    唤醒丢失问题

    如果线程未持有与条件相关联的互斥锁,则调用 pthread_cond_signal() 或 pthread_cond_broadcast() 会产生唤醒丢失错误。满足以下所有条件时,即会出现唤醒丢失问题:

    • 一个线程调用 pthread_cond_signal() 或 pthread_cond_broadcast()
    • 另一个线程已经测试了该条件,但是尚未调用 pthread_cond_wait()
    • 没有正在等待的线程

    信号不起作用,因此将会丢失,仅当修改所测试的条件但未持有与之相关联的互斥锁时,才会出现此问题。只要仅在持有关联的互斥锁同时修改所测试的条件,即可调用 pthread_cond_signal() 和 pthread_cond_broadcast(),而无论这些函数是否持有关联的互斥锁。

    线程条件变量使用方法

    get_resources(int amount)    
    {    
        pthread_mutex_lock(&rsrc_lock);    
        while (resources < amount) 
       {    
            pthread_cond_wait(&rsrc_add, &rsrc_lock);
       }   
       resources -= amount;
       pthread_mutex_unlock(&rsrc_lock);

    }

    add_resources(int amount) 
    {

    pthread_mutex_lock(&rsrc_lock);
       resources += amount;
       pthread_cond_broadcast(&rsrc_add);
       pthread_mutex_unlock(&rsrc_lock);
    }

    eventfd

    int eventfd(unsigned int initval, int flags);

    eventfd 是Linux提供内核态的事件等待/通知机制,内核维护了一个8字节的整型数,该整型数由 initval 来初始化, flags 参数可以由以下值位或而来:

    • EFD_CLOEXEC:设置该描述符的 O_CLOEXEC 标志。
    • EFD_NONBLOCK:设置描述符为非阻塞模式。
    • EFD_SEMAPHORE:设置描述符为信号量工作模式,在此模式下, read 模式会使整型数减1并返回数值1。

    当内核维护的8字节整型数为0时, read 操作会阻塞,如果为fd设置为非阻塞模式,则返回 EAGAIN 错误。

    简单的唤醒队列

    下面我们实现一个简单的环形队列:

    #define default_size 1024
    
    typedef struct queue
    {
        int header;
        int tail;
        int size;
        int capcity;
        void **_buf;
    } queue_t;
    
    queue_t *queue_create(int size)
    {
        queue_t *q = malloc(sizeof (queue_t));
        if (q != NULL)
        {
            if (size > 0)
            {
                q->_buf = malloc(size);
                q->capcity = size;
            }
            else
            {
                q->_buf = malloc(default_size * sizeof (void *));
                q->capcity = default_size;
            }
            q->header = q->tail = q->size = 0;
        }
    
        return q;
    }
    
    int queue_is_full(queue_t *q)
    {
        return q->size == q->capcity;
    }
    
    int queue_is_empty(queue_t *q)
    {
        return q->size == 0;
    }
    
    void queue_push_tail(queue_t *q, void *data)
    {
        if (!queue_is_full(q))
        {
            q->_buf[q->tail] = data;
            q->tail = (q->tail + 1) % q->capcity;
            q->size++;
        }
    }
    
    void *queue_pop_head(queue_t *q)
    {
        void *data = NULL;
        if (!queue_is_empty(q))
        {
            data = q->_buf[(q->header)];
            q->header = (q->header + 1) % q->capcity;
            q->size--;
        }
        return data;
    }
    
    int *queue_free(queue_t *q)
    {
        free(q->_buf);
        free(q);
    }

    线程变量实现的异步队列

    typedef struct async_queue
    {
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        int waiting_threads;
        queue_t *_queue;
    } async_queue_t;
    async_queue_t *async_queue_create(int size)
    {
        async_queue_t *q = malloc(sizeof (async_queue_t));
        q->_queue = queue_create(size);
        q->waiting_threads = 0;
        pthread_mutex_init(&(q->mutex), NULL);
        pthread_cond_init(&(q->cond), NULL);
    
        return q;
    }
    
    void async_queue_push_tail(async_queue_t *q, void *data)
    {
        if (!queue_is_full(q->_queue))
        {
            pthread_mutex_lock(&(q->mutex));
            queue_push_tail(q->_queue, data);
            if (q->waiting_threads > 0)
            {
                pthread_cond_signal(&(q->cond));
            }
            pthread_mutex_unlock(&(q->mutex));
        }
    
    }
    
    void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
    {
        void *retval = NULL;
        pthread_mutex_lock(&(q->mutex));
        if (queue_is_empty(q->_queue))
        {
            q->waiting_threads++;
            while (queue_is_empty(q->_queue))
            {
                pthread_cond_wait(&(q->cond), &(q->mutex));
            }
            q->waiting_threads--;
        }
        retval = queue_pop_head(q->_queue);
        pthread_mutex_unlock(&(q->mutex));
        return retval;
    }
    
    void async_queue_free(async_queue_t *q)
    {
        queue_free(q->_queue);
        pthread_cond_destroy(&(q->cond));
        pthread_mutex_destroy(&(q->mutex));
        free(q);
    }

    eventfd实现的异步队列

    typedef struct async_queue
    {
        int efd; //event fd
        fd_set rdfds; //for select
        queue_t *_queue;
    } async_queue_t;
    async_queue_t *async_queue_create(int size)
    {
        async_queue_t *q = malloc(sizeof (async_queue_t));
    
        q->efd = eventfd(0, EFD_SEMAPHORE|EFD_NONBLOCK);
        q->_queue = queue_create(size);
        FD_ZERO(&(q->rdfds));
        FD_SET(q->efd, &(q->rdfds));
    
        return q;
    }
    
    void async_queue_push_tail(async_queue_t *q, void *data)
    {
        unsigned long long i = 1;
        if (!queue_is_full(q->_queue))
        {
            queue_push_tail(q->_queue, data);
            write(q->efd, &i, sizeof (i));
        }
    }
    
    void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
    {
        unsigned long long i = 0;
        void *data = NULL;
        if (select(q->efd + 1, &(q->rdfds), NULL, NULL, tv) == 0)
        {
            return data;
        }
        else
        {
            read(q->efd, &i, sizeof (i));
            return queue_pop_head(q->_queue);
        }
    }
    
    void async_queue_free(async_queue_t *q)
    {
        queue_free(q->_queue);
        close(q->efd);
        free(q);
    }

    总结

    两种实现方法线程条件变量比较复杂,但是性能略高,而eventfd实现简单,但是性能略低。

  • 相关阅读:
    JVM调试常用命令——jmap、jstat(2)
    JVM调试常用命令——jps、(1)
    线程基础:多任务处理(18)——MESI协议以及带来的问题:volatile关键字
    线程基础:多任务处理(18)——MESI协议以及带来的问题:伪共享
    网络穿透与音视频技术(5)——NAT映射检测和常见网络穿越方法论(NAT检测实践2)
    网络穿透与音视频技术(4)——NAT映射检测和常见网络穿越方法论(NAT检测实践1)
    网络穿透与音视频技术(3)——NAT映射检测和常见网络穿越方法论(NAT检测)
    网络穿透与音视频技术(2)——NAT的概念及工作模式(下)
    成功解决JSP和Servlet的中文乱码问题
    bootstrap心得
  • 原文地址:https://www.cnblogs.com/yyx1-1/p/6339803.html
Copyright © 2011-2022 走看看