Hartree-Fock is parallel

This commit is contained in:
Anthony Scemama 2018-10-23 13:39:06 +02:00
parent 60b4bc6713
commit 1d4560418e
7 changed files with 120 additions and 134 deletions

View File

@ -131,7 +131,7 @@ let store_class ?(cutoff=integrals_cutoff) data contracted_shell_pair_couple cls
let of_basis basis =
let of_basis_serial basis =
let n = Bs.size basis
and shell = Bs.contracted_shells basis
@ -198,10 +198,22 @@ let of_basis basis =
(*
(* Parallel functions *)
let of_basis_parallel basis =
let store_class ?(cutoff=integrals_cutoff) push_socket contracted_shell_pair_couple cls =
let n = Bs.size basis
and shell = Bs.contracted_shells basis
in
let store_class_parallel
?(cutoff=integrals_cutoff) contracted_shell_pair_couple cls =
let to_powers x =
let open Zkey in
match to_powers x with
@ -213,7 +225,7 @@ let of_basis_parallel basis =
and shell_q = Cspc.shell_pair_q contracted_shell_pair_couple
in
let msg = ref [] in
let result = ref [] in
Array.iteri (fun i_c powers_i ->
let i_c = Cs.index (Csp.shell_a shell_p) + i_c + 1 in
let xi = to_powers powers_i in
@ -228,25 +240,15 @@ let of_basis_parallel basis =
let xl = to_powers powers_l in
let key = Zkey.of_powers_twelve xi xj xk xl in
let value = Zmap.find cls key in
msg := (i_c,j_c,k_c,l_c,value) :: !msg;
result := (i_c, j_c, k_c, l_c, value) :: !result
) (Cs.zkey_array (Csp.shell_b shell_q))
) (Cs.zkey_array (Csp.shell_a shell_q))
) (Cs.zkey_array (Csp.shell_b shell_p))
) (Cs.zkey_array (Csp.shell_a shell_p));
Zmq.Socket.send_all push_socket ["0" ; Bytes.to_string (Marshal.to_bytes !msg []) ]
!result
in
let n = Bs.size basis
and shell = Bs.contracted_shells basis
in
let eri_array =
Fis.create ~size:n `Dense
(*
Fis.create ~size:n `Sparse
*)
in
let t0 = Unix.gettimeofday () in
@ -255,127 +257,84 @@ let of_basis_parallel basis =
|> filter_contracted_shell_pairs ~cutoff
in
Printf.printf "%d significant shell pairs computed in %f seconds\n%!"
Printf.printf "%d significant shell pairs computed in %f seconds\n"
(List.length shell_pairs) (Unix.gettimeofday () -. t0);
let t0 = Unix.gettimeofday () in
let ishell = ref 0 in
let input_stream = Stream.of_list shell_pairs in
let f shell_p =
let () =
if Parallel.rank < 2 && Cs.index (Csp.shell_a shell_p) > !ishell then
(ishell := Cs.index (Csp.shell_a shell_p) ; print_int !ishell ; print_newline ())
in
let zmq_port = 12345 in
begin
match Unix.fork () with
| 0 -> Printf.printf "pouet\n%!"
| pid -> Printf.printf "coucou\n%!"
end;
begin
let sp =
Csp.shell_pairs shell_p
in
match Unix.fork () with
| 0 -> begin
let zmq_addr = Printf.sprintf "tcp://localhost:%d" zmq_port in
let zmq = ref None in
Printf.printf "PID %d OK\n%!" 0;
let result = ref [] in
try
List.iter (fun shell_q ->
let () =
if Cs.index (Csp.shell_a shell_q) >
Cs.index (Csp.shell_a shell_p) then
raise Exit
in
let sq = Csp.shell_pairs shell_q in
let cspc =
if Array.length sp < Array.length sq then
Cspc.make ~cutoff shell_p shell_q
else
Cspc.make ~cutoff shell_q shell_p
in
Parmap.pariter ~chunksize:1
~init:(fun rank ->
let zmq_context =
Zmq.Context.create ()
in
let push_socket =
Zmq.Socket.create zmq_context Zmq.Socket.push
in
Printf.printf "Init %d OK\n%!" rank;
Zmq.Socket.connect push_socket zmq_addr;
zmq := Some (zmq_context, push_socket)
)
(fun shell_p ->
let push_socket =
match !zmq with
| Some (_, push_socket) -> push_socket
| None -> failwith "ZMQ"
in
let () =
if (Cs.index (Csp.shell_a shell_p) > !ishell) then
(ishell := Cs.index (Csp.shell_a shell_p) ; print_int !ishell ; print_newline ())
in
let sp =
Csp.shell_pairs shell_p
in
try
List.iter (fun shell_q ->
let () =
if Cs.index (Csp.shell_a shell_q) >
Cs.index (Csp.shell_a shell_p) then
raise Exit
in
let sq = Csp.shell_pairs shell_q in
let cspc =
if Array.length sp < Array.length sq then
Cspc.make ~cutoff shell_p shell_q
else
Cspc.make ~cutoff shell_q shell_p
in
match cspc with
| Some cspc -> let cls = class_of_contracted_shell_pair_couple cspc in
store_class ~cutoff push_socket cspc cls
| None -> ()
) shell_pairs
with Exit -> ()
) (Parmap.L shell_pairs)
~finalize:(fun _ ->
let zmq_context, push_socket =
match !zmq with
| Some (zmq_context, push_socket) -> zmq_context, push_socket
| None -> failwith "ZMQ"
in
Zmq.Socket.close push_socket;
Zmq.Context.terminate zmq_context
);
let zmq_context =
Zmq.Context.create ()
match cspc with
| Some cspc ->
let cls =
class_of_contracted_shell_pair_couple cspc
in
let push_socket = Zmq.Socket.create zmq_context Zmq.Socket.push in
Zmq.Socket.connect push_socket zmq_addr;
Zmq.Socket.send_all push_socket [ "1" ; ""];
Zmq.Socket.close push_socket;
Zmq.Context.terminate zmq_context;
ignore @@ exit 0
end
result := (store_class_parallel ~cutoff cspc cls) :: !result;
| None -> ()
) shell_pairs;
List.concat !result
with Exit -> List.concat !result
in
| pid -> begin
Printf.printf "PID %d OK\n%!" pid;
let zmq_addr = Printf.sprintf "tcp://*:%d" zmq_port in
let zmq_context =
Zmq.Context.create ()
in
let pull_socket =
Zmq.Socket.create zmq_context Zmq.Socket.pull
in
Zmq.Socket.bind pull_socket zmq_addr;
(*
let stream_map f stream =
let rec next i =
try Some (f (Stream.next stream))
with Stream.Failure -> None in
Stream.from next
in
stream_map f input_stream
*)
let eri_array =
if Parallel.master then
Fis.create ~size:n `Dense
else
Fis.create ~size:0 `Dense
in
Farm.run f input_stream
|> Stream.iter (fun l ->
List.iter (fun (i_c,j_c,k_c,l_c,value) ->
set_chem eri_array i_c j_c k_c l_c value) l);
if not Parallel.master then
exit 0;
try
while true do
match Zmq.Socket.recv_all pull_socket with
| "0" :: rest :: [] ->
List.iter (fun (i,j,k,l,value) ->
set_chem eri_array i j k l value) (Marshal.from_bytes (Bytes.of_string rest) 0)
| "1" :: _ -> raise Exit
| _ -> invalid_arg "ERI"
done
with Exit -> ();
Zmq.Socket.close pull_socket;
Zmq.Context.terminate zmq_context;
ignore (Unix.wait ())
end
end;
Printf.printf "Computed ERIs in %f seconds\n%!" (Unix.gettimeofday () -. t0);
Printf.printf "Computed ERIs in parallel in %f seconds\n%!" (Unix.gettimeofday () -. t0);
eri_array
*)
let of_basis = of_basis_parallel

View File

@ -45,11 +45,6 @@ let run_parallel_server ~ordered stream =
empty. *)
let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in
(* buffer of finished tasks with a task_id greater than the
current result_id. It allows to put back the results in
the correct order.
*)
let rec get_result () : (task_id * 'a ) option =
begin
match Stream.peek stream with
@ -73,8 +68,15 @@ let run_parallel_server ~ordered stream =
in
let f =
if ordered then
(* buffer of finished tasks with a task_id greater than the
current result_id. It allows to put back the results in
the correct order.
*)
let buffer = Hashtbl.create 67 in
fun i ->
begin
match Hashtbl.find_opt buffer i with
@ -92,11 +94,14 @@ let run_parallel_server ~ordered stream =
else (Hashtbl.add buffer task_id result; loop () )
in loop ()
end
else
fun _ ->
match get_result () with
| Some (_, result) -> Some result
| None -> None
in
Stream.from f

14
Parallel/Farm.mli Normal file
View File

@ -0,0 +1,14 @@
(** The Farm skeleton, similar to SklMl.
The input is a stream of input data, and the output is a stream of data.
*)
val run : ?ordered:bool -> f:('a -> 'b) -> 'a Stream.t -> 'b Stream.t
(** Run the [f] function on every process by popping elements from the
input stream, and putting the results on the output stream. If [ordered]
(the default is [ordered = true], then the order of the output is kept
consistent with the order of the input.
*)

View File

@ -24,7 +24,13 @@ let barrier () =
let broadcast x =
Mpi.broadcast x 0 Mpi.comm_world
let x =
if master then Some (Lazy.force x)
else None
in
match Mpi.broadcast x 0 Mpi.comm_world with
| Some x -> x
| None -> assert false
let broadcast_int x =

View File

@ -12,7 +12,7 @@ val master : bool
val barrier : unit -> unit
(** Wait for all processes to reach this point. *)
val broadcast : 'a -> 'a
val broadcast : 'a lazy_t -> 'a
(** Broadcasts data to all processes. *)
val broadcast_int : int -> int

View File

@ -13,6 +13,7 @@ let make ?cartesian:(cartesian=false)
~nuclei
basis
=
Printf.eprintf "Evaluating Simulation\n%!";
(* Tune Garbage Collector *)
let gc = Gc.get () in
@ -47,5 +48,6 @@ let of_filenames ?cartesian:(cartesian=false) ?multiplicity:(multiplicity=1) ?ch
let basis =
Basis.of_nuclei_and_basis_filename ~nuclei basis_filename
in
make ~cartesian ~charge ~multiplicity ~nuclei basis
lazy (make ~cartesian ~charge ~multiplicity ~nuclei basis)
|> Parallel.broadcast

View File

@ -36,7 +36,7 @@ let run ~out =
in
let s =
Simulation.of_filenames ~charge ~multiplicity ~nuclei:nuclei_file basis_file
Simulation.of_filenames ~charge ~multiplicity ~nuclei:nuclei_file basis_file
in
HartreeFock.make s