10
1
mirror of https://gitlab.com/scemama/QCaml.git synced 2025-01-03 01:55:40 +01:00

Farm works

This commit is contained in:
Anthony Scemama 2018-10-22 13:12:43 +02:00
parent 45882e28b1
commit 758c65e31c

View File

@ -1,29 +1,26 @@
(* Single process function *)
let run_sequential f stream = let run_sequential f stream =
(* Stream -> f -> Stream *)
let rec next _ = let rec next _ =
try try
let task = Stream.next stream in let task = Stream.next stream in
Some (f task) Some (f task)
with Stream.Failure -> None in with Stream.Failure -> None in
Stream.from next Stream.from next
(* Multi-process functions *)
let debug msg = Printf.eprintf "%d %s\n%!" Parallel.rank msg (* Server side *)
(** Server side *)
let run_parallel_server stream = let run_parallel_server stream =
let fetch_result () = let fetch_result () =
let result, rank, _ = let result, rank, _tag =
Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world
in in
let a, b = match result with | Some t -> t | None -> (Parallel.rank, (-1)) in
Printf.eprintf "Master: Fetch result %d,%d from %d\n%!" a b rank;
result, rank result, rank
in in
@ -34,8 +31,6 @@ Printf.eprintf "Master: Fetch result %d,%d from %d\n%!" a b rank;
| [] -> None , [] | [] -> None , []
| task :: rest -> Some task, rest | task :: rest -> Some task, rest
in in
let a, b = match task with | Some t -> t | None -> 0, 0 in
Printf.eprintf "Master: Send task %d,%d to %d \n%!" a b client_rank;
Mpi.send task client_rank 0 Mpi.comm_world; Mpi.send task client_rank 0 Mpi.comm_world;
rest rest
in in
@ -45,9 +40,12 @@ Printf.eprintf "Master: Send task %d,%d to %d \n%!" a b client_rank;
| 0 -> result_list | 0 -> result_list
| n_todo -> | n_todo ->
begin begin
Printf.eprintf "n_todo = %d\n%!" n_todo;
let result, client = fetch_result () in let result, client = fetch_result () in
let new_result_list = result :: result_list in let new_result_list =
match result with
| None -> result_list
| Some result -> result :: result_list
in
let new_tasklist = send_task client task_list in let new_tasklist = send_task client task_list in
run new_result_list new_tasklist (n_todo-1) run new_result_list new_tasklist (n_todo-1)
end end
@ -55,56 +53,38 @@ Printf.eprintf "n_todo = %d\n%!" n_todo;
let result = let result =
let task_list = let task_list =
(*
let rec to_list accu = let rec to_list accu =
try to_list (Stream.next stream :: accu) try to_list (Stream.next stream :: accu)
with Stream.Failure -> List.rev accu with Stream.Failure -> List.rev accu
in to_list [] in to_list []
*)
[]
in in
let n_todo = List.length task_list in let n_todo = List.length task_list in
run [] task_list (n_todo + Parallel.size - 1) run [] task_list (n_todo + Parallel.size - 1)
|> Stream.of_list |> Stream.of_list
in in
debug "Master: barrier";
Mpi.barrier Mpi.comm_world; Mpi.barrier Mpi.comm_world;
debug "Master: barrier ok";
result result
(** Client side *) (** Client side *)
let run_parallel_client f = let run_parallel_client f =
Unix.sleep (Parallel.rank);
Mpi.send None 0 0 Mpi.comm_world; Mpi.send None 0 0 Mpi.comm_world;
let rec run () = let rec run () =
let task = let task =
Mpi.receive 0 Mpi.any_tag Mpi.comm_world Mpi.receive 0 Mpi.any_tag Mpi.comm_world
in in
let a, b = match task with | Some t -> t | None -> 0, 0 in
Printf.eprintf "Client %d: Received task %d,%d\n%!" Parallel.rank a b;
debug "task received";
match task with match task with
| None -> | None -> Mpi.barrier Mpi.comm_world
begin
Printf.eprintf "Client %d: barrier\n%!" Parallel.rank;
Mpi.barrier Mpi.comm_world;
Printf.eprintf "Client %d: barrier ok\n%!" Parallel.rank;
end
| Some task -> | Some task ->
let result = f task in let result = f task in
begin begin
Unix.sleep 1;
let a, b = result in
Printf.eprintf "Client %d: Send result %d %d\n%!" Parallel.rank a b;
Mpi.send (Some result) 0 0 Mpi.comm_world; Mpi.send (Some result) 0 0 Mpi.comm_world;
run () run ()
end end
in in
run (); run ();
Printf.eprintf "Client %d: barrier ok" Parallel.rank;
Stream.of_list [] Stream.of_list []
@ -114,3 +94,9 @@ let run_parallel f stream =
| 0 -> run_parallel_server stream | 0 -> run_parallel_server stream
| _ -> run_parallel_client f | _ -> run_parallel_client f
let run f stream =
match Mpi.comm_size Mpi.comm_world with
| 1 -> run_sequential f stream
| _ -> run_parallel f stream