zoukankan      html  css  js  c++  java
  • 线程同步

    线程同步

    多个线程共享相同的内存时, 需要确保每个线程看到的数据视图一致.   如果每个线程的数据在其他线程不会用到,  或者变量是只读的, 这样没有一致性问题. 但是如果多个线程需要共享变量时, 就需要进行线程同步了.

    举个例子,  有线程A读取变量并加1, 可分为以下三步:

    1.从内存读入变量到寄存器

    2.寄存器中进行变量值增加

    3.把新值写回内存单元.

    这时如果有线程B在 2 和 3 期间读取变量值, 就可能会得到不一致的值. 下面介绍几种常用同步机制.

    互斥量

    互斥量确保同一时间只有一个线程访问数据, 从本质上来说是一把锁.  在访问数据前要加锁, 访问数据后释放锁.  

    如果有多个线程同时试图锁住数据, 则只有一个线程能获取锁, 变成运行状态( 谁能第一个获取锁与具体实现有关).  其它线程将被阻塞直到锁释放. 在这种方式下, 只有一个线程可以向前执行.

    互斥变量使用 pthread_mutex_t 数据类型表示. 在使用前需要用 pthread_mutex_init( ) 初始化, 释放内存前需要调用 pthread_mutex_destroy( );

    如果线程不希望被阻塞, 它可以使用 pthread_mutex_trylock ( ) 尝试对互斥量加锁,  如果互斥量可用, 则成功锁住; 否则锁住失败, 返回EBUSY, 线程不阻塞.

    #include <pthread.h>
    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_trylock(pthread_mutex_t *mutex);
    int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,const struct timespec *restrict tsptr);
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
                                                All return: 0 if OK, error number on failure

    下面是是APUE上面一个简单的程序.   由于pthread不是系统默认库, 所以在编译是注意加入-lpthread.

     gcc mutex.c -o mutex -lpthread

    #include "apue.h"
    #include <pthread.h>
    
    /*
     *
     *    加入引用计数, 确保在所有使用该对象的线程完成数据访问之前, 该对象内存空间不会被释放.
     *    在加1, 减1 以及判断引用计数是否为0之前必须锁住互斥量.
     *
     *
     * */
    
    struct foo{
        int    f_count;
        pthread_mutex_t f_lock;    //互斥变量
    };
    
    
    //调用malloc() 动态分配互斥变量
    struct foo *foo_alloc(void){
        
        struct foo *fp;
        
        if((fp = malloc(sizeof(struct foo))) != NULL){
            fp->f_count = 1;
            //初始化互斥变量
            if(pthread_mutex_init(&fp->f_lock, NULL) != 0){
                free(fp);
                return (NULL);
            }
        }
        return (fp);
    }
    
    //加1操作
    void foo_hold(struct foo *fp){
        pthread_mutex_lock(&fp->f_lock);
        fp->f_count++;    
        printf("f_count = %d
    ", fp->f_count);
        pthread_mutex_unlock(&fp->f_lock);
    }
    
    //减1操作,  如果最后一个引用被释放, 释放对象内存空间.
    void foo_rele(struct foo *fp){
        pthread_mutex_lock(&fp->f_lock);
        fp->f_count--;
        printf("f_count = %d
    ", fp->f_count);
        if(0 == fp->f_count){    
            printf("free fp
    ");
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_destroy(&fp->f_lock);
            free(fp);
        }else{    
            pthread_mutex_unlock(&fp->f_lock);
        }
    }
    
    
    //线程 1 执行函数
    void *thr_fn1(void *arg){
    
        struct foo *fp = (struct foo*)arg;
        printf("thread 1 starting
    ");
        foo_rele(fp);
        printf("thread 1 exit
    ");
        pthread_exit((void *)1);
    }
    
    
    //线程 2 执行函数
    void *thr_fn2(void *arg){
    
        struct foo *fp = (struct foo *)arg;
        printf("thread 2 starting
    ");
        foo_hold(fp);
        foo_hold(fp);
        printf("thread 2 exit
    ");
        pthread_exit((void *)2);
    }
    
    int main(void){
    
        
        pthread_t tid1, tid2;
        int err;
        void *pret;
        struct foo *fp;
        
        fp = foo_alloc();
    
        //创建线程
        if((err = pthread_create(&tid1, NULL, thr_fn1, (void *)fp)) != 0){
            err_quit("create thread 1 error
    ", strerror(err));
        }
        
        if((err = pthread_create(&tid2, NULL, thr_fn2, (void *)fp)) != 0){
            err_quit("create thread 2 error
    ", strerror(err));
        }
        //获取线程退出状态
        pthread_join(tid1, &pret);
        printf("thread 1 exit code is : %ld
    ", (long)pret);
        pthread_join(tid2, &pret);
        printf("thread 2 exit code is : %ld
    ", (long)pret);
        exit(0);
    }
    View Code

    程序运行结果如下,  这里有个问题,  那就是线程1在线程2运行之前就已经结束.  既然 fp 指针已经被释放掉了.  为何仍然在线程2中得以引用呢 ?    答案是野指针 !   之前 fp 指向的内存没有被占用, 但是也可能被占用, 引起内存泄露. 

    thread 1 starting
    thread 2 starting
    f_count = 0
    free fp
    thread 1 exit
    f_count = 1
    f_count = 2
    thread 2 exit
    thread 1 exit code is : 1
    thread 2 exit code is : 2

    避免死锁

    产生死锁的几种情况

    1.线程对同一互斥量多次加锁, 类似下面这种

    pthread_mutex_lock(mutex);
    pthread_mutex_lock(mutex);
    pthread_mutex_unlock(mutex);
    pthread_mutex_unlock(mutex);

    第二行阻塞, 发生死锁.  这种情况比较容易避免,  取消多次加锁即可.

    2.多个互斥量,  且不同线程各锁住一个互斥量, 并都在请求另一个互斥量阻塞.

    //thread A
    pthread_mutex_lock(mutex1);
    pthread_mutex_lock(mutex2);
    pthread_mutex_unlock(mutex2);
    pthread_mutex_unlock(mutex1);
    //thread B
    pthread_mutex_lock(mutex2);
    pthread_mutex_lock(mutex1);
    pthread_mutex_unlock(mutex1);
    pthread_mutex_unlock(mutex2);

    互相请求对方所占资源, 导致死锁.  避免办法是各个进程对互斥量加锁的顺序要一致.  可以是使用一个hash表锁实现.  

    下面程序来自APUE, 只是一种解决办法, 没有具体测试代码.   这里有两个参考文章.

    http://www.cnblogs.com/xbf9xbf/p/4764747.html

    https://segmentfault.com/q/1010000007227951

    #include "apue.h"
    #include <pthread.h>
    
    #define NHASH 29                                    //哈希大值
    #define HASH(fp) (((unsigned long)fp) % NHASH)        //简单hash取模
    struct foo *fh[NHASH];                                //哈希表, 链表解决散列冲突
    
    //初始化  只对静态变量分配的互斥量
    pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
    
    struct foo{
        int    f_count;
        pthread_mutex_t f_lock;    //互斥变量
        struct foo *f_next;
        int f_id;
    };
    
    
    //调用malloc() 动态分配互斥变量
    struct foo *foo_alloc(void){
        
        int idx;
        struct foo *fp;
        
        if((fp = malloc(sizeof(struct foo))) != NULL){
            fp->f_count = 1;
            //初始化互斥变量
            if(pthread_mutex_init(&fp->f_lock, NULL) != 0){
                free(fp);
                return (NULL);
            }
    
            idx = HASH(fp);
            // 先对散列表加锁
            pthread_mutex_lock(&hashlock);
            //头插
            fp->f_next = fh[idx];
            fh[idx] =fp->f_next;
            // 这里要加锁,  对于其它线程是全局可见的, 所以要先阻塞其它请求新结构的线程
            pthread_lock(&fp->f_lock);
            pthread_unlock(&hashlock);
            
            /* 其它初始化操作 */
    
            pthread_mutex_unlock(&fp->f_lock);
        }
        return (fp);
    }
    
    //加1操作
    void foo_hold(struct foo *fp){
        pthread_mutex_lock(&fp->f_lock);
        fp->f_count++;    
        pthread_mutex_unlock(&fp->f_lock);
    }
    
    
    /* 第二版这个函数有错误, 这里是第三版的 */
    struct foo *foo_find(int id){
    
        struct foo  *fp;
        /* 锁住hash表 */
        pthread_mutex_lock(&hashlock);
        for(fp = fh[HASH(id)]; fp != NULL, fp = fp->f_next){
            //命中 增加其引用计数
            if(fp->f_id == id){
                foo_hold(fp);
                break;
            }
        }
    
        pthread_mutex_unlock(&hashlock);
        return (fp);
    }
    
    //减1操作.
    void foo_rele(struct foo *fp){
    
        struct foo *tfp;
        int idx;
        //锁住此节点
        pthread_mutex_lock(&fp->f_lock);
        
        //是最后一个结构
        if(1 == fp->f_count){    
            
            // 这里需要先解锁然后重新获取
            // 为什么要重新获取呢 ?    因为释放节点必须先对其解锁, 才能进行操作!
            // 为什么是先获取hashlock 再获取 fp节点的锁 ?  考虑一种情况
            // foo_rele获取fp的锁, 请求hashlock锁,  而foo_find获取了此hashlock的锁, 请求此fp, 造成死锁
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_unlock(&hashlock);
            pthread_mutex_lock(&fp->f_lock);
            
    
            //重新判断, 防止前面调整锁的顺序, 而其它线程又对此引用计数加1.
            if(1 != fp_f_count){
                fp->f_count--;
                pthread_mutex_unlock(&fp->f_lock);
                pthread_mutex_unlock(&hashlock);
                return;
            }
    
            idx = HASH(fp);
            tfp = fh[idx];
    
            //链表查找
            if(tfp == tp){
                fh[idx] = fp->f_next;
            }else{
                while(tfp->f_index != fp) tfp = tfp->f_next;
                tfp->f_next = fp->f_next;
            }
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_unlock(&hashlock);
            pthread_mutex_destroy(&fp->f_lock);
            free(fp);
        }else{    
            //不是最后一个结构
            fp->f_count--;
            pthread_mutex_unlock(&fp->f_lock);
        }
    }
    View Code

    这里可以对锁进行简化, 用hash锁赖保护结构引用计数,  结构互斥量保护结构体内的其它任何数据. 这种锁的粒度较粗, 但是围绕hash锁和结构互斥量锁的排序问题就没有了

    对上面程序稍作修改

    #include "apue.h"
    #include <pthread.h>
    
    #define NHASH 29                                    //哈希大值
    #define HASH(fp) (((unsigned long)fp) % NHASH)        //简单hash取模
    struct foo *fh[NHASH];                                //哈希表, 链表解决散列冲突
    
    //初始化  只对静态变量分配的互斥量
    pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
    
    struct foo{
        int    f_count;
        pthread_mutex_t f_lock;    //互斥变量
        struct foo *f_next;
        int f_id;
    };
    
    
    //调用malloc() 动态分配互斥变量
    struct foo *foo_alloc(void){
        
        int idx;
        struct foo *fp;
        
        if((fp = malloc(sizeof(struct foo))) != NULL){
            fp->f_count = 1;
            //初始化互斥变量
            if(pthread_mutex_init(&fp->f_lock, NULL) != 0){
                free(fp);
                return (NULL);
            }
    
            idx = HASH(fp);
            // 先对散列表加锁
            pthread_mutex_lock(&hashlock);
            //头插
            fp->f_next = fh[idx];
            fh[idx] =fp->f_next;
            // 这里要加锁,  对于其它线程是全局可见的, 所以要先阻塞其它请求新结构的线程
            pthread_lock(&fp->f_lock);
            pthread_unlock(&hashlock);
            
            /* 其它初始化操作 */
    
            pthread_mutex_unlock(&fp->f_lock);    //第二版中此处应该有错误, 少了这句话.
        }
        return (fp);
    }
    
    //加1操作
    void foo_hold(struct foo *fp){
        pthread_mutex_lock(&hashlock);
        fp->f_count++;    
        pthread_mutex_unlock(&hashlock);
    }
    
    
    /* 第二版这个函数有错误, 这里是第三版的 */
    struct foo *foo_find(int id){
    
        struct foo  *fp;
        /*hash锁保护对象引用计数 */
        pthread_mutex_lock(&hashlock);
        for(fp = fh[HASH(id)]; fp != NULL, fp = fp->f_next){
            //命中 增加其引用计数
            if(fp->f_id == id){
                fp->f_count++;
                break;
            }
        }
    
        pthread_mutex_unlock(&hashlock);
        return (fp);
    }
    
    //减1操作.
    void foo_rele(struct foo *fp){
    
        struct foo *tfp;
        int idx;
        //锁住hash表
        pthread_mutex_lock(&hashlock);
        
        //是最后一个结构
        if(--fp->f_count == 0){    
    
            idx = HASH(fp);
            tfp = fh[idx];
    
            //链表查找
            if(tfp == tp){
                fh[idx] = fp->f_next;
            }else{
                while(tfp->f_index != fp) tfp = tfp->f_next;
                tfp->f_next = fp->f_next;
            }
            pthread_mutex_unlock(&hashlock);
            pthread_mutex_destroy(&fp->f_lock);
            free(fp);
        }else{    
            //不是最后一个结构
            pthread_mutex_unlock(&fp->f_lock);
        }
    }
    View Code

    读写锁

    读写锁可以使得读操作比互斥量具有更高的并行性, 它有三种状态, 读加锁, 写加锁, 不加锁.  非常适合读操作远多余写操作的数据结构.  可以在读数据前加锁, 读完后释放, 写操作同样如此.

    关键点

    1.  读写锁在读加锁状态时, 其它任何读操作都可以加锁.
    2.  读写锁在写加锁状态时, 其它任何操作都被阻塞(不管是加读锁还是写锁).
    3.  读写锁只有在无所状态下才能加写锁.

    读写锁实现方式各不相同, 但是当读写锁处于读模式锁定状态时, 如果有线程试图加写锁操作, 那么其后的读锁请求将会被阻塞,  避免等待的写锁一直得不到满足.

    其中一些API如下

    初始与销毁

    #include <pthread.h>
    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t*restrict attr);
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
                                                   Both return: 0 if OK, error number on failure

    加锁与解锁

    #include <pthread.h>
    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
                                             All return: 0 if OK, error number on failure
    #include <pthread.h>
    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
                                            Both return: 0 if OK, error number on failure
    #include <pthread.h>
    #include <time.h>
    int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrictrwlock,const struct timespec*restrict tsptr);
    int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrictrwlock,const struct timespec *restric ttsptr);

    下面程序说明了读写锁的使用, 四个线程, 两个读线程, 两个写线程.

    #include "apue.h"
    #include <pthread.h>
    
    /* 初始化读写锁 */
    pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
    
    /* 全局资源 */
    int global_num = 10;
    
    /* 读锁线程 */
    void *thread_read_lock(void *arg){
        
        char *pthread_name = (char *)arg;
        while(1){    
            /* 读加锁 */
            pthread_rwlock_rdlock(&rwlock);
            printf("读线程 %s 进入临界区, global_num = %d
    ", pthread_name, global_num);
            sleep(1);
            printf("读线程 %s 退出临界区
    ", pthread_name);
        
            pthread_rwlock_unlock(&rwlock);
            sleep(1);
        }
        return NULL;
    }
    
    /* 写锁线程 */
    void *thread_write_lock(void *arg){
            
        char *pthread_name = (char *)arg;
    
        while(1){    
            /* 写加锁 */
            pthread_rwlock_wrlock(&rwlock);
    
            global_num++;
            printf("写线程 %s 进入临界区, global_num = %d
    ",pthread_name, global_num);
            sleep(1);
            printf("写线程 %s 退出临界区
    ", pthread_name);
            
            pthread_rwlock_unlock(&rwlock);
    
            /* 这里多一秒 是因为两个写锁一起请求会导致后面的读锁饥饿. */
            sleep(2);
        }
    }
    
    
    int main(void){
    
        pthread_t tid_r1, tid_r2, tid_w1, tid_w2;
    
        int err;
        //创建四个线程, 两个读, 两个写.
        if((err = pthread_create(&tid_r1, NULL, thread_read_lock, "read1")) != 0)  err_quit("create thread r1 error
    ", strerror(err));
        if((err = pthread_create(&tid_r2, NULL, thread_read_lock, "read2")) != 0)  err_quit("create thread r1 error
    ", strerror(err));
        if((err = pthread_create(&tid_w1, NULL, thread_write_lock, "write1")) != 0)  err_quit("create thread w1 error
    ", strerror(err));
        if((err = pthread_create(&tid_w2, NULL, thread_write_lock, "write2")) != 0)  err_quit("create thread w2 error
    ", strerror(err));
        
        /* 防止主线程提前退出 */
        if((err = pthread_join(tid_r1, NULL)) != 0) err_quit("cant't join with thread 1
    ", strerror(err));
    
        exit(0);
    }
    View Code

    运行结果 :

    读线程 read2 进入临界区, global_num = 10
    读线程 read1 进入临界区, global_num = 10
    读线程 read2 退出临界区
    读线程 read1 退出临界区
    写线程 write1 进入临界区, global_num = 11
    写线程 write1 退出临界区
    写线程 write2 进入临界区, global_num = 12
    写线程 write2 退出临界区
    读线程 read2 进入临界区, global_num = 12
    读线程 read1 进入临界区, global_num = 12
    读线程 read2 退出临界区
    读线程 read1 退出临界区
    写线程 write1 进入临界区, global_num = 13
    写线程 write1 退出临界区
    写线程 write2 进入临界区, global_num = 14
    写线程 write2 退出临界区
    读线程 read2 进入临界区, global_num = 14
    读线程 read1 进入临界区, global_num = 14

    可以看出, 读锁可以同时进入临界区, 而写锁中间等待了一秒都没有其它线程能进来.

    条件变量

    配合一个互斥量来使用, 允许线程以无竞争的方式来等待条件. 

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    
    
    struct foo{
        int f_count;
        pthread_mutex_t f_lock;
        pthread_cond_t f_cond;
    };
    
    /* 初始化互斥量与条件变量 */
    struct foo * foo_alloc(){
        struct foo *fp;
        if((fp = malloc(sizeof(struct foo))) != NULL){
            fp->f_count = 0;
            pthread_mutex_init(&fp->f_lock, NULL);
            pthread_cond_init(&fp->f_cond, NULL);
        }
        return fp;
    }
    
    /* 加法 */
    void *foo_increase(void *arg){
        
        struct foo *fp;
    
        fp = (struct foo*)arg;
    
        while(1){
            pthread_mutex_lock(&fp->f_lock);
        
            fp->f_count++;
            /* 大于等于100时发送条件 */
            if(fp->f_count >= 100){
                pthread_cond_signal(&fp->f_cond);
                
                pthread_cond_wait(&fp->f_cond, &fp->f_lock);
            }
    
            pthread_mutex_unlock(&fp->f_lock);
        }
    }
    
    /* 重新置0 */
    void *foo_print(void *arg){
        struct foo *fp;
        fp = (struct foo*)arg;
    
        while(1){
            pthread_mutex_lock(&fp->f_lock);
            
            while(fp->f_count < 100){
                //释放掉锁, 等待条件为真返回, 再次锁住.
                pthread_cond_wait(&fp->f_cond, &fp->f_lock);
            }
            printf("重置 : %d
    ", fp->f_count);    
            /* 重新置0 */    
            fp->f_count = 0;
            
            pthread_cond_signal(&fp->f_cond);
            pthread_mutex_unlock(&fp->f_lock);
        }
    }
    
    
    int main(void){
    
        struct foo *fp;
        pthread_t tid_increase1, tid_print;
        //初始化
        fp = foo_alloc();
    
        //加法线程
        pthread_create(&tid_increase1, NULL, foo_increase, fp);
        
        //重置线程
        pthread_create(&tid_print, NULL, foo_print, fp);
    
        //防止主线程提前退出
        sleep(20);
        exit(0);
    }
    View Code
  • 相关阅读:
    C#生成CHM文件(外篇使用hha.dll)
    prototype原理详解
    避免在代码中直接任意使用ConfigurationManager.AppSettings
    页面生命周期的来龙去脉(详细)
    nRF52832 SAADC sampling
    nRF52832开发日志--SAADC调试
    boost学习笔记(七)---date_time库
    Boost学习笔记(六) progress_display注意事项
    Boost学习笔记(五) progress_display
    扩展progress_timer的计时精度
  • 原文地址:https://www.cnblogs.com/tanxing/p/6103862.html
Copyright © 2011-2022 走看看