zoukankan      html  css  js  c++  java
  • Linux下线程同步的几种方法

    Linux下提供了多种方式来处理线程同步,最常用的是互斥锁、条件变量和信号量。

    一、互斥锁(mutex)
      锁机制是同一时刻只允许一个线程执行一个关键部分的代码。

     1. 初始化锁
      int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutex_attr_t *mutexattr);
       其中参数 mutexattr 用于指定锁的属性(见下),如果为NULL则使用缺省属性。
       互斥锁的属性在创建锁的时候指定,在LinuxThreads实现中仅有一个锁类型属性,不同的锁类型在试图对一个已经被锁定的互斥锁加锁时表现不同。当前有四个值可供选择:
       (1)PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
       (2)PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。
       (3)PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。
       (4)PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。

     2. 阻塞加锁
      int pthread_mutex_lock(pthread_mutex *mutex);
     3. 非阻塞加锁
       int pthread_mutex_trylock( pthread_mutex_t *mutex);
       该函数语义与 pthread_mutex_lock() 类似,不同的是在锁已经被占据时返回 EBUSY 而不是挂起等待。
     4. 解锁(要求锁是lock状态,并且由加锁线程解锁)
      int pthread_mutex_unlock(pthread_mutex *mutex);
     5. 销毁锁(此时锁必需unlock状态,否则返回EBUSY)
      int pthread_mutex_destroy(pthread_mutex *mutex);
      示例代码:

    1. #include <stdio.h>
    2. #include <stdlib.h>
    3. #include <unistd.h>
    4. #include <pthread.h>
    5.  
    6. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    7.  
    8. int gn;
    9.  
    10. void* thread(void *arg)
    11. {
    12. printf("thread's ID is %d ",pthread_self());
    13. pthread_mutex_lock(&mutex);
    14. gn = 12;
    15. printf("Now gn = %d ",gn);
    16. pthread_mutex_unlock(&mutex);
    17. return NULL;
    18. }
    19.  
    20. int main()
    21. {
    22. pthread_t id;
    23. printf("main thread's ID is %d ",pthread_self());
    24. gn = 3;
    25. printf("In main func, gn = %d ",gn);
    26. if (!pthread_create(&id, NULL, thread, NULL))
    27. {
    28. printf("Create thread success! ");
    29. }else
    30. {
    31. printf("Create thread failed! ");
    32. }
    33. pthread_join(id, NULL);
    34. pthread_mutex_destroy(&mutex);
    35.  
    36. return 0;
    37.  
    38. }

    二、条件变量(cond)

      条件变量是利用线程间共享全局变量进行同步的一种机制。条件变量上的基本操作有:触发条件(当条件变为 true 时);等待条件,挂起线程直到其他线程触发条件。
       
       1. 初始化条件变量
         int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);
          尽管POSIX标准中为条件变量定义了属性,但在Linux中没有实现,因此cond_attr值通常为NULL,且被忽略。
       2. 有两个等待函数 
          (1)无条件等待

             int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
          (2)计时等待
             int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
              如果在给定时刻前条件没有满足,则返回ETIMEOUT,结束等待,其中abstime以与time()系统调用相同意义的绝对时间形式出现,0表示格林尼治时间1970年1月1日0时0分0秒。
     
          无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求(用 pthread_cond_wait() 或 pthread_cond_timedwait() 请求)竞争条件(Race Condition)。mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。

       3. 激发条件
         (1)激活一个等待该条件的线程(存在多个等待线程时按入队顺序激活其中一个)
      
             int pthread_cond_signal(pthread_cond_t *cond);
         (2)激活所有等待线程
          int pthread_cond_broadcast(pthread_cond_t *cond); 

       4. 销毁条件变量
         int pthread_cond_destroy(pthread_cond_t *cond);
          只有在没有线程在该条件变量上等待的时候才能销毁这个条件变量,否则返回EBUSY

    说明:
      1. pthread_cond_wait 自动解锁互斥量(如同执行了pthread_unlock_mutex),并等待条件变量触发。这时线程挂起,不占用CPU时间,直到条件变量被触发(变量为ture)。在调用 pthread_cond_wait之前,应用程序必须加锁互斥量。pthread_cond_wait函数返回前,自动重新对互斥量加锁(如同执行了pthread_lock_mutex)。

      2. 互斥量的解锁和在条件变量上挂起都是自动进行的。因此,在条件变量被触发前,如果所有的线程都要对互斥量加锁,这种机制可保证在线程加锁互斥量和进入等待条件变量期间,条件变量不被触发。条件变量要和互斥量相联结,以避免出现条件竞争——个线程预备等待一个条件变量,当它在真正进入等待之前,另一个线程恰好触发了该条件(条件满足信号有可能在测试条件和调用pthread_cond_wait函数(block)之间被发出,从而造成无限制的等待)。

      3. 条件变量函数不是异步信号安全的,不应当在信号处理程序中进行调用。特别要注意,如果在信号处理程序中调用 pthread_cond_signal 或 pthread_cond_boardcast 函数,可能导致调用线程死锁

    示例代码1:

     

    1. #include <stdio.h>
    2. #include <pthread.h>
    3. #include "stdlib.h"
    4. #include "unistd.h"
    5.  
    6. pthread_mutex_t mutex;
    7. pthread_cond_t cond;
    8.  
    9. void hander(void *arg)
    10. {
    11. free(arg);
    12. (void)pthread_mutex_unlock(&mutex);
    13. }
    14.  
    15. void *thread1(void *arg)
    16. {
    17. pthread_cleanup_push(hander, &mutex);
    18. while(1)
    19. {
    20. printf("thread1 is running ");
    21. pthread_mutex_lock(&mutex);
    22. pthread_cond_wait(&cond,&mutex);
    23. printf("thread1 applied the condition ");
    24. pthread_mutex_unlock(&mutex);
    25. sleep(4);
    26. }
    27. pthread_cleanup_pop(0);
    28. }
    29.  
    30. void *thread2(void *arg)
    31. {
    32. while(1)
    33. {
    34. printf("thread2 is running ");
    35. pthread_mutex_lock(&mutex);
    36. pthread_cond_wait(&cond,&mutex);
    37. printf("thread2 applied the condition ");
    38. pthread_mutex_unlock(&mutex);
    39. sleep(1);
    40. }
    41. }
    42.  
    43. int main()
    44. {
    45. pthread_t thid1,thid2;
    46. printf("condition variable study! ");
    47. pthread_mutex_init(&mutex,NULL);
    48. pthread_cond_init(&cond,NULL);
    49. pthread_create(&thid1,NULL,thread1,NULL);
    50. pthread_create(&thid2,NULL,thread2,NULL);
    51.  
    52. sleep(1);
    53.  
    54. do{
    55. pthread_cond_signal(&cond);
    56. }while(1);
    57.  
    58. sleep(20);
    59. pthread_exit(0);
    60.  
    61. return 0;
    62. }

    示例代码2:

     

    1. #include <pthread.h>
    2. #include <unistd.h>
    3. #include "stdio.h"
    4. #include "stdlib.h"
    5.  
    6. static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
    7. static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    8.  
    9. struct node
    10. {
    11. int n_number;
    12. struct node *n_next;
    13. }*head = NULL;
    14.  
    15. static void cleanup_handler(void *arg)
    16. {
    17. printf("Cleanup handler of second thread. ");
    18. free(arg);
    19. (void)pthread_mutex_unlock(&mtx);
    20. }
    21.  
    22. static void *thread_func(void *arg)
    23. {
    24. struct node *p = NULL;
    25. pthread_cleanup_push(cleanup_handler, p);
    26.  
    27. while (1)
    28. {
    29. // 这个mutex主要是用来保证pthread_cond_wait的并发性。
    30. pthread_mutex_lock(&mtx);
    31. while (head == NULL)
    32. {
    33. /* 这个while要特别说明一下,单个pthread_cond_wait功能很完善,为何
    34. * 这里要有一个while (head == NULL)呢?因为pthread_cond_wait里的线
    35. * 程可能会被意外唤醒,如果这个时候head != NULL,则不是我们想要的情况。
    36. * 这个时候,应该让线程继续进入pthread_cond_wait
    37. * pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx,
    38. * 然后阻塞在等待对列里休眠,直到再次被唤醒(大多数情况下是等待的条件成立
    39. * 而被唤醒,唤醒后,该进程会先锁定先pthread_mutex_lock(&mtx);,再读取资源
    40. * 用这个流程是比较清楚的。*/
    41.  
    42. pthread_cond_wait(&cond, &mtx);
    43. p = head;
    44. head = head->n_next;
    45. printf("Got %d from front of queue ", p->n_number);
    46. free(p);
    47. }
    48.  
    49. pthread_mutex_unlock(&mtx); // 临界区数据操作完毕,释放互斥锁。
    50. }
    51.  
    52. pthread_cleanup_pop(0);
    53.  
    54. return 0;
    55.  
    56. }
    57.  
    58. int main(void)
    59. {
    60. pthread_t tid;
    61. int i;
    62. struct node *p;
    63.  
    64. /* 子线程会一直等待资源,类似生产者和消费者,但是这里的消费者可以是多个消费者,
    65. * 而不仅仅支持普通的单个消费者,这个模型虽然简单,但是很强大。*/
    66.  
    67. pthread_create(&tid, NULL, thread_func, NULL);
    68.  
    69. sleep(1);
    70.  
    71. for (i = 0; i < 10; i++)
    72. {
    73. p = (struct node*)malloc(sizeof(struct node));
    74. p->n_number = i;
    75. pthread_mutex_lock(&mtx); // 需要操作head这个临界资源,先加锁。
    76.  
    77. p->n_next = head;
    78. head = p;
    79.  
    80. pthread_cond_signal(&cond);
    81.  
    82. pthread_mutex_unlock(&mtx); //解锁
    83.  
    84. sleep(1);
    85. }
    86.  
    87. printf("thread 1 wanna end the line.So cancel thread 2. ");
    88.  
    89. /* 关于pthread_cancel,有一点额外的说明,它是从外部终止子线程,子线程会在最近的取消点,
    90. * 退出线程,而在我们的代码里,最近的取消点肯定就是pthread_cond_wait()了。*/
    91.  
    92. pthread_cancel(tid);
    93.  
    94. pthread_join(tid, NULL);
    95.  
    96. printf("All done -- exiting ");
    97.  
    98. return 0;
    99. }

    可以看出,等待条件变量信号的用法约定一般是这样的:
    ...
    pthread_mutex_lock(&mutex);
    ...
    pthread_cond_wait (&cond, &mutex);
    ...
    pthread_mutex_unlock (&mutex);
    ...

    相信很多人都会有这个疑问:为什么pthread_cond_wait需要的互斥锁不在函数内部定义,而要使用户定义的呢?现在没有时间研究 pthread_cond_wait 的源代码,带着这个问题对条件变量的用法做如下猜测,希望明白真相看过源代码的朋友不吝指正。

    1. pthread_cond_wait 和 pthread_cond_timewait 函数为什么需要互斥锁?因为:条件变量是线程同步的一种方法,这两个函数又是等待信号的函数,函数内部一定有须要同步保护的数据。
    2. 使用用户定义的互斥锁而不在函数内部定义的原因是:无法确定会有多少用户使用条件变量,所以每个互斥锁都须要动态定义,而且管理大量互斥锁的开销太大,使用用户定义的即灵活又方便,符合UNIX哲学的编程风格(随便推荐阅读《UNIX编程哲学》这本好书!)。
    3. 好了,说完了1和2,我们来自由猜测一下 pthread_cond_wait 函数的内部结构吧:
      int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
       {
          if(没有条件信号)
          {
             (1)pthread_mutex_unlock (mutex); // 因为用户在函数外面已经加锁了(这是使用约定),但是在没有信号的情况下为了让其他线程也能等待cond,必须解锁。
             (2) 阻塞当前线程,等待条件信号(当然应该是类似于中断触发的方式等待,而不是软件轮询的方式等待)... 有信号就继续执行后面。
             (3) pthread_mutex_lock (mutex); // 因为用户在函数外面要解锁(这也是使用约定),所以要与1呼应加锁,保证用户感觉依然是自己加锁、自己解锁。
          }      
          ...
      }

    三、 信号量


     如同进程一样,线程也可以通过信号量来实现通信,虽然是轻量级的。
       线程使用的基本信号量函数有四个:

      #include <semaphore.h>

         1. 初始化信号量
          int sem_init (sem_t *sem , int pshared, unsigned int value);

          参数:
          sem - 指定要初始化的信号量;
          pshared - 信号量 sem 的共享选项,linux只支持0,表示它是当前进程的局部信号量;
          value - 信号量 sem 的初始值。

          2. 信号量值加1
          给参数sem指定的信号量值加1。
         int sem_post(sem_t *sem);

         3. 信号量值减1
          给参数sem指定的信号量值减1。
         int sem_wait(sem_t *sem);
          如果sem所指的信号量的数值为0,函数将会等待直到有其它线程使它不再是0为止。

         4. 销毁信号量
        销毁指定的信号量。
      int sem_destroy(sem_t *sem);

      示例代码:

     

    1. #include <stdlib.h>
    2. #include <stdio.h>
    3. #include <unistd.h>
    4. #include <pthread.h>
    5. #include <semaphore.h>
    6. #include <errno.h>
    7.  
    8. #define return_if_fail(p) if((p) == 0){printf ("[%s]:func error! ", __func__);return;}
    9.  
    10. typedef struct _PrivInfo
    11. {
    12. sem_t s1;
    13. sem_t s2;
    14. time_t end_time;
    15. }PrivInfo;
    16.  
    17. static void info_init (PrivInfo* prifo);
    18. static void info_destroy (PrivInfo* prifo);
    19. static void* pthread_func_1 (PrivInfo* prifo);
    20. static void* pthread_func_2 (PrivInfo* prifo);
    21.  
    22. int main (int argc, char** argv)
    23. {
    24. pthread_t pt_1 = 0;
    25. pthread_t pt_2 = 0;
    26. int ret = 0;
    27. PrivInfo* prifo = NULL;
    28. prifo = (PrivInfo* )malloc (sizeof (PrivInfo));
    29.  
    30. if (prifo == NULL)
    31. {
    32. printf ("[%s]: Failed to malloc priv. ");
    33. return -1;
    34. }
    35.  
    36. info_init (prifo);
    37. ret = pthread_create (&pt_1, NULL, (void*)pthread_func_1, prifo);
    38. if (ret != 0)
    39. {
    40. perror ("pthread_1_create:");
    41. }
    42.  
    43. ret = pthread_create (&pt_2, NULL, (void*)pthread_func_2, prifo);
    44. if (ret != 0)
    45. {
    46. perror ("pthread_2_create:");
    47. }
    48.  
    49. pthread_join (pt_1, NULL);
    50. pthread_join (pt_2, NULL);
    51. info_destroy (prifo);
    52. return 0;
    53. }
    54.  
    55. static void info_init (PrivInfo* prifo)
    56. {
    57. return_if_fail (prifo != NULL);
    58. prifo->end_time = time(NULL) + 10;
    59. sem_init (&prifo->s1, 0, 1);
    60. sem_init (&prifo->s2, 0, 0);
    61. return;
    62. }
    63.  
    64. static void info_destroy (PrivInfo* prifo)
    65. {
    66. return_if_fail (prifo != NULL);
    67. sem_destroy (&prifo->s1);
    68. sem_destroy (&prifo->s2);
    69. free (prifo);
    70. prifo = NULL;
    71. return;
    72. }
    73.  
    74. static void* pthread_func_1 (PrivInfo* prifo)
    75. {
    76. return_if_fail (prifo != NULL);
    77. while (time(NULL) < prifo->end_time)
    78. {
    79. sem_wait (&prifo->s2);
    80. printf ("pthread1: pthread1 get the lock. ");
    81. sem_post (&prifo->s1);
    82. printf ("pthread1: pthread1 unlock ");
    83. sleep (1);
    84. }
    85. return;
    86. }
    87.  
    88. static void* pthread_func_2 (PrivInfo* prifo)
    89. {
    90. return_if_fail (prifo != NULL);
    91. while (time (NULL) < prifo->end_time)
    92. {
    93. sem_wait (&prifo->s1);
    94. printf ("pthread2: pthread2 get the unlock. ");
    95. sem_post (&prifo->s2);
    96. printf ("pthread2: pthread2 unlock. ");
    97. sleep (1);
    98. }
    99. return;
    100. }

     

    四、异步信号 


    由于LinuxThreads是在核外使用核内轻量级进程实现的线程,所以基于内核的异步信号操作对于线程也是有效的。但同时,由于异步信号总是实际发往某个进程,所以无法实现POSIX标准所要求的"信号到达某个进程,然后再由该进程将信号分发到所有没有阻塞该信号的线程中"原语,而是只能影响到其中一个线程。

    POSIX异步信号同时也是一个标准C库提供的功能,主要包括信号集管理(sigemptyset()、sigfillset()、sigaddset()、sigdelset()、sigismember()等)、信号处理函数安装(sigaction())、信号阻塞控制(sigprocmask())、被阻塞信号查询(sigpending())、信号等待(sigsuspend())等,它们与发送信号的kill()等函数配合就能实现进程间异步信号功能。LinuxThreads围绕线程封装了sigaction()何raise(),本节集中讨论LinuxThreads中扩展的异步信号函数,包括pthread_sigmask()、pthread_kill()和sigwait()三个函数。毫无疑问,所有POSIX异步信号函数对于线程都是可用的。

    int pthread_sigmask(int how, const sigset_t *newmask, sigset_t *oldmask) 
    设置线程的信号屏蔽码,语义与sigprocmask()相同,但对不允许屏蔽的Cancel信号和不允许响应的Restart信号进行了保护。被屏蔽的信号保存在信号队列中,可由sigpending()函数取出。

    int pthread_kill(pthread_t thread, int signo) 
    向thread号线程发送signo信号。实现中在通过thread线程号定位到对应进程号以后使用kill()系统调用完成发送。 

    int sigwait(const sigset_t *set, int *sig) 
    挂起线程,等待set中指定的信号之一到达,并将到达的信号存入*sig中。POSIX标准建议在调用sigwait()等待信号以前,进程中所有线程都应屏蔽该信号,以保证仅有sigwait()的调用者获得该信号,因此,对于需要等待同步的异步信号,总是应该在创建任何线程以前调用pthread_sigmask()屏蔽该信号的处理。而且,调用sigwait()期间,原来附接在该信号上的信号处理函数不会被调用。

    如果在等待期间接收到Cancel信号,则立即退出等待,也就是说sigwait()被实现为取消点。 

    五、 其他同步方式 


    除了上述讨论的同步方式以外,其他很多进程间通信手段对于LinuxThreads也是可用的,比如基于文件系统的IPC(管道、Unix域Socket等)、消息队列(Sys.V或者Posix的)、System V的信号灯等。只有一点需要注意,LinuxThreads在核内是作为共享存储区、共享文件系统属性、共享信号处理、共享文件描述符的独立进程看待的。

     

    条件变量与互斥锁、信号量的区别

           1.互斥锁必须总是由给它上锁的线程解锁,信号量的挂出即不必由执行过它的等待操作的同一进程执行。一个线程可以等待某个给定信号灯,而另一个线程可以挂出该信号灯。

           2.互斥锁要么锁住,要么被解开(二值状态,类型二值信号量)。

           3.由于信号量有一个与之关联的状态(它的计数值),信号量挂出操作总是被记住。然而当向一个条件变量发送信号时,如果没有线程等待在该条件变量上,那么该信号将丢失。

           4.互斥锁是为了上锁而设计的,条件变量是为了等待而设计的,信号灯即可用于上锁,也可用于等待,因而可能导致更多的开销和更高的复杂性。

  • 相关阅读:
    Java JMX 监管
    Spring Boot REST(一)核心接口
    JSR 规范目录
    【平衡树】宠物收养所 HNOI 2004
    【树型DP】叶子的颜色 OUROJ 1698
    【匈牙利匹配】无题II HDU2236
    【贪心】Communication System POJ 1018
    【贪心】Moving Tables POJ 1083
    Calling Extraterrestrial Intelligence Again POJ 1411
    【贪心】Allowance POJ 3040
  • 原文地址:https://www.cnblogs.com/wanghuaijun/p/9532644.html
Copyright © 2011-2022 走看看