zoukankan      html  css  js  c++  java
  • linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】

    转自:http://blog.chinaunix.net/uid-28458801-id-4262445.html

    操作系统:ubuntu10.04

    前言:
        在嵌入式开发中,只要是带操作系统的,在其上开发产品应用,基本都需要用到多线程。
        为了提高效率,尽可能的提高并发率。因此,线程之间的通信就是问题的核心。
        根据当前产品需要,使用 环形缓冲区 解决。

    一,环形缓冲区的实现
        1,cbuf.h

    点击(此处)折叠或打开

    1. #ifndef __CBUF_H__
    2. #define __CBUF_H__

    3. #ifdef __cplusplus
    4. extern "C" {
    5. #endif

    6. /* Define to prevent recursive inclusion
    7. -------------------------------------*/
    8. #include "types.h"
    9. #include "thread.h"


    10. typedef    struct _cbuf
    11. {
    12.     int32_t        size;            /* 当前缓冲区中存放的数据的个数 */
    13.     int32_t        next_in;        /* 缓冲区中下一个保存数据的位置 */
    14.     int32_t        next_out;        /* 从缓冲区中取出下一个数据的位置 */
    15.     int32_t        capacity;        /* 这个缓冲区的可保存的数据的总个数 */
    16.     mutex_t        mutex;            /* Lock the structure */
    17.     cond_t        not_full;        /* Full -> not full condition */
    18.     cond_t        not_empty;        /* Empty -> not empty condition */
    19.     void        *data[CBUF_MAX];/* 缓冲区中保存的数据指针 */
    20. }cbuf_t;


    21. /* 初始化环形缓冲区 */
    22. extern    int32_t        cbuf_init(cbuf_t *c);

    23. /* 销毁环形缓冲区 */
    24. extern    void        cbuf_destroy(cbuf_t    *c);

    25. /* 压入数据 */
    26. extern    int32_t        cbuf_enqueue(cbuf_t *c,void *data);

    27. /* 取出数据 */
    28. extern    void*        cbuf_dequeue(cbuf_t *c);


    29. /* 判断缓冲区是否为满 */
    30. extern    bool        cbuf_full(cbuf_t    *c);

    31. /* 判断缓冲区是否为空 */
    32. extern    bool        cbuf_empty(cbuf_t *c);

    33. /* 获取缓冲区可存放的元素的总个数 */
    34. extern    int32_t        cbuf_capacity(cbuf_t *c);


    35. #ifdef __cplusplus
    36. }
    37. #endif

    38. #endif
    39. /* END OF FILE
    40. ---------------------------------------------------------------*/


        2,cbuf.c

    点击(此处)折叠或打开

    1. #include "cbuf.h"



    2. /* 初始化环形缓冲区 */
    3. int32_t        cbuf_init(cbuf_t *c)
    4. {
    5.     int32_t    ret = OPER_OK;

    6.     if((ret = mutex_init(&c->mutex)) != OPER_OK)    
    7.     {
    8. #ifdef DEBUG_CBUF
    9.     debug("cbuf init fail ! mutex init fail ! ");
    10. #endif
    11.         return ret;
    12.     }

    13.     if((ret = cond_init(&c->not_full)) != OPER_OK)    
    14.     {
    15. #ifdef DEBUG_CBUF
    16.     debug("cbuf init fail ! cond not full init fail ! ");
    17. #endif
    18.         mutex_destroy(&c->mutex);
    19.         return ret;
    20.     }

    21.     if((ret = cond_init(&c->not_empty)) != OPER_OK)
    22.     {
    23. #ifdef DEBUG_CBUF
    24.     debug("cbuf init fail ! cond not empty init fail ! ");
    25. #endif
    26.         cond_destroy(&c->not_full);
    27.         mutex_destroy(&c->mutex);
    28.         return ret;
    29.     }

    30.     c->size     = 0;
    31.     c->next_in    = 0;
    32.     c->next_out = 0;
    33.     c->capacity    = CBUF_MAX;

    34. #ifdef DEBUG_CBUF
    35.     debug("cbuf init success ! ");
    36. #endif

    37.     return ret;
    38. }


    39. /* 销毁环形缓冲区 */
    40. void        cbuf_destroy(cbuf_t    *c)
    41. {
    42.     cond_destroy(&c->not_empty);
    43.     cond_destroy(&c->not_full);
    44.     mutex_destroy(&c->mutex);

    45. #ifdef DEBUG_CBUF
    46.     debug("cbuf destroy success ");
    47. #endif
    48. }



    49. /* 压入数据 */
    50. int32_t        cbuf_enqueue(cbuf_t *c,void *data)
    51. {
    52.     int32_t    ret = OPER_OK;

    53.     if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return ret;

    54.     /*
    55.      * Wait while the buffer is full.
    56.      */
    57.     while(cbuf_full(c))
    58.     {
    59. #ifdef DEBUG_CBUF
    60.     debug("cbuf is full !!! ");
    61. #endif
    62.         cond_wait(&c->not_full,&c->mutex);
    63.     }

    64.     c->data[c->next_in++] = data;
    65.     c->size++;
    66.     c->next_in %= c->capacity;

    67.     mutex_unlock(&c->mutex);

    68.     /*
    69.      * Let a waiting consumer know there is data.
    70.      */
    71.     cond_signal(&c->not_empty);

    72. #ifdef DEBUG_CBUF
    73. //    debug("cbuf enqueue success ,data : %p ",data);
    74.     debug("enqueue ");
    75. #endif

    76.     return ret;
    77. }



    78. /* 取出数据 */
    79. void*        cbuf_dequeue(cbuf_t *c)
    80. {
    81.     void     *data     = NULL;
    82.     int32_t    ret     = OPER_OK;

    83.     if((ret = mutex_lock(&c->mutex)) != OPER_OK)    return NULL;

    84.        /*
    85.      * Wait while there is nothing in the buffer
    86.      */
    87.     while(cbuf_empty(c))
    88.     {
    89. #ifdef DEBUG_CBUF
    90.     debug("cbuf is empty!!! ");
    91. #endif
    92.         cond_wait(&c->not_empty,&c->mutex);
    93.     }

    94.     data = c->data[c->next_out++];
    95.     c->size--;
    96.     c->next_out %= c->capacity;

    97.     mutex_unlock(&c->mutex);


    98.     /*
    99.      * Let a waiting producer know there is room.
    100.      * 取出了一个元素,又有空间来保存接下来需要存储的元素
    101.      */
    102.     cond_signal(&c->not_full);

    103. #ifdef DEBUG_CBUF
    104. //    debug("cbuf dequeue success ,data : %p ",data);
    105.     debug("dequeue ");
    106. #endif

    107.     return data;
    108. }


    109. /* 判断缓冲区是否为满 */
    110. bool        cbuf_full(cbuf_t    *c)
    111. {
    112.     return (c->size == c->capacity);
    113. }

    114. /* 判断缓冲区是否为空 */
    115. bool        cbuf_empty(cbuf_t *c)
    116. {
    117.     return (c->size == 0);
    118. }

    119. /* 获取缓冲区可存放的元素的总个数 */
    120. int32_t        cbuf_capacity(cbuf_t *c)
    121. {
    122.     return c->capacity;
    123. }



    二,辅助文件
        为了提高程序的移植性,对线程相关进行封装。
        1,thread.h

    点击(此处)折叠或打开

    1. #ifndef __THREAD_H__
    2. #define __THREAD_H__

    3. #ifdef __cplusplus
    4. extern "C" {
    5. #endif

    6. /* Define to prevent recursive inclusion
    7. -------------------------------------*/
    8. #include "types.h"





    9. typedef    struct _mutex
    10. {
    11.     pthread_mutex_t        mutex;
    12. }mutex_t;


    13. typedef    struct _cond
    14. {
    15.     pthread_cond_t        cond;
    16. }cond_t;


    17. typedef    pthread_t        tid_t;
    18. typedef    pthread_attr_t    attr_t;
    19. typedef    void*    (* thread_fun_t)(void*);


    20. typedef    struct _thread
    21. {
    22.     tid_t            tid;
    23.     cond_t            *cv;
    24.     int32_t            state;
    25.     int32_t            stack_size;
    26.     attr_t         attr;
    27.     thread_fun_t    fun;
    28. }thread_t;



    29. /* mutex */
    30. extern    int32_t        mutex_init(mutex_t    *m);
    31. extern    int32_t        mutex_destroy(mutex_t    *m);
    32. extern    int32_t        mutex_lock(mutex_t    *m);
    33. extern    int32_t        mutex_unlock(mutex_t    *m);


    34. /* cond */
    35. extern    int32_t        cond_init(cond_t    *c);
    36. extern    int32_t        cond_destroy(cond_t    *c);
    37. extern    int32_t        cond_signal(cond_t *c);
    38. extern    int32_t        cond_wait(cond_t    *c,mutex_t *m);



    39. /* thread */
    40. /* 线程的创建,其属性的设置等都封装在里面 */
    41. extern    int32_t        thread_create(thread_t *t);
    42. //extern    int32_t        thread_init(thread_t    *t);

    43. #define    thread_join(t, p)     pthread_join(t, p)
    44. #define    thread_self()        pthread_self()
    45. #define    thread_sigmask        pthread_sigmask


    46. #ifdef __cplusplus
    47. }
    48. #endif

    49. #endif
    50. /* END OF FILE
    51. ---------------------------------------------------------------*/


        2,thread.c

    点击(此处)折叠或打开

    1. #include "thread.h"




    2. /* mutex */
    3. int32_t        mutex_init(mutex_t    *m)
    4. {
    5.     int32_t        ret = OPER_OK;

    6.     if((ret = pthread_mutex_init(&m->mutex, NULL)) != 0)
    7.         ret = -THREAD_MUTEX_INIT_ERROR;

    8.     return ret;
    9. }


    10. int32_t        mutex_destroy(mutex_t    *m)
    11. {
    12.     int32_t        ret = OPER_OK;

    13.     if((ret = pthread_mutex_destroy(&m->mutex)) != 0)
    14.         ret = -MUTEX_DESTROY_ERROR;

    15.     return ret;
    16. }



    17. int32_t        mutex_lock(mutex_t    *m)
    18. {
    19.     int32_t        ret = OPER_OK;

    20.     if((ret = pthread_mutex_lock(&m->mutex)) != 0)
    21.         ret = -THREAD_MUTEX_LOCK_ERROR;

    22.     return ret;
    23. }



    24. int32_t        mutex_unlock(mutex_t    *m)
    25. {
    26.     int32_t        ret = OPER_OK;

    27.     if((ret = pthread_mutex_unlock(&m->mutex)) != 0)
    28.         ret = -THREAD_MUTEX_UNLOCK_ERROR;
    29.     
    30.     return ret;
    31. }






    32. /* cond */
    33. int32_t        cond_init(cond_t    *c)
    34. {
    35.     int32_t        ret = OPER_OK;

    36.     if((ret = pthread_cond_init(&c->cond, NULL)) != 0)
    37.         ret = -THREAD_COND_INIT_ERROR;

    38.     return ret;
    39. }



    40. int32_t        cond_destroy(cond_t    *c)
    41. {
    42.     int32_t        ret = OPER_OK;

    43.     if((ret = pthread_cond_destroy(&c->cond)) != 0)
    44.         ret = -COND_DESTROY_ERROR;
    45.     
    46.     return ret;
    47. }



    48. int32_t        cond_signal(cond_t *c)
    49. {
    50.     int32_t        ret = OPER_OK;


    51.     if((ret = pthread_cond_signal(&c->cond)) != 0)
    52.         ret = -COND_SIGNAL_ERROR;
    53.     
    54.     return ret;
    55. }




    56. int32_t        cond_wait(cond_t    *c,mutex_t *m)
    57. {
    58.     int32_t        ret = OPER_OK;

    59.     if((ret = pthread_cond_wait(&c->cond, &m->mutex)) != 0)
    60.         ret = -COND_WAIT_ERROR;    
    61.     
    62.     return ret;
    63. }



    三,测试
        1,测试代码

    点击(此处)折叠或打开

    1. /*
    2.  * cbuf begin
    3.  */
    4. #define        OVER    (-1)

    5. static        cbuf_t    cmd;
    6. static        int        line_1[200];
    7. static        int        line_2[200];
    8. //static        int        temp = 0;

    9. static        bool    line1_finish = false;
    10. static        bool    line2_finish = false;

    11. void*    producer_1(void *data)
    12. {
    13.     int32_t    i = 0;

    14.     for(i = 0; i < 200; i++)
    15.     {
    16.         line_1[i] = i+1000;
    17.         cbuf_enqueue(&cmd, &line_1[i]);

    18.         if(0 == (i % 9)) sleep(1);
    19.     }

    20.     line1_finish = true;

    21.     return NULL;
    22. }

    23. void*    producer_2(void *data)
    24. {
    25.     int32_t    i = 0;

    26.     for(i = 0; i < 200; i++)
    27.     {
    28.         line_2[i] = i+20000;
    29.         cbuf_enqueue(&cmd, &line_2[i]);

    30.         if(0 == (i % 9)) sleep(1);
    31.     }

    32.     line2_finish = true;

    33.     return NULL;
    34. }


    35. void*    consumer(void *data)
    36. {
    37.     int32_t        *ptr = NULL;

    38.     while(1)
    39.     {
    40.         ptr = cbuf_dequeue(&cmd);
    41.         printf("%d ",*ptr);

    42.         if(cbuf_empty(&cmd) && line2_finish && line1_finish)
    43.         {
    44.             printf("quit ");
    45.             break;
    46.         }
    47.     }

    48.     return NULL;
    49. }


    50. void    test_cbuf_oper(void)
    51. {
    52.     pthread_t    l_1;
    53.     pthread_t    l_2;
    54.     pthread_t    c;
    55.     
    56.     cbuf_init(&cmd);

    57.     pthread_create(&l_1,NULL,producer_1,0);
    58.     pthread_create(&l_2,NULL,producer_2,0);
    59.     pthread_create(&c,NULL,consumer,0);

    60.     pthread_join(l_1,NULL);
    61.     pthread_join(l_2,NULL);
    62.     pthread_join(c,NULL);

    63.     cbuf_destroy(&cmd);
    64. }


    65. void    test_cbuf(void)
    66. {
    67.     test_cbuf_oper();
    68. }


    69. /*
    70.  * cbuf end
    71.  */


        2,测试结果



    四,参考文件
    1,《bareos-master》源码
    2,《nginx》源码

  • 相关阅读:
    2级搭建类203-Oracle 19c SI ASM 静默搭建(OEL7.7)
    2级搭建类EM-Oracle EMCC 13c Release 3 在 OEL 7.7 上的搭建
    1级搭建类112-Oracle 19c SI FS(CentOS 8)
    0级搭建类013-CentOS 8.x 安装
    List添加map,后添加的map覆盖前面的问题
    mysql插入数据报错1366
    oracle ora-12514解决办法
    easyUI 创建详情页dialog
    Server Tomcat v7.0 Server at localhost failed to start.
    maven项目启动报错;class path resource [com/ssm/mapping/] cannot be resolved to URL because it does not exist
  • 原文地址:https://www.cnblogs.com/sky-heaven/p/8267211.html
Copyright © 2011-2022 走看看