zoukankan      html  css  js  c++  java
  • Linux互斥和同步应用程序(四):posix互斥信号和同步

           【版权声明:尊重原创。转载请保留源:blog.csdn.net/shallnet 要么 .../gentleliu,文章仅供学习交流,请勿用于商业用途】
             在前面讲共享内存的IPC时曾说共享内存本身不具备同步机制,假设要实现同步须要使用信号量等手段来实现之,如今我们就来说说使用posix的信号量来实现posix多进程共享内存的同步。事实上信号量也能够使用在同一进程的不同线程之间。

            信号量是一个包括非负整数的变量,有两个原子操作。wait和post。也能够说是P操作和V操作。P操作将信号量减一。V操作将信号量加一。假设信号量大于0,P操作直接减一,否则假设等于0,P操作将调用进程(线程)堵塞起来,直到另外进程(线程)运行V操作。
            信号量能够理解为一种资源的数量,通过这样的资源的分配来实现同步或者相互排斥。

    当信号量的值为1时。能够实现相互排斥的功能。此时信号量就是二值信号量,假设信号量的值大于一时。能够实现进程(线程)并发运行。

    信号量和相互排斥锁条件变量之间的差别是:相互排斥锁必须由给它上锁的进程(线程)来解锁,而信号灯P操作不必由运行过它V操作的进程(线程)来运行;相互排斥锁类似于二值信号量。要么加锁。要么解锁;当向条件变量发信号时,假设此时没有等待在该条件变量上的线程。信号将丢失。而信号量不会。信号量主要用于之间同步。但也能够用在线程之间。

    相互排斥锁和条件变量主要用于线程同步,但也能够用于进程间的同步。

            POSIX信号量是一个sem_t的变量类型,分有名信号量和无名信号量。无名信号量用在共享信号量内存的情况下,比方同一个进程的不同线程之间,或父子进程之间,有名信号量随内核持续的。所以我们能够跨多个程序操作它们


           1. 函数sem_open创建一个有名信号量或打开一个已存在信号量。函数原型例如以下:
           #include <fcntl.h> /* For O_* constants */
           #include <sys/stat.h> /* For mode constants */
           #include <semaphore.h>
    
           sem_t *sem_open(const char *name, int oflag);
           sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
           Link with -pthread
    參数name为posix IPC 名称, 參数oflag能够为0, O_CREAT 或O_CREAT | O_EXCL, 假设指定了O_CREAT,第三、四參数须要指定。mode为指定权限位,value參数指定信号量初始值,二值信号量的值通常为1。计数信号量的值通常大于1。
    函数返回运行sem_t数据类型的指针,出错返回SEM_FAILED。

           2. 函数sem_close关闭有名信号量。
           #include <semaphore.h>
           int sem_close(sem_t *sem);
           Link with -pthread.    
            3.函数sem_unlink删除有名信号量。

           #include <semaphore.h>
           int sem_unlink(const char *name);
           Link with -pthread.    
            4.函数sem_wait和函数sem_trywait測试信号量的值,假设信号量值大于0,函数将信号量的值减一并马上返回,假设信号量的值等于0,sem_wait函数開始睡眠,直到信号量的值大于0,这是再减一并马上返回,而sem_trywait函数不睡眠直接返回,并返回EAGAIN错误。函数原型为:
           #include <semaphore.h>
           int sem_wait(sem_t *sem);
           int sem_trywait(sem_t *sem);
           int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
           Link with -pthread.
            5. 函数sem_post运行和sem_wait相反的操作。当一进程(线程)运行完操作之后,应该调用sem_post,将指定信号量加一,然后唤醒正在等待该信号量变为1的进程(线程)。

    函数原型:

           #include <semaphore.h>
           int sem_post(sem_t *sem);
           Link with -pthread.
            6. 函数sem_getvalue获取指定信号量的当前值,假设当前信号量已上锁,返回0,或者为负数,其绝对值为等待该信号量的解锁的线程数。
           #include <semaphore.h>
           int sem_getvalue(sem_t *sem, int *sval);
           Link with -pthread.
            7. 无名信号量使用sem_init()初始化,函数原型为:
           #include <semaphore.h>
           int sem_init(sem_t *sem, int pshared, unsigned int value);
           Link with -pthread.
        sem为无名信号量地址,value为信号量初始值,pshared指定信号量是在进程的各个线程之间共享还是在进程之间共享。假设该值为0。表示在进程的线程之间共享。假设非0则在进出之间共享,无名信号量主要用在有共同祖先的进程或线程之间。

              这一节使用posix共享内存来实现各个进程之间的通信。并使用posix信号量来达到相互排斥与同步的目的。

    首先我们来看看使用信号量实现对共享内存段的相互排斥訪问。

              之前线程的訪问使用例如以下方式来达到对公共的内存相互排斥訪问:
              pthread_mutex_t     mutex = PTHREAD_MUTEX_INITIALIZER;
     ......
    pthread_mutex_lock(&mutex);//加锁
     ......
    /*share memory handle*/
     ......
    pthread_mutex_unlock(&mutex);//解锁
    ......
    如今我们也使用类似方式来实现:
    sem_t *sem_mutex = NULL;
    ......
    SLN_MUTEX_SHM_LOCK(SEM_MUTEX_FILE, sem_mutex);//加锁
    ......
    /*share memory handle*/
     ......
    SLN_MUTEX_SHM_UNLOCK(sem_mutex);//解锁
    ......
    当中SEM_MUTEX_FILE为sem_open函数须要的有名信号量名称。

    当中两个加锁解锁的实现为:
    #define SLN_MUTEX_SHM_LOCK(shmfile, sem_mutex) do {
        sem_mutex = sem_open(shmfile, O_RDWR | O_CREAT, 0666, 1);
        if (SEM_FAILED == sem_mutex) {
            printf("sem_open(%d): %s
    ", __LINE__, strerror(errno));
        }
        sem_wait(sem_mutex);
    }while(0)
    
    #define SLN_MUTEX_SHM_UNLOCK(sem_mutex) do {sem_post(sem_mutex);} while(0)
    事实上就是初始化一个二值信号量,其初始值为1。并运行wait操作,使信号量的值变为0,此时其他进程想要操作共享内存时也须要运行wait操作,但此时信号量的值为0,所以開始等待信号量的值变为1。当当前进程操作完共享内存后,開始解锁,运行post操作将信号量的值加一,此时其他进程的wait能够返回了。
    以下为一个相互排斥訪问共享内存的演示样例。posix共享内存实现请查看前面IPC的系列文章。

    ser process:
    int nms_shm_get(char *shm_file, void **shm, int mem_len)
    {
        int fd;
    
        fd = shm_open(shm_file, O_RDWR | O_CREAT, 0666);
        if (fd < 0) {
            printf("shm_pen <%s> failed: %s
    ", shm_file, strerror(errno));
            return -1;
        }
    
        ftruncate(fd, mem_len);
    
        *shm = mmap(NULL, mem_len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
        if (MAP_FAILED == *shm) {
            printf("mmap: %s
    ", strerror(errno));
            return -1;
        }
    
        return 0;
    }
    int main(int argc, const char *argv[])
    {
        sem_t *sem_mutex = NULL;
        char *str = NULL;
    
        SLN_MUTEX_SHM_LOCK(SEM_MUTEX_FILE, sem_mutex); //加锁
    
        nms_shm_get(SHM_FILE, (void **)&str, SHM_MAX_LEN); //以下三行相互排斥訪问共享内存
        sleep(6);
        snprintf(str, SHM_MAX_LEN, "posix semphore server!");
    
        SLN_MUTEX_SHM_UNLOCK(sem_mutex); //解锁
    
        sleep(6);
    
        shm_unlink(SHM_FILE);
    
        return 0;
    }
    client process:
    int main(int argc, const char *argv[])
    {
        sem_t *sem_mutex;
        char *str = NULL;
        SLN_MUTEX_SHM_LOCK(SEM_MUTEX_FILE, sem_mutex);
        nms_shm_get(SHM_FILE, (void **)&str, SHM_MAX_LEN);
        printf("client get: %s
    ", str);
        SLN_MUTEX_SHM_UNLOCK(sem_mutex);
        return 0;
    }
    先启动服务进程首先加锁,创建共享内存并操作它。加锁中sleep 6秒,以便測试客户进程是否在服务进程未释放锁时处于等待状态。客户进程在服务进程启动之后立即启动,此时处于等待状态,当服务进程6秒之后解锁,客户进程获得共享内存信息。再过6秒之后,服务进程删除共享内存,客户进程再此获取共享内存失败。
    # ./server & 
    [1] 21690 
    # ./client 
    client get: posix semphore server! 
    # ./client
    shm_open <share_memory_file> failed: No such file or directory
    client get: (null)
    [1]+ Done ./server
    posix有名信号量创建的信号量文件和共享内存文件在/dev/shm/文件夹下:
    # ls /dev/shm/ 
     sem.sem_mutex share_memory_file 
    #
    在两个进程共享数据时,当一个进程向共享内存写入了数据后须要通知另外的进程,这就须要两个进程之间实现同步,这里我们给上面的程序在相互排斥的基础上加上同步操作。同步也是使用posix信号量来实现。
    server process:
    int main(int argc, const char *argv[])
    {
        sem_t *sem_mutex = NULL;
        sem_t *sem_consumer = NULL, *sem_productor = NULL;
        int semval;
        char *sharememory = NULL;
    
        sem_consumer = sem_open(SEM_CONSUMER_FILE, O_CREAT, 0666, 0); //初始化信号量sem_consumer 。并设置初始值为0
        if (SEM_FAILED == sem_consumer) {
            printf("sem_open <%s>: %s
    ", SEM_CONSUMER_FILE, strerror(errno));
            return -1;
        }
    
        sem_productor = sem_open(SEM_PRODUCTOR_FILE, O_CREAT, 0666, 0);//初始化信号量sem_productor ,并设置初始值为0
        if (SEM_FAILED == sem_productor) {
            printf("sem_open <%s>: %s
    ", SEM_PRODUCTOR_FILE, strerror(errno));
            return -1;
        }
    
        for (;;) {//服务进程一直循环处理客户进程请求
            sem_getvalue(sem_consumer, &semval);
            printf("%d waiting...
    ", semval);
            if (sem_wait(sem_consumer) < 0) {//假设sem_consumer为0,则堵塞在此,等待客户进程post操作使sem_consumer大于0。此处和客户进程同步
                printf("sem_wait: %s
    ", strerror(errno));
                return -1;
            }
            printf("Get request...
    ");
    
            SLN_MUTEX_SHM_LOCK(SEM_MUTEX, sem_mutex);//此处開始相互排斥訪问共享内存
            nms_shm_get(SHM_FILE, (void **)&sharememory, SHM_MAX_LEN);
            sleep(6);
            snprintf(sharememory, SHM_MAX_LEN, "Hello, this is server's message!");
            SLN_MUTEX_SHM_UNLOCK(sem_mutex);
    
            sem_post(sem_productor);//使信号量sem_productor加一,使堵塞的客户进程继续运行
            printf("Response request...
    ");
        }
    
        sem_close(sem_consumer);
        sem_close(sem_productor);
        return 0;
    }
    
    
    client process:

    int main(int argc, const char *argv[])
    {
        sem_t *sem_consumer = NULL, *sem_productor = NULL;
        struct timespec timeout;
        int ret;
        char *sharememory = NULL;
        sem_t *sem_mutex;
        sem_consumer = sem_open(SEM_CONSUMER_FILE, O_RDWR);//获取信号量sem_consumer的值
        if (SEM_FAILED == sem_consumer) {
            printf("sem_open <%s>: %s
    ", SEM_CONSUMER_FILE, strerror(errno));
            return -1;
        }
        sem_productor = sem_open(SEM_PRODUCTOR_FILE, O_RDWR);//获取信号量sem_productor 的值
        if (SEM_FAILED == sem_productor) {
            printf("sem_open <%s>: %s
    ", SEM_PRODUCTOR_FILE, strerror(errno));
            return -1;
        }
        //clear_exist_sem(sem_productor);
    
        SLN_MUTEX_SHM_LOCK(SEM_MUTEX, sem_mutex);//相互排斥訪问共享内存
        nms_shm_get(SHM_FILE, (void **)&sharememory, SHM_MAX_LEN);
        printf("sharememory: %s
    ", sharememory);
        SLN_MUTEX_SHM_UNLOCK(sem_mutex);
    
        sem_post(sem_consumer);//信号量sem_consumer加一。唤醒是堵塞在该信号量上的服务进程
        printf("Post...
    ");
       sem_wait(sem_productor);//等待服务进程回应
       /*
        timeout.tv_sec = time(NULL) + SEM_TIMEOUT_SEC;
        timeout.tv_nsec = 0;
        ret = sem_timedwait(sem_productor, &timeout);
        if (ret < 0) {
            printf("sem_timedwait: %s
    ", strerror(errno));
        }
       */
    
        printf("Get response...
    ");
        sem_close(sem_consumer);
        sem_close(sem_productor);
    
        return 0;
    }
    本节演示样例源代码下载:

    版权声明:本文博主原创文章。博客,未经同意不得转载。假设你认为你的实际物品,请点击以下“最佳”。

  • 相关阅读:
    HDU
    hdu-1260 tickets
    hdu-1024 Max Sum Plus Plus
    spfa+链式前向星模板
    kafka-伪集群搭建
    elasticsearch-安装-centos7- es7.5 搭建
    elk-安装 通过docker
    kibana-安装-通过docker
    logstash -grok插件语法介绍
    docker 启动redis/nginx
  • 原文地址:https://www.cnblogs.com/blfshiye/p/4804956.html
Copyright © 2011-2022 走看看