4-idx node-only

This commit is contained in:
Anthony Scemama 2019-04-04 09:14:15 +02:00
parent 72422d84b5
commit 1a4677dc19
9 changed files with 39 additions and 28 deletions

View File

@ -32,13 +32,13 @@ type status =
| Running | Running
| Done | Done
let run_parallel_server ~ordered stream = let run_parallel_server ~comm ~ordered stream =
(* [status.(rank)] is [Initializing] if rank has not yet obtained a task, (* [status.(rank)] is [Initializing] if rank has not yet obtained a task,
[Running] if rank is running a task and [Done] if [rank] is waiting at [Running] if rank is running a task and [Done] if [rank] is waiting at
the barrier. the barrier.
*) *)
let status = Array.make (Mpi.comm_size Mpi.comm_world) Initializing in let status = Array.make (Mpi.comm_size comm) Initializing in
status.(0) <- Done; status.(0) <- Done;
@ -50,8 +50,8 @@ let run_parallel_server ~ordered stream =
debug "Before receive_status"; debug "Before receive_status";
(* Avoid busy receive *) (* Avoid busy receive *)
let rec wait_and_receive () = let rec wait_and_receive () =
match Mpi.iprobe Mpi.any_source Mpi.any_tag Mpi.comm_world with match Mpi.iprobe Mpi.any_source Mpi.any_tag comm with
| Some _ -> Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world | Some _ -> Mpi.receive_status Mpi.any_source Mpi.any_tag comm
| None -> (Unix.sleepf 0.001 ; wait_and_receive ()) | None -> (Unix.sleepf 0.001 ; wait_and_receive ())
in in
wait_and_receive () wait_and_receive ()
@ -74,7 +74,7 @@ let run_parallel_server ~ordered stream =
with Stream.Failure -> None with Stream.Failure -> None
in in
debug @@ Printf.sprintf "Sending to %d\n" client_rank; debug @@ Printf.sprintf "Sending to %d\n" client_rank;
Mpi.send task client_rank 0 Mpi.comm_world; Mpi.send task client_rank 0 comm;
debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank
(if task = None then "None" else "Some"); (if task = None then "None" else "Some");
if task <> None then if task <> None then
@ -103,7 +103,7 @@ let run_parallel_server ~ordered stream =
if all_done () then if all_done () then
begin begin
debug "Before barrier"; debug "Before barrier";
Mpi.barrier Mpi.comm_world; Mpi.barrier comm;
debug "After barrier"; debug "After barrier";
None None
end end
@ -172,11 +172,11 @@ let run_parallel_server ~ordered stream =
(* Client side *) (* Client side *)
(********************************************************************) (********************************************************************)
let run_parallel_client f = let run_parallel_client ~comm f =
(** Send a first message containing [None] to request a task *) (** Send a first message containing [None] to request a task *)
debug "Before send None"; debug "Before send None";
Mpi.send None 0 0 Mpi.comm_world; Mpi.send None 0 0 comm;
debug "After send None"; debug "After send None";
(** Main loop. (** Main loop.
@ -189,20 +189,20 @@ let run_parallel_client f =
let message = let message =
debug "Before receive"; debug "Before receive";
Mpi.receive 0 0 Mpi.comm_world Mpi.receive 0 0 comm
in in
debug "After receive" ; debug "After receive" ;
match message with match message with
| None -> | None ->
( debug "Before barrier"; ( debug "Before barrier";
Mpi.barrier Mpi.comm_world; Mpi.barrier comm;
debug "After barrier";) debug "After barrier";)
| Some (task_id, task) -> | Some (task_id, task) ->
let result = f task in let result = f task in
begin begin
debug @@ Printf.sprintf "Before send task_id %d" task_id ; debug @@ Printf.sprintf "Before send task_id %d" task_id ;
Mpi.send (Some (task_id, result)) 0 0 Mpi.comm_world; Mpi.send (Some (task_id, result)) 0 0 comm;
debug @@ Printf.sprintf "After send task_id %d" task_id ; debug @@ Printf.sprintf "After send task_id %d" task_id ;
run () run ()
end end
@ -217,28 +217,28 @@ let run_parallel_client f =
let run_parallel ~ordered f stream = let run_parallel ~comm ~ordered f stream =
match Mpi.comm_rank Mpi.comm_world with match Mpi.comm_rank comm with
| 0 -> run_parallel_server ~ordered stream | 0 -> run_parallel_server ~comm ~ordered stream
| _ -> run_parallel_client f | _ -> run_parallel_client ~comm f
let nested = ref false let nested = ref false
let run ?(ordered=true) ~f stream = let run ?(ordered=true) ?(comm=Mpi.comm_world) ~f stream =
if !nested then if !nested then
begin begin
let message = let message =
"Nested parallel regions are not supported by Farm.ml" "Nested parallel regions are not supported by Farm.ml"
in in
Printf.eprintf "%s\n%!" message ; Printf.eprintf "%s\n%!" message ;
exit 1 failwith message
end; end;
nested := true; nested := true;
let result = let result =
match Mpi.comm_size Mpi.comm_world with match Mpi.comm_size comm with
| 1 -> run_sequential f stream | 1 -> run_sequential f stream
| _ -> run_parallel ~ordered f stream | _ -> run_parallel ~comm ~ordered f stream
in in
nested := false; nested := false;
result result

View File

@ -4,11 +4,13 @@ The input is a stream of input data, and the output is a stream of data.
*) *)
val run : ?ordered:bool -> f:('a -> 'b) -> 'a Stream.t -> 'b Stream.t val run : ?ordered:bool -> ?comm:Mpi.communicator ->
f:('a -> 'b) -> 'a Stream.t -> 'b Stream.t
(** Run the [f] function on every process by popping elements from the (** Run the [f] function on every process by popping elements from the
input stream, and putting the results on the output stream. If [ordered] input stream, and putting the results on the output stream. If [ordered]
(the default is [ordered = true], then the order of the output is kept (the default is [ordered = true], then the order of the output is kept
consistent with the order of the input. consistent with the order of the input.
[comm], within MPI is a communicator. It describes a subgroup of processes.
*) *)

View File

@ -59,7 +59,7 @@ module Node = struct
let name = Unix.gethostname () let name = Unix.gethostname ()
let comm = lazy ( let comm =
Mpi.allgather (name, rank) Mpi.comm_world Mpi.allgather (name, rank) Mpi.comm_world
|> Array.to_list |> Array.to_list
|> List.filter (fun (n, r) -> name = n) |> List.filter (fun (n, r) -> name = n)
@ -67,10 +67,9 @@ module Node = struct
|> Array.of_list |> Array.of_list
|> Mpi.(group_incl (comm_group comm_world)) |> Mpi.(group_incl (comm_group comm_world))
|> Mpi.(comm_create comm_world) |> Mpi.(comm_create comm_world)
)
let rank = let rank =
Mpi.comm_rank (Lazy.force comm) Mpi.comm_rank comm
let master = rank = 0 let master = rank = 0
@ -79,13 +78,13 @@ module Node = struct
if master then Some (Lazy.force x) if master then Some (Lazy.force x)
else None else None
in in
match broadcast x 0 (Lazy.force comm) with match broadcast x 0 comm with
| Some x -> x | Some x -> x
| None -> assert false | None -> assert false
let broadcast x = broadcast_generic Mpi.broadcast x let broadcast x = broadcast_generic Mpi.broadcast x
let barrier () = Mpi.barrier (Lazy.force comm ) let barrier () = Mpi.barrier comm
end end

View File

@ -36,6 +36,9 @@ module Node : sig
val name : string val name : string
(** Name of the current host *) (** Name of the current host *)
val comm : Mpi.communicator
(** MPI Communicator containing the processes of the current node *)
val rank : Mpi.rank val rank : Mpi.rank
(** Rank of the current process in the node *) (** Rank of the current process in the node *)

View File

@ -8,6 +8,6 @@ let run_sequential f stream =
with Stream.Failure -> None in with Stream.Failure -> None in
Stream.from next Stream.from next
let run ?(ordered=true) ~f stream = let run ?(ordered=true) ?(comm) ~f stream =
run_sequential f stream run_sequential f stream

View File

@ -4,11 +4,13 @@ The input is a stream of input data, and the output is a stream of data.
*) *)
val run : ?ordered:bool -> f:('a -> 'b) -> 'a Stream.t -> 'b Stream.t val run : ?ordered:bool -> ?comm:'c ->
f:('a -> 'b) -> 'a Stream.t -> 'b Stream.t
(** Run the [f] function on every process by popping elements from the (** Run the [f] function on every process by popping elements from the
input stream, and putting the results on the output stream. If [ordered] input stream, and putting the results on the output stream. If [ordered]
(the default is [ordered = true], then the order of the output is kept (the default is [ordered = true], then the order of the output is kept
consistent with the order of the input. consistent with the order of the input.
In the non-parallel mode, the [comm] argument is unused.
*) *)

View File

@ -25,6 +25,8 @@ module Node = struct
let name = Unix.gethostname () let name = Unix.gethostname ()
let comm = None
let rank = 0 let rank = 0
let master = true let master = true

View File

@ -36,6 +36,9 @@ module Node : sig
val name : string val name : string
(** Name of the current host *) (** Name of the current host *)
val comm : 'a option
(** Always [None] *)
val rank : int val rank : int
(** Rank of the current process in the node *) (** Rank of the current process in the node *)

View File

@ -386,7 +386,7 @@ let four_index_transform coef source =
let n = ref 0 in let n = ref 0 in
Stream.of_list range_mo Stream.of_list range_mo
|> Farm.run ~f:task ~ordered:false |> Farm.run ~f:task ~ordered:false ~comm:Parallel.Node.comm
|> Stream.iter (fun l -> |> Stream.iter (fun l ->
if Parallel.master then (incr n ; Printf.eprintf "\r%d / %d%!" !n mo_num); if Parallel.master then (incr n ; Printf.eprintf "\r%d / %d%!" !n mo_num);
Array.iter (fun (alpha, beta, gamma, delta, x) -> Array.iter (fun (alpha, beta, gamma, delta, x) ->