10
1
mirror of https://gitlab.com/scemama/QCaml.git synced 2025-01-05 02:48:37 +01:00
QCaml/Parallel_mpi/Farm.ml

247 lines
6.5 KiB
OCaml
Raw Normal View History

2019-02-28 22:21:13 +01:00
(********************************************************************)
(* Single process *)
(********************************************************************)
2018-10-21 23:54:28 +02:00
let run_sequential f stream =
2018-10-22 13:12:43 +02:00
let rec next _ =
try
let task = Stream.next stream in
Some (f task)
2019-02-19 17:36:07 +01:00
with Stream.Failure -> None
in
Stream.from next
2018-10-21 23:54:28 +02:00
2019-02-28 22:21:13 +01:00
(********************************************************************)
(* Server side *)
(********************************************************************)
2018-10-21 23:54:28 +02:00
2018-10-22 19:10:38 +02:00
type task_id = int
2019-02-28 22:21:13 +01:00
let debug _s =
2019-03-02 16:48:35 +01:00
if true then
()
else
Printf.eprintf "%d : %s : %s\n%!" (Mpi.comm_rank Mpi.comm_world) (Unix.gettimeofday () |> string_of_float) _s
type status =
| Initializing
| Running
| Done
2018-10-22 19:10:38 +02:00
2019-04-04 09:14:15 +02:00
let run_parallel_server ~comm ~ordered stream =
2018-10-21 23:54:28 +02:00
2019-03-02 16:48:35 +01:00
(* [status.(rank)] is [Initializing] if rank has not yet obtained a task,
[Running] if rank is running a task and [Done] if [rank] is waiting at
the barrier.
2019-02-28 22:21:13 +01:00
*)
2019-04-04 09:14:15 +02:00
let status = Array.make (Mpi.comm_size comm) Initializing in
2019-03-02 16:48:35 +01:00
status.(0) <- Done;
2018-10-21 23:54:28 +02:00
2019-02-28 22:21:13 +01:00
(** Fetches a result coming from any client. Returns the result
as a (task_id * 'a) option and the rank of the client as an int.
*)
2018-10-22 19:10:38 +02:00
let fetch_result () : (task_id * 'a) option * int =
let (message, rank, _tag) : (task_id * 'a) option * int * int =
2019-02-28 22:21:13 +01:00
debug "Before receive_status";
2019-03-20 23:19:52 +01:00
(* Avoid busy receive *)
let rec wait_and_receive () =
2019-04-04 09:14:15 +02:00
match Mpi.iprobe Mpi.any_source Mpi.any_tag comm with
| Some _ -> Mpi.receive_status Mpi.any_source Mpi.any_tag comm
2019-09-10 18:39:14 +02:00
| None -> (Unix.sleepf 0.001 ; (wait_and_receive [@tailcall]) ())
2019-03-20 23:19:52 +01:00
in
wait_and_receive ()
2018-10-21 23:54:28 +02:00
in
2019-02-28 22:21:13 +01:00
debug @@ Printf.sprintf "After receive_status %d %d" rank _tag;
2018-10-22 19:10:38 +02:00
message, rank
2018-10-21 23:54:28 +02:00
in
2019-02-28 22:21:13 +01:00
(** Pops a task from the stream and sends it to a client.
If no task is available, sends [None].
The return value is a boolean telling if the stream is empty.
*)
2019-03-02 16:48:35 +01:00
let send_task (client_rank : int) : unit =
2018-10-22 13:39:02 +02:00
let task =
2018-10-22 19:10:38 +02:00
try
let task_id = Stream.count stream in
let element = Stream.next stream in
Some (task_id, element)
2018-10-22 13:39:02 +02:00
with Stream.Failure -> None
2018-10-21 23:54:28 +02:00
in
2019-02-28 22:21:13 +01:00
debug @@ Printf.sprintf "Sending to %d\n" client_rank;
2019-04-04 09:14:15 +02:00
Mpi.send task client_rank 0 comm;
2019-02-28 22:21:13 +01:00
debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank
2019-03-02 16:48:35 +01:00
(if task = None then "None" else "Some");
if task <> None then
status.(client_rank) <- Running
else
status.(client_rank) <- Done
2018-10-21 23:54:28 +02:00
in
2019-03-02 16:48:35 +01:00
let all_done () =
try
Array.iter (fun i -> if i <> Done then raise Exit) status;
true
with Exit -> false
in
2018-10-22 17:53:13 +02:00
2019-02-28 22:21:13 +01:00
(** Main loop.
While [n_todo > 0], fetch a result from a client
and send it back a new task. If no more tasks are
available, send [None]. If the result of the task
is None, loop back into [get_result].
*)
let rec get_result () : (task_id * 'a ) option =
2019-03-02 16:48:35 +01:00
if all_done () then
begin
debug "Before barrier";
2019-04-04 09:14:15 +02:00
Mpi.barrier comm;
2019-02-28 22:21:13 +01:00
debug "After barrier";
2019-03-02 16:48:35 +01:00
None
end
else
begin
let message, rank = fetch_result () in
send_task rank;
match message with
| None -> get_result ()
| Some (task_id, result) -> Some (task_id, result)
end
2018-10-22 19:10:38 +02:00
in
2019-02-28 22:21:13 +01:00
2019-02-25 14:37:20 +01:00
2019-02-28 22:21:13 +01:00
(** Function from which the output stream is built. *)
2018-10-22 19:10:38 +02:00
let f =
2018-10-23 13:39:06 +02:00
2019-02-28 22:21:13 +01:00
if not ordered then
(* If [ordered] is false, the values are popped out whenever they arrive
from the clients.
*)
fun _ ->
match get_result () with
| Some (_, result) -> Some result
| None -> None
2018-10-23 13:39:06 +02:00
2019-02-28 22:21:13 +01:00
else
(* If [ordered] is true, out into the stream when the next expected task has
been computed.
2019-02-25 14:37:20 +01:00
*)
2019-02-28 22:21:13 +01:00
let buffer =
(* buffer of finished tasks with a task_id greater than the
current result_id. It allows to put back the results in
the correct order.
*)
Hashtbl.create 67
in
2018-10-23 13:39:06 +02:00
2019-02-25 14:37:20 +01:00
fun i ->
begin
2018-10-22 19:10:38 +02:00
match Hashtbl.find_opt buffer i with
| Some x ->
2019-02-25 14:37:20 +01:00
begin
Hashtbl.remove buffer i;
Some x
end
2018-10-22 19:10:38 +02:00
| None ->
2019-02-25 14:37:20 +01:00
let rec loop () =
match get_result () with
| None -> None
| Some (task_id, result) ->
if task_id = i then Some result
2019-09-10 18:39:14 +02:00
else (Hashtbl.add buffer task_id result; (loop [@tailcall]) () )
2019-02-25 14:37:20 +01:00
in loop ()
end
2018-10-21 23:54:28 +02:00
in
2018-10-22 17:53:13 +02:00
Stream.from f
2018-10-21 23:54:28 +02:00
2019-02-28 22:21:13 +01:00
(********************************************************************)
(* Client side *)
(********************************************************************)
2019-04-04 09:14:15 +02:00
let run_parallel_client ~comm f =
2018-10-21 23:54:28 +02:00
2019-02-28 22:21:13 +01:00
(** Send a first message containing [None] to request a task *)
debug "Before send None";
2019-04-04 09:14:15 +02:00
Mpi.send None 0 0 comm;
2019-02-28 22:21:13 +01:00
debug "After send None";
(** Main loop.
Receive a message. If the message is [None], there are no more
tasks to compute and we can go to the barrier.
If the message is not [None], apply [f] to the task, send the
result back to the server and loop.
*)
2018-10-21 23:54:28 +02:00
let rec run () =
2019-02-28 22:21:13 +01:00
2018-10-22 19:10:38 +02:00
let message =
2019-02-28 22:21:13 +01:00
debug "Before receive";
2019-04-04 09:14:15 +02:00
Mpi.receive 0 0 comm
2018-10-21 23:54:28 +02:00
in
2019-02-28 22:21:13 +01:00
debug "After receive" ;
2018-10-21 23:54:28 +02:00
2018-10-22 19:10:38 +02:00
match message with
2019-02-28 22:21:13 +01:00
| None ->
( debug "Before barrier";
2019-04-04 09:14:15 +02:00
Mpi.barrier comm;
2019-02-28 22:21:13 +01:00
debug "After barrier";)
2018-10-22 19:10:38 +02:00
| Some (task_id, task) ->
2018-10-21 23:54:28 +02:00
let result = f task in
begin
2019-02-28 22:21:13 +01:00
debug @@ Printf.sprintf "Before send task_id %d" task_id ;
2019-04-04 09:14:15 +02:00
Mpi.send (Some (task_id, result)) 0 0 comm;
2019-02-28 22:21:13 +01:00
debug @@ Printf.sprintf "After send task_id %d" task_id ;
2019-09-10 18:39:14 +02:00
(run [@tailcall]) ()
2018-10-21 23:54:28 +02:00
end
in
run ();
2019-02-28 22:21:13 +01:00
(* The output is an empty stream so that the type of run_parallel_client
is the same as the type of the server function.
*)
2018-10-21 23:54:28 +02:00
Stream.of_list []
2019-04-04 09:14:15 +02:00
let run_parallel ~comm ~ordered f stream =
match Mpi.comm_rank comm with
| 0 -> run_parallel_server ~comm ~ordered stream
| _ -> run_parallel_client ~comm f
2018-10-21 23:54:28 +02:00
let nested = ref false
2018-10-22 13:12:43 +02:00
2019-04-04 09:14:15 +02:00
let run ?(ordered=true) ?(comm=Mpi.comm_world) ~f stream =
if !nested then
begin
let message =
"Nested parallel regions are not supported by Farm.ml"
in
Printf.eprintf "%s\n%!" message ;
2019-04-04 09:14:15 +02:00
failwith message
end;
nested := true;
let result =
2019-04-04 09:14:15 +02:00
match Mpi.comm_size comm with
| 1 -> run_sequential f stream
2019-04-04 09:14:15 +02:00
| _ -> run_parallel ~comm ~ordered f stream
in
nested := false;
result
2018-10-22 13:12:43 +02:00
2019-01-15 15:17:34 +01:00