10
1
mirror of https://gitlab.com/scemama/QCaml.git synced 2024-12-22 12:23:31 +01:00

Improved parallel

This commit is contained in:
Anthony Scemama 2019-02-28 22:21:13 +01:00
parent ac45b0b0cb
commit 4e65d70a52
2 changed files with 117 additions and 48 deletions

View File

@ -91,7 +91,7 @@ let create_matrix_spin f det_space =
(** Update function when ki and kj are connected *) (** Update function when ki and kj are connected *)
let update i j ki kj = let update i j ki kj =
let x = f ki kj in let x = f ki kj in
if x <> 0. then if abs_float x < Constants.epsilon then
result.(i) <- (j, x) :: result.(i) ; result.(i) <- (j, x) :: result.(i) ;
in in

View File

@ -1,4 +1,7 @@
(* Single process function *) (********************************************************************)
(* Single process *)
(********************************************************************)
let run_sequential f stream = let run_sequential f stream =
let rec next _ = let rec next _ =
@ -10,24 +13,47 @@ let run_sequential f stream =
Stream.from next Stream.from next
(* Multi-process functions *)
(********************************************************************)
(* Server side *)
(********************************************************************)
type task_id = int type task_id = int
let debug _s =
()
(*
Printf.eprintf "%d : %s : %s\n%!" (Mpi.comm_rank Mpi.comm_world) (Unix.gettimeofday () |> string_of_float) _s
*)
(* Server side *)
let run_parallel_server ~ordered stream = let run_parallel_server ~ordered stream =
(* n_running is the number of running tasks, required for clean
termination. It is the number of tasks to wait for when the input stream
is empty.
*)
let n_running = ref 0 in
(** Fetches a result coming from any client. Returns the result
as a (task_id * 'a) option and the rank of the client as an int.
*)
let fetch_result () : (task_id * 'a) option * int = let fetch_result () : (task_id * 'a) option * int =
let (message, rank, _tag) : (task_id * 'a) option * int * int = let (message, rank, _tag) : (task_id * 'a) option * int * int =
debug "Before receive_status";
Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world
in in
debug @@ Printf.sprintf "After receive_status %d %d" rank _tag;
decr n_running;
message, rank message, rank
in in
let send_task (client_rank : int) : unit = (** Pops a task from the stream and sends it to a client.
If no task is available, sends [None].
The return value is a boolean telling if the stream is empty.
*)
let send_task (client_rank : int) : bool =
let task = let task =
try try
let task_id = Stream.count stream in let task_id = Stream.count stream in
@ -35,46 +61,69 @@ let run_parallel_server ~ordered stream =
Some (task_id, element) Some (task_id, element)
with Stream.Failure -> None with Stream.Failure -> None
in in
Mpi.send task client_rank 0 Mpi.comm_world debug @@ Printf.sprintf "Sending to %d\n" client_rank;
Mpi.send task client_rank 0 Mpi.comm_world;
debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank
(if task = None then "None" else "Some")
;
let running = task <> None in
if running then incr n_running;
running
in in
(* n_todo is required for clean termination. It is the
number of tasks to wait for when the input stream is
empty. *)
let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in
let rec get_result () : (task_id * 'a ) option =
begin (** Main loop.
match Stream.peek stream with While [n_todo > 0], fetch a result from a client
| None -> decr n_todo and send it back a new task. If no more tasks are
| _ -> () available, send [None]. If the result of the task
end; is None, loop back into [get_result].
match !n_todo with TODO : bug is probably here...
| 0 -> *)
begin let rec get_result () : (task_id * 'a ) option =
Mpi.barrier Mpi.comm_world; let message, rank = fetch_result () in
None let iterate = send_task rank in
end match iterate, message with
| _ -> | true , None -> (incr n_running ; get_result ())
begin | true , Some (task_id, result) -> Some (task_id, result)
let message, rank = fetch_result () in | false, Some (task_id, result) ->
send_task rank; if !n_running > 0 then
match message with Some (task_id, result)
| None -> get_result () else
| Some (task_id, result) -> Some (task_id, result) ( debug "Before barrier";
end Mpi.barrier Mpi.comm_world;
debug "After barrier";
None;)
| false, None -> assert false
in in
(** Function from which the output stream is built. *)
let f = let f =
if ordered then if not ordered then
(* If [ordered] is false, the values are popped out whenever they arrive
(* buffer of finished tasks with a task_id greater than the from the clients.
current result_id. It allows to put back the results in
the correct order.
*) *)
let buffer = Hashtbl.create 67 in
fun _ ->
match get_result () with
| Some (_, result) -> Some result
| None -> None
else
(* If [ordered] is true, out into the stream when the next expected task has
been computed.
*)
let buffer =
(* buffer of finished tasks with a task_id greater than the
current result_id. It allows to put back the results in
the correct order.
*)
Hashtbl.create 67
in
fun i -> fun i ->
begin begin
@ -94,36 +143,56 @@ let run_parallel_server ~ordered stream =
in loop () in loop ()
end end
else
fun _ ->
match get_result () with
| Some (_, result) -> Some result
| None -> None
in in
Stream.from f Stream.from f
(** Client side *)
let run_parallel_client f =
Mpi.send None 0 0 Mpi.comm_world;
(********************************************************************)
(* Client side *)
(********************************************************************)
let run_parallel_client f =
(** Send a first message containing [None] to request a task *)
debug "Before send None";
Mpi.send None 0 0 Mpi.comm_world;
debug "After send None";
(** Main loop.
Receive a message. If the message is [None], there are no more
tasks to compute and we can go to the barrier.
If the message is not [None], apply [f] to the task, send the
result back to the server and loop.
*)
let rec run () = let rec run () =
let message = let message =
Mpi.receive 0 Mpi.any_tag Mpi.comm_world debug "Before receive";
Mpi.receive 0 0 Mpi.comm_world
in in
debug "After receive" ;
match message with match message with
| None -> Mpi.barrier Mpi.comm_world | None ->
( debug "Before barrier";
Mpi.barrier Mpi.comm_world;
debug "After barrier";)
| Some (task_id, task) -> | Some (task_id, task) ->
let result = f task in let result = f task in
begin begin
debug @@ Printf.sprintf "Before send task_id %d" task_id ;
Mpi.send (Some (task_id, result)) 0 0 Mpi.comm_world; Mpi.send (Some (task_id, result)) 0 0 Mpi.comm_world;
debug @@ Printf.sprintf "After send task_id %d" task_id ;
run () run ()
end end
in in
run (); run ();
(* The output is an empty stream so that the type of run_parallel_client
is the same as the type of the server function.
*)
Stream.of_list [] Stream.of_list []