Parallel::ForkManage: 一个简单的并行进程用于fork管理:
2017-09-12 16:54
246 查看
Parallel::ForkManage: 一个简单的并行进程用于fork管理: use Parallel::ForkManager; my $pm = Parallel::ForkManager->new($MAX_PROCESSES); DATA_LOOP: foreach my $data (@all_data) { # Forks and returns the pid for the child: my $pid = $pm->start and next DATA_LOOP; ... do some work with $data in the child process ... $pm->finish; # Terminates the child process } 描述: 这个模块是用于使用在并行操作中进行操作, 被fork的进程数量应该被限制。 典型的使用是下载器从数百 数千个文件中检索 代码用于下载器应该是这样的: use LWP::Simple; use Parallel::ForkManager; ... my @links=( ["http://www.foo.bar/rulez.data","rulez_data.txt"], ["http://new.host/more_data.doc","more_data.doc"], ... ); ... # Max 30 processes for parallel download my $pm = Parallel::ForkManager->new(30); LINKS: foreach my $linkarray (@links) { $pm->start and next LINKS; # do the fork my ($link, $fn) = @$linkarray; warn "Cannot get $fn from $link" if getstore($link, $fn) != RC_OK; $pm->finish; # do the exit in the child process } $pm->wait_all_children; DESCRIPTION : 这个模块用于在并行操作中进程操作,fork的进程数应该被限制。 典型的使用一个下载器来检索成百/上千个文件。 use LWP::Simple; use Parallel::ForkManager; ... my @links=( ["http://www.foo.bar/rulez.data","rulez_data.txt"], ["http://new.host/more_data.doc","more_data.doc"], ... ); ... # Max 30 processes for parallel download my $pm = Parallel::ForkManager->new(30); LINKS: foreach my $linkarray (@links) { $pm->start and next LINKS; # do the fork my ($link, $fn) = @$linkarray; warn "Cannot get $fn from $link" if getstore($link, $fn) != RC_OK; $pm->finish; # do the exit in the child process } $pm->wait_all_children; 首先你需要实例化ForkManager 使用"new"构造器。 你必须指定创建的最大进程数。 如果你指定为0,然后不进行fork, 这对调试是有好处 下一步,使用$pm->start 来fork. $pm 返回0对于child process, 和child pid 对于parent process "and next" 跳过内部循环在parent process. NOTE: $pm->start dies if the fork fails. $pm->finish 中断child process 注意:你不能使用$pm->start 如果你已经在child process里。 如果你需要管理另外的子进程集在child process, 你必须实例化另外一个 Parallel::ForkManager对象。 METHODS : 注释的字母表明方法允许在哪里 P对于parent C对child new $processes: 实例化一个新的 Parallel::ForkManager object. 你必须指定0(zero), 然后no children 会被forked. 这是用于调试目的。 第2个参数,$tempdir, 是只能用于如果你要children 发回一个引用到一些数据。 如果没有提供,它是设置通过调用 File::Temp::tempdir(). new方法会die如果 temporary directory 不存在或者它不是一个目录 start [ $process_identifier ] 这个方法开始fork, 它返回child process的pid 对于父进程 0对于子进程。 如果$processes 参数对于构造器是0,那么假设你是在child process, $pm->start simply returns 0. 一个可选的$process_identifier 可以被提供用于这个方法,它是通过run_on_finish使用 finish [ $exit_code [, $data_structure_reference] ] 关闭 child 进程通过退出和接收一个可选的exit code(默认code是0) 可以通过回调在父进程中检索 如果第2个参数提供,child 尝试发送它的内容到parent. set_max_procs $processes 允许你设置一个新的最大的children数量 wait_all_children: 你可以调用这个方法来等待所有的被fork的进程,这是一个堵塞等待: reap_finished_children: 这是一个非堵塞请求来收割children,执行回调独立于调用"start"或者"wait_all_children". is_parent 返回真如果在parent里 返回false在child里: $pm->wait_all_children; print "22222222222222222222222222\n"; print $pm->is_parent; print "\n"; print "22222222222222222222222222\n"; 22222222222222222222222222 1 22222222222222222222222222 [oracle@node01 bb]$ max_procs: 返回奖fork的最大进程数 $pm->wait_all_children; print "22222222222222222222222222\n"; print $pm->is_parent; print "\n"; print $pm->max_procs; print "\n"; print "22222222222222222222222222\n"; 22222222222222222222222222 1 30 22222222222222222222222222 running_procs 返回 forked 进程pids 当前被Parallel::ForkManager监控的。 注意 children 是仍旧被汇报为运行的直到 fork manager收割它们, 通过next call 来start或者wait_all_children。 [oracle@node01 bb]$ ps -ef | grep t1.pl oracle 11066 1970 3 06:42 pts/0 00:00:00 perl t1.pl oracle 11068 11066 0 06:42 pts/0 00:00:00 perl t1.pl oracle 11069 11066 0 06:42 pts/0 00:00:00 perl t1.pl oracle 11070 11066 0 06:42 pts/0 00:00:00 perl t1.pl oracle 11071 11066 0 06:42 pts/0 00:00:00 perl t1.pl oracle 11072 11066 0 06:42 pts/0 00:00:00 perl t1.pl oracle 11074 2278 0 06:42 pts/1 00:00:00 grep t1.pl 11067 11068 11071 11070 11069$a====8 wait_for_available_procs( $n ): 等待直到$n 可用进程sots 是可用的,如果$n 没有给定 默认是1 CALLBACKS 回调: 你可以定义回调在代码里, 被调用在事件像 开始一个进程或者当finish的时候。 调用可以定义下面的方法: run_on_finish $code [, $pid ] 你可以定义一个子例程, 当一个child 是被终止时被调用 它是在父进程中调用: $code 的参数如下: - pid of the process, which is terminated - exit code of the program - identification of the process (if provided in the "start" method) - exit signal (0-127: signal name) - core dump (1 if there was core dump at exit) - datastructure reference or undef (see RETRIEVING DATASTRUCTURES) run_on_start $code 你可以定义一个子例程当一个child 启动时调用,它调用在一个child 的成功启动在parent 进程 $code的参数如下: 这个$code的参数如下: - pid of the process which has been started - identification of the process (if provided in the "start" method) 你可以定义一个子例程,当child process 需要等待startup时被调用。 如果 $period没有被定义,然后一个调用时完成对于每个child. 如果 $period是被定义,然后$code 是被周期性的调用。 module 等待$period 数秒在两个请求之间, BLOCKING CALLS 当它等待child processes终止,Parallel::ForkManager 是在一个fork和一个hard place(如果你excuse the terrible pun) 底层Perl waitpid 函数 模块依赖可以堵塞直到任何一个指定的或者任何child process 终止 这意味着module 可以做两件事情中的一个 当它等待其中一个child processes终止: 只等待它自己的child processes: 并行get: 这个小的例子可以用于get URLs并行的: [oracle@node01 cc]$ time perl a1.pl $c====Parallel::ForkManager $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/1 5.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/5 9.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/4 8.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/3 7.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/2757485/3 3.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/2757485/1 1.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/7 11.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/8 12.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/9 13.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/6 10.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/1756685/2 6.txt $linkarray=== http://blog.csdn.net/zhaoyangjian724/article/category/2757485/2 2.txt real 0m8.121s user 0m0.939s sys 0m0.452s use LWP::Simple; use Parallel::ForkManager; my @links=( ["http://blog.csdn.net/zhaoyangjian724/article/category/2757485/1","1.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/2757485/2","2.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/2757485/3","3.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/1","5.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/2","6.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/3","7.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/4","8.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/5","9.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/6","10.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/7","11.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/8","12.txt"], ["http://blog.csdn.net/zhaoyangjian724/article/category/1756685/9","13.txt"] ); sub getsrore{ my $link=shift; my $fn=shift; open fh1,">$fn" || die "open csdn file failed:$!"; my $ua = LWP::UserAgent->new; $ua->timeout(10); $ua->env_proxy; $ua->agent("Mozilla/8.0"); my $response = $ua->get("$link"); print fh1 $response->content; close fh1; }; # Max 30 processes for parallel download my $pm = Parallel::ForkManager->new(30); LINKS: foreach my $linkarray (@links) { $pm->start and next LINKS; # do the fork print "\$linkarray=== @$linkarray\n"; my ($link, $fn) = @$linkarray; &getstore($link, $fn); $pm->finish; # do the exit in the child process } $pm->wait_all_children; Callbacks: 使用回调程序的例子得到child exit codes: use strict; use Parallel::ForkManager; my $max_procs = 5; my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara ); # hash to resolve PID's back to child specific information my $pm = Parallel::ForkManager->new($max_procs); # Setup a callback for when a child finishes up so we can # get it's exit code $pm->run_on_finish( sub { my ($pid, $exit_code, $ident) = @_; print "** $ident just got out of the pool ". "with PID $pid and exit code: $exit_code\n"; }); $pm->run_on_start( sub { my ($pid, $ident)=@_; print "** $ident started, pid: $pid\n"; }); $pm->run_on_wait( sub { print "** Have to wait for one children ...\n" }, 0.5 ); NAMES: foreach my $child ( 0 .. $#names ) { my $pid = $pm->start($names[$child]) and next NAMES; # This code is the child process print "This is $names[$child], Child number $child\n"; sleep ( 2 * $child ); print "$names[$child], Child $child is about to get out...\n"; sleep 1; $pm->finish($child); # pass an exit code to finish } print "Waiting for Children...\n"; $pm->wait_all_children; print "Everybody is out of the pool!\n";
相关文章推荐
- 一个Linux内核利用init_task进行进程管理的简单例子
- 一个Linux内核利用init_task进行进程管理的简单例子
- fork--用于进程管理的系统调用
- uc笔记07---进程管理,PID,#ps,getxxxid,fork,vfork,system
- MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。
- 【转载】【图文并茂】进程与线程的一个简单解释
- 进程与线程的一个简单解释
- 进程与线程的一个简单解释
- 进程与线程的一个简单解释---阮一峰的网络日志
- 进程与线程的一个简单解释
- 进程与线程的一个简单解释
- 一个简单的类,来管理mysql的结果集的指针。
- kill 进程一个简单函数接口
- 进程与线程的一个简单解释
- pyqt一个简单的计划管理程序
- C++ ostream类包含的一个简单的用于控制格式的成员函数setf precision
- 一个简单的管理Web站点文件的页面程序(修改版)
- 如何写一个简单的守护(精灵)进程原型
- 一个用于图像标注的简单Web程序
- fork函数的返回值实际只有一个值,看似两个值是因为在不同的进程中返回