Pipes



Subsecciones
Casiano Rodríguez León
2010-03-22
="tex2html2960" HREF="node96.html#SECTION035136030000000000000">La Distribución de GRID::Machine
  • La Distribución de Farm::Simple
    Casiano Rodríguez León
    2010-04-19
    pFile("$tmpdir/Error".$p->sorder().".yml", 86 \$w, \@t, $p->begin(), $end, \@errmsg) 87 if $checkpointing; 88 die "No machines left\n" if (keys %workers == keys %error); 89 } 90 91 delete $process{$child}; 92 93 }; 94 95 my $reaper = sub { 96 my $child; 97 98 $process_child->($child) while (($child = waitpid(-1, WNOHANG)) > 0); 99 }; # end reaper 100 101 # Initialize 102 $accum = $job->initialize->($job, $cluster, $self); 103 104 my $sorder = 0; # Current task position 105 { 106 local $SIG{CHLD} = $reaper; 107 while (@tasks) { 108 while (@idles and @tasks) { 109 my $t = shift @tasks; 110 my $w = shift @idles; 111 my $handle = IO::Handle->new(); 112 113 my $c = shift @$t; 114 my $rcmd = $rsh{$w}? "$rsh{$w} $w $c @$t" : "$c @$t"; 115 warn "$rcmd\n" if $debug; 116 117 my $p = Farm::Process->new( 118 fromchild => $handle, task => $t, worker => $w, sorder => $sorder, 119 ); 120 121 $job->on_start($p); 122 my $pid = open($handle, "$rcmd |") || die "Error: can't fork child to $rcmd\n"; 123 124 $p->pid($pid); 125 $process{$pid} = $p; 126 127 $sorder++; 128 } # end while @idles and @tasks 129 130 my $r; 131 while ($r = shift @results) { 132 $combine->($accum, $r); 133 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing; 134 } 135 } # end while (@tasks) 136 } # end scope of reaper 137 138 warn "Last tasks\n" if $debug; 139 while (($_ = wait) > 0) { 140 $process_child->($_); 141 my $r = shift @results; 142 $combine->($accum, $r); 143 DumpFile "$tmpdir/Result".$r->{sorder}.".yml", $r, \$accum if $checkpointing; 144 } 145 146 # Finalize 147 $job->finalize->($job, $cluster, $self); 148 149 return $accum; 150 } 151 152 153 1;

    El Paquete Farm::Job

    pp2@nereida:~/LFARM/script$ sed -ne '155,194p' Simple.pm | cat -n
     1  package Farm::Job;
     2  use base qw(Class::Accessor);
     3  Farm::Job->mk_accessors(qw(tasks combine on_start looks_ok initialize finalize));
     4
     5
     6  my $default_looks_ok = sub { # something written to STDOUT
     7    my $res = shift;
     8    return (ref($res) eq 'ARRAY') && (@{$res} > 0)
     9  };
    10
    11  sub new {
    12    my $class = shift || die "Error building Farm::Job\n";
    13    my %args = @_;
    14
    15    die "farm Error! Supply tasks argument\n" unless defined($args{tasks})
    16                                  and UNIVERSAL::isa($args{tasks}, 'ARRAY')
    17                                  and @{$args{tasks}};
    18
    19    $args{combine} = sub { $_[0] .= "$[1]\n"; } unless defined($args{combine});
    20
    21    $args{on_start} = sub { } unless defined($args{on_start});
    22
    23    $args{looks_ok} =  $default_looks_ok unless defined($args{looks_ok});
    24
    25    # Initialize
    26    $args{initialize} = sub { } unless defined($args{initialize});
    27
    28      die "Error creating job: 'initialize' is not a CODE ref\n"
    29    unless  UNIVERSAL::isa($args{initialize}, 'CODE');
    30
    31    # Finalize
    32    $args{finalize} = sub { } unless defined($args{finalize});
    33
    34      die "Error creating job: 'finalize' is not a CODE ref\n"
    35    unless  UNIVERSAL::isa($args{finalize}, 'CODE');
    36
    37    bless \%args, $class;
    38  }
    39
    40  1;
    

    El Paquete Farm::Process

    pp2@nereida:~/LFARM/script$ sed -ne '196,216p' Simple.pm | cat -n
     1  package Farm::Process;
     2  use base qw(Class::Accessor);
     3
     4  Farm::Process->mk_accessors(qw(task worker fromchild pid sorder begin));
     5
     6  # Mapping of a task onto a machine
     7  # $process{$pid} = Farm::Process->new(
     8  #    fromchild => $handle,
     9  #    task => $t,
    10  #    pid => $pid,
    11  #    worker => $w,
    12  #    sorder => $sorder);
    13  #
    14
    15  sub new {
    16    my $class = shift || die "Error building Farm::Process\n";
    17    my %args = @_;
    18
    19    $args{begin} = time();
    20    bless \%args, $class;
    21  }
    

    El Paquete Farm::Cluster

    pp2@nereida:~/LFARM/script$ sed -ne '218,295p' Simple.pm | cat -n
     1  package Farm::Cluster;
     2
     3  # Defines the cluster
     4  # Farm::Cluster->new(
     5  #       beowulf => { rsh => '/usr/bin/ssh', ... }
     6  #       orion   => { rsh => $rsh, rcp => $rcp, user=> 'fulano', ... }
     7  #       ...
     8  #       );
     9  #
    10  sub new {
    11    my $class = shift || die "Error building Farm::Cluster\n";
    12    my %args = @_;
    13    my %cluster = ();
    14
    15    for (keys %args) {
    16      die "Illegal machine name $_\n" unless defined($_) and m{\w+};
    17      my $m = Farm::Machine->new(name => $_, %{$args{$_}});
    18      $cluster{$_} = $m;
    19    }
    20    # user and dir
    21    #
    22
    23    bless \%cluster, $class;
    24  }
    25
    26  sub processornames {
    27    my $self = shift;
    28
    29    return keys(%$self);
    30  }
    31
    32  sub processor {
    33    my $self = shift; # cluster
    34    my $name = shift; # processor name
    35    my $val = shift;
    36
    37    $self->{$name} = $val if defined($val) and UNIVERSAL::isa($val, 'Farm::Machine');
    38    return $self->{$name};
    39  }
    40
    41  sub processors {
    42    my $self = shift;
    43
    44    return values(%$self);
    45  }
    46
    47  # Returns a hash (beowulf=>'/usr/bin/ssh', nereida => '', ... )
    48  sub rsh {
    49    my $self = shift;
    50
    51    return map { $_ => $self->{$_}{rsh} } keys(%$self);
    52  }
    53
    54  sub cp {
    55    my $cluster = shift;
    56    my $src     = shift;
    57    my $dest    = shift || '';
    58
    59    die "Can't copy file $src\n" unless -r $src;
    60    foreach my $machine ($cluster->processors()) {
    61      if (system($machine->rcp, $src, "$machine->{name}:$dest")) {
    62        warn "Couldn't copy $src to $machine->{name}:$dest: $?\n"
    63      }
    64    }
    65  }
    66
    67  sub rm {
    68    my $cluster = shift;
    69    my $src     = shift || die "Cluster 'rm' error: provide a filename\n";
    70
    71    foreach my $machine ($cluster->processors()) {
    72      if (system($machine->rsh, "$machine->{name}", "rm $src")) {
    73        warn "couldn't rm $src in $machine->{name}: $?\n"
    74      }
    75    }
    76  }
    77
    78  1;
    

    El Paquete Farm::Machine

    pp2@nereida:~/LFARM/script$ sed -ne '297,370p' Simple.pm | cat -n
     1  package Farm::Machine;
     2  use IPC::Run3;
     3  use base qw(Class::Accessor);
     4  Farm::Machine->mk_accessors(qw(name rsh rcp stdout stderr));
     5
     6  use constant NULDEV => \undef;
     7
     8  # Defines machine
     9  # only rsh field now
    10
    11  sub new {
    12    my $class = shift || die "Error building Farm::Machine\n";
    13    my %arg = @_;
    14
    15    die "Provide a name for the machine\n" unless defined($arg{name});
    16
    17    unless (defined($arg{rsh})) {
    18      my $rsh = `which ssh`;
    19      die "Error: define ssh\n" unless defined($rsh);
    20      chomp($rsh),
    21      $arg{rsh} = $rsh;
    22    }
    23
    24    unless (defined($arg{rcp})) {
    25      my $rcp = `which scp`;
    26      die "Error: define scp\n" unless defined($rcp);
    27      chomp($rcp),
    28      $arg{rcp} = $rcp;
    29    }
    30
    31    # Add user field
    32
    33    # Home directory for this machine
    34    $arg{home} = $arg{name} unless exists($arg{home});
    35
    36    # Local directories for this machine
    37    open $arg{stdout}, "> $arg{name}.output";
    38    open $arg{stderr}, "> .$arg{name}.err";
    39
    40    # Add environment variables
    41    # stdin stdout stderr
    42
    43    bless \%arg, $class;
    44  }
    45
    46  sub DESTROY {
    47    my $self = shift;
    48
    49    close($self->stdout);
    50    close($self->stderr);
    51  }
    52
    53  # True if machine accepts ssh connections
    54  {
    55    my $self;
    56    local $SIG{ALRM} = sub { $self->{operative} = 0 };
    57
    58    sub isoperative {
    59      $self = shift;
    60      my $seconds = shift || 1;
    61
    62      my $machine = ['ssh', $self->name,' ps'];
    63      $self->{operative} = 1;
    64      alarm($seconds);
    65      eval {
    66        run3($machine, undef, NULDEV, $self->stderr);
    67      };
    68      $self->{operative} = 0 if $@;
    69      alarm(0);
    70      return $self->{operative};
    71    }
    72  }
    73
    74  1;
    

    El Ejecutable operative.pl

    pp2@nereida:~/LFARM/script$ cat -n operative.pl
     1  #!/usr/bin/perl -w
     2  # Author: Casiano
     3  use strict;
     4  use Farm::Simple;
     5
     6  #### main
     7
     8  # Warning: the format of the printf in pi0.c influences
     9  # the precision
    10  #
    11  my $m = shift || "beowulf";
    12
    13  my $default = {
    14    rsh => '/usr/bin/ssh',
    15    rcp => '/usr/bin/scp',
    16    user => 'casiano',
    17  };
    18
    19  my @processors = qw(orion nereida beowulf);
    20  my $cluster = Farm::Cluster->new( map { $_ => $default } @processors );
    21
    22  my $rm = $cluster->processor($m);
    23  print $rm->isoperative."\n";
    



    Subsecciones
    Casiano Rodríguez León
    2011-02-07
    l8" HREF="http://www.google.es/">googleetsiiullpcgullLHPLHP moodleperlcriticpbpblogsgoogle code project hosting
    Sig: Práctica: Granja con Pipes Sup: Granjas Simples Ant: Ejercicio: Hashes de Manejadores
    Casiano Rodríguez León
    2011-02-21
    dríguez León
    2011-02-21 HTML>