diff --git a/Basis/ERI.ml b/Basis/ERI.ml index d2828eb..855a3bd 100644 --- a/Basis/ERI.ml +++ b/Basis/ERI.ml @@ -131,7 +131,7 @@ let store_class ?(cutoff=integrals_cutoff) data contracted_shell_pair_couple cls -let of_basis basis = +let of_basis_serial basis = let n = Bs.size basis and shell = Bs.contracted_shells basis @@ -198,10 +198,22 @@ let of_basis basis = -(* + + + + +(* Parallel functions *) + + + let of_basis_parallel basis = - let store_class ?(cutoff=integrals_cutoff) push_socket contracted_shell_pair_couple cls = + let n = Bs.size basis + and shell = Bs.contracted_shells basis + in + + let store_class_parallel + ?(cutoff=integrals_cutoff) contracted_shell_pair_couple cls = let to_powers x = let open Zkey in match to_powers x with @@ -213,7 +225,7 @@ let of_basis_parallel basis = and shell_q = Cspc.shell_pair_q contracted_shell_pair_couple in - let msg = ref [] in + let result = ref [] in Array.iteri (fun i_c powers_i -> let i_c = Cs.index (Csp.shell_a shell_p) + i_c + 1 in let xi = to_powers powers_i in @@ -228,25 +240,15 @@ let of_basis_parallel basis = let xl = to_powers powers_l in let key = Zkey.of_powers_twelve xi xj xk xl in let value = Zmap.find cls key in - msg := (i_c,j_c,k_c,l_c,value) :: !msg; + result := (i_c, j_c, k_c, l_c, value) :: !result ) (Cs.zkey_array (Csp.shell_b shell_q)) ) (Cs.zkey_array (Csp.shell_a shell_q)) ) (Cs.zkey_array (Csp.shell_b shell_p)) ) (Cs.zkey_array (Csp.shell_a shell_p)); - Zmq.Socket.send_all push_socket ["0" ; Bytes.to_string (Marshal.to_bytes !msg []) ] + !result in - let n = Bs.size basis - and shell = Bs.contracted_shells basis - in - - let eri_array = - Fis.create ~size:n `Dense -(* - Fis.create ~size:n `Sparse -*) - in let t0 = Unix.gettimeofday () in @@ -255,127 +257,84 @@ let of_basis_parallel basis = |> filter_contracted_shell_pairs ~cutoff in - Printf.printf "%d significant shell pairs computed in %f seconds\n%!" + Printf.printf "%d significant shell pairs computed in %f seconds\n" (List.length shell_pairs) (Unix.gettimeofday () -. t0); let t0 = Unix.gettimeofday () in let ishell = ref 0 in + let input_stream = Stream.of_list shell_pairs in + let f shell_p = + let () = + if Parallel.rank < 2 && Cs.index (Csp.shell_a shell_p) > !ishell then + (ishell := Cs.index (Csp.shell_a shell_p) ; print_int !ishell ; print_newline ()) + in - let zmq_port = 12345 in - begin - match Unix.fork () with - | 0 -> Printf.printf "pouet\n%!" - | pid -> Printf.printf "coucou\n%!" - end; - begin + let sp = + Csp.shell_pairs shell_p + in - match Unix.fork () with - | 0 -> begin - let zmq_addr = Printf.sprintf "tcp://localhost:%d" zmq_port in - let zmq = ref None in -Printf.printf "PID %d OK\n%!" 0; + let result = ref [] in + try + List.iter (fun shell_q -> + let () = + if Cs.index (Csp.shell_a shell_q) > + Cs.index (Csp.shell_a shell_p) then + raise Exit + in + let sq = Csp.shell_pairs shell_q in + let cspc = + if Array.length sp < Array.length sq then + Cspc.make ~cutoff shell_p shell_q + else + Cspc.make ~cutoff shell_q shell_p + in - Parmap.pariter ~chunksize:1 - ~init:(fun rank -> - let zmq_context = - Zmq.Context.create () - in - let push_socket = - Zmq.Socket.create zmq_context Zmq.Socket.push - in - Printf.printf "Init %d OK\n%!" rank; - Zmq.Socket.connect push_socket zmq_addr; - zmq := Some (zmq_context, push_socket) - ) - - (fun shell_p -> - let push_socket = - match !zmq with - | Some (_, push_socket) -> push_socket - | None -> failwith "ZMQ" - in - let () = - if (Cs.index (Csp.shell_a shell_p) > !ishell) then - (ishell := Cs.index (Csp.shell_a shell_p) ; print_int !ishell ; print_newline ()) - in - - let sp = - Csp.shell_pairs shell_p - in - - try - List.iter (fun shell_q -> - let () = - if Cs.index (Csp.shell_a shell_q) > - Cs.index (Csp.shell_a shell_p) then - raise Exit - in - let sq = Csp.shell_pairs shell_q in - let cspc = - if Array.length sp < Array.length sq then - Cspc.make ~cutoff shell_p shell_q - else - Cspc.make ~cutoff shell_q shell_p - in - - match cspc with - | Some cspc -> let cls = class_of_contracted_shell_pair_couple cspc in - store_class ~cutoff push_socket cspc cls - | None -> () - ) shell_pairs - with Exit -> () - ) (Parmap.L shell_pairs) - ~finalize:(fun _ -> - let zmq_context, push_socket = - match !zmq with - | Some (zmq_context, push_socket) -> zmq_context, push_socket - | None -> failwith "ZMQ" - in - Zmq.Socket.close push_socket; - Zmq.Context.terminate zmq_context - ); - let zmq_context = - Zmq.Context.create () + match cspc with + | Some cspc -> + let cls = + class_of_contracted_shell_pair_couple cspc in - let push_socket = Zmq.Socket.create zmq_context Zmq.Socket.push in - Zmq.Socket.connect push_socket zmq_addr; - Zmq.Socket.send_all push_socket [ "1" ; ""]; - Zmq.Socket.close push_socket; - Zmq.Context.terminate zmq_context; - ignore @@ exit 0 - end + result := (store_class_parallel ~cutoff cspc cls) :: !result; + | None -> () + ) shell_pairs; + List.concat !result + with Exit -> List.concat !result + in - | pid -> begin -Printf.printf "PID %d OK\n%!" pid; - let zmq_addr = Printf.sprintf "tcp://*:%d" zmq_port in - let zmq_context = - Zmq.Context.create () - in - let pull_socket = - Zmq.Socket.create zmq_context Zmq.Socket.pull - in - Zmq.Socket.bind pull_socket zmq_addr; + (* + let stream_map f stream = + let rec next i = + try Some (f (Stream.next stream)) + with Stream.Failure -> None in + Stream.from next + in + stream_map f input_stream + *) + let eri_array = + if Parallel.master then + Fis.create ~size:n `Dense + else + Fis.create ~size:0 `Dense + in + Farm.run f input_stream + |> Stream.iter (fun l -> + List.iter (fun (i_c,j_c,k_c,l_c,value) -> + set_chem eri_array i_c j_c k_c l_c value) l); + if not Parallel.master then + exit 0; - try - while true do - match Zmq.Socket.recv_all pull_socket with - | "0" :: rest :: [] -> - List.iter (fun (i,j,k,l,value) -> - set_chem eri_array i j k l value) (Marshal.from_bytes (Bytes.of_string rest) 0) - | "1" :: _ -> raise Exit - | _ -> invalid_arg "ERI" - done - with Exit -> (); - - Zmq.Socket.close pull_socket; - Zmq.Context.terminate zmq_context; - ignore (Unix.wait ()) - end - end; - Printf.printf "Computed ERIs in %f seconds\n%!" (Unix.gettimeofday () -. t0); + Printf.printf "Computed ERIs in parallel in %f seconds\n%!" (Unix.gettimeofday () -. t0); eri_array -*) + + + +let of_basis = of_basis_parallel + + + + + diff --git a/Parallel/Farm.ml b/Parallel/Farm.ml index 7a5ff94..ee6f684 100644 --- a/Parallel/Farm.ml +++ b/Parallel/Farm.ml @@ -45,11 +45,6 @@ let run_parallel_server ~ordered stream = empty. *) let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in - (* buffer of finished tasks with a task_id greater than the - current result_id. It allows to put back the results in - the correct order. - *) - let rec get_result () : (task_id * 'a ) option = begin match Stream.peek stream with @@ -73,8 +68,15 @@ let run_parallel_server ~ordered stream = in let f = + if ordered then + + (* buffer of finished tasks with a task_id greater than the + current result_id. It allows to put back the results in + the correct order. + *) let buffer = Hashtbl.create 67 in + fun i -> begin match Hashtbl.find_opt buffer i with @@ -92,11 +94,14 @@ let run_parallel_server ~ordered stream = else (Hashtbl.add buffer task_id result; loop () ) in loop () end + else + fun _ -> match get_result () with | Some (_, result) -> Some result | None -> None + in Stream.from f diff --git a/Parallel/Farm.mli b/Parallel/Farm.mli new file mode 100644 index 0000000..9ac9298 --- /dev/null +++ b/Parallel/Farm.mli @@ -0,0 +1,14 @@ +(** The Farm skeleton, similar to SklMl. + +The input is a stream of input data, and the output is a stream of data. +*) + + +val run : ?ordered:bool -> f:('a -> 'b) -> 'a Stream.t -> 'b Stream.t +(** Run the [f] function on every process by popping elements from the + input stream, and putting the results on the output stream. If [ordered] + (the default is [ordered = true], then the order of the output is kept + consistent with the order of the input. +*) + + diff --git a/Parallel/Parallel.ml b/Parallel/Parallel.ml index bd3ba17..7010d2d 100644 --- a/Parallel/Parallel.ml +++ b/Parallel/Parallel.ml @@ -24,7 +24,13 @@ let barrier () = let broadcast x = - Mpi.broadcast x 0 Mpi.comm_world + let x = + if master then Some (Lazy.force x) + else None + in + match Mpi.broadcast x 0 Mpi.comm_world with + | Some x -> x + | None -> assert false let broadcast_int x = diff --git a/Parallel/Parallel.mli b/Parallel/Parallel.mli index 1e48db2..4d76cc0 100644 --- a/Parallel/Parallel.mli +++ b/Parallel/Parallel.mli @@ -12,7 +12,7 @@ val master : bool val barrier : unit -> unit (** Wait for all processes to reach this point. *) -val broadcast : 'a -> 'a +val broadcast : 'a lazy_t -> 'a (** Broadcasts data to all processes. *) val broadcast_int : int -> int diff --git a/Simulation.ml b/Simulation.ml index 3b7bc43..abe8ec5 100644 --- a/Simulation.ml +++ b/Simulation.ml @@ -13,6 +13,7 @@ let make ?cartesian:(cartesian=false) ~nuclei basis = + Printf.eprintf "Evaluating Simulation\n%!"; (* Tune Garbage Collector *) let gc = Gc.get () in @@ -47,5 +48,6 @@ let of_filenames ?cartesian:(cartesian=false) ?multiplicity:(multiplicity=1) ?ch let basis = Basis.of_nuclei_and_basis_filename ~nuclei basis_filename in - make ~cartesian ~charge ~multiplicity ~nuclei basis + lazy (make ~cartesian ~charge ~multiplicity ~nuclei basis) + |> Parallel.broadcast diff --git a/run_hartree_fock.ml b/run_hartree_fock.ml index fba70fe..ec8aee3 100644 --- a/run_hartree_fock.ml +++ b/run_hartree_fock.ml @@ -36,7 +36,7 @@ let run ~out = in let s = - Simulation.of_filenames ~charge ~multiplicity ~nuclei:nuclei_file basis_file + Simulation.of_filenames ~charge ~multiplicity ~nuclei:nuclei_file basis_file in HartreeFock.make s