10
1
mirror of https://gitlab.com/scemama/QCaml.git synced 2025-01-03 10:05:40 +01:00

Farm works

This commit is contained in:
Anthony Scemama 2018-10-22 17:53:13 +02:00
parent 4bcab0647a
commit b95b80000c
3 changed files with 31 additions and 42 deletions

View File

@ -17,7 +17,7 @@ let run_sequential f stream =
let run_parallel_server stream = let run_parallel_server stream =
let fetch_result () = let fetch_result () : 'a option * int =
let result, rank, _tag = let result, rank, _tag =
Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world
in in
@ -25,7 +25,7 @@ let run_parallel_server stream =
in in
let send_task client_rank = let send_task (client_rank : int) : unit =
let task = let task =
try Some (Stream.next stream) try Some (Stream.next stream)
with Stream.Failure -> None with Stream.Failure -> None
@ -34,34 +34,33 @@ let run_parallel_server stream =
in in
let rec run result_list n_todo = let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in
let n_todo =
let f i =
let rec get_result () =
begin
match Stream.peek stream with match Stream.peek stream with
| None -> n_todo-1 | None -> decr n_todo
| _ -> n_todo | _ -> ()
in end;
match n_todo with match !n_todo with
| 0 -> result_list | 0 ->
begin
Mpi.barrier Mpi.comm_world;
None
end
| _ -> | _ ->
begin begin
let result, client = fetch_result () in let result, client = fetch_result () in
let new_result_list =
match result with
| None -> result_list
| Some result -> result :: result_list
in
send_task client; send_task client;
run new_result_list n_todo match result with
| None -> get_result ()
| _ -> result
end end
in in
get_result ()
let result =
let n_todo = Mpi.comm_size Mpi.comm_world in
run [] n_todo
|> Stream.of_list
in in
Mpi.barrier Mpi.comm_world; Stream.from f
result
(** Client side *) (** Client side *)

View File

@ -1,12 +0,0 @@
(** The Farm skeleton, similar to SklMl.
The input is a stream of input data, and the output is a stream of data.
*)
val run : ('a -> 'b) -> 'a Stream.t -> 'b Stream.t
(** Run the [worker_function] on every process by popping elements from the
input stream, and put the results on the output stream. The order of the
output is consistent with the order of the input. *)

View File

@ -26,6 +26,8 @@ let () =
let input = Stream.of_list let input = Stream.of_list
[ (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10) ] [ (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10) ]
in in
let stream =
Farm.run f input Farm.run f input
|> Stream.iter (fun (x,y) -> Printf.printf "%d %d\n" x y) in
Stream.iter (fun (x,y) -> Printf.printf "%d %d\n%!" x y) stream