Granjas Simples



Subsecciones
Casiano Rodríguez León
2010-03-22
\right)}$">   $\displaystyle \mbox{ con $k = 0 \ldots NT-1$}$ (3.2)

Para realizar la tarea usamos un programa escrito en C que calcula la suma parcial explicitada en la ecuación anterior:
lhp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n pi0.c
 1  #include <stdio.h>
 2  #include <stdlib.h>
 3
 4  main(int argc, char **argv) {
 5    int id, N, np, i;
 6    double sum, left;
 7
 8    if (argc != 4) {
 9      printf("Uso:\n%s id N np\n",argv[0]);
10      exit(1);
11    }
12    id = atoi(argv[1]);
13    N = atoi(argv[2]);
14    np = atoi(argv[3]);
15    for(i=id, sum = 0; i<N; i+=np) {
16      double x = (i + 0.5)/N;
17      sum += 4 / (1 + x*x);
18    }
19    sum /= N;
20    printf("%lf\n", sum);
21  }
El programa espera tres argumentos:
lhp@nereida:~/Lperl/src/perl_networking/ch2$ pi0
Uso:
pi0 id N np
lhp@nereida:~/Lperl/src/perl_networking/ch2$ pi0 2 1000 4
0.785148
Estos argumentos son: el identificador lógico del procesador, el número de subintervalos en que se particiona el intervalo [0,1] y el número de procesos.

Inicialización

El ejecutable -originalmente en la máquina capataz o farmer - se copia en cada máquina obrero o worker (se asume compatibilidad del ejecutable). Esta tarea (línea 13, subrutina initialize) es uno de los parámetros/manejadores que recibe la subrutina farm. Será disparada por farm al comienzo de la computación:

13  sub initialize {
14    my ($tasks, $procs) = @_;
15
16    # Assume all machines have the same architecture
17    my %Machines;
18    @Machines{@$procs} = ();
19    foreach my $machine (keys %Machines) {
20      die "couldn't copy $executable to $machine:$rdir: $?\n"
21         if system($rcp, $executable, "$machine:$rdir");
22    }
23    return 0; # initial value for the accumulator
24  }

La función initialize retorna el valor inicial para el ''acumulador'' del granjero (en este caso 0, línea 23). Cada vez que termina una tarea se ejecuta un manejador de combinación. El acumulador es una variable que será pasada a dicho manejador de combinación junto con el resultado de la ejecución de la tarea que acaba de finalizar. El resultado retornado por el manejador se guarda en el ''acumulador''.

El Granjero

Después de iniciar el acumulador en las líneas 53-54 el granjero (subrutina farm en la línea 37) divide su tiempo dentro del bucle (líneas 56-86) en enviar tareas a los procesadores ociosos (líneas 57-72) y recolectar resultados desde los que han acabado una tarea (líneas 74-85).

La salida del bucle principal indica que todas las tareas han sido enviadas. No significa sin embargo que todos los resultados hayan sido recibidos, pues puede suceder que los últimos procesos envíados no hayan terminado.

En las líneas 89-95 se espera por la resolución de las últimas tareas.

37  sub farm {
38    my %args = @_;
39    my @tasks = @{$args{tasks} || die "farm Error! Supply tasks argument\n"};
40    my @idles = @{$args{processors} || die "farm Error! Supply processors argument\n"};
41    my $rsh = ($args{rsh} || "/usr/bin/ssh" || "/usr/bin/rsh"); chomp($rsh);
42    my $command = ($args{command} || die "farm Error! Supply a command argument\n");
43    my $combine = ($args{combine} || sub { $_[0] .= "$[1]\n"; });
44    my $debug = ($args{debug} || 0);
45
46    my %FROMCHILD; # Key = PID Value = IO handler for that process
47    my %Task;      # Key = PID Value = [ lista de parámetros de la tarea ]
48    my %Worker;    # Key = PID Value = machine name or address
49
50    my $handle;
51
52    # Initialize
53    my $accum = defined($args{initialize})?
54          $args{initialize}->(\@tasks, \@idles, $rsh,  $command, $debug):undef;
55
56    while (@tasks) {
57      if (@idles) {
58        my $t = shift @tasks;
59        my $w = shift @idles;
60        $handle = IO::Handle->new();
61
62        my $rcmd = "$rsh $w '".
63                  $command->($t, $w, $debug).
64                  "'";
65        warn "$rcmd\n" if $debug;
66
67        my $pid = open($handle, "$rcmd |");
68
69        $FROMCHILD{$pid} = $handle;
70        $Task{$pid} = $t;
71        $Worker{$pid} = $w;
72      }
73
74      my $child = waitpid(-1, WNOHANG);
75      if ($child > 0) { # Hijo cosechado
76        my @t = @{$Task{$child}}; delete($Task{$child});
77        my $w = $Worker{$child};  delete($Worker{$child});
78
79        $handle = $FROMCHILD{$child}; # Recuperamos el canal con ese hijo
80        my @result = <$handle>;
81        push @idles, $w; # Now $w is idle again
82        $combine->($accum, \@result, \@t, $w);
83        warn "From $w ($child) received result:\n@result[0..$#result>1?1:$#result]".
84             "for task (@t)\n" if $debug;
85      }
86    }
87
88    warn "Last tasks\n" if $debug;
89    while (($_ = wait) > 0) {
90      my @task = @{$Task{$_}};
91      $handle = $FROMCHILD{$_};
92      my @result = <$handle>;
93      $combine->($accum, \@result);
94      warn "$Worker{$_} ($_) task (@task), Combined = $accum\n" if $debug;
95    }
96    return $accum;
97  }

El manejador command_handler recibe la tarea (la lista anónima de parámetros) y retorna la cadena con el comando a ejecutar.

25  sub command_handler {
26    my $t = shift;
27    my @parameters = @$t;
28
29    return "/tmp/pi0 @parameters";
30  }
El manejador de combinación suma los dos parámetros que recibe:
32  sub combine {
33    $_[0] += ${$_[1]}[0];
34  }

Controlando el Login de Usuario

El guión asume que se ha instalado un sistema de autentificación automática usando parejas clave pública-clave privada y agentes. El guión no considera el caso en que el login de usuario cambia de máquina a máquina. Una solución a este problema es incluir un fichero de configuración ssh:

hp@nereida:~/Lperl/src/perl_networking/ch2$ cat -n ~/.ssh/config
     1  # man  ssh_config
     2  Host machine1.domain
     3  user casiano
     4
     5  Host machine2.domain
     6  user pp2



Subsecciones
Casiano Rodríguez León
2010-05-05