mirror of
https://gitlab.com/scemama/QCaml.git
synced 2025-01-03 01:55:40 +01:00
Fixed Farm
This commit is contained in:
parent
6e68eef645
commit
5540db632c
@ -22,18 +22,25 @@ let run_sequential f stream =
|
|||||||
type task_id = int
|
type task_id = int
|
||||||
|
|
||||||
let debug _s =
|
let debug _s =
|
||||||
()
|
if true then
|
||||||
(*
|
()
|
||||||
Printf.eprintf "%d : %s : %s\n%!" (Mpi.comm_rank Mpi.comm_world) (Unix.gettimeofday () |> string_of_float) _s
|
else
|
||||||
*)
|
Printf.eprintf "%d : %s : %s\n%!" (Mpi.comm_rank Mpi.comm_world) (Unix.gettimeofday () |> string_of_float) _s
|
||||||
|
|
||||||
|
type status =
|
||||||
|
| Initializing
|
||||||
|
| Running
|
||||||
|
| Done
|
||||||
|
|
||||||
let run_parallel_server ~ordered stream =
|
let run_parallel_server ~ordered stream =
|
||||||
|
|
||||||
(* n_running is the number of running tasks, required for clean
|
(* [status.(rank)] is [Initializing] if rank has not yet obtained a task,
|
||||||
termination. It is the number of tasks to wait for when the input stream
|
[Running] if rank is running a task and [Done] if [rank] is waiting at
|
||||||
is empty.
|
the barrier.
|
||||||
*)
|
*)
|
||||||
let n_running = ref 0 in
|
let status = Array.make (Mpi.comm_size Mpi.comm_world) Initializing in
|
||||||
|
status.(0) <- Done;
|
||||||
|
|
||||||
|
|
||||||
(** Fetches a result coming from any client. Returns the result
|
(** Fetches a result coming from any client. Returns the result
|
||||||
as a (task_id * 'a) option and the rank of the client as an int.
|
as a (task_id * 'a) option and the rank of the client as an int.
|
||||||
@ -44,7 +51,6 @@ let run_parallel_server ~ordered stream =
|
|||||||
Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world
|
Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world
|
||||||
in
|
in
|
||||||
debug @@ Printf.sprintf "After receive_status %d %d" rank _tag;
|
debug @@ Printf.sprintf "After receive_status %d %d" rank _tag;
|
||||||
decr n_running;
|
|
||||||
message, rank
|
message, rank
|
||||||
in
|
in
|
||||||
|
|
||||||
@ -53,7 +59,7 @@ let run_parallel_server ~ordered stream =
|
|||||||
If no task is available, sends [None].
|
If no task is available, sends [None].
|
||||||
The return value is a boolean telling if the stream is empty.
|
The return value is a boolean telling if the stream is empty.
|
||||||
*)
|
*)
|
||||||
let send_task (client_rank : int) : bool =
|
let send_task (client_rank : int) : unit =
|
||||||
let task =
|
let task =
|
||||||
try
|
try
|
||||||
let task_id = Stream.count stream in
|
let task_id = Stream.count stream in
|
||||||
@ -64,14 +70,21 @@ let run_parallel_server ~ordered stream =
|
|||||||
debug @@ Printf.sprintf "Sending to %d\n" client_rank;
|
debug @@ Printf.sprintf "Sending to %d\n" client_rank;
|
||||||
Mpi.send task client_rank 0 Mpi.comm_world;
|
Mpi.send task client_rank 0 Mpi.comm_world;
|
||||||
debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank
|
debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank
|
||||||
(if task = None then "None" else "Some")
|
(if task = None then "None" else "Some");
|
||||||
;
|
if task <> None then
|
||||||
let running = task <> None in
|
status.(client_rank) <- Running
|
||||||
if running then incr n_running;
|
else
|
||||||
running
|
status.(client_rank) <- Done
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
||||||
|
let all_done () =
|
||||||
|
try
|
||||||
|
Array.iter (fun i -> if i <> Done then raise Exit) status;
|
||||||
|
true
|
||||||
|
with Exit -> false
|
||||||
|
in
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
(** Main loop.
|
(** Main loop.
|
||||||
@ -79,23 +92,23 @@ let run_parallel_server ~ordered stream =
|
|||||||
and send it back a new task. If no more tasks are
|
and send it back a new task. If no more tasks are
|
||||||
available, send [None]. If the result of the task
|
available, send [None]. If the result of the task
|
||||||
is None, loop back into [get_result].
|
is None, loop back into [get_result].
|
||||||
TODO : bug is probably here...
|
|
||||||
*)
|
*)
|
||||||
let rec get_result () : (task_id * 'a ) option =
|
let rec get_result () : (task_id * 'a ) option =
|
||||||
let message, rank = fetch_result () in
|
if all_done () then
|
||||||
let iterate = send_task rank in
|
begin
|
||||||
match iterate, message with
|
debug "Before barrier";
|
||||||
| true , None -> (incr n_running ; get_result ())
|
|
||||||
| true , Some (task_id, result) -> Some (task_id, result)
|
|
||||||
| false, Some (task_id, result) ->
|
|
||||||
if !n_running > 0 then
|
|
||||||
Some (task_id, result)
|
|
||||||
else
|
|
||||||
( debug "Before barrier";
|
|
||||||
Mpi.barrier Mpi.comm_world;
|
Mpi.barrier Mpi.comm_world;
|
||||||
debug "After barrier";
|
debug "After barrier";
|
||||||
None;)
|
None
|
||||||
| false, None -> assert false
|
end
|
||||||
|
else
|
||||||
|
begin
|
||||||
|
let message, rank = fetch_result () in
|
||||||
|
send_task rank;
|
||||||
|
match message with
|
||||||
|
| None -> get_result ()
|
||||||
|
| Some (task_id, result) -> Some (task_id, result)
|
||||||
|
end
|
||||||
in
|
in
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,4 +1,7 @@
|
|||||||
let line ?(c='-') n =
|
let line ?(c='-') n =
|
||||||
String.make n c
|
String.make n c
|
||||||
|
|
||||||
|
let ppf_dev_null =
|
||||||
|
let oc = open_out "/dev/null" in
|
||||||
|
Format.formatter_of_out_channel oc
|
||||||
|
|
||||||
|
@ -47,8 +47,12 @@ let () =
|
|||||||
HartreeFock.make s
|
HartreeFock.make s
|
||||||
in
|
in
|
||||||
|
|
||||||
Format.printf "@[%a@]@." HartreeFock.pp_hf hf;
|
let ppf =
|
||||||
|
if Parallel.master then Format.std_formatter
|
||||||
|
else Printing.ppf_dev_null
|
||||||
|
in
|
||||||
|
Format.fprintf ppf "@[%a@]@." HartreeFock.pp_hf hf;
|
||||||
let mos = MOBasis.of_hartree_fock hf in
|
let mos = MOBasis.of_hartree_fock hf in
|
||||||
Format.printf "@[%a@]@." (fun ppf x -> MOBasis.pp_mo ppf x) mos
|
Format.fprintf ppf "@[%a@]@." (fun ppf x -> MOBasis.pp_mo ppf x) mos
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user