zoukankan      html  css  js  c++  java
  • LinuxC线程pthread线程同步进程同步-互斥量、信号量、条件变量、读写锁、文件锁

    1. 同步概念
      同步:即按时间先后顺序执行。也叫时间控制流。
      同步机制:多个控制流访问同一个共享资源时,为了保证数据不混乱而引入的一种协调机制。
    2. 线程同步

      互斥量:也叫建议锁。因为线程不加锁也可以访问数据但容易出现混乱,建议加锁。#include<pthread.h>
        pthread_mutex_t:是结构体变量,可看作值为1或者0
        
    int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);//1
        pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//2
          //初始化一把锁的两种方式。看作初始化变量值为1。mutex为传出参数,attr传入属性,restrict意思是限制为只能通过本指针修改内存空间。
        int pthread_mutex_destroy(pthread_mutex_t *mutex);
          //销毁一把锁。mutex为传入参数。
        int pthread_mutex_lock(pthread_mutex_t *mutex);
          //加锁,看作mutex--。成功加上锁mutex减1,加不上阻塞等待。
        int pthread_mutex_trylock(pthread_mutex_t *mutex);
          //尝试加锁mutex--,不阻塞,会立马返回是否加锁成功。
        int pthread_mutex_unlock(pthread_mutex_t *mutex);
          //加锁,看作mutex++。同时将阻塞在该锁的所有线程唤醒。接下来哪个线程获取锁取决于cpu调度。
      //加锁解锁应该成对出现,加锁粒度应该尽可能的小。

      死锁:对同一互斥量重复加锁、线程1持有互斥锁A请求互斥锁B但线程2持有B请求A。
     
      信号量:互斥量的升级版。看作是初始值为n的互斥量。保证同步的同时提高了并发量。#include<semaphore.h>
        sem_t:结构体,n不能小于0.
        int sem_init(sem_t *sem, int pshared, unsigned int value);
          //初始化。sem传出参数,pshared为0时线程间、非0时进程间。value信号量初始n的值。
        int sem_destroy(sem_t *sem);
          //销魂信号量,sem传入参数
        int sem_wait(sem_t *sem);
          //阻塞加锁,加锁成功sem--
        int sem_trywait(sem_t *sem);
          //尝试加锁,加锁成功sem--
        int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
          //阻塞加锁,有个等待超时时间,加锁成功sem--
        int sem_post(sem_t *sem);
          //解锁。sem++。唤醒阻塞在该锁的所有线程。接下来哪里线程获得锁不确定取决于cpu调度。  
      
      条件变量:不是锁,但可以造成阻塞。与互斥量mutex结合使用。减少不必要的竞争。#include<pthread.h>
        pthread_cond_t:结构体
        int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
        pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
          //初始化一个条件变量的两种方式,cond传出参数,attr传入参数属性
        int pthread_cond_destroy(pthread_cond_t *cond);
          //销毁一个条件变量,cond传入参数
        int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
          //阻塞等待一个条件变量
        int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
          //阻带等待一个条件变量,有个超时时间。abstime绝对时间,从0开始,所以使用时需要加上当前时间time(NULL)。
        int pthread_cond_signal(pthread_cond_t *cond);
          //至少唤醒一个阻塞在该条件变量的线程
        int pthread_cond_broadcast(pthread_cond_t *cond);
          //唤醒所有阻塞在该条件变量的线程

      读写锁:读时共享,写时独占。写锁优先级高。适用于读远远大于写的情况。
        pthread_rwlock_t:结构体
        pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
        int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
          //初始化读写锁的两种方式
        int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
          //销毁读写锁
        int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
        int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
          //请求读锁,rdlock阻塞,tryrdlock不阻塞
        int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
        int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
          //请求写锁,wrlock阻塞,trywrlock不阻塞
        int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
          //释放读锁或者写锁。

    3. 进程同步

      互斥量:使用方法类似与线程同步,初始化时在pthread_mutexattr_t属性添调用pthread_mutexattr_setpshared()设置为PTHREAD_PROCESS_SHARED即可。
      
      信号量:一般与mmap内存共享映射结合使用。
      
      文件锁:fcntl函数实现,只有进程才有文件锁,线程没有因为通过文件修改描述符实现的。
        int fcntl(int fd, int cmd, struct flock *fl)
          //fd文件描述符,
          //cmd为F_SETLK设置文件(trylock)、为F_SETLKW设置文件锁(wait)、为F_GETLK获取文件锁
          //struct flock:结构体
            l_type:F_RDLCK文件读锁,F_WRLCK文件写锁,F_UNLCK解锁
            l_whence:从哪里计算偏移,SEEK_SET开始位置,SEEK_CUR当前位置,SEEK_END结束位置
            l_start:起始偏移
            l_len:长度。为0表示整个文件加锁
            l_pid:持有该锁的进程id。F_GETLK时使用。

    4. 生产者消费者模型

        #include<stdlib.h>
        #include<stdio.h>
        #include<pthread.h>
        #include<unistd.h>

        typedef struct msg {
          struct msg* next;
          int num;
        }msg_t;

        pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
        pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
        msg_t* header;

        void* product(void* arg)
        {
          msg_t* mp;
          while (1)
          {
            pthread_mutex_lock(&mutex);
            mp = malloc(sizeof(msg_t));
            mp->num = rand() % 1000;
            printf("--product ---%d ", mp->num);
            mp->next = header;
            header = mp;
            pthread_mutex_unlock(&mutex);

            pthread_cond_signal(&has_product);
            sleep(rand() % 5);
          }
        return NULL;
        }

        void* consume(void* arg)
        {
          msg_t* mp;
          while (1)
          {
            pthread_mutex_lock(&mutex);
            while (header == NULL) {//if null,release the lock and wait to be signalled.
              pthread_cond_wait(&has_product, &mutex);
            }
            if (mp != NULL)
            {
              free(mp);
              mp = NULL;
            }
            mp = header;
            header = mp->next;
            printf("--consume ---%d ", mp->num);
            free(mp);
            mp = NULL;
            pthread_mutex_unlock(&mutex);
            sleep(rand() % 5);
          }
          return NULL;
        }

        int main_pthread_cond_var(int argc, char* argv[])
        {
          pthread_t pid, cid;
          srand(time(NULL));
          pthread_create(&pid, NULL, product, NULL);
          pthread_create(&cid, NULL, consume, NULL);
          pthread_join(pid, NULL);
          pthread_join(cid, NULL);
          return EXIT_SUCCESS;
        }


    5. 哲学家问题

        #include<stdio.h>
        #include<stdlib.h>
        #include<unistd.h>
        #include<pthread.h>
        #include<semaphore.h>
        #include<sys/mman.h>
        #include<sys/wait.h>

        #define NUM 5

        pthread_mutex_t mutex[NUM];

        void* thread_philosopher_run(void* argv)
        {
          srand(time(NULL));
          int i = (int)argv;
          int left, right;
          if (i == NUM - 1)//the last one do in opposite way
            left = 0, right = i;
          else
            left = i, right = i + 1;
          while (1)
          {
            pthread_mutex_lock(&mutex[left]);
            if (pthread_mutex_trylock(&mutex[right]) == 0)
            {
              printf(" %c is eating ", 'A' + i);
              pthread_mutex_unlock(&mutex[right]);
            }
            pthread_mutex_unlock(&mutex[left]);
            sleep(rand() % 5);
          }
          return NULL;
        }

        void test_thread_philosopher()
        {
          pthread_t tid[NUM];
          for (int i = 0; i < NUM; i++)
          {
            pthread_mutex_init(&mutex[NUM], NULL);
          }
          for (int i = 0; i < NUM; i++)
          {
            pthread_create(&tid[i], NULL, thread_philosopher_run, (void*)i);
          }
          for (int i = 0; i < NUM; i++)
          {
            pthread_join(tid[i], NULL);
          }
          for (int i = 0; i < NUM; i++)
          {
            pthread_mutex_destroy(&mutex[i]);
          }
        }

        void test_process_philosopher()
        {
          int i;
          sem_t* sem;
          sem = mmap(NULL, sizeof(sem_t) * NUM, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
          if (sem == MAP_FAILED)
          {
            perror("mmap error");
            exit(EXIT_FAILURE);
          }
          for (i = 0; i < NUM; i++)
            sem_init(sem + i, 0, 1);

          for (i = 0; i < NUM; i++)
            if (fork() == 0)
              break;
          if (i < NUM)
          {
            int left, right;
            srand(time(NULL));
            if (i == NUM - 1)
              left = 0, right = i;
            else
              left = i, right = i + 1;
            while (1)
            {
              sem_wait(sem + left);
              if (sem_trywait(sem + right) == 0)
              {
                printf(" %c is eating ", 'A' + i);
                sem_post(sem + right);
              }
              sem_post(sem + left);
              sleep(rand() % 5);
            }
            printf("fork process id=%d ", getpid());
            exit(0);
          }

          for (i = 0; i < NUM; i++)
            wait(NULL);
          for (int i = 0; i < NUM; i++)
            sem_destroy(sem + i);
          munmap(sem, sizeof(sem_t) * NUM);
        }

        int main(int argc, char* argv[])
        {
          //test_thread_philosopher();
          test_process_philosopher();
          return EXIT_SUCCESS;
        }

    注意:编译链接时需要添加参数 -lpthread 线程库。比如gcc -c src.c -o app.exe -lpthread 。

  • 相关阅读:
    WebRTC学习资料大全
    WebRTC学习与DEMO资源一览
    WebRTC MCU( Multipoint Conferencing Unit)服务器调研
    基于Kurento的WebRTC移动视频群聊技术方案
    使用 nginx 和 rtmp 插件搭建视频直播和点播服务器
    利用nginx搭建RTMP视频点播、直播、HLS服务器
    几个学习流媒体的案例代码网址
    rtmp与hls流媒体服务器搭建:ubuntu下Nginx搭建初探与rtmp-module的添加
    Neo4j模糊查询及分页查询
    自定义中文全文索引
  • 原文地址:https://www.cnblogs.com/yongfengnice/p/12116954.html
Copyright © 2011-2022 走看看