zoukankan      html  css  js  c++  java
  • linux线程基础篇----线程同步与互斥

    linux线程基础----线程同步与互斥

    一、同步的概念

      1.同步概念

        所谓同步,即同时起步,协调一致。不同的对象,对“同步”的理解方式略有不同。如,设备同步,是指在两个设备

        之间规定一个共同的时间参考;数据库同步,是指让两个或多个数据库内容保持一致,或者按需要部分保持一致;

        文件同步,是指让两个或多个文件夹里的文件保持一致等等。而编程中、通信中所说的同步与生活中大家印象中的

        同步概念略有差异。“同”字应是指协同、协助、互相配合。主旨在协同步调,按预定的先后次序运行。

      2.数据混乱的原因

       1. 资源共享(独享资源则不会)       

        2. 调度随机(意味着数据访问会出现竞争)  

        3. 线程间缺乏必要的同步机制。

             以上3点中,前两点不能改变,欲提高效率,传递数据,资源必须共享。只要共享资源,就一定会出现竞争。只要存在竞争关系,

        数据就很容易出现混乱。所以只能从第三点着手解决。使多个线程在访问共享资源的时候,出现互斥。

       3.线程同步

      同步即协同步调,按预定的先后次序运行。

            线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据一致性,不能调用

       该功能。同步”的目的,是为了避免数据混乱,解决与时间有关的错误。实际上,不仅线程间需要同步,进程间、信号间等等都

       需要同步机制。因此,所有“多个控制流,共同操作一个共享资源”的情况,都需要同步。

    二、线程同步

        线程同步主要有互斥锁,条件变量,读写锁和信号量(还有自旋锁但在用户层不常用,具体参考APUE11.6.7自旋锁)

       1.互斥锁

      Linux中提供一把互斥锁mutex(也称之为互斥量)。

       每个线程在对资源操作前都尝试先加锁,成功加锁才能操作,操作结束解锁。

            资源还是共享的,线程间也还是竞争的,                                                                

           但通过“锁”就将资源的访问变成互斥操作,而后与时间有关的错误也不会再产生了

      

      但,应注意:同一时刻,只能有一个线程持有该锁。

           当A线程对某个全局变量加锁访问,B在访问前尝试加锁,拿不到锁,B阻塞。

      C线程不去加锁,而直接访问该全局变量,依然能够访问,但会出现数据混乱。

           所以,互斥锁实质上是操作系统提供的一把“建议锁”(又称“协同锁”)

      建议程序中有多线程访问共享资源的时候使用该机制。但并没有强制限定。

      因此,即使有了mutex,如果有线程不按规则来访问数据,依然会造成数据混乱。

      主要应用函数:

      pthread_mutex_init函数

          pthread_mutex_destroy函数

          pthread_mutex_lock函数

          pthread_mutex_trylock函数

           pthread_mutex_unlock函数

       以上5个函数的返回值都是:成功返回0, 失败返回错误号。   

      pthread_mutex_t 类型,其本质是一个结构体。为简化理解,应用时可忽略其实现细节,简单当成整数看待。

      pthread_mutex_t mutex; 变量mutex只有两种取值1、0。

      pthread_mutex_init函数

      初始化一个互斥锁(互斥量) ---> 初值可看作1

           int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

           参1:传出参数,调用时应传 &mutex      

           restrict关键字:只用于限制指针,告诉编译器,所有修改该指针指向内存中内容的操作,只能通过本指针完成。

      不能通过除本指针以外的其他变量或指针修改

           参2:互斥量属性。是一个传入参数,通常传NULL,选用默认属性(线程间共享)。 参APUE.12.4同步属性

    1. 静态初始化:如果互斥锁 mutex 是静态分配的(定义在全局,或加了static关键字修饰),可以直接使用宏进行初始化。
    2. e.g.  pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;
    3. 动态初始化:局部变量应采用动态初始化。e.g.  pthread_mutex_init(&mutex, NULL)

        pthread_mutex_destroy函数

      销毁一个互斥锁

           int pthread_mutex_destroy(pthread_mutex_t *mutex);

        pthread_mutex_lock函数

      加锁。可理解为将mutex--(或-1)

           int pthread_mutex_lock(pthread_mutex_t *mutex);

      pthread_mutex_unlock函数

      解锁。可理解为将mutex ++(或+1)

           int pthread_mutex_unlock(pthread_mutex_t *mutex);

      pthread_mutex_trylock函数

      尝试加锁

          int pthread_mutex_trylock(pthread_mutex_t *mutex);

      

        加锁与解锁

      lock与unlock:

            lock尝试加锁,如果加锁不成功,线程阻塞,阻塞到持有该互斥量的其他线程解锁为止。

            unlock主动解锁函数,同时将阻塞在该锁上的所有线程全部唤醒,至于哪个线程先被唤醒,取决于优先级、调度。默认:先阻塞、先唤醒。

            例如:T1 T2 T3 T4 使用一把mutex锁。T1加锁成功,其他线程均阻塞,直至T1解锁。T1解锁后,T2 T3 T4均被唤醒,并自动再次尝试加锁。

            可假想mutex锁 init成功初值为1。 lock 功能是将mutex--, unlock将mutex++

         lock与trylock:

            lock加锁失败会阻塞,等待锁释放。

            trylock加锁失败直接返回错误号(如:EBUSY),不阻塞。

      示例代码:生产者与消费者,头文件参考UNPV22E

    /* include main */
    #include    "unpipc.h"
    
    #define    MAXNITEMS         1000000
    #define    MAXNTHREADS            100
    
    int        nitems;            /* read-only by producer and consumer */
    struct {
      pthread_mutex_t    mutex;
      int    buff[MAXNITEMS];
      int    nput;
      int    nval;
    } shared = { PTHREAD_MUTEX_INITIALIZER };
    
    void    *produce(void *), *consume(void *);
    
    int
    main(int argc, char **argv)
    {
        int            i, nthreads, count[MAXNTHREADS];
        pthread_t    tid_produce[MAXNTHREADS], tid_consume;
    
        if (argc != 3)
            err_quit("usage: prodcons2 <#items> <#threads>");
        nitems = min(atoi(argv[1]), MAXNITEMS);
        nthreads = min(atoi(argv[2]), MAXNTHREADS);
    
        Set_concurrency(nthreads);
            /* 4start all the producer threads */
        for (i = 0; i < nthreads; i++) {
            count[i] = 0;
            Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
        }
    
            /* 4wait for all the producer threads */
        for (i = 0; i < nthreads; i++) {
            Pthread_join(tid_produce[i], NULL);
            printf("count[%d] = %d
    ", i, count[i]);    
        }
    
            /* 4start, then wait for the consumer thread */
        Pthread_create(&tid_consume, NULL, consume, NULL);
        Pthread_join(tid_consume, NULL);
    
        exit(0);
    }
    /* end main */
    
    /* include producer */
    void *
    produce(void *arg)
    {
        for ( ; ; ) {
            Pthread_mutex_lock(&shared.mutex);
            if (shared.nput >= nitems) {
                Pthread_mutex_unlock(&shared.mutex);
                return(NULL);        /* array is full, we're done */
            }
            shared.buff[shared.nput] = shared.nval;
            shared.nput++;
            shared.nval++;
            Pthread_mutex_unlock(&shared.mutex);
            *((int *) arg) += 1;
        }
    }
    
    void *
    consume(void *arg)
    {
        int        i;
    
        for (i = 0; i < nitems; i++) {
            if (shared.buff[i] != i)
                printf("buff[%d] = %d
    ", i, shared.buff[i]);
        }
        return(NULL);
    }
    /* end producer */
    mutex_prodcons2.c

      

      2.条件变量

       条件本身不是锁!但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。

       互斥锁用于上锁,条件变量用于等待。

      主要应用函数:

             pthread_cond_init函数

             pthread_cond_destroy函数

             pthread_cond_wait函数

             pthread_cond_timedwait函数

             pthread_cond_signal函数

             pthread_cond_broadcast函数

        以上6 个函数的返回值都是:成功返回0, 失败直接返回错误号。

             pthread_cond_t类型      用于定义条件变量

             pthread_cond_t cond;

       pthread_cond_init函数

       初始化一个条件变量,定义在全局,因为要在子线程中使用。

       int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);                

       参2:attr表条件变量属性,通常为默认值,传NULL即可

       也可以使用静态初始化的方法,初始化条件变量,定义在全局:

       pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

       pthread_cond_destroy函数

      销毁一个条件变量

      int pthread_cond_destroy(pthread_cond_t *cond);

      pthread_cond_wait函数

      阻塞等待一个条件变量

           int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

      函数作用:

          1.阻塞等待条件变量cond(参1)满足 

          2.释放已掌握的互斥锁(解锁互斥量)相当于pthread_mutex_unlock(&mutex);

       1.2.两步为一个原子操作,不可分割。

          3.当被唤醒,pthread_cond_wait函数返回时,解除阻塞并重新申请获取互斥锁pthread_mutex_lock(&mutex);

      pthread_cond_timedwait函数

      限时等待一个条件变量,使用相对时间,所以要先使用time()函数获取当前时间。

      int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

             参3:

                       struct timespec {

                                time_t tv_sec;          /* seconds */ 秒

                                long   tv_nsec;      /* nanosecondes*/ 纳秒

                       }                                                                        

      形参abstime:绝对时间。                                                                                     

      如:time(NULL)返回的就是绝对时间。而alarm(1)是相对时间,相对当前时间定时1秒钟。   

                                struct timespec t = {1, 0};

                                sem_timedwait(&sem, &t); 这样只能定时到 1970年1月1日  00:00:01秒(早已经过去)

     

          正确用法:

                                time_t cur = time(NULL); 获取当前时间。

            struct timespec t;    定义timespec 结构体变量t

                                t.tv_sec = cur+1; 定时1秒

            pthread_cond_timedwait (&cond, &t); 传参                                              参APUE.11.6线程同步

         在讲解setitimer函数时我们还提到另外一种时间类型:

                  struct timeval {

                      time_t      tv_sec;  /* seconds */ 秒

                      suseconds_t tv_usec; /* microseconds */ 微秒

                  };

      

      pthread_cond_signal函数

      唤醒至少一个阻塞在条件变量上的线程

      int pthread_cond_signal(pthread_cond_t *cond);

      pthread_cond_broadcast函数

      唤醒全部阻塞在条件变量上的线程

          int pthread_cond_broadcast(pthread_cond_t *cond);

      示例代码:生产者消费者模型

    /*借助条件变量模拟 生产者-消费者 问题*/
    #include <stdlib.h>
    #include <unistd.h>
    #include <pthread.h>
    #include <stdio.h>
    
    /*链表作为公享数据,需被互斥量保护*/
    struct msg {
        struct msg *next;
        int num;
    };
    struct msg *head;
    
    /* 静态初始化 一个条件变量 和 一个互斥量*/
    pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
    
    void *consumer(void *p)
    {
        struct msg *mp;
    
        for (;;) {
            pthread_mutex_lock(&lock);
            while (head == NULL) {           //头指针为空,说明没有节点    可以为if吗
                pthread_cond_wait(&has_product, &lock);
            }
    
            mp = head;      
            head = mp->next;                //模拟消费掉一个产品
            pthread_mutex_unlock(&lock);
    
            printf("-Consume %lu---%d
    ", pthread_self(), mp->num);
            free(mp);
            sleep(rand() % 4);
        }
    }
    
    void *producer(void *p)
    {
        struct msg *mp;
    
        for (;;) {
            mp = malloc(sizeof(struct msg));
            mp->num = rand() % 1000 + 1;        //模拟生产一个产品
            printf("-Produce -------------%d
    ", mp->num);
    
            pthread_mutex_lock(&lock);
            mp->next = head;
            head = mp;
            pthread_mutex_unlock(&lock);
    
            pthread_cond_signal(&has_product);  //将等待在该条件变量上的一个线程唤醒
    
            sleep(rand() % 4);
        }
    }
    
    int main(int argc, char *argv[])
    {
        pthread_t pid, cid;
        srand(time(NULL));
    
        pthread_create(&pid, NULL, producer, NULL);
    
        pthread_create(&cid, NULL, consumer, NULL);
        pthread_create(&cid, NULL, consumer, NULL);
        pthread_create(&cid, NULL, consumer, NULL);
        pthread_create(&cid, NULL, consumer, NULL);
    
        pthread_join(pid, NULL);
        pthread_join(cid, NULL);
    
        return 0;
    }

      条件变量的优点:

           相较于mutex而言,条件变量可以减少竞争。如直接使用mutex,除了生产者、消费者之间要竞争互斥量以外,

           消费者之间也需要竞争互斥量,但如果汇聚(链表)中没有数据,消费者之间竞争互斥锁是无意义的。

      有了条件变量机制以后,只有生产者完成生产,才会引起消费者之间的竞争。提高了程序效率。

      3.读写锁

      与互斥量类似,但读写锁允许更高的并行性。其特性为:写独占,读共享。

      读写锁状态:

      一把读写锁具备三种状态:

             1. 读模式下加锁状态 (读锁)

             2. 写模式下加锁状态 (写锁)

             3. 不加锁状态

      读写锁特性: 

      1.读写锁是“写模式加锁”时, 解锁前,所有对该锁加锁的线程都会被阻塞。

      2.读写锁是“读模式加锁”时, 如果线程以读模式对其加锁会成功;如果线程以写模式加锁会阻塞。

      3.读写锁是“读模式加锁”时, 既有试图以写模式加锁的线程,也有试图以读模式加锁的线程。

      那么读写锁会阻塞随后的读模式锁请求。优先满足写模式锁。读锁、写锁并行阻塞,写锁优先级高

           读写锁也叫共享-独占锁。当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独占模式锁住的。写独占、读共享。

           读写锁非常适合于对数据结构读的次数远大于写的情况。

      主要应用函数:

           pthread_rwlock_init函数

           pthread_rwlock_destroy函数

           pthread_rwlock_rdlock函数 

           pthread_rwlock_wrlock函数

           pthread_rwlock_tryrdlock函数

           pthread_rwlock_trywrlock函数

           pthread_rwlock_unlock函数

      以上7 个函数的返回值都是:成功返回0, 失败直接返回错误号。  

          pthread_rwlock_t类型   用于定义一个读写锁变量。

          pthread_rwlock_t rwlock;

      pthread_rwlock_init函数

      初始化一把读写锁

           int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

           参2:attr表读写锁属性,通常使用默认属性,传NULL即可。

      pthread_rwlock_destroy函数

      销毁一把读写锁

           int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

      pthread_rwlock_rdlock函数

      以读方式请求读写锁。(常简称为:请求读锁)

           int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

      pthread_rwlock_wrlock函数

      以写方式请求读写锁。(常简称为:请求写锁)

         int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

      pthread_rwlock_unlock函数

      解锁

           int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

      pthread_rwlock_tryrdlock函数

      非阻塞以读方式请求读写锁(非阻塞请求读锁)

      int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

         pthread_rwlock_trywrlock函数

      非阻塞以写方式请求读写锁(非阻塞请求写锁)

           int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

      示例代码:同时有多个线程对同一全局数据读、写操作。 

    #include <stdio.h>
    #include <unistd.h>
    #include <pthread.h>
    
    int counter;
    pthread_rwlock_t rwlock;
    
    /* 3个线程不定时写同一全局资源,5个线程不定时读同一全局资源 */
    void *th_write(void *arg)
    {
        int t;
        int i = (int)arg;
        while (1) {
            pthread_rwlock_wrlock(&rwlock);
            t = counter;   
            usleep(1000);
            printf("=======write %d: %lu: counter=%d ++counter=%d
    ", i, pthread_self(), t, ++counter);
            pthread_rwlock_unlock(&rwlock);
            usleep(10000);
        }
        return NULL;
    }
    void *th_read(void *arg)
    {
        int i = (int)arg;
    
        while (1) {
            pthread_rwlock_rdlock(&rwlock);
            printf("----------------------------read %d: %lu: %d
    ", i, pthread_self(), counter);
            pthread_rwlock_unlock(&rwlock);
            usleep(2000);
        }
        return NULL;
    }
    
    int main(void)
    {
        int i;
        pthread_t tid[8];
    
        pthread_rwlock_init(&rwlock, NULL);
    
        for (i = 0; i < 3; i++)
            pthread_create(&tid[i], NULL, th_write, (void *)i);
    
        for (i = 0; i < 5; i++)
            pthread_create(&tid[i+3], NULL, th_read, (void *)i);
    
        for (i = 0; i < 8; i++)
            pthread_join(tid[i], NULL);
    
        pthread_rwlock_destroy(&rwlock);
    
        return 0;
    }

      

      4.信号量

      信号量有posix有名信号量和无名信号量,还有system V信号量,在这里主要介绍posix无名信号量用于线程同步。

      进化版的互斥锁(1 --> N)

            由于互斥锁的粒度比较大,如果我们希望在多个线程间对某一对象的部分数据进行共享,使用互斥锁是没有办法实现的,只能将整个数据对象锁住。

       这样虽然达到了多线程操作共享数据时保证数据正确性的目的,却无形中导致线程的并发性下降。线程从并行执行,变成了串行执行。与直接使用单进程无异。

          信号量,是相对折中的一种处理方式,既能保证同步,数据不混乱,又能提高线程并发

      主要应用函数:

             sem_init函数

             sem_destroy函数

             sem_wait函数

             sem_trywait函数  

             sem_timedwait函数      

             sem_post函数

        以上6 个函数的返回值都是:成功返回0, 失败返回-1,同时设置errno。(注意,它们没有pthread前缀)

       可以使用perror函数打印出错信息。

            sem_t类型,本质仍是结构体。但应用期间可简单看作为整数,忽略实现细节(类似于使用文件描述符)。

            sem_t sem; 规定信号量sem不能 < 0。头文件 <semaphore.h>

      信号量基本操作:

      sem_wait:        1. 信号量大于0,则信号量--                (类比pthread_mutex_lock)

               |                   2. 信号量等于0,造成线程阻塞

             对应

               |

           sem_post:     将信号量++,同时唤醒阻塞在信号量上的线程         (类比pthread_mutex_unlock)

      但,由于sem_t的实现对用户隐藏,所以所谓的++、--操作只能通过函数来实现,而不能直接++、--符号。

      信号量的初值,决定了占用信号量的线程的个数。

      sem_init函数

      初始化一个信号量

           int sem_init(sem_t *sem, int pshared, unsigned int value);

           参1:sem信号量 

        参2:pshared取0用于线程间;取非0用于进程间        

      参3:value指定信号量初值

      sem_destroy函数

      销毁一个信号量

            int sem_destroy(sem_t *sem);

       sem_wait函数

      给信号量加锁 --

           int sem_wait(sem_t *sem);

      sem_post函数

      给信号量解锁 ++

           int sem_post(sem_t *sem); 

      sem_trywait函数

      尝试对信号量加锁 --    (与sem_wait的区别类比lock和trylock)

           int sem_trywait(sem_t *sem);     

      sem_timedwait函数

      限时尝试对信号量加锁 --

           int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

           参2:abs_timeout采用的是绝对时间。                      

          定时1秒:

                       time_t cur = time(NULL); 获取当前时间。

           struct timespec t;    定义timespec 结构体变量t

                       t.tv_sec = cur+1; 定时1秒

          sem_timedwait(&sem, &t); 传参

      示例代码:生成者消费者模型,一个生产者多个消费者  

    /*信号量实现 生产者 消费者问题*/
    #include <stdlib.h>
    #include <unistd.h>
    #include <pthread.h>
    #include <stdio.h>
    #include <semaphore.h>
    
    #define NUM 5       
    
    int idex = 0;    
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;    //解决多个消费者之间的竞争   
    int queue[NUM];                                     //全局数组实现环形队列
    sem_t blank_number, product_number;                 //空格子信号量, 产品信号量
    void *producer(void *arg)
    {
        int i = 0;
        while (1) {
            sem_wait(&blank_number);                    //生产者将空格子数--,为0则阻塞等待
            queue[i] = rand() % 1000 + 1;               //生产一个产品
            printf("----Produce---%d
    ", queue[i]);        
            sem_post(&product_number);                  //将产品数++
    
            i = (i+1) % NUM;                            //借助下标实现环形
            sleep(rand()%1);
        }
    }
    
    void *consumer(void *arg)
    {
        while (1) {
            sem_wait(&product_number);                  //消费者将产品数--,为0则阻塞等待
            printf("-Consume---%d      %lu
    ", queue[idex], pthread_self());
            queue[idex] = 0;                               //消费一个产品 
            sem_post(&blank_number);                    //消费掉以后,将空格子数++
    
            pthread_mutex_lock(&lock);
            idex = (idex+1) % NUM;
            pthread_mutex_unlock(&lock);
            sleep(rand()%1);
        }
    }
    
    int main(int argc, char *argv[])
    {
        pthread_t pid, cid;
    
        sem_init(&blank_number, 0, NUM);                //初始化空格子信号量为5
        sem_init(&product_number, 0, 0);                //产品数为0
    
        pthread_create(&pid, NULL, producer, NULL);
        
        pthread_create(&cid, NULL, consumer, NULL);
        pthread_create(&cid, NULL, consumer, NULL);
        pthread_create(&cid, NULL, consumer, NULL);
        pthread_create(&cid, NULL, consumer, NULL);
        pthread_create(&cid, NULL, consumer, NULL);
        
        pthread_join(pid, NULL);
        pthread_join(cid, NULL);
    
        sem_destroy(&blank_number);
        sem_destroy(&product_number);
    
        return 0;
    }

      

  • 相关阅读:
    mysql索引
    springboot mybatis 后台框架平台 shiro 权限 集成代码生成器
    java 企业网站源码模版 有前后台 springmvc SSM 生成静态化
    java springMVC SSM 操作日志 4级别联动 文件管理 头像编辑 shiro redis
    activiti工作流的web流程设计器整合视频教程 SSM和独立部署
    .Net Core中的ObjectPool
    文件操作、流相关类梳理
    .Net Core中的配置文件源码解析
    .Net Core中依赖注入服务使用总结
    消息中间件RabbitMQ(一)
  • 原文地址:https://www.cnblogs.com/FREMONT/p/9482524.html
Copyright © 2011-2022 走看看