zoukankan      html  css  js  c++  java
  • 【说解】在shell中通过mkfifo创建命名管道来控制多个进程并发执行

    背景:

    工作中有两个异地机房需要传数据,数据全名很规范,在某个目录下命名为统一的前缀加上编号。如/path/from/file.{1..100}。而机房间的专线对单个scp进程的传输速度是有限制的,比如最大在100Mb/s,如果直接启动100个scp,则又会遇到ssh的并发连接数限制。

    所以需要控制并发数,即不超过ssh的并发限制,又要让单网卡上的带宽接近饱和,尽快完成传输(假设专线带宽远大于单机网卡带宽)

    实现

    之前知道通过mkfifo创建一个命名管道,可以实现对并发的控制。现在来实现一个。

    在此之前,如果对mkfifo不了解,可以参考这个连接,作者写得很详细,我就不造轮子了。

    这里直接给出代码,并做一些解释。因为单进程的带宽如上所述,所以考虑9个并发。代码如下:

     1 #!/bin/bash
     2 
     3 your_func()
     4 {   # use your cmd or func instead of sleep here. don't end with background(&)
     5     date +%s
     6     echo "scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/"
     7     sleep 2
     8 }
     9 
    10 concurrent()
    11 {   # from $1 to $2, (included $1,$2 itself), con-current $3 cmd
    12     start=$1 && end=$2 && cur_num=$3
    13 
    14     # ff_file which is opened by fd 4 will be really removed after script stopped
    15     mkfifo   ./fifo.$$ &&  exec 4<> ./fifo.$$ && rm -f ./fifo.$$
    16 
    17     # initial fifo: write $cur_num line to $ff_file
    18     for ((i=$start; i<$cur_num+$start; i++)); do
    19         echo "init time add $i" >&4
    20     done
    21 
    22     for((i=$start; i<=$end; i++)); do
    23         read -u 4   # read from mkfifo file
    24         {   # REPLY is var for read
    25             echo -e "-- current loop: [cmd id: $i ; fifo id: $REPLY ]"
    26 
    27             your_func $i
    28             echo "real time add $(($i+$cur_num))"  1>&4 # write to $ff_file
    29         } & # & to backgroud each process in {}
    30     done
    31     wait    # wait all con-current cmd in { } been running over
    32 }
    33 
    34 concurrent 0 8 3

    上面以3为并发数,执行0到8号共9次,以便显示如下执行结果。

     1 bash concurrent.sh
     2 -- current loop: [cmd id: 0 ; fifo id: init time add 0 ]
     3 -- current loop: [cmd id: 1 ; fifo id: init time add 1 ]
     4 -- current loop: [cmd id: 2 ; fifo id: init time add 2 ]
     5 1453518400
     6 1453518400
     7 scp HOSTNAME:/home/USER/path/from/file.0 REMOTE_HOST:/home/USER/path/to/
     8 scp HOSTNAME:/home/USER/path/from/file.2 REMOTE_HOST:/home/USER/path/to/
     9 1453518400
    10 scp HOSTNAME:/home/USER/path/from/file.1 REMOTE_HOST:/home/USER/path/to/
    11 -- current loop: [cmd id: 3 ; fifo id: real time add 3 ]
    12 -- current loop: [cmd id: 4 ; fifo id: real time add 5 ]
    13 -- current loop: [cmd id: 5 ; fifo id: real time add 4 ]
    14 1453518402
    15 scp HOSTNAME:/home/USER/path/from/file.3 REMOTE_HOST:/home/USER/path/to/
    16 1453518402
    17 1453518402
    18 scp HOSTNAME:/home/USER/path/from/file.5 REMOTE_HOST:/home/USER/path/to/
    19 scp HOSTNAME:/home/USER/path/from/file.4 REMOTE_HOST:/home/USER/path/to/
    20 -- current loop: [cmd id: 6 ; fifo id: real time add 6 ]
    21 -- current loop: [cmd id: 7 ; fifo id: real time add 7 ]
    22 -- current loop: [cmd id: 8 ; fifo id: real time add 8 ]
    23 1453518404
    24 scp HOSTNAME:/home/USER/path/from/file.6 REMOTE_HOST:/home/USER/path/to/
    25 1453518404
    26 1453518404
    27 scp HOSTNAME:/home/USER/path/from/file.7 REMOTE_HOST:/home/USER/path/to/
    28 scp HOSTNAME:/home/USER/path/from/file.8 REMOTE_HOST:/home/USER/path/to/

    从date输出的时间上,可以看出,每2秒会执行3个并发。

    说明

    整体过程

    设N的值为并发数。通过在fifo中初始化N行内容(可以为空值),再利用fifo的特性,从fifo中每读一行,启动一次your_func调用,当fifo读完N次时,fifo为空。再读时就会阻塞。这样开始执行时就是N个并发(1-N)。

    当并发执行的进程your_func,任意一个完成操作时,下一步会招待如下语句:

    echo "real time add $(($i+$cur_num))"  1>&4

    这样就对fifo新写入了一行,前面被阻塞的第N+1号待执行的进程read成功,开始进入{}语句块执行。这样通过read fifo的阻塞功能,实现了并发数的控制。

    需要注意的是,当并发数较大时,多个并发进程即使在使用sleep相同秒数模拟时,也会存在进程调度的顺序问题,因而并不是按启动顺序结束的,可能会后启动的进程先结束。

    从而导致如下语句所示的输出中,两个数字并不一定是相等的。并发数越大,这种差异性越大。

    -- current loop: [cmd id: 8 ; fifo id: real time add 9 ]

    自定义函数

    修改自定义函数your_func,这个函数实际只需要一行就完成了。

    your_func()
    {   # use your cmd or func instead of sleep here. don't end with background(&)
        date +%s
        scp HOSTNAME:/home/USER/path/from/file.$1 REMOTE_HOST:/home/USER/path/to/
    }

    需要注意的是,scp命令最后不需要添加压后台的&符号。因为在上一级就已经压后台并发了。

    再来说明concurrent函数的第14行。

    exec digit<>  filename

    这是一个平常很少使用到的命令。特别是‘<>’这个符号。既然不明白我们来查一下系统帮助。

    man bash
    # search 'exec '
    
    Opening File Descriptors for Reading and Writing
           The redirection operator
    
                  [n]<>word
    
           causes  the  file  whose  name  is  the  expansion  of word to be opened for both reading and writing on file
           descriptor n, or on file descriptor 0 if n is not specified.  If the file does not exist, it is created.

    通过man bash来搜索exec加空格,会找到对exec的说明。注意如果直接man exec,会搜索到linux programer's manual,是对execl, execlp, execle, execv, execvp, execvpe - execute a file这一堆系统函数的调用说明。

    还要注意哦,4<> 这几个字符不要加空格,必然连着写。word前可以加空格。

    rm file

    mkfifo先创建管道文件,再通过exec将该文件绑定到文件描述符4。也许你在疑惑后面的rm操作。其实当该文件绑定到文件描述符后,内核已经通过open系统调用打开了该文件,这个时候执行rm操作,删除的是文件的Inode,但concurrent函数已经连接到文件的block块区。

    如果你遇到过这样的情况,你就明白了:如果线上的nginx日志是没有切分的,access.log会越来越大,这时你直接rm access.log文件后,文件不见了,但df查看系统并没有释放磁盘空间。这就是因为rm只是删除了inode,但这之前nginx早已经通过open打开了这个文件,nginx进程的进程控制块中的文件描述符表中对应的fd,已经有相应的文件指针指向了该文件在内存中的文件表,以及其在内存中的v节点表,并最终指向文件的实际存储块。因此nginx依然可以继续写日志,磁盘还在被写入。只有重启或者reload,让进程重新读一次配置,重新打开一遍相应的文件时,才会发现该文件不存在的,并新建该文件。而这时因为Inode节点已经释放,再用df查看时就能看到可用空间增大了。

    不懂可以参考APUE的图3.1及想着说明。

    因此14行的rm并不影响后继脚本执行,直到脚本结束,系统收回所有文件描述符。

    初始化

    18-20行在做初始化管道的工作。其中读取管道有两类写法:

    1 # style 1
    2     for ((i=$start; i<$cur_num+$start; i++)); do
    3         echo "init time add $i" >&4
    4     done
    5 
    6 # style 2
    7     for ((i=$start; i<$cur_num+$start; i++)); do
    8         echo "init time add $i"
    9     done >&4

    差别就是‘>&4’ 这几个字符放在echo语句后面,还是放在done后面,两者都可以,前者针对echo语句,后者针对整个for循环。

    同理,在下一个for循环中,read命令也有两种方式:

    # style 1
    for((i=$start; i<=$end; i++)); do
            read -u 4   
            {   
                your_func $i
                echo "real time add $(($i+$cur_num))"  1>&4 # write to $ff_file
            } & 
    done
    
    # style 2
    for((i=$start; i<=$end; i++)); do
            read   
            {   
                your_func $i
                echo "real time add $(($i+$cur_num))"  1>&4 # write to $ff_file
            } & 
    done <&4

    关于REPLY

    再解释一下REPLY变量。这是上述循环中,用来存放read命令从fifo中读到的内容。其实在整个脚本中,是不需要关注这个点的。不过这里随带也解释一下。

    通过能fifo的不断读写,才实现了echo如下语句:

    -- current loop: [cmd id: 7 ; fifo id: real time add 7 ]

    如何了解到REPLY呢?我们又得man一下了。为了找到read的参数。先man read发现不对。再如下查找,因为read是bash自建命令。

    1 man  bash 
    2 # search  'Shell Variables'
    3 
    4        REPLY  Set to the line of input read by the read builtin command when no arguments are supplied.
  • 相关阅读:
    libevent
    STL中mem_fun和mem_fun_ref的用法
    java的awt和swing的区别于联系
    数据库流程控制的使用IF CASE LOOP LEAVE ITERETA REPEAT WHILE
    mysql的常用函数
    数据库的基本知识点
    使用myeclipse 打包并运行普通java项目
    getClass()与getName()方法
    Java中的final关键字
    基本类型包装类的常量池技术
  • 原文地址:https://www.cnblogs.com/qinqiao/p/concurrent_by_shell_mkfifo.html
Copyright © 2011-2022 走看看