zoukankan      html  css  js  c++  java
  • 线程间通信

    线程间通信

      参考博客:https://blog.csdn.net/a987073381/article/details/52029070

      推荐博客:pthread_mutex_t

      

      线程间通信,一般指的是同一进程中的不同线程通信。不同进程中的线程通信其实属于进程间通信。

      线程间通信方式:

      1、全局变量:进程中线程间共享内存。同一进程中的不同线程共享同一全局内存区域,包括初始化数据段,未初始化数据段以及堆内存数据段。因此线程间可以方便、快速的共享信息。只需要将数据复制到共享(全局/堆)变量中即可。

      注:要避免多个线程试图同时修改同一份信息。

          定义全局变量时最好使用volatile定义,以防编译器对此变量优化。

     下图为多线程的进程地址空间:

      2、Message消息机制

      3、CEvent对象

    线程安全

      进程中的多个线程同时运行,当多个线程同时访问一段代码时,若每次运行结果和单线程运行结果一样,其他变量的值也和预期值一样,那么线程就是安全的,即多线程访问同一段代码不会产生不确定结果。如何保证线程安全——线程间同步。

    线程间同步

      同步指的是多个任务按照约定的先后次序互相配合完成一件事情。Dijikstra基于信号量的概念,提出一个同步机制——由信号量来决定线程是否继续运行还是阻塞。

      线程间同步的信号量采用POSIX的信号量接口。

      信号量代表某一类资源,其值代表系统中该资源的数量。信号量是一个受保护的变量,只能通过三种操作来访问:

      1)初始化

      2)P操作(申请资源)

      3)V操作(释放资源)

      pthread库常用信号量操作函数如下:

    1 #include <semaphore.h>
    2 
    3 sem_t sem_event;
    4 int sem_init(sem_t *sem, int pshared, unsigned int value);//初始化一个信号量 
    5 int sem_destroy(sem_t * sem); //销毁信号量
    6 int sem_post(sem_t * sem); //V操作:信号量增加1,
    7 int sem_wait(sem_t * sem); //P操作:信号量减少1,如果信号量计数是0就会发生阻塞。直到成功使信号量减1
    8 int sem_getvalue(sem_t * sem, int * sval); //获取当前信号量的值

      测试:写线程,读线程对buf实现PV操作。

     1 #include <pthread.h>
     2 #include <stdio.h>
     3 #include <stdlib.h>
     4 #include <string.h>
     5 #include <semaphore.h>
     6 
     7 char buffer[100];
     8 sem_t sem; //定义信号量
     9 
    10 void *write(void* arg)
    11 {
    12     //detach改变线程状态为unjoinable,
    13     //在线程结束后,会自动回收资源
    14     pthread_detach(pthread_self());
    15     while(1)
    16     {
    17         fgets(buffer,20,stdin); //通过键盘输入
    18         sem_post(&sem); //写线程,写后释放资源.
    19     }
    20 }
    21  //read和write中的两个while循环,程序运行后会一直在这两个线程的while切换进行写-读
    22 void *read(void* arg)
    23 {
    24     pthread_detach(pthread_self());
    25     while(1)
    26     {
    27         memset(buffer,0,100);
    28         sem_wait(&sem); //读线程等待写线程的资源,就算先调用read,但因value=0,wait函数阻塞,直到value加1,之后再wait之后减1
    29         printf("%s
    ",buffer); //等待写进程释放资源后,退出阻塞,执行至此打印buf内容
    30     }
    31 }
    32 
    33 int main()
    34 {
    35     int re,i=0;
    36     pthread_t tid1,tid2;
    37   //pshared 控制信号量的类型,值为 0 代表该信号量用于多线程间的同步,值如果大于 0 表示可以共享,用于多个相关进程间的同步
    38     sem_init(&sem,0,0); //参数:信号量对象,默认0
    39     
    40     re = pthread_create(&tid1,NULL,read,(void *)i);
    41     if(re!=0)
    42     {
    43         printf("pthread_create:%s
    ",strerror(re));
    44         exit(0);
    45     }
    46 
    47     re = pthread_create(&tid2,NULL,write,(void *)i);
    48     if(re!=0)
    49     {
    50         printf("pthread_create:%s
    ",strerror(re));
    51         exit(0);
    52     }
    53 
    54     while(1)   //一直阻塞在此,否则顺序执行下来,就直接退出了,对应线程也不会执行。可以使用pthread_join等待线程退出
    55         sleep(1);
    56 }

    结果:

     写线程通过键盘输入将内容写入buf,释放sem,探测sem有资源,读线程读buf,打印。

    多个线程实现同步,定义多个信号量

     1 #include<stdio.h>
     2 #include <pthread.h>
     3 #include <semaphore.h>
     4 
     5 
     6 #define N 64
     7 typedef struct message{
     8     char buf[N];
     9     int len;
    10 }msg_t;
    11 
    12 sem_t sem_reverse;  //定义用于逆置的信号量
    13 sem_t sem_printf;   //定义用于打印的信号量
    14 
    15 void* reverse_msgbuf(void* arg)
    16 {
    17     msg_t *msg = (msg_t *)arg;
    18     int i = 0;
    19     char tmp;
    20     while(1){
    21         sem_wait(&sem_reverse);
    22         printf("reverse_msgbuf -------------
    ");
    23             //printf("hello reverse_msgbuf.
    ");
    24 #if 1
    25             for(i = 0; i < msg->len/2; i ++){
    26                 tmp             = msg->buf[i];
    27                 msg->buf[i]  = msg->buf[msg->len - i - 1];
    28                 msg->buf[msg->len - i -1] = tmp;
    29             }
    30 #endif 
    31             sleep(1);
    32             printf("reverse_msgbuf :%s
    ",msg->buf);
    33         sem_post(&sem_printf);
    34     }
    35 }
    36 
    37 void* printf_msgbuf(void* arg)
    38 {
    39     msg_t *msg = (msg_t *)arg;
    40     while(1){
    41         sem_wait(&sem_printf);
    42             printf("printf_msgbuf :***********
    ");
    43             printf("printf_msgbuf :%s
    ",msg->buf);
    44         sem_post(&sem_reverse);
    45     }
    46 }
    47 
    48 int main(int argc, const char *argv[])
    49 {
    50     msg_t msg = {"123456789",9};
    51     pthread_t tid[2];
    52     
    53     //初始化信号量
    54     sem_init(&sem_reverse,0,1);  //value=1,表示有资源可用,即让reverse先执行
    55     sem_init(&sem_printf,0,0);   //实现先逆置在打印,先逆置在打印,以此循环
    56     
    57     pthread_create(&tid[0],NULL,reverse_msgbuf,(void *)&msg);
    58     pthread_create(&tid[1],NULL,printf_msgbuf,(void *)&msg);
    59 
    60     pause();
    61 
    62     return 0;
    63 }
    pthreadsem.c

    线程互斥:

      前面提到了同步,同步是一种直接制约关系,是指多个线程(或进程)为了合作完成任务,必须严格按照先后次序来运行。如上的读写操作,必须先进行写操作,写入内容,才可执行读操作,且写的过程不可以读。

      互斥,是一种间接制约关系,指系统中的一些共享资源(临界资源),当访问了解资源时,其他线程必须等待。

      互斥量本质上说是一把锁,在访问共享资源前对互斥量进行上锁,在访问完成后释放互斥量。对互斥量进行上锁以后,其他试图再次对互斥量加锁的线程都会被阻塞,直到当前线程释放该互斥锁。

     1 #include <pthread.h>
     2 
     3 /********互斥锁初始化********/
     4 int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
     5 
     6 //成功返回0,失败返回错误码
     7 //mutex指向要初始化的互斥锁对象
     8 //attr 互斥锁属性,NULL缺省
     9 
    10 eg.pthread_t mutex;
    11 pthread_mutex_init(&mutex, NULL);
    12 
    13 /*******互斥锁上锁**********/
    14 int pthread_mutex_lock(pthread_mutex_t *mutex);
    15 //成功返回0,失败返回错误码
    16 //mutex指向要初始化的互斥锁对象
    17 //如果无法获得锁,任务阻塞
    18 
    19 /********互斥锁解锁********/
    20 int pthread_mutex_unlock(pthread_mutex_t *mutex);
    21 //成功返回0,失败返回错误码
    22 //mutex指向要初始化的互斥锁对象
    23 //执行完临界区要及时释放锁

    测试程序:

    创建两个写线程write1、write2,分别向缓冲区写入字符串:I am pthread_write1与I am pthread_write2。并打印输出。

    分析:可以分别在线程写前后加互斥锁,解互斥锁。

     加互斥锁前:

     1 #include <pthread.h>
     2 #include <stdio.h>
     3 #include <stdlib.h>
     4 #include <string.h>
     5 #include <semaphore.h>
     6 
     7 FILE *fp; //定义文件流指针
     8 
     9 void *write1(void* arg)
    10 {
    11     char *c1 = "I am pthread_write1";
    12     char *c2;
    13     int len,i;
    14     len = strlen(c1);
    15     pthread_detach(pthread_self());
    16     c2 = c1;
    17     while(1)
    18     {
    19         for(i=0;i<len;i++)
    20         {
    21             fputc(*c1,fp); //每次写一个字符
    22             fflush(fp);
    23             c1++; //指针移动
    24             usleep(10000); //挂起10000微秒即10毫秒
    25         }
    26         c1 = c2; //恢复指针到字符串头部
    27         sleep(1);
    28     }
    29 }
    30 
    31 void *write2(void* arg)
    32 {
    33     int len,i;
    34     char *c2;
    35     char *c1 = "I am pthread_write2";
    36     len = strlen(c1);
    37     c2 = c1;
    38     pthread_detach(pthread_self());
    39     while(1)
    40     {
    41         for(i=0;i<len;i++)
    42         {
    43             fputc(*c1,fp);
    44             fflush(fp);
    45             c1++;
    46             usleep(10000);
    47         }
    48         c1 = c2;
    49         sleep(1);
    50     }
    51 }
    52 
    53 int main()
    54 {
    55     int re,i=0;
    56     pthread_t tid1,tid2;
    57 
    58     fp = fopen("1.txt","w");
    59     if(!fp)
    60     {
    61         perror("fopen");
    62         return -1;
    63     }
    64     re = pthread_create(&tid1,NULL,write1,(void *)i);
    65     if(re!=0)
    66     {
    67         printf("pthread_create:%s
    ",strerror(re));
    68         exit(0);
    69     }
    70 
    71     re = pthread_create(&tid2,NULL,write2,(void *)i);
    72     if(re!=0)
    73     {
    74         printf("pthread_create:%s
    ",strerror(re));
    75         exit(0);
    76     }
    77 
    78     while(1)
    79     {
    80         sleep(1);
    81     }
    82 }
    no_mutex

     写入文件中的内容乱序。

    加锁后:

     1 #include <pthread.h>
     2 #include <stdio.h>
     3 #include <stdlib.h>
     4 #include <string.h>
     5 #include <semaphore.h>
     6 
     7 FILE *fp; //定义文件流指针
     8 
     9 pthread_mutex_t mutex; //定义锁
    10 
    11 void *write1(void* arg)
    12 {
    13     char *c1 = "I am pthread_write1
    ";
    14     char *c2;
    15     int len,i;
    16     len = strlen(c1);
    17     pthread_detach(pthread_self());
    18     c2 = c1;
    19     while(1)
    20     {
    21         pthread_mutex_lock(&mutex); //上锁
    22         for(i=0;i<len;i++)
    23         {
    24             fputc(*c1,fp); //每次写一个字符
    25             fflush(fp);
    26             c1++; //指针移动
    27             usleep(10000); //挂起10000微秒即10毫秒
    28         }
    29         pthread_mutex_unlock(&mutex); //解锁
    30         c1 = c2; //恢复指针到字符串头部
    31         sleep(1);
    32     }
    33 }
    34 
    35 void *write2(void* arg)
    36 {
    37     int len,i;
    38     char *c2;
    39     char *c1 = "I am pthread_write2
    ";
    40     len = strlen(c1);
    41     c2 = c1;
    42     pthread_detach(pthread_self());
    43     while(1)
    44     {
    45         pthread_mutex_lock(&mutex);
    46         for(i=0;i<len;i++)
    47         {
    48             fputc(*c1,fp);
    49             fflush(fp);
    50             c1++;
    51             usleep(10000);
    52         }
    53         pthread_mutex_unlock(&mutex);
    54         c1 = c2;
    55         sleep(1);
    56     }
    57 }
    58 
    59 int main()
    60 {
    61     int re,i=0;
    62     pthread_t tid1,tid2;
    63 
    64     pthread_mutex_init(&mutex,NULL); //锁初始化
    65     fp = fopen("1.txt","w");
    66     if(!fp)
    67     {
    68         perror("fopen");
    69         return -1;
    70     }
    71     re = pthread_create(&tid1,NULL,write1,(void *)i);
    72     if(re!=0)
    73     {
    74         printf("pthread_create:%s
    ",strerror(re));
    75         exit(0);
    76     }
    77 
    78     re = pthread_create(&tid2,NULL,write2,(void *)i);
    79     if(re!=0)
    80     {
    81         printf("pthread_create:%s
    ",strerror(re));
    82         exit(0);
    83     }
    84 
    85     while(1)
    86     {
    87         sleep(1);
    88     }
    89 }
    add_mutex

     注:一个互斥量表示的是一个临界资源,

      如果多个线程会对同一个临界资源进行访问,又想让他们互斥执行,就为他们定义同一个互斥量。
      如果各自有各自的临界资源,希望自己在执行的时候,不被其他线程打断,就单独为自己定义一个互斥量。

     1、关于斥锁与条件变量的使用见博客:

    条件变量pthread_cond_t怎么用

    pthread_mutex_t

    2、pthread_join与 pthread_detach详见:

    https://baike.baidu.com/item/pthread_join

    linux中pthread_join()与pthread_detach()详解

    pshared 控制信号量的类型,值为 0 代表该信号量用于多线程间的同步,值如果大于 0 表示可以共享,用于多个相关进程间的同步

  • 相关阅读:
    大数据学习——hive数据类型
    大数据学习——关于hive中的各种join
    大数据学习——hive的sql练习
    大数据学习——hive显示命令
    大数据学习——hive数仓DML和DDL操作
    大数据学习——hive基本操作
    大数据学习——hive使用
    大数据学习——hive安装部署
    大数据学习——日志分析
    大数据学习——mapreduce运营商日志增强
  • 原文地址:https://www.cnblogs.com/y4247464/p/12103136.html
Copyright © 2011-2022 走看看