10
1
mirror of https://gitlab.com/scemama/QCaml.git synced 2024-06-29 16:34:43 +02:00
QCaml/Parallel/Farm.ml

143 lines
3.1 KiB
OCaml
Raw Normal View History

2018-10-22 13:12:43 +02:00
(* Single process function *)
2018-10-21 23:54:28 +02:00
let run_sequential f stream =
2018-10-22 13:12:43 +02:00
let rec next _ =
try
let task = Stream.next stream in
Some (f task)
with Stream.Failure -> None in
Stream.from next
2018-10-21 23:54:28 +02:00
2018-10-22 13:12:43 +02:00
(* Multi-process functions *)
2018-10-21 23:54:28 +02:00
2018-10-22 19:10:38 +02:00
type task_id = int
2018-10-22 13:12:43 +02:00
(* Server side *)
2018-10-22 19:10:38 +02:00
let run_parallel_server ~ordered stream =
2018-10-21 23:54:28 +02:00
2018-10-22 19:10:38 +02:00
let fetch_result () : (task_id * 'a) option * int =
let (message, rank, _tag) : (task_id * 'a) option * int * int =
2018-10-21 23:54:28 +02:00
Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world
in
2018-10-22 19:10:38 +02:00
message, rank
2018-10-21 23:54:28 +02:00
in
2018-10-22 17:53:13 +02:00
let send_task (client_rank : int) : unit =
2018-10-22 13:39:02 +02:00
let task =
2018-10-22 19:10:38 +02:00
try
let task_id = Stream.count stream in
let element = Stream.next stream in
Some (task_id, element)
2018-10-22 13:39:02 +02:00
with Stream.Failure -> None
2018-10-21 23:54:28 +02:00
in
2018-10-22 13:39:02 +02:00
Mpi.send task client_rank 0 Mpi.comm_world
2018-10-21 23:54:28 +02:00
in
2018-10-22 19:10:38 +02:00
(* n_todo is required for clean termination. It is the
number of tasks to wait for when the input stream is
empty. *)
2018-10-22 17:53:13 +02:00
let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in
2018-10-22 19:10:38 +02:00
let rec get_result () : (task_id * 'a ) option =
begin
match Stream.peek stream with
| None -> decr n_todo
| _ -> ()
end;
match !n_todo with
| 0 ->
2018-10-21 23:54:28 +02:00
begin
2018-10-22 19:10:38 +02:00
Mpi.barrier Mpi.comm_world;
None
end
| _ ->
begin
let message, rank = fetch_result () in
send_task rank;
match message with
| None -> get_result ()
| Some (task_id, result) -> Some (task_id, result)
end
in
let f =
2018-10-23 13:39:06 +02:00
2018-10-22 19:10:38 +02:00
if ordered then
2018-10-23 13:39:06 +02:00
(* buffer of finished tasks with a task_id greater than the
current result_id. It allows to put back the results in
the correct order.
*)
2018-10-22 19:10:38 +02:00
let buffer = Hashtbl.create 67 in
2018-10-23 13:39:06 +02:00
2018-10-22 19:10:38 +02:00
fun i ->
begin
match Hashtbl.find_opt buffer i with
| Some x ->
begin
Hashtbl.remove buffer i;
Some x
end
| None ->
let rec loop () =
match get_result () with
| None -> None
| Some (task_id, result) ->
if task_id = i then Some result
else (Hashtbl.add buffer task_id result; loop () )
in loop ()
end
2018-10-23 13:39:06 +02:00
2018-10-22 19:10:38 +02:00
else
2018-10-23 13:39:06 +02:00
2018-10-22 19:10:38 +02:00
fun _ ->
match get_result () with
| Some (_, result) -> Some result
| None -> None
2018-10-23 13:39:06 +02:00
2018-10-21 23:54:28 +02:00
in
2018-10-22 17:53:13 +02:00
Stream.from f
2018-10-21 23:54:28 +02:00
(** Client side *)
let run_parallel_client f =
Mpi.send None 0 0 Mpi.comm_world;
let rec run () =
2018-10-22 19:10:38 +02:00
let message =
2018-10-21 23:54:28 +02:00
Mpi.receive 0 Mpi.any_tag Mpi.comm_world
in
2018-10-22 19:10:38 +02:00
match message with
2018-10-22 13:12:43 +02:00
| None -> Mpi.barrier Mpi.comm_world
2018-10-22 19:10:38 +02:00
| Some (task_id, task) ->
2018-10-21 23:54:28 +02:00
let result = f task in
begin
2018-10-22 19:10:38 +02:00
Mpi.send (Some (task_id, result)) 0 0 Mpi.comm_world;
2018-10-21 23:54:28 +02:00
run ()
end
in
run ();
Stream.of_list []
2018-10-22 19:10:38 +02:00
let run_parallel ~ordered f stream =
2018-10-21 23:54:28 +02:00
match Mpi.comm_rank Mpi.comm_world with
2018-10-22 19:10:38 +02:00
| 0 -> run_parallel_server ~ordered stream
2018-10-21 23:54:28 +02:00
| _ -> run_parallel_client f
2018-10-22 13:12:43 +02:00
2018-10-22 19:10:38 +02:00
let run ?(ordered=true) ~f stream =
2018-10-22 13:12:43 +02:00
match Mpi.comm_size Mpi.comm_world with
| 1 -> run_sequential f stream
2018-10-22 19:10:38 +02:00
| _ -> run_parallel ~ordered f stream
2018-10-22 13:12:43 +02:00