Parallel::ForkManage: 一个简单的并行进程用于fork管理:
use Parallel::ForkManager;
my $pm = Parallel::ForkManager->new($MAX_PROCESSES);
DATA_LOOP:
foreach my $data (@all_data) {
# Forks and returns the pid for the child:
my $pid = $pm->start and next DATA_LOOP;
... do some work with $data in the child process ...
$pm->finish; # Terminates the child process
}
描述:
这个模块是用于使用在并行操作中进行操作, 被fork的进程数量应该被限制。
典型的使用是下载器从数百 数千个文件中检索
代码用于下载器应该是这样的:
use LWP::Simple;
use Parallel::ForkManager;
...
my @links=(
["http://www.foo.bar/rulez.data","rulez_data.txt"],
["http://new.host/more_data.doc","more_data.doc"],
...
);
...
# Max 30 processes for parallel download
my $pm = Parallel::ForkManager->new(30);
LINKS:
foreach my $linkarray (@links) {
$pm->start and next LINKS; # do the fork
my ($link, $fn) = @$linkarray;
warn "Cannot get $fn from $link"
if getstore($link, $fn) != RC_OK;
$pm->finish; # do the exit in the child process
}
$pm->wait_all_children;
DESCRIPTION :
这个模块用于在并行操作中进程操作,fork的进程数应该被限制。
典型的使用一个下载器来检索成百/上千个文件。
use LWP::Simple;
use Parallel::ForkManager;
...
my @links=(
["http://www.foo.bar/rulez.data","rulez_data.txt"],
["http://new.host/more_data.doc","more_data.doc"],
...
);
...
# Max 30 processes for parallel download
my $pm = Parallel::ForkManager->new(30);
LINKS:
foreach my $linkarray (@links) {
$pm->start and next LINKS; # do the fork
my ($link, $fn) = @$linkarray;
warn "Cannot get $fn from $link"
if getstore($link, $fn) != RC_OK;
$pm->finish; # do the exit in the child process
}
$pm->wait_all_children;
首先你需要实例化ForkManager 使用"new"构造器。
你必须指定创建的最大进程数。 如果你指定为0,然后不进行fork,
这对调试是有好处
下一步,使用$pm->start 来fork.
$pm 返回0对于child process, 和child pid 对于parent process
"and next" 跳过内部循环在parent process.
NOTE: $pm->start dies if the fork fails.
$pm->finish 中断child process
注意:你不能使用$pm->start 如果你已经在child process里。
如果你需要管理另外的子进程集在child process,
你必须实例化另外一个 Parallel::ForkManager对象。
METHODS :
注释的字母表明方法允许在哪里 P对于parent C对child
new $processes:
实例化一个新的 Parallel::ForkManager object.
你必须指定0(zero), 然后no children 会被forked.
这是用于调试目的。
第2个参数,$tempdir, 是只能用于如果你要children 发回一个引用到一些数据。
如果没有提供,它是设置通过调用 File::Temp::tempdir().
new方法会die如果 temporary directory 不存在或者它不是一个目录
start [ $process_identifier ]
这个方法开始fork, 它返回child process的pid 对于父进程
0对于子进程。
如果$processes 参数对于构造器是0,那么假设你是在child process,
$pm->start simply returns 0.
一个可选的$process_identifier 可以被提供用于这个方法,它是通过run_on_finish使用
finish [ $exit_code [, $data_structure_reference] ]
关闭 child 进程通过退出和接收一个可选的exit code(默认code是0)
可以通过回调在父进程中检索
如果第2个参数提供,child 尝试发送它的内容到parent.
set_max_procs $processes
允许你设置一个新的最大的children数量
wait_all_children:
你可以调用这个方法来等待所有的被fork的进程,这是一个堵塞等待:
reap_finished_children:
这是一个非堵塞请求来收割children,执行回调独立于调用"start"或者"wait_all_children".
is_parent
返回真如果在parent里 返回false在child里:
$pm->wait_all_children;
print "22222222222222222222222222
";
print $pm->is_parent;
print "
";
print "22222222222222222222222222
";
22222222222222222222222222
1
22222222222222222222222222
[oracle@node01 bb]$
max_procs:
返回奖fork的最大进程数
$pm->wait_all_children;
print "22222222222222222222222222
";
print $pm->is_parent;
print "
";
print $pm->max_procs;
print "
";
print "22222222222222222222222222
";
22222222222222222222222222
1
30
22222222222222222222222222
running_procs
返回 forked 进程pids 当前被Parallel::ForkManager监控的。
注意 children 是仍旧被汇报为运行的直到 fork manager收割它们,
通过next call 来start或者wait_all_children。
[oracle@node01 bb]$ ps -ef | grep t1.pl
oracle 11066 1970 3 06:42 pts/0 00:00:00 perl t1.pl
oracle 11068 11066 0 06:42 pts/0 00:00:00 perl t1.pl
oracle 11069 11066 0 06:42 pts/0 00:00:00 perl t1.pl
oracle 11070 11066 0 06:42 pts/0 00:00:00 perl t1.pl
oracle 11071 11066 0 06:42 pts/0 00:00:00 perl t1.pl
oracle 11072 11066 0 06:42 pts/0 00:00:00 perl t1.pl
oracle 11074 2278 0 06:42 pts/1 00:00:00 grep t1.pl
11067
11068
11071
11070
11069$a====8
wait_for_available_procs( $n ):
等待直到$n 可用进程sots 是可用的,如果$n 没有给定 默认是1
CALLBACKS 回调:
你可以定义回调在代码里, 被调用在事件像 开始一个进程或者当finish的时候。
调用可以定义下面的方法:
run_on_finish $code [, $pid ]
你可以定义一个子例程, 当一个child 是被终止时被调用 它是在父进程中调用:
$code 的参数如下:
- pid of the process, which is terminated
- exit code of the program
- identification of the process (if provided in the "start" method)
- exit signal (0-127: signal name)
- core dump (1 if there was core dump at exit)
- datastructure reference or undef (see RETRIEVING DATASTRUCTURES)
run_on_start $code
你可以定义一个子例程当一个child 启动时调用,它调用在一个child 的成功启动在parent 进程
$code的参数如下:
这个$code的参数如下:
- pid of the process which has been started
- identification of the process (if provided in the "start" method)
你可以定义一个子例程,当child process 需要等待startup时被调用。
如果 $period没有被定义,然后一个调用时完成对于每个child.
如果 $period是被定义,然后$code 是被周期性的调用。
module 等待$period 数秒在两个请求之间,
BLOCKING CALLS
当它等待child processes终止,Parallel::ForkManager 是在一个fork和一个hard place(如果你excuse the terrible pun)
底层Perl waitpid 函数 模块依赖可以堵塞直到任何一个指定的或者任何child process 终止
这意味着module 可以做两件事情中的一个 当它等待其中一个child processes终止:
只等待它自己的child processes:
并行get:
这个小的例子可以用于get URLs并行的:
[oracle@node01 cc]$ time perl a1.pl
$c====Parallel::ForkManager
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/1 5.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/5 9.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/4 8.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/3 7.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/2757485/3 3.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/2757485/1 1.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/7 11.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/8 12.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/9 13.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/6 10.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/2 6.txt
$linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/2757485/2 2.txt
real 0m8.121s
user 0m0.939s
sys 0m0.452s
use LWP::Simple;
use Parallel::ForkManager;
my @links=(
["http://blog.csdn.net/zhaoyangjian724/article/category/2757485/1","1.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/2757485/2","2.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/2757485/3","3.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/1","5.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/2","6.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/3","7.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/4","8.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/5","9.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/6","10.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/7","11.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/8","12.txt"],
["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/9","13.txt"]
);
sub getsrore{
my $link=shift;
my $fn=shift;
open fh1,">$fn" || die "open csdn file failed:$!";
my $ua = LWP::UserAgent->new;
$ua->timeout(10);
$ua->env_proxy;
$ua->agent("Mozilla/8.0");
my $response = $ua->get("$link");
print fh1 $response->content;
close fh1;
};
# Max 30 processes for parallel download
my $pm = Parallel::ForkManager->new(30);
LINKS:
foreach my $linkarray (@links) {
$pm->start and next LINKS; # do the fork
print "$linkarray=== @$linkarray
";
my ($link, $fn) = @$linkarray;
&getstore($link, $fn);
$pm->finish; # do the exit in the child process
}
$pm->wait_all_children;
Callbacks:
使用回调程序的例子得到child exit codes:
use strict;
use Parallel::ForkManager;
my $max_procs = 5;
my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
# hash to resolve PID's back to child specific information
my $pm = Parallel::ForkManager->new($max_procs);
# Setup a callback for when a child finishes up so we can
# get it's exit code
$pm->run_on_finish( sub {
my ($pid, $exit_code, $ident) = @_;
print "** $ident just got out of the pool ".
"with PID $pid and exit code: $exit_code
";
});
$pm->run_on_start( sub {
my ($pid, $ident)=@_;
print "** $ident started, pid: $pid
";
});
$pm->run_on_wait( sub {
print "** Have to wait for one children ...
"
},
0.5
);
NAMES:
foreach my $child ( 0 .. $#names ) {
my $pid = $pm->start($names[$child]) and next NAMES;
# This code is the child process
print "This is $names[$child], Child number $child
";
sleep ( 2 * $child );
print "$names[$child], Child $child is about to get out...
";
sleep 1;
$pm->finish($child); # pass an exit code to finish
}
print "Waiting for Children...
";
$pm->wait_all_children;
print "Everybody is out of the pool!
";