Práctica: Granja con Pipes

Adapte el ejemplo de la sección 3.13.1 para realizar una granja que -utilizando el comando sort - ordene numéricamente un conjunto de ficheros numéricos (para ello use la opción -g de sort) produciendo como salida un unico fichero ordenado. El granjero mezclará los vectores ordenados (opción -m de sort) devueltos por los trabajadores. El código muestra una posible versión de la función de combinación:
sub combineFunction {
  $_[0] = `echo -n '$_[1]' | sort -m -g - /tmp/sorted.dat`;
  open(SORTED, ">/tmp/sorted.dat");
  print SORTED $_[0];
  close(SORTED);
}
En una primera versión puede asumir - si lo desea - un sistema de archivos compartido. Realice una segunda versión en la que no asuma la existencia de NFS: el fichero o vector a ordenar deberá ser transferido al disco del trabajador. Para ello amplíe farm con la capacidad de transferir los ficheros de un directorio local a directorios remotos. Haga perldoc -f rand para obtener información sobre la función de generación aleatoria de números. El siguiente programa puede ser usado para generar los datos:
!/usr/bin/perl -w
# Ej:
#  create_data.pl 5 data/data 500 5 6 7 8 9
use strict;

my $numFich = shift || 3;
my $prefix  = shift || 'data';
my $max_val = shift || 10000;

die "Uso: $0 num_fich prefijo Max len1 len2 ... lenN\n" unless (@ARGV == $numFich);

{
        local $" = "\n";
        for(my $i = 0; $i < $numFich; $i++){
                my @nums = map { int (rand($max_val)) } 1..shift;
                open F,"> $prefix$i.dat";
                print F "@nums";
                close(F);
        }
}

Casiano Rodríguez León
2010-03-22
1 package Farm::Simple; 2 use 5.008008; 3 use strict; 4 use warnings; 5 6 use POSIX; 7 use IO::Handle; 8 use IO::File; 9 use YAML qw(DumpFile LoadFile); 10 use Time::HiRes qw(time); 11 12 use base qw(Class::Accessor); 13 Farm::Simple->mk_accessors(qw(checkpointing debug tracedir)); 14 15 sub new { 16 my $class = shift; 17 my %args = @_; 18 19 $args{checkpointing} = 1 unless defined($args{checkpointing}); 20 $args{debug} = 0 unless defined($args{debug}); 21 $args{tracedir} = 'FARMLOG' unless defined($args{tracedir}); 22 23 bless \%args, $class; 24 } 25 26 sub run { 27 my $self = shift; 28 my $cluster = shift; 29 my $job = shift; 30 31 my @idles = $cluster->processornames; 32 my %rsh = $cluster->rsh; 33 my @tasks = @{$job->tasks}; 34 my $combine = $job->combine; 35 my $looks_ok = $job->looks_ok; 36 my $checkpointing = $self->checkpointing; 37 my $debug = $self->debug; 38 39 my %process; # Key = PID Value = {task=>$t, pid=>$p, worker=>$w, sorder=>$s, begin=>$t} 40 my $accum; # Accumulator 41 my @results; # Results. 42 43 my %workers; # Set of workers 44 @workers{@idles} = (); 45 my %error; # $Error{$w} = [ tasks that failed when executed on $w ] 46 47 my $rorder = 0; # reception order 48 my $tmpdir = $self->tracedir; 49 mkdir $tmpdir if $checkpointing; 50 51 my $process_child = sub { 52 my $child = shift; 53 my $p = $process{$child}; 54 55 my @errmsg = ($?, 0+$!, "$!"); 56 my @t = @{$p->task()}; 57 my $w = $p->worker; 58 59 warn "reaping $child (@t) from worker $w. Errvars: @errmsg\n" if $debug; 60 61 my $handle = $p->fromchild; # Recuperamos el canal con ese hijo 62 my @result = <$handle>; 63 my $end = time(); 64 if ($looks_ok->(\@result, $p, @errmsg)) { 65 push @idles, $w; # Now $w is idle again 66 my $r = { 67 task => \@t, 68 result => \@result, 69 worker => $w, 70 begin => $p->begin(), 71 end => $end, 72 PID => $child, 73 sorder => $p->sorder(), 74 rorder => $rorder++, 75 }; 76 push @results, $r; 77 78 my @segtoshow = @result>2? @result[0,1]:@result; 79 warn "From $w ($child) received result:\n@segtoshow". 80 "for task (@t)\n" if $debug; 81 } 82 else { 83 warn "Error processing task @t on $w. Errmsg: @errmsg\n"; 84 push @{$error{$w}}, \@t; 85 DumpFile("$tmpdir/Error".$p->sorder().".yml", 86 \$w, \@t, $p->begin(), $end, \@errmsg) 87 if $checkpointing; 88 die "No machines left\n" if (keys %workers == keys %error); 89 } 90 91 delete $process{$child}; 92 93 }; 94 95 my $reaper = sub { 96 my $child; 97 98 $process_child->($child) while (($child = waitpid(-1, WNOHANG)) > 0); 99 }; # end reaper 100 101 # Initialize 102 $accum = $job->initialize->($job, $cluster, $self); 103 104 my $sorder = 0; # Current task position 105 { 106 local $SIG{CHLD} = $reaper; 107 while (@tasks) { 108 while (@idles and @tasks) { 109 my $t = shift @tasks; 110 my $w = shift @idles; 111 my $handle = IO::Handle->new(); 112 113 my $c = shift @$t; 114 my $rcmd = $rsh{$w}? "$rsh{$w} $w $c @$t" : "$c @$t"; 115 warn "$rcmd\n" if $debug; 116 117 my $p = Farm::Process->new( 118 fromchild => $handle, task => $t, worker => $w, sorder => $sorder, 119 ); 120 121 $job->on_start($p); 122 my $pid = open($handle, "$rcmd |") || die "Error: can't fork child to $rcmd\n"; 123 124 $p->pid($pid); 125 $process{$pid} = $p; 126 127 $sorder++; 128 } # end while @idles and @tasks 129 130 my $r; 131 while ($r = shift @results) { 132 $combine->($accum, $r); 133 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing; 134 } 135 } # end while (@tasks) 136 } # end scope of reaper 137 138 warn "Last tasks\n" if $debug; 139 while (($_ = wait) > 0) { 140 $process_child->($_); 141 my $r = shift @results; 142 $combine->($accum, $r); 143 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing; 144 } 145 146 # Finalize 147 $job->finalize->($job, $cluster, $self); 148 149 return $accum; 150 } 151 152 153 1;

El Paquete Farm::Job

pp2@nereida:~/LFARM/script$ sed -ne '155,194p' Simple.pm | cat -n
 1  package Farm::Job;
 2  use base qw(Class::Accessor);
 3  Farm::Job->mk_accessors(qw(tasks combine on_start looks_ok initialize finalize));
 4
 5
 6  my $default_looks_ok = sub { # something written to STDOUT
 7    my $res = shift;
 8    return (ref($res) eq 'ARRAY') && (@{$res} > 0)
 9  };
10
11  sub new {
12    my $class = shift || die "Error building Farm::Job\n";
13    my %args = @_;
14
15    die "farm Error! Supply tasks argument\n" unless defined($args{tasks})
16                                  and UNIVERSAL::isa($args{tasks}, 'ARRAY')
17                                  and @{$args{tasks}};
18
19    $args{combine} = sub { $_[0] .= "$[1]\n"; } unless defined($args{combine});
20
21    $args{on_start} = sub { } unless defined($args{on_start});
22
23    $args{looks_ok} =  $default_looks_ok unless defined($args{looks_ok});
24
25    # Initialize
26    $args{initialize} = sub { } unless defined($args{initialize});
27
28      die "Error creating job: 'initialize' is not a CODE ref\n"
29    unless  UNIVERSAL::isa($args{initialize}, 'CODE');
30
31    # Finalize
32    $args{finalize} = sub { } unless defined($args{finalize});
33
34      die "Error creating job: 'finalize' is not a CODE ref\n"
35    unless  UNIVERSAL::isa($args{finalize}, 'CODE');
36
37    bless \%args, $class;
38  }
39
40  1;

El Paquete Farm::Process

pp2@nereida:~/LFARM/script$ sed -ne '196,216p' Simple.pm | cat -n
 1  package Farm::Process;
 2  use base qw(Class::Accessor);
 3
 4  Farm::Process->mk_accessors(qw(task worker fromchild pid sorder begin));
 5
 6  # Mapping of a task onto a machine
 7  # $process{$pid} = Farm::Process->new(
 8  #    fromchild => $handle,
 9  #    task => $t,
10  #    pid => $pid,
11  #    worker => $w,
12  #    sorder => $sorder);
13  #
14
15  sub new {
16    my $class = shift || die "Error building Farm::Process\n";
17    my %args = @_;
18
19    $args{begin} = time();
20    bless \%args, $class;
21  }

El Paquete Farm::Cluster

pp2@nereida:~/LFARM/script$ sed -ne '218,295p' Simple.pm | cat -n
 1  package Farm::Cluster;
 2
 3  # Defines the cluster
 4  # Farm::Cluster->new(
 5  #       beowulf => { rsh => '/usr/bin/ssh', ... }
 6  #       orion   => { rsh => $rsh, rcp => $rcp, user=> 'fulano', ... }
 7  #       ...
 8  #       );
 9  #
10  sub new {
11    my $class = shift || die "Error building Farm::Cluster\n";
12    my %args = @_;
13    my %cluster = ();
14
15    for (keys %args) {
16      die "Illegal machine name $_\n" unless defined($_) and m{\w+};
17      my $m = Farm::Machine->new(name => $_, %{$args{$_}});
18      $cluster{$_} = $m;
19    }
20    # user and dir
21    #
22
23    bless \%cluster, $class;
24  }
25
26  sub processornames {
27    my $self = shift;
28
29    return keys(%$self);
30  }
31
32  sub processor {
33    my $self = shift; # cluster
34    my $name = shift; # processor name
35    my $val = shift;
36
37    $self->{$name} = $val if defined($val) and UNIVERSAL::isa($val, 'Farm::Machine');
38    return $self->{$name};
39  }
40
41  sub processors {
42    my $self = shift;
43
44    return values(%$self);
45  }
46
47  # Returns a hash (beowulf=>'/usr/bin/ssh', nereida => '', ... )
48  sub rsh {
49    my $self = shift;
50
51    return map { $_ => $self->{$_}{rsh} } keys(%$self);
52  }
53
54  sub cp {
55    my $cluster = shift;
56    my $src     = shift;
57    my $dest    = shift || '';
58
59    die "Can't copy file $src\n" unless -r $src;
60    foreach my $machine ($cluster->processors()) {
61      if (system($machine->rcp, $src, "$machine->{name}:$dest")) {
62        warn "Couldn't copy $src to $machine->{name}:$dest: $?\n"
63      }
64    }
65  }
66
67  sub rm {
68    my $cluster = shift;
69    my $src     = shift || die "Cluster 'rm' error: provide a filename\n";
70
71    foreach my $machine ($cluster->processors()) {
72      if (system($machine->rsh, "$machine->{name}", "rm $src")) {
73        warn "couldn't rm $src in $machine->{name}: $?\n"
74      }
75    }
76  }
77
78  1;

El Paquete Farm::Machine

pp2@nereida:~/LFARM/script$ sed -ne '297,370p' Simple.pm | cat -n
 1  package Farm::Machine;
 2  use IPC::Run3;
 3  use base qw(Class::Accessor);
 4  Farm::Machine->mk_accessors(qw(name rsh rcp stdout stderr));
 5
 6  use constant NULDEV => \undef;
 7
 8  # Defines machine
 9  # only rsh field now
10
11  sub new {
12    my $class = shift || die "Error building Farm::Machine\n";
13    my %arg = @_;
14
15    die "Provide a name for the machine\n" unless defined($arg{name});
16
17    unless (defined($arg{rsh})) {
18      my $rsh = `which ssh`;
19      die "Error: define ssh\n" unless defined($rsh);
20      chomp($rsh),
21      $arg{rsh} = $rsh;
22    }
23
24    unless (defined($arg{rcp})) {
25      my $rcp = `which scp`;
26      die "Error: define scp\n" unless defined($rcp);
27      chomp($rcp),
28      $arg{rcp} = $rcp;
29    }
30
31    # Add user field
32
33    # Home directory for this machine
34    $arg{home} = $arg{name} unless exists($arg{home});
35
36    # Local directories for this machine
37    open $arg{stdout}, "> $arg{name}.output";
38    open $arg{stderr}, "> .$arg{name}.err";
39
40    # Add environment variables
41    # stdin stdout stderr
42
43    bless \%arg, $class;
44  }
45
46  sub DESTROY {
47    my $self = shift;
48
49    close($self->stdout);
50    close($self->stderr);
51  }
52
53  # True if machine accepts ssh connections
54  {
55    my $self;
56    local $SIG{ALRM} = sub { $self->{operative} = 0 };
57
58    sub isoperative {
59      $self = shift;
60      my $seconds = shift || 1;
61
62      my $machine = ['ssh', $self->name,' ps'];
63      $self->{operative} = 1;
64      alarm($seconds);
65      eval {
66        run3($machine, undef, NULDEV, $self->stderr);
67      };
68      $self->{operative} = 0 if $@;
69      alarm(0);
70      return $self->{operative};
71    }
72  }
73
74  1;

El Ejecutable operative.pl

pp2@nereida:~/LFARM/script$ cat -n operative.pl
 1  #!/usr/bin/perl -w
 2  # Author: Casiano
 3  use strict;
 4  use Farm::Simple;
 5
 6  #### main
 7
 8  # Warning: the format of the printf in pi0.c influences
 9  # the precision
10  #
11  my $m = shift || "beowulf";
12
13  my $default = {
14    rsh => '/usr/bin/ssh',
15    rcp => '/usr/bin/scp',
16    user => 'casiano',
17  };
18
19  my @processors = qw(orion nereida beowulf);
20  my $cluster = Farm::Cluster->new( map { $_ => $default } @processors );
21
22  my $rm = $cluster->processor($m);
23  print $rm->isoperative."\n";



Subsecciones
Casiano Rodríguez León
2010-04-19