zoukankan      html  css  js  c++  java
  • [apue] 使用文件记录锁无法实现父子进程交互执行同步

    父子进程间交互执行是指用一种同步原语,实现父进程和子进程在某一时刻只有一个进程执行,之后由另外一个进程执行,用一段代码举例如下:

        SYNC_INIT(); 
    
        int i=0, counter=0; 
        pid_t pid = fork (); 
        if (pid < 0)
            err_sys ("fork error"); 
        else if (pid > 0)
        {
            // parent
            for (i=0; i<NLOOPS; i+=2)
            {
                counter = update ((long *)area); 
                if (counter != i)
                    err_quit ("parent: expected %d, got %d", i, counter); 
                else 
                    printf ("parent increase to %d based %d
    ", i+1, counter); 
    
                SYNC_TELL(pid, 1); 
                SYNC_WAIT(0); 
            }
    
            printf ("parent exit
    "); 
        }
        else 
        {
            for (i=1; i<NLOOPS+1; i+=2)
            {
                SYNC_WAIT(1); 
                counter = update ((long *)area); 
                if (counter != i)
                    err_quit ("child: expected %d, got %d", i, counter); 
                else 
                    printf ("child increase to %d based %d
    ", i+1, counter); 
    
                SYNC_TELL(getppid (), 0);
            }
    
            printf ("child exit
    "); 
        }

    其中area是指向共享内存的一个地址,update用来增加area指向的内容(为long),在fork之后,父子进程交替更新此值。

    它们使用了一些抽象的同步原语,例如SYNC_INIT用于初始化同步设施、SYNC_WAIT等待另外进程的信号、SYNC_TELL向另外进程发送信号。

    下面是成功同步后的输出(假设NLOOPS为100):

    create shared-memory 3801126 with size 4 ok
    attach shared-memory at 0xb7733000
    parent increase to 1 based 0
    child increase to 2 based 1
    parent increase to 3 based 2
    child increase to 4 based 3
    parent increase to 5 based 4
    child increase to 6 based 5
    parent increase to 7 based 6
    child increase to 8 based 7
    parent increase to 9 based 8
    child increase to 10 based 9
    parent increase to 11 based 10
    child increase to 12 based 11
    parent increase to 13 based 12
    child increase to 14 based 13
    parent increase to 15 based 14
    child increase to 16 based 15
    parent increase to 17 based 16
    child increase to 18 based 17
    parent increase to 19 based 18
    child increase to 20 based 19
    parent increase to 21 based 20
    child increase to 22 based 21
    parent increase to 23 based 22
    child increase to 24 based 23
    parent increase to 25 based 24
    child increase to 26 based 25
    parent increase to 27 based 26
    child increase to 28 based 27
    parent increase to 29 based 28
    child increase to 30 based 29
    parent increase to 31 based 30
    child increase to 32 based 31
    parent increase to 33 based 32
    child increase to 34 based 33
    parent increase to 35 based 34
    child increase to 36 based 35
    parent increase to 37 based 36
    child increase to 38 based 37
    parent increase to 39 based 38
    child increase to 40 based 39
    parent increase to 41 based 40
    child increase to 42 based 41
    parent increase to 43 based 42
    child increase to 44 based 43
    parent increase to 45 based 44
    child increase to 46 based 45
    parent increase to 47 based 46
    child increase to 48 based 47
    parent increase to 49 based 48
    child increase to 50 based 49
    parent increase to 51 based 50
    child increase to 52 based 51
    parent increase to 53 based 52
    child increase to 54 based 53
    parent increase to 55 based 54
    child increase to 56 based 55
    parent increase to 57 based 56
    child increase to 58 based 57
    parent increase to 59 based 58
    child increase to 60 based 59
    parent increase to 61 based 60
    child increase to 62 based 61
    parent increase to 63 based 62
    child increase to 64 based 63
    parent increase to 65 based 64
    child increase to 66 based 65
    parent increase to 67 based 66
    child increase to 68 based 67
    parent increase to 69 based 68
    child increase to 70 based 69
    parent increase to 71 based 70
    child increase to 72 based 71
    parent increase to 73 based 72
    child increase to 74 based 73
    parent increase to 75 based 74
    child increase to 76 based 75
    parent increase to 77 based 76
    child increase to 78 based 77
    parent increase to 79 based 78
    child increase to 80 based 79
    parent increase to 81 based 80
    child increase to 82 based 81
    parent increase to 83 based 82
    child increase to 84 based 83
    parent increase to 85 based 84
    child increase to 86 based 85
    parent increase to 87 based 86
    child increase to 88 based 87
    parent increase to 89 based 88
    child increase to 90 based 89
    parent increase to 91 based 90
    child increase to 92 based 91
    parent increase to 93 based 92
    child increase to 94 based 93
    parent increase to 95 based 94
    child increase to 96 based 95
    parent increase to 97 based 96
    child increase to 98 based 97
    parent increase to 99 based 98
    child increase to 100 based 99
    child exit
    parent exit
    remove that shared-memory
    

    这套同步原语可以有多种实现方案,简单如管道、xsi信号量,甚至直接使用信号。下面是一些例子:

    1. 使用管道

    #ifdef USE_PIPE_SYNC
    
    // pp is the pipe that parent notify(write) child wait(read)
    // pc is the pipe that child notify(write) parent wait(read)
    static int pp[2], pc[2]; 
    
    void SYNC_INIT (void)
    {
        if (pipe (pp) < 0 || pipe(pc) < 0)
            err_sys ("pipe error"); 
    }
    
    void SYNC_TELL (pid_t pid, int child)
    {
        // close unused read end to avoid poll receive events
        // note, we can NOT do it in SYNC_INIT, 
        // as at that moment, we have not fork yet !
        if (child) { 
            close (pp[0]); 
            close (pc[1]); 
            pp[0] = pc[1] = -1; 
        } else { 
            close (pc[0]); 
            close (pp[1]); 
            pc[0] = pp[1] = -1; 
        }
    
        if (write (child ? pp[1] : pc[1], child ? "p" : "c", 1) != 1)
            err_sys ("write error"); 
    }
    
    void SYNC_WAIT (int child /* unused */)
    {
        int n = 0, m = 0; 
        struct pollfd fds[2] = {{ 0 }}; 
        // if fd==-1, just be a place taker !
        //if (pp[0] != -1) 
        {
            fds[n].fd = pp[0]; 
            fds[n].events = POLLIN; 
            n++; 
        }
    
        //if (pc[0] != -1) 
        { 
            fds[n].fd = pc[0]; 
            fds[n].events = POLLIN; 
            n++; 
        }
    
        int ret = poll (fds, n, -1); 
        if (ret == -1)
            err_sys ("poll error"); 
        else if (ret > 0) { 
            char c = 0; 
            //printf ("poll %d from %d
    ", ret, n); 
            for (m=0; m<n; ++m) {
                //printf ("poll fd %d event 0x%08x
    ", fds[m].fd, fds[m].revents); 
                if (fds[m].revents & POLLIN) { 
                    if (fds[m].fd == pp[0]) { 
                        if (read (pp[0], &c, 1) != 1)
                            err_sys ("read parent pipe error"); 
                        if (c != 'p')
                            err_quit ("wait parent pipe but got incorrect data %c", c); 
                    }
                    else {
                        if (read (pc[0], &c, 1) != 1) 
                            err_sys ("read child pipe error"); 
                        if (c != 'c') 
                            err_quit ("wait child pipe but got incorrect data %c", c); 
                    }
                }
            }
        }
        else 
            printf ("poll return 0
    "); 
    }
    
    #endif

    管道的话,TELL时就是向管道写一个字节,WAIT的时候就是阻塞在对应的端读取一个字节。

    注意这里WAIT没有直接使用child参数,而是使用poll同时检测两个读端,看哪个有数据就返回哪个。其实直接读对应的端更直接一些。

    2.使用xsi信号量

    #ifdef USE_SEM_SYNC
    
    union semun
    {
      int val;                //<= value for SETVAL
      struct semid_ds *buf; //        <= buffer for IPC_STAT & IPC_SET
      unsigned short int *array;//         <= array for GETALL & SETALL
      struct seminfo *__buf;    //        <= buffer for IPC_INFO
    };
    
    static int semid = -1;  
    
    void SYNC_INIT ()
    {
        int mode = 0666;  // 0; 
        int flag = IPC_CREAT; 
    #ifdef USE_EXCL
        flag |= IPC_EXCL; 
    #endif
    
        semid = semget (IPC_PRIVATE, 2, flag | mode); 
        if (semid < 0)
            err_sys ("semget for SYNC failed"); 
    
        printf ("create semaphore %d for SYNC ok
    ", semid); 
    
        union semun sem; 
        //sem.val = 1;
        //int ret = semctl (semid, 0, SETVAL, sem); 
        //if (ret < 0)
        //    err_sys ("semctl to set val failed"); 
        
        short arr[2] = { 0 }; 
        sem.array = arr; 
        int ret = semctl (semid, 0, SETALL, sem); 
        if (ret < 0)
            err_sys ("semctl to set all val failed"); 
    
        printf ("reset all semaphores ok
    "); 
    }
    
    void SYNC_TELL (pid_t pid, int child)
    {
        struct sembuf sb; 
        sb.sem_op = 1; 
        sb.sem_num = child ? 1 : 0; 
        sb.sem_flg = 0;  // IPC_NOWAIT, SEM_UNDO
        int ret = semop (semid, &sb, 1); 
        if (ret < 0)
            printf ("semop to release resource failed, ret %d, errno %d
    ", ret, errno); 
        else 
            printf ("release %d resource %d
    ", sb.sem_op, ret); 
    }
    
    void SYNC_WAIT (int child)
    {
        struct sembuf sb; 
        sb.sem_op = -1; 
        sb.sem_num = child ? 1 : 0; 
        sb.sem_flg = 0;  // IPC_NOWAIT, SEM_UNDO
        int ret = semop (semid, &sb, 1); 
        if (ret < 0)
            printf ("semop to require resource failed, ret %d, errno %d
    ", ret, errno); 
        else 
            printf ("require %d resource %d
    ", sb.sem_op, ret); 
    }
    
    #endif

    xsi信号量的话,在TELL时是向对应的信号量执行V操作,释放一个资源;在WAIT时是向对应的信号量执行P操作,申请一个资源,如果申请不到,就阻塞在那里。

    3.使用信号

    #ifdef USE_SIGNAL_SYNC
    
    static volatile sig_atomic_t sigflag; 
    static sigset_t newmask, oldmask, zeromask; 
    
    static void sig_usr (int signo)
    {
        sigflag = 1; 
        printf ("SIGUSR1/2 called
    "); 
    }
    
    void SYNC_INIT ()
    {
        if (apue_signal (SIGUSR1, sig_usr) == SIG_ERR)
            err_sys ("signal (SIGUSR1) error"); 
        if (apue_signal (SIGUSR2, sig_usr) == SIG_ERR)
            err_sys ("signal (SIGUSR2) error"); 
    
        sigemptyset (&zeromask); 
        sigemptyset (&newmask); 
        sigaddset (&newmask, SIGUSR1); 
        sigaddset (&newmask, SIGUSR2); 
    
        if (sigprocmask (SIG_BLOCK, &newmask, &oldmask) < 0)
            err_sys ("SIG_BLOCK error"); 
    }
    
    void SYNC_TELL (pid_t pid, int child)
    {
        kill (pid, child ? SIGUSR1 : SIGUSR2); 
    }
    
    void SYNC_WAIT (int child /* unused */)
    {
        while (sigflag == 0)
            sigsuspend (&zeromask); 
    
        sigflag = 0; 
        if (sigprocmask (SIG_SETMASK, &oldmask, NULL) < 0)
            err_sys ("SIG_SETMASK error"); 
    }
    
    #endif

    直接使用signal的话,这里分别使用了SIGUSR1和SIGUSR2表示父子进程,TELL操作就是激发一个信号给对方;WAIT操作就是sigsuspend在某个特定信号上,直到有信号发生才返回。

    注意TELL时需要指定发送信号的进程号,所以多了一个pid参数,这个参数在之前据说的两种方法中并没有使用。这也是signal不好的一点。

    然后,apue 15章最后一道习题中,要求使用文件记录锁来实现上述交互执行时,发现这是不可能完成的任务!

    假设我们以加锁文件或文件中一个字节来实现WAIT,使用解锁来实现TELL,那么会发现文件记录锁有以下缺点,导致它不能胜任这个工作:

    1. 文件记录锁是基于文件+进程的,当fork后产生子进程时,之前加的锁自动释放;

    2. 文件记录锁对于重复施加锁于一个文件或文件中某个特定字节时,它的表现就和之前没有加锁一样,直接成功返回,不会产生阻塞效果;

    对于 问题1,直接的影响就是父进程加好锁之后fork,子进程启动后却没有任何初始锁,导致父子进程同步困难。

    虽然这个可以通过在子进程中重新初始化来部分的解决,但是这种问题因为有进程竞争存在,问题不严密从而不完美的;

    对于 问题2,就直接导致其中一个进程在它的任务循环中,TELL另外一个进程后,再WAIT本进程的同步原语时(内部通过加锁实现),

    另一个进程即使没有解锁相应的文件或字节,WAIT也直接成功返回(因为本进程已经持有该锁),从而造成其中一个进程执行多次,另一个进程没有办法插进去执行的情况(虽然两个进程也不能同时执行)。

    所以结论是,对于交互执行的同步场景,管道、semaphore、signal都适用,而file lock不适用。

    测试程序

    各种实现

  • 相关阅读:
    C#中Bitmap类实现对图像操作的一些方法
    C# GDI+ 文字操作
    C#中使用GDI+实现复杂打印
    【Python基础】json.dumps()和json.loads()、json.dump()和json.load()的区分
    【Python爬虫】selenium基础用法
    【Python爬虫】PyQuery解析库
    【Python爬虫】BeautifulSoup 解析库
    【Python爬虫】正则表达式与re模块
    【Python爬虫】Requests库的基本使用
    【Python基础】*args,**args的详细用法
  • 原文地址:https://www.cnblogs.com/goodcitizen/p/11612620.html
Copyright © 2011-2022 走看看