Working on binary input

This commit is contained in:
Anthony Scemama 2022-01-11 13:41:13 +01:00
parent a0323922a8
commit dfbbf8b329
7 changed files with 467 additions and 273 deletions

View File

@ -57,24 +57,65 @@ let zero =
let to_bytes b = let to_bytes b =
(* [ Length of b (* [ Length of b
[ Length of value ; [ Length of value ;
Value ; Value ;
Length of weight ; Length of weight ;
Weight ; Weight ;
... ] ] *) ... ] ] *)
let l = let l =
[ Property.to_bytes b.property ; [ Property.to_bytes b.property ;
Sample.to_bytes b.value ; Sample.to_bytes b.value ;
Weight.to_bytes b.weight ; Weight.to_bytes b.weight ;
bytes_of_int b.pid ; bytes_of_int b.pid ;
Block_id.to_bytes b.block_id ; Block_id.to_bytes b.block_id ;
Compute_node.to_bytes b.compute_node ] Compute_node.to_bytes b.compute_node ]
|> List.map (fun x -> [ bytes_of_int (Bytes.length x) ; x ] ) |> List.map (fun x -> [ bytes_of_int (Bytes.length x) ; x ] )
|> List.concat |> List.concat
in in
let result = let result =
Bytes.concat Bytes.empty (zero :: l) Bytes.concat Bytes.empty (zero :: l)
in in
Bytes.set_int64_le result 8 (Int64.of_int (Bytes.length result)); Bytes.set_int64_le result 0 (Int64.of_int ((Bytes.length result) - 8));
result
let read_bytes b =
(* Reads m, the first 8 bytes as an int64 containing the number of bytes to read.
Then, read the next m bytes and return a tuple containing the decoded data and the rest.
*)
let l = Bytes.length b in
let m =
Bytes.get_int64_le b 0
|> Int64.to_int
in
let nl = l-m-8 in
if nl > 0 then
(Bytes.sub b 8 m, Some (Bytes.sub b (8+m) nl))
else
(Bytes.sub b 8 m, None)
let of_bytes b =
let b, _rest =
read_bytes b
in
let rec loop accu s =
match read_bytes s with
| data, None -> List.rev (data :: accu)
| data, (Some rest) -> loop (data :: accu) rest
in
let result =
match loop [] b with
| value :: weight :: property :: compute_node :: pid :: block_id :: [] ->
{ property = Property.of_bytes property;
value = Sample.of_bytes value;
weight = Weight.of_bytes weight;
pid = int_of_bytes pid;
block_id = Block_id.of_bytes block_id;
compute_node = Compute_node.of_bytes compute_node;
}
| _ -> assert false
in
result result
@ -87,11 +128,6 @@ let to_string b =
(string_of_int b.pid) (string_of_int b.pid)
(Block_id.to_int b.block_id) (Block_id.to_int b.block_id)
(*
let of_string s =
Bytes.of_string s
|> of_bytes
*)
let dir_name = lazy( let dir_name = lazy(
let ezfio_filename = let ezfio_filename =

View File

@ -17,9 +17,9 @@ let initialization_timeout = 600.
let bind_socket ~socket_type ~socket ~address = let bind_socket ~socket_type ~socket ~address =
try try
Zmq.Socket.bind socket address Zmq.Socket.bind socket address
with with
| Unix.Unix_error (_, message, f) -> | Unix.Unix_error (_, message, f) ->
failwith @@ Printf.sprintf failwith @@ Printf.sprintf
"\n%s\nUnable to bind the dataserver's %s socket :\n %s\n%s" "\n%s\nUnable to bind the dataserver's %s socket :\n %s\n%s"
f socket_type address message f socket_type address message
| other_exception -> raise other_exception | other_exception -> raise other_exception
@ -30,7 +30,7 @@ let run ?(daemon=true) ezfio_filename =
Qputils.set_ezfio_filename ezfio_filename; Qputils.set_ezfio_filename ezfio_filename;
(** Check if walkers need to be created. *) (** Check if walkers need to be created. *)
let () = let () =
if ( not(Ezfio.has_electrons_elec_coord_pool ()) ) then if ( not(Ezfio.has_electrons_elec_coord_pool ()) ) then
begin begin
Printf.printf "Generating initial walkers...\n%!"; Printf.printf "Generating initial walkers...\n%!";
@ -39,12 +39,12 @@ let run ?(daemon=true) ezfio_filename =
Unix.execvp Unix.execvp
(Lazy.force Qmcchem_config.qmc_create_walkers) (Lazy.force Qmcchem_config.qmc_create_walkers)
[|"qmc_create_walkers" ; ezfio_filename|] [|"qmc_create_walkers" ; ezfio_filename|]
| pid -> | pid ->
begin begin
ignore @@ Unix.waitpid [] pid; ignore @@ Unix.waitpid [] pid;
Printf.printf "Initial walkers ready\n%!" Printf.printf "Initial walkers ready\n%!"
end end
end end
in in
@ -74,7 +74,7 @@ let run ?(daemon=true) ezfio_filename =
in in
(** Status variable (mutable) *) (** Status variable (mutable) *)
let status = let status =
ref Status.Queued ref Status.Queued
in in
@ -91,7 +91,7 @@ let run ?(daemon=true) ezfio_filename =
let adress_prefix = let adress_prefix =
"tcp://*:" "tcp://*:"
in in
let result = let result =
List.fold_left (fun accu i -> List.fold_left (fun accu i ->
let address = let address =
adress_prefix ^ (string_of_int (n+i)) adress_prefix ^ (string_of_int (n+i))
@ -99,14 +99,14 @@ let run ?(daemon=true) ezfio_filename =
let socket = let socket =
Zmq.Socket.create zmq_context Zmq.Socket.rep Zmq.Socket.create zmq_context Zmq.Socket.rep
in in
let result = let result =
try try
Zmq.Socket.bind socket address; Zmq.Socket.bind socket address;
accu accu
with with
| _ -> false | _ -> false
in in
Zmq.Socket.close socket; Zmq.Socket.close socket;
result result
) true [0;1;2;3] ) true [0;1;2;3]
in in
@ -120,7 +120,7 @@ let run ?(daemon=true) ezfio_filename =
(** Random port number between 49152 and 65535 *) (** Random port number between 49152 and 65535 *)
let port = let port =
let newport = let newport =
ref ( 1024 + (Random.int (49151-1024))) ref ( 1024 + (Random.int (49151-1024)))
in in
while ((check_port !newport) = `Unavailable) while ((check_port !newport) = `Unavailable)
@ -131,7 +131,7 @@ let run ?(daemon=true) ezfio_filename =
in in
let debug_socket = let debug_socket =
Zmq.Socket.create zmq_context Zmq.Socket.xpub Zmq.Socket.create zmq_context Zmq.Socket.xpub
and address = and address =
Printf.sprintf "tcp://*:%d" (port+4) Printf.sprintf "tcp://*:%d" (port+4)
@ -140,17 +140,17 @@ let run ?(daemon=true) ezfio_filename =
Zmq.Socket.set_linger_period debug_socket 100 ; Zmq.Socket.set_linger_period debug_socket 100 ;
let close_debug_socket () = let close_debug_socket () =
Zmq.Socket.close debug_socket Zmq.Socket.close debug_socket
in in
(** Sends a log text to the debug socket *) (** Sends a log text to the debug socket *)
let send_log socket size t0 text = let send_log socket size t0 text =
let dt = let dt =
delta_t t0 delta_t t0
in in
let message = let message =
Printf.sprintf "%20s : %8d : %10s : %e" Printf.sprintf "%20s : %8d : %10s : %e"
socket size text dt socket size text dt
in in
Zmq.Socket.send debug_socket message Zmq.Socket.send debug_socket message
in in
@ -159,7 +159,7 @@ let run ?(daemon=true) ezfio_filename =
(** {2 Walkers} *) (** {2 Walkers} *)
(** Number of electrons *) (** Number of electrons *)
let elec_num = let elec_num =
Lazy.force Qputils.elec_num Lazy.force Qputils.elec_num
in in
@ -181,14 +181,14 @@ let run ?(daemon=true) ezfio_filename =
in in
(** Array of walkers. The size is [walk_num_tot]. *) (** Array of walkers. The size is [walk_num_tot]. *)
let walkers_array = let walkers_array =
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
let j = let j =
3*elec_num + 3 3*elec_num + 3
in in
let result = let result =
let size = let size =
Ezfio.get_electrons_elec_coord_pool_size () Ezfio.get_electrons_elec_coord_pool_size ()
and ez = and ez =
@ -204,7 +204,7 @@ let run ?(daemon=true) ezfio_filename =
failwith "Walkers file is broken." failwith "Walkers file is broken."
in in
String.concat " " [ "Read" ; string_of_int (Array.length result) ; String.concat " " [ "Read" ; string_of_int (Array.length result) ;
"walkers"] "walkers"]
|> send_log "status" 0 t0 ; |> send_log "status" 0 t0 ;
result result
in in
@ -217,7 +217,7 @@ let run ?(daemon=true) ezfio_filename =
(** Last time when the walkers were saved to disk. *) (** Last time when the walkers were saved to disk. *)
let last_save_walkers = let last_save_walkers =
ref 0. ref 0.
in in
@ -227,14 +227,12 @@ let run ?(daemon=true) ezfio_filename =
if (delta_t !last_save_walkers > 10. ) then if (delta_t !last_save_walkers > 10. ) then
begin begin
Ezfio.set_electrons_elec_coord_pool_size walk_num_tot ; Ezfio.set_electrons_elec_coord_pool_size walk_num_tot ;
let walkers_list = let walkers_list =
Array.map Array.to_list walkers_array Array.to_list walkers_array
|> Array.to_list |> Array.concat
|> List.concat |> Array.map float_of_string
|> List.rev_map float_of_string
|> List.rev
in in
Ezfio.set_electrons_elec_coord_pool (Ezfio.ezfio_array_of_list Ezfio.set_electrons_elec_coord_pool (Ezfio.ezfio_array_of_array
~rank:3 ~dim:[| elec_num+1 ; 3 ; walk_num_tot |] ~data:walkers_list); ~rank:3 ~dim:[| elec_num+1 ; 3 ; walk_num_tot |] ~data:walkers_list);
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
@ -263,7 +261,7 @@ let run ?(daemon=true) ezfio_filename =
(** {2 Set of workers} *) (** {2 Set of workers} *)
(** A hash table is kept to track the running workers. The keys are the (** A hash table is kept to track the running workers. The keys are the
built as string containing the couple ([Compute_node], [PID]), and built as string containing the couple ([Compute_node], [PID]), and
the values are the last communication time. the values are the last communication time.
*) *)
@ -271,16 +269,16 @@ let run ?(daemon=true) ezfio_filename =
(** The hash table for workers *) (** The hash table for workers *)
let workers_hash = let workers_hash =
Hashtbl.create 63 Hashtbl.create 63
in in
(** Creates a key using the couple ([Compute_node], [PID]) *) (** Creates a key using the couple ([Compute_node], [PID]) *)
let key compute_node pid = let key compute_node pid =
String.concat " " [ String.concat " " [
(Compute_node.to_string compute_node); (Compute_node.to_string compute_node);
(string_of_int pid) ] (string_of_int pid) ]
in in
@ -294,7 +292,7 @@ let run ?(daemon=true) ezfio_filename =
in in
match Hashtbl.find_opt workers_hash s with match Hashtbl.find_opt workers_hash s with
| Some _ -> failwith (s^" already registered") | Some _ -> failwith (s^" already registered")
| None -> Hashtbl.add workers_hash s (Unix.gettimeofday ()) | None -> Hashtbl.add workers_hash s (Unix.gettimeofday ())
in in
@ -307,14 +305,14 @@ let run ?(daemon=true) ezfio_filename =
in in
match Hashtbl.find_opt workers_hash s with match Hashtbl.find_opt workers_hash s with
| Some x -> Hashtbl.remove workers_hash s | Some x -> Hashtbl.remove workers_hash s
| None -> failwith (s^" not registered") | None -> failwith (s^" not registered")
in in
(** Sets the last access of the worker to [Unix.gettimeofday ()] *) (** Sets the last access of the worker to [Unix.gettimeofday ()] *)
let touch_worker w pid = let touch_worker w pid =
let s = let s =
key w pid key w pid
in in
Hashtbl.replace workers_hash s (Unix.gettimeofday ()) Hashtbl.replace workers_hash s (Unix.gettimeofday ())
@ -326,11 +324,11 @@ let run ?(daemon=true) ezfio_filename =
let delta = let delta =
initialization_timeout +. block_time *. 2. initialization_timeout +. block_time *. 2.
in in
Hashtbl.fold (fun k v accu -> Hashtbl.fold (fun k v accu ->
if (now -. v) <= delta then if (now -. v) <= delta then
v :: accu v :: accu
else else
accu ) hash [] accu ) hash []
|> List.length |> List.length
in in
@ -344,14 +342,14 @@ let run ?(daemon=true) ezfio_filename =
(** Name of the blocks file written by the current process. *) (** Name of the blocks file written by the current process. *)
let block_channel_filename = let block_channel_filename =
let dirname = let dirname =
Lazy.force Block.dir_name Lazy.force Block.dir_name
in in
let () = let () =
if not ( Sys.file_exists dirname ) then if not ( Sys.file_exists dirname ) then
Unix.mkdir dirname 0o755 Unix.mkdir dirname 0o755
in in
Filename.concat dirname ( Filename.concat dirname (
hostname ^ "." ^ (string_of_int dataserver_pid) hostname ^ "." ^ (string_of_int dataserver_pid)
) )
@ -387,7 +385,7 @@ let run ?(daemon=true) ezfio_filename =
the compute nodes. Happens when [max_file_size] is reached. the compute nodes. Happens when [max_file_size] is reached.
*) *)
let compress_block_file filename = let compress_block_file filename =
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
close_out !block_channel; close_out !block_channel;
@ -406,16 +404,16 @@ let run ?(daemon=true) ezfio_filename =
(** {3 Status thread} *) (** {3 Status thread} *)
let start_status_thread = let start_status_thread =
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
Thread.create (fun () -> Thread.create (fun () ->
send_log "status" 0 t0 "Starting status thread"; send_log "status" 0 t0 "Starting status thread";
let socket = let socket =
Zmq.Socket.create zmq_context Zmq.Socket.pub Zmq.Socket.create zmq_context Zmq.Socket.pub
and address = and address =
Printf.sprintf "tcp://*:%d" (port+1) Printf.sprintf "tcp://*:%d" (port+1)
in in
bind_socket "PUB" socket address; bind_socket "PUB" socket address;
@ -423,15 +421,15 @@ let run ?(daemon=true) ezfio_filename =
and delay_read = 2. and delay_read = 2.
in in
let start_time = let start_time =
Unix.gettimeofday () Unix.gettimeofday ()
and stop_time = and stop_time =
ref (Input.Stop_time.(read () |> to_float) ) ref (Input.Stop_time.(read () |> to_float) )
in in
let last_update = let last_update =
ref start_time ref start_time
in in
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
@ -439,46 +437,46 @@ let run ?(daemon=true) ezfio_filename =
let now = let now =
Unix.gettimeofday () Unix.gettimeofday ()
in in
let status_string = let status_string =
Status.to_string !status Status.to_string !status
in in
Zmq.Socket.send socket status_string; Zmq.Socket.send socket status_string;
send_log "status" (String.length status_string) now status_string; send_log "status" (String.length status_string) now status_string;
let test = let test =
if (now -. !last_update > delay_read) then if (now -. !last_update > delay_read) then
let n_connect = let n_connect =
n_connected workers_hash now n_connected workers_hash now
in in
`Update n_connect `Update n_connect
else if (now -. start_time > !stop_time) then else if (now -. start_time > !stop_time) then
`Terminate `Terminate
else if (now -. start_time > initialization_timeout) then else if (now -. start_time > initialization_timeout) then
`Timeout `Timeout
else else
`None `None
in in
match (daemon, !status, test) with match (daemon, !status, test) with
| (_ , _ , `None ) -> () | (_ , _ , `None ) -> ()
| (_ , Status.Running , `Terminate ) -> change_status Status.Stopping | (_ , Status.Running , `Terminate ) -> change_status Status.Stopping
| (false, Status.Running , `Update 0 ) -> change_status Status.Stopped | (false, Status.Running , `Update 0 ) -> change_status Status.Stopped
| (true , Status.Running , `Update 0 ) -> change_status Status.Queued | (true , Status.Running , `Update 0 ) -> change_status Status.Queued
| (_ , _ , `Update i ) -> | (_ , _ , `Update i ) ->
begin begin
status := Status.read (); status := Status.read ();
last_update := now; last_update := now;
stop_time := Input.Stop_time.(read () |> to_float) ; stop_time := Input.Stop_time.(read () |> to_float) ;
let n_tot = let n_tot =
Hashtbl.length workers_hash Hashtbl.length workers_hash
in in
if (i <> n_tot) then if (i <> n_tot) then
begin begin
Printf.sprintf "Connected workers : %d / %d" i n_tot Printf.sprintf "Connected workers : %d / %d" i n_tot
|> send_log "status" 0 now |> send_log "status" 0 now
end end
end end
| (false, Status.Queued , `Timeout ) -> change_status Status.Stopped | (false, Status.Queued , `Timeout ) -> change_status Status.Stopped
| (_, _, _) -> () | (_, _, _) -> ()
; ;
done; done;
@ -487,37 +485,37 @@ let run ?(daemon=true) ezfio_filename =
Zmq.Socket.close socket Zmq.Socket.close socket
) )
in in
(** {3 Log thread} *) (** {3 Log thread} *)
let start_log_thread = let start_log_thread =
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
Thread.create (fun () -> Thread.create (fun () ->
send_log "status" 0 t0 "Starting log thread"; send_log "status" 0 t0 "Starting log thread";
let socket = let socket =
Zmq.Socket.create zmq_context Zmq.Socket.xsub Zmq.Socket.create zmq_context Zmq.Socket.xsub
and address = and address =
Printf.sprintf "tcp://*:%d" (port+3) Printf.sprintf "tcp://*:%d" (port+3)
in in
bind_socket "XSUB" socket address; bind_socket "XSUB" socket address;
let pollitem = let pollitem =
Zmq.Poll.mask_of Zmq.Poll.mask_of
[| (socket , Zmq.Poll.In) ; [| (socket , Zmq.Poll.In) ;
(debug_socket , Zmq.Poll.In) (debug_socket , Zmq.Poll.In)
|] |]
in in
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
Zmq.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
if (polling.(0) = Some Zmq.Poll.In) then if (polling.(0) = Some Zmq.Poll.In) then
begin begin
let message = let message =
Zmq.Socket.recv_all ~block:false socket Zmq.Socket.recv_all ~block:false socket
|> String.concat " " |> String.concat " "
in in
@ -530,7 +528,7 @@ let run ?(daemon=true) ezfio_filename =
begin begin
(* Forward subscription from XPUB to XSUB *) (* Forward subscription from XPUB to XSUB *)
Zmq.Socket.recv_all ~block:false debug_socket Zmq.Socket.recv_all ~block:false debug_socket
|> Zmq.Socket.send_all socket |> Zmq.Socket.send_all socket
end end
done; done;
Zmq.Socket.set_linger_period socket 1000 ; Zmq.Socket.set_linger_period socket 1000 ;
@ -539,17 +537,17 @@ let run ?(daemon=true) ezfio_filename =
in in
(** {3 Main thread} *) (** {3 Main thread} *)
let random_walkers n_walks = let random_walkers n_walks =
let rec walkers accu = function let rec walkers accu = function
| 0 -> accu | 0 -> accu
| n -> | n ->
let random_int = let random_int =
Random.int (Strictly_positive_int.to_int n_walks) Random.int (Strictly_positive_int.to_int n_walks)
in in
let new_accu = let new_accu =
walkers_array.(random_int) :: accu walkers_array.(random_int) :: accu
in in
walkers new_accu (n-1) walkers new_accu (n-1)
in in
walkers [] (Strictly_positive_int.to_int n_walks) walkers [] (Strictly_positive_int.to_int n_walks)
|> Array.concat |> Array.concat
@ -560,21 +558,21 @@ let run ?(daemon=true) ezfio_filename =
let wall0 = let wall0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
let f () = let f () =
change_status Status.Queued; change_status Status.Queued;
send_log "status" 0 wall0 "Starting main thread"; send_log "status" 0 wall0 "Starting main thread";
(** Reply socket *) (** Reply socket *)
let rep_socket = let rep_socket =
Zmq.Socket.create zmq_context Zmq.Socket.rep Zmq.Socket.create zmq_context Zmq.Socket.rep
and address = and address =
Printf.sprintf "tcp://*:%d" port Printf.sprintf "tcp://*:%d" port
in in
bind_socket "REP" rep_socket address; bind_socket "REP" rep_socket address;
Zmq.Socket.set_receive_high_water_mark rep_socket 100_000; Zmq.Socket.set_receive_high_water_mark rep_socket 100_000;
Zmq.Socket.set_send_high_water_mark rep_socket 100_000; Zmq.Socket.set_send_high_water_mark rep_socket 100_000;
Zmq.Socket.set_immediate rep_socket true; Zmq.Socket.set_immediate rep_socket true;
Zmq.Socket.set_linger_period rep_socket 600_000 ; Zmq.Socket.set_linger_period rep_socket 600_000 ;
(** EZFIO Cache *) (** EZFIO Cache *)
@ -595,14 +593,14 @@ let run ?(daemon=true) ezfio_filename =
in in
List.iter (fun x -> List.iter (fun x ->
if handle_ezfio ("has_"^x) = "T" then if handle_ezfio ("has_"^x) = "T" then
try ignore @@ handle_ezfio ("get_"^x) try ignore @@ handle_ezfio ("get_"^x)
with Failure _ -> ()) with Failure _ -> ())
Qptypes.all_ezfio_messages; Qptypes.all_ezfio_messages;
(** Pull socket for computed data *) (** Pull socket for computed data *)
let pull_socket = let pull_socket =
Zmq.Socket.create zmq_context Zmq.Socket.pull Zmq.Socket.create zmq_context Zmq.Socket.pull
and address = and address =
Printf.sprintf "tcp://*:%d" (port+2) Printf.sprintf "tcp://*:%d" (port+2)
in in
bind_socket "PULL" pull_socket address; bind_socket "PULL" pull_socket address;
@ -611,7 +609,7 @@ let run ?(daemon=true) ezfio_filename =
(** Address of the dataserver *) (** Address of the dataserver *)
let server_address = let server_address =
let ip = let ip =
Lazy.force Qmcchem_config.ip_address Lazy.force Qmcchem_config.ip_address
in in
Printf.sprintf "tcp://%s:%d" ip port Printf.sprintf "tcp://%s:%d" ip port
in in
@ -621,7 +619,7 @@ let run ?(daemon=true) ezfio_filename =
(** Polling item to poll REP and PULL sockets. *) (** Polling item to poll REP and PULL sockets. *)
let pollitem = let pollitem =
Zmq.Poll.mask_of Zmq.Poll.mask_of
[| ( rep_socket, Zmq.Poll.In) ; [| ( rep_socket, Zmq.Poll.In) ;
( pull_socket, Zmq.Poll.In) ; ( pull_socket, Zmq.Poll.In) ;
|] |]
@ -629,27 +627,27 @@ let run ?(daemon=true) ezfio_filename =
(** Handles messages coming into the REP socket. *) (** Handles messages coming into the REP socket. *)
let handle_rep () = let handle_rep () =
let raw_msg = let raw_msg =
Zmq.Socket.recv_all ~block:false rep_socket Zmq.Socket.recv_all ~block:false rep_socket
in in
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
let msg = let msg =
List.rev_map String.trim raw_msg List.rev_map String.trim raw_msg
|> List.rev |> List.rev
|> Message.create |> Message.create
and msg_size = and msg_size =
List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg
in in
let handle = function let handle = function
| Message.Error _ -> () | Message.Error _ -> ()
| Message.Ezfio ezfio_msg -> | Message.Ezfio ezfio_msg ->
let result = let result =
handle_ezfio ezfio_msg handle_ezfio ezfio_msg
in in
Zmq.Socket.send_all rep_socket Zmq.Socket.send_all rep_socket
[ String.length result [ String.length result
|> Printf.sprintf "%d " ; |> Printf.sprintf "%d " ;
result ] ; result ] ;
@ -657,53 +655,53 @@ let run ?(daemon=true) ezfio_filename =
| Message.GetWalkers n_walks -> | Message.GetWalkers n_walks ->
begin begin
send_log "req" msg_size t0 "get_walkers"; send_log "req" msg_size t0 "get_walkers";
let result = let result =
random_walkers n_walks random_walkers n_walks
in in
Zmq.Socket.send_all rep_socket result; Zmq.Socket.send_all rep_socket result;
send_log "rep" walkers_size t0 "get_walkers" send_log "rep" walkers_size t0 "get_walkers"
end end
| Message.Register (w,pid) -> | Message.Register (w,pid) ->
begin begin
match !status with match !status with
| Status.Queued | Status.Queued
| Status.Running -> | Status.Running ->
begin begin
String.concat " " [ "Register :" ; String.concat " " [ "Register :" ;
Compute_node.to_string w ; Compute_node.to_string w ;
string_of_int pid ] string_of_int pid ]
|> send_log "req" msg_size t0; |> send_log "req" msg_size t0;
add_worker w pid; add_worker w pid;
if (!status = Status.Queued) then if (!status = Status.Queued) then
change_status Status.Running ; change_status Status.Running ;
Zmq.Socket.send rep_socket "OK"; Zmq.Socket.send rep_socket "OK";
send_log "rep" 2 t0 "Register : OK" send_log "rep" 2 t0 "Register : OK"
end end
| Status.Stopping | Status.Stopping
| Status.Stopped -> | Status.Stopped ->
Zmq.Socket.send rep_socket "Failed"; Zmq.Socket.send rep_socket "Failed";
end end
| Message.Unregister (w,pid) -> | Message.Unregister (w,pid) ->
begin begin
String.concat " " [ "Unregister :" ; String.concat " " [ "Unregister :" ;
(Compute_node.to_string w) ; (Compute_node.to_string w) ;
(string_of_int pid) ] (string_of_int pid) ]
|> send_log "req" msg_size t0; |> send_log "req" msg_size t0;
Zmq.Socket.send rep_socket "OK"; Zmq.Socket.send rep_socket "OK";
del_worker w pid; del_worker w pid;
String.concat " " [ "Unregister :"; String.concat " " [ "Unregister :";
(Hashtbl.length workers_hash) |> string_of_int ; (Hashtbl.length workers_hash) |> string_of_int ;
"remaining" ] "remaining" ]
|> send_log "rep" 2 t0 ; |> send_log "rep" 2 t0 ;
let n_connect = let n_connect =
n_connected workers_hash t0 n_connected workers_hash t0
in in
match (daemon,n_connect) with match (daemon,n_connect) with
| (false,0) -> change_status Status.Stopped | (false,0) -> change_status Status.Stopped
| (true ,0) -> change_status Status.Queued | (true ,0) -> change_status Status.Queued
| _ -> () | _ -> ()
end end
| Message.Test -> | Message.Test ->
begin begin
Zmq.Socket.send rep_socket "OK"; Zmq.Socket.send rep_socket "OK";
send_log "rep" 2 t0 "Test" send_log "rep" 2 t0 "Test"
@ -719,17 +717,17 @@ let run ?(daemon=true) ezfio_filename =
let raw_msg = let raw_msg =
Zmq.Socket.recv_all ~block:false pull_socket Zmq.Socket.recv_all ~block:false pull_socket
in in
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
let msg = let msg =
List.rev_map String.trim raw_msg List.rev_map String.trim raw_msg
|> List.rev |> List.rev
|> Message.create |> Message.create
and msg_size = and msg_size =
List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg
in in
let recv_log = let recv_log =
send_log "pull" msg_size t0 send_log "pull" msg_size t0
in in
@ -739,7 +737,7 @@ let run ?(daemon=true) ezfio_filename =
begin begin
if (status = Status.Running) then if (status = Status.Running) then
touch_worker h pid ; touch_worker h pid ;
let log_msg = let log_msg =
Printf.sprintf "Walkers from %s : %d / %d / %d" Printf.sprintf "Walkers from %s : %d / %d / %d"
(key h pid) (Array.length w) (!last_walker) walk_num_tot (key h pid) (Array.length w) (!last_walker) walk_num_tot
in in
@ -754,10 +752,10 @@ let run ?(daemon=true) ezfio_filename =
(Unix.gettimeofday () -. wall0) (Unix.gettimeofday () -. wall0)
1. (Property.to_string Property.Wall) 1. (Property.to_string Property.Wall)
hostname (string_of_int dataserver_pid) 1 hostname (string_of_int dataserver_pid) 1
|> Block.of_string |> Block.of_string
in in
match wall with match wall with
| Some wall -> | Some wall ->
begin begin
output_string !block_channel (Block.to_string wall); output_string !block_channel (Block.to_string wall);
output_char !block_channel '\n'; output_char !block_channel '\n';
@ -777,7 +775,7 @@ let run ?(daemon=true) ezfio_filename =
| Message.Ezfio _ | Message.Ezfio _
| Message.Register (_, _) | Message.Register (_, _)
| Message.Unregister (_, _) | Message.Unregister (_, _)
-> failwith "Bad message" -> failwith "Bad message"
in handle msg in handle msg
in in
@ -785,18 +783,18 @@ let run ?(daemon=true) ezfio_filename =
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
Zmq.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
match polling.(1) with match polling.(1) with
| Some Zmq.Poll.In -> handle_pull !status | Some Zmq.Poll.In -> handle_pull !status
| _ -> | _ ->
begin begin
match polling.(0) with match polling.(0) with
| Some Zmq.Poll.In -> handle_rep () | Some Zmq.Poll.In -> handle_rep ()
| _ -> | _ ->
begin begin
flush !block_channel ; flush !block_channel ;
let file_size = let file_size =
(Unix.stat block_channel_filename_locked).Unix.st_size (Unix.stat block_channel_filename_locked).Unix.st_size
in in
if (file_size > !max_file_size) then if (file_size > !max_file_size) then
@ -811,13 +809,13 @@ let run ?(daemon=true) ezfio_filename =
List.iter (fun socket -> List.iter (fun socket ->
Zmq.Socket.set_linger_period socket 1000 ; Zmq.Socket.set_linger_period socket 1000 ;
Zmq.Socket.close socket) Zmq.Socket.close socket)
[ rep_socket ; pull_socket ] [ rep_socket ; pull_socket ]
in in
Thread.create f Thread.create f
in in
(** {2 Finalization} *) (** {2 Finalization} *)
(** Cleans all the open files, sockets, etc. (** Cleans all the open files, sockets, etc.
@ -842,16 +840,16 @@ let run ?(daemon=true) ezfio_filename =
(** {3 Main function} *) (** {3 Main function} *)
let t0 = let t0 =
Unix.gettimeofday () Unix.gettimeofday ()
in in
(* Handle signals *) (* Handle signals *)
let handler s = let handler s =
Printf.printf "Dataserver received signal %d... killing\n%!" s; Printf.printf "Dataserver received signal %d... killing\n%!" s;
Watchdog.kill (); Watchdog.kill ();
in in
List.iter (fun s -> ignore @@ Sys.signal s (Sys.Signal_handle handler)) List.iter (fun s -> ignore @@ Sys.signal s (Sys.Signal_handle handler))
[ [
Sys.sigint ; Sys.sigint ;
Sys.sigterm ; Sys.sigterm ;
@ -864,7 +862,7 @@ let run ?(daemon=true) ezfio_filename =
begin begin
try try
(List.iter Thread.join (List.iter Thread.join
[ start_status_thread () ; [ start_status_thread () ;
start_log_thread () ; start_log_thread () ;
start_main_thread () ; start_main_thread () ;

View File

@ -1,13 +1,13 @@
open Qptypes open Qptypes
type t = type t =
{ property : Property.t ; { property : Property.t ;
data : Block.t list; data : Block.t list;
} }
module Average = struct module Average = struct
include Sample include Sample
end end
module Error = struct module Error = struct
@ -19,42 +19,42 @@ module Variance = struct
end end
module Skewness: sig module Skewness: sig
type t type t
val to_float : t -> float val to_float : t -> float
val of_float : float -> t val of_float : float -> t
val to_string : t -> string val to_string : t -> string
end = struct end = struct
type t = float type t = float
let to_string = string_of_float let to_string = string_of_float
let to_float x = x let to_float x = x
let of_float x = x let of_float x = x
end end
module Kurtosis: sig module Kurtosis: sig
type t type t
val to_float : t -> float val to_float : t -> float
val of_float : float -> t val of_float : float -> t
val to_string : t -> string val to_string : t -> string
end = struct end = struct
type t = float type t = float
let to_string = string_of_float let to_string = string_of_float
let to_float x = x let to_float x = x
let of_float x = x let of_float x = x
end end
module GaussianDist: sig module GaussianDist: sig
type t type t
val create : mu:Average.t -> sigma2:Variance.t -> t val create : mu:Average.t -> sigma2:Variance.t -> t
val eval : g:t -> x:float -> float val eval : g:t -> x:float -> float
end = struct end = struct
type t = { mu: Average.t ; sigma2: Variance.t } type t = { mu: Average.t ; sigma2: Variance.t }
let create ~mu ~sigma2 = let create ~mu ~sigma2 =
{ mu ; sigma2 } { mu ; sigma2 }
let eval ~g ~x = let eval ~g ~x =
let { mu ; sigma2 } = let { mu ; sigma2 } =
g g
in in
let mu = let mu =
Average.to_float mu Average.to_float mu
and sigma2 = and sigma2 =
Variance.to_float sigma2 Variance.to_float sigma2
@ -62,10 +62,10 @@ end = struct
let x2 = let x2 =
(x -. mu) *. ( x -. mu) /. sigma2 (x -. mu) *. ( x -. mu) /. sigma2
in in
let pi = let pi =
acos (-1.) acos (-1.)
in in
let c = let c =
1. /. (sqrt (sigma2 *. (pi +. pi))) 1. /. (sqrt (sigma2 *. (pi +. pi)))
in in
c *. exp ( -0.5 *. x2) c *. exp ( -0.5 *. x2)
@ -78,7 +78,7 @@ let hashtbl_to_alist table =
let hashtbl_change table key f = let hashtbl_change table key f =
let elt = let elt =
try try
Some (Hashtbl.find table key) Some (Hashtbl.find table key)
with with
| Not_found -> None | Not_found -> None
@ -91,7 +91,7 @@ let hashtbl_change table key f =
(** Build from raw data. Range values are given in percent. *) (** Build from raw data. Range values are given in percent. *)
let of_raw_data ?(locked=true) ~range property = let of_raw_data ?(locked=true) ~range property =
let data = let data =
Block.raw_data ~locked () Block.raw_data ~locked ()
|> List.filter (fun x -> x.Block.property = property) |> List.filter (fun x -> x.Block.property = property)
|> List.sort (fun x y -> |> List.sort (fun x y ->
@ -109,7 +109,7 @@ let of_raw_data ?(locked=true) ~range property =
(Weight.to_float x.Block.weight) +. accu (Weight.to_float x.Block.weight) +. accu
) 0. data ) 0. data
in in
let wmin, wmax = let wmin, wmax =
rmin *. total_weight *. 0.01, rmin *. total_weight *. 0.01,
rmax *. total_weight *. 0.01 rmax *. total_weight *. 0.01
@ -128,13 +128,13 @@ let of_raw_data ?(locked=true) ~range property =
(wsum_new, x::l) (wsum_new, x::l)
else else
(wsum_new, l) (wsum_new, l)
end end
) (0.,[]) data ) (0.,[]) data
in in
List.rev new_data List.rev new_data
in in
let result = let result =
match range with match range with
| (0.,100.) -> { property ; data } | (0.,100.) -> { property ; data }
| (rmin,rmax) -> { property ; data=data_in_range rmin rmax } | (rmin,rmax) -> { property ; data=data_in_range rmin rmax }
@ -146,7 +146,7 @@ let of_raw_data ?(locked=true) ~range property =
(** Compute average *) (** Compute average *)
let average { property ; data } = let average { property ; data } =
if Property.is_scalar property then if Property.is_scalar property then
let (num,denom) = let (num,denom) =
List.fold_left (fun (an, ad) x -> List.fold_left (fun (an, ad) x ->
let num = let num =
(Weight.to_float x.Block.weight) *. (Sample.to_float x.Block.value) (Weight.to_float x.Block.weight) *. (Sample.to_float x.Block.value)
@ -154,7 +154,7 @@ let average { property ; data } =
(Weight.to_float x.Block.weight) (Weight.to_float x.Block.weight)
in (an +. num, ad +. den) in (an +. num, ad +. den)
) (0., 0.) data ) (0., 0.) data
in in
num /. denom num /. denom
|> Average.of_float |> Average.of_float
else else
@ -163,15 +163,15 @@ let average { property ; data } =
| [] -> 1 | [] -> 1
| x :: tl -> Sample.dimension x.Block.value | x :: tl -> Sample.dimension x.Block.value
in in
let (num,denom) = let (num,denom) =
List.fold_left (fun (an, ad) x -> List.fold_left (fun (an, ad) x ->
let num = let num =
Array.map (fun y -> (Weight.to_float x.Block.weight) *. y) Array.map (fun y -> (Weight.to_float x.Block.weight) *. y)
(Sample.to_float_array x.Block.value) (Sample.to_float_array x.Block.value)
and den = (Weight.to_float x.Block.weight) and den = (Weight.to_float x.Block.weight)
in ( Array.mapi (fun i y -> y +. num.(i)) an , ad +. den) in ( Array.mapi (fun i y -> y +. num.(i)) an , ad +. den)
) (Array.make dim 0. , 0.) data ) (Array.make dim 0. , 0.) data
in in
let denom_inv = let denom_inv =
1. /. denom 1. /. denom
in in
@ -180,22 +180,28 @@ let average { property ; data } =
(** Compute sum (for CPU/Wall time) *) (** Compute sum (for CPU/Wall time) *)
let sum { property ; data } = let sum { property ; data } =
List.fold_left (fun accu x -> List.fold_left (fun accu x ->
let num = (Weight.to_float x.Block.weight) *. (Sample.to_float x.Block.value) let num = (Weight.to_float x.Block.weight) *. (Sample.to_float x.Block.value)
in accu +. num in accu +. num
) 0. data ) 0. data
(** Calculation of the average and error bar *) (** Calculation of the average and error bar *)
let ave_error { property ; data } = let ave_error { property ; data } =
let rec loop ~sum ~avsq ~ansum ~avsum ~n ?idx = function (* sum: \sum_k x_k *. w_k
| [] -> ansum: \sum_k w_k
avsum: \sum_k x_k *. w_k
avcu0: avsum / ansum
avsq: \sum_k (1. -. (w_k /. ansum_k)) *. (x_k -. avcu0)^2 *. w_k)
*)
let rec loop ~sum ~avsq ~ansum ~avsum ~n ?idx = function
| [] ->
begin begin
if (n > 0.) then if (n > 0.) then
( Average.of_float (sum /. ansum), ( Average.of_float (sum /. ansum),
@ -205,12 +211,8 @@ let ave_error { property ; data } =
end end
| (x,w) :: tail -> | (x,w) :: tail ->
begin begin
let avcu0 = let avcu0 = avsum /. ansum in
avsum /. ansum let xw = x *. w in
in
let xw =
x *. w
in
let ansum, avsum, sum = let ansum, avsum, sum =
ansum +. w , ansum +. w ,
avsum +. xw , avsum +. xw ,
@ -220,9 +222,9 @@ let ave_error { property ; data } =
~sum:sum ~sum:sum
~avsq:(avsq +. (1. -. (w /. ansum)) *. (x -. avcu0) ~avsq:(avsq +. (1. -. (w /. ansum)) *. (x -. avcu0)
*. (x -. avcu0) *. w) *. (x -. avcu0) *. w)
~avsum:avsum ~avsum:avsum
~ansum:ansum ~ansum:ansum
~n:(n +. 1.) ~n:(n +. 1.)
end end
in in
@ -242,7 +244,7 @@ let ave_error { property ; data } =
(Sample.to_float x.Block.value, (Sample.to_float x.Block.value,
Weight.to_float x.Block.weight) Weight.to_float x.Block.weight)
) data ) data
|> ave_error_scalar |> ave_error_scalar
else else
match data with match data with
| [] -> (Average.of_float 0., None) | [] -> (Average.of_float 0., None)
@ -251,7 +253,7 @@ let ave_error { property ; data } =
head.Block.value head.Block.value
|> Sample.dimension |> Sample.dimension
in in
let result = let result =
Array.init dim (fun idx -> Array.init dim (fun idx ->
List.rev_map (fun x -> List.rev_map (fun x ->
(Sample.to_float ~idx x.Block.value, (Sample.to_float ~idx x.Block.value,
@ -260,16 +262,16 @@ let ave_error { property ; data } =
|> ave_error_scalar |> ave_error_scalar
) )
in in
( Array.map (fun (x,_) -> Average.to_float x) result ( Array.map (fun (x,_) -> Average.to_float x) result
|> Average.of_float_array ~dim , |> Average.of_float_array ~dim ,
if (Array.length result < 2) then if (Array.length result < 2) then
None None
else else
Some (Array.map (function Some (Array.map (function
| (_,Some y) -> Error.to_float y | (_,Some y) -> Error.to_float y
| (_,None) -> 0.) result | (_,None) -> 0.) result
|> Average.of_float_array ~dim) |> Average.of_float_array ~dim)
) )
@ -286,14 +288,14 @@ let fold_blocks ~f { property ; data } =
List.fold_left (fun accu block -> List.fold_left (fun accu block ->
let x = Sample.to_float block.Block.value let x = Sample.to_float block.Block.value
in f accu x in f accu x
) init data ) init data
(** Convergence plot *) (** Convergence plot *)
let convergence { property ; data } = let convergence { property ; data } =
let rec loop ~sum ~avsq ~ansum ~avsum ~n ~accu = function let rec loop ~sum ~avsq ~ansum ~avsum ~n ~accu = function
| [] -> List.rev accu | [] -> List.rev accu
| head :: tail -> | head :: tail ->
begin begin
@ -307,7 +309,7 @@ let convergence { property ; data } =
and avsum = avsum +. xw and avsum = avsum +. xw
and sum = sum +. xw and sum = sum +. xw
in in
let accu = let accu =
if (n > 0.) then if (n > 0.) then
(sum /. ansum, sqrt ( abs_float ( avsq /.( ansum *. n))))::accu (sum /. ansum, sqrt ( abs_float ( avsq /.( ansum *. n))))::accu
else else
@ -317,9 +319,9 @@ let convergence { property ; data } =
~sum:sum ~sum:sum
~avsq:(avsq +. (1. -. (w /. ansum)) *. (x -. avcu0) ~avsq:(avsq +. (1. -. (w /. ansum)) *. (x -. avcu0)
*. (x -. avcu0) *. w) *. (x -. avcu0) *. w)
~avsum:avsum ~avsum:avsum
~ansum:ansum ~ansum:ansum
~n:(n +. 1.) ~n:(n +. 1.)
~accu:accu ~accu:accu
end end
in in
@ -365,7 +367,7 @@ let max_block =
(** Create a hash table for merging *) (** Create a hash table for merging *)
let create_hash ~create_key ?(update_block_id=(fun x->x)) let create_hash ~create_key ?(update_block_id=(fun x->x))
?(update_value=(fun wc vc wb vb sw -> (wc *. vc +. wb *. vb) /. sw) ) ?(update_value=(fun wc vc wb vb sw -> (wc *. vc +. wb *. vb) /. sw) )
?(update_weight=(fun wc wb -> wc +. wb) ) t = ?(update_weight=(fun wc wb -> wc +. wb) ) t =
let table = Hashtbl.create 63 let table = Hashtbl.create 63
@ -374,7 +376,7 @@ let create_hash ~create_key ?(update_block_id=(fun x->x))
let key = create_key block let key = create_key block
in in
let open Block in let open Block in
hashtbl_change table key (function hashtbl_change table key (function
| Some current -> | Some current ->
let wc, wb = let wc, wb =
Weight.to_float current.weight, Weight.to_float current.weight,
@ -384,7 +386,7 @@ let create_hash ~create_key ?(update_block_id=(fun x->x))
update_weight wc wb update_weight wc wb
in in
if (Property.is_scalar current.property) then if (Property.is_scalar current.property) then
let vc, vb = let vc, vb =
Sample.to_float current.value, Sample.to_float current.value,
Sample.to_float block.value Sample.to_float block.value
in Some in Some
@ -396,15 +398,15 @@ let create_hash ~create_key ?(update_block_id=(fun x->x))
compute_node = block.compute_node; compute_node = block.compute_node;
} }
else else
let vc, vb = let vc, vb =
Sample.to_float_array current.value, Sample.to_float_array current.value,
Sample.to_float_array block.value Sample.to_float_array block.value
and dim = and dim =
Sample.dimension current.value Sample.dimension current.value
in Some in Some
{ property = current.property ; { property = current.property ;
weight = Weight.of_float sw ; weight = Weight.of_float sw ;
value = value =
Array.init dim (fun i -> update_value wc vc.(i) wb vb.(i) sw) Array.init dim (fun i -> update_value wc vc.(i) wb vb.(i) sw)
|> Sample.of_float_array ~dim ; |> Sample.of_float_array ~dim ;
block_id = update_block_id block.block_id; block_id = update_block_id block.block_id;
@ -443,15 +445,15 @@ let merge ~create_key ?update_block_id ?update_value ?update_weight t =
(** Merge per block id *) (** Merge per block id *)
let merge_per_block_id = let merge_per_block_id =
merge merge
~create_key:(fun block -> Block_id.to_string block.Block.block_id) ~create_key:(fun block -> Block_id.to_string block.Block.block_id)
(** Merge per compute_node *) (** Merge per compute_node *)
let merge_per_compute_node = let merge_per_compute_node =
merge merge
~create_key:(fun block -> ~create_key:(fun block ->
Printf.sprintf "%s" Printf.sprintf "%s"
(Compute_node.to_string block.Block.compute_node) ) (Compute_node.to_string block.Block.compute_node) )
@ -459,8 +461,8 @@ let merge_per_compute_node =
(** Merge per Compute_node and PID *) (** Merge per Compute_node and PID *)
let merge_per_compute_node_and_pid = let merge_per_compute_node_and_pid =
merge merge
~create_key:(fun block -> ~create_key:(fun block ->
Printf.sprintf "%s %10.10d" Printf.sprintf "%s %10.10d"
(Compute_node.to_string block.Block.compute_node) (Compute_node.to_string block.Block.compute_node)
(block.Block.pid) ) (block.Block.pid) )
@ -469,8 +471,8 @@ let merge_per_compute_node_and_pid =
(** Merge per Compute_node and BlockId *) (** Merge per Compute_node and BlockId *)
let merge_per_compute_node_and_block_id = let merge_per_compute_node_and_block_id =
merge merge
~create_key:(fun block -> ~create_key:(fun block ->
Printf.sprintf "%s %10.10d" Printf.sprintf "%s %10.10d"
(Compute_node.to_string block.Block.compute_node) (Compute_node.to_string block.Block.compute_node)
(Block_id.to_int block.Block.block_id) ) (Block_id.to_int block.Block.block_id) )
@ -510,48 +512,48 @@ let error_x_over_y = function
(** Create float, variable operators *) (** Create float, variable operators *)
let one_variable_operator ~update_value p f = let one_variable_operator ~update_value p f =
{ p with { p with
data = List.rev @@ List.rev_map (fun b -> { b with data = List.rev @@ List.rev_map (fun b -> { b with
Block.value = Sample.of_float (update_value (Sample.to_float b.Block.value) ) } Block.value = Sample.of_float (update_value (Sample.to_float b.Block.value) ) }
) p.data } ) p.data }
let ( +@ ) p f = one_variable_operator p f let ( +@ ) p f = one_variable_operator p f
~update_value: (fun x -> x +. f ) ~update_value: (fun x -> x +. f )
let ( *@ ) p f = one_variable_operator p f let ( *@ ) p f = one_variable_operator p f
~update_value: (fun x -> x *. f ) ~update_value: (fun x -> x *. f )
let ( -@ ) p f = one_variable_operator p f let ( -@ ) p f = one_variable_operator p f
~update_value: (fun x -> x -. f ) ~update_value: (fun x -> x -. f )
let ( /@ ) p f = one_variable_operator p f let ( /@ ) p f = one_variable_operator p f
~update_value: (fun x -> x /. f ) ~update_value: (fun x -> x /. f )
(** Create two variable operators *) (** Create two variable operators *)
let two_variable_operator ~update_value p1 p2 = let two_variable_operator ~update_value p1 p2 =
merge merge
~update_value ~update_value
~create_key:(fun block -> ~create_key:(fun block ->
Printf.sprintf "%s %10.10d %10.10d" Printf.sprintf "%s %10.10d %10.10d"
(Compute_node.to_string block.Block.compute_node) (Compute_node.to_string block.Block.compute_node)
(Block_id.to_int block.Block.block_id) (Block_id.to_int block.Block.block_id)
(block.Block.pid) ) (block.Block.pid) )
~update_weight:(fun wc wb -> wc ) ~update_weight:(fun wc wb -> wc )
{ property = p1.property ; { property = p1.property ;
data = List.concat [ p1.data ; p2.data ] } data = List.concat [ p1.data ; p2.data ] }
let ( +! ) = two_variable_operator let ( +! ) = two_variable_operator
~update_value: (fun wc vc wb vb sw -> (vc +. vb) ) ~update_value: (fun wc vc wb vb sw -> (vc +. vb) )
let ( *! ) = two_variable_operator let ( *! ) = two_variable_operator
~update_value: (fun wc vc wb vb sw -> (vc *. vb) ) ~update_value: (fun wc vc wb vb sw -> (vc *. vb) )
let ( -! ) = two_variable_operator let ( -! ) = two_variable_operator
~update_value: (fun wc vc wb vb sw -> (vc -. vb) ) ~update_value: (fun wc vc wb vb sw -> (vc -. vb) )
let ( /! ) = two_variable_operator let ( /! ) = two_variable_operator
~update_value: (fun wc vc wb vb sw -> (vc /. vb) ) ~update_value: (fun wc vc wb vb sw -> (vc /. vb) )
@ -560,11 +562,11 @@ let ( /! ) = two_variable_operator
(** Merge two consecutive blocks *) (** Merge two consecutive blocks *)
let compress = let compress =
merge merge
~create_key:(fun block -> ~create_key:(fun block ->
Printf.sprintf "%s %10.10d %10.10d" Printf.sprintf "%s %10.10d %10.10d"
(Compute_node.to_string block.Block.compute_node) block.Block.pid (Compute_node.to_string block.Block.compute_node) block.Block.pid
(((Block_id.to_int block.Block.block_id)+1)/2)) (((Block_id.to_int block.Block.block_id)+1)/2))
~update_block_id:(fun block_id -> ~update_block_id:(fun block_id ->
((Block_id.to_int block_id)+1)/2 ((Block_id.to_int block_id)+1)/2
|> Block_id.of_int ) |> Block_id.of_int )
@ -576,15 +578,15 @@ let max_value_per_compute_node t =
let table = Hashtbl.create 63 let table = Hashtbl.create 63
in in
let create_key block = let create_key block =
Printf.sprintf "%s %10.10d" Printf.sprintf "%s %10.10d"
(Compute_node.to_string block.Block.compute_node) (Compute_node.to_string block.Block.compute_node)
(block.Block.pid) (block.Block.pid)
in in
List.iter (fun block -> List.iter (fun block ->
let key = create_key block let key = create_key block
in in
let open Block in let open Block in
hashtbl_change table key (function hashtbl_change table key (function
| Some current -> | Some current ->
let vc = Sample.to_float current.value let vc = Sample.to_float current.value
and vb = Sample.to_float block.value and vb = Sample.to_float block.value
@ -610,36 +612,36 @@ let max_value_per_compute_node t =
(** String representation *) (** String representation *)
let to_string p = let to_string p =
match p.property with match p.property with
| Property.Cpu -> Printf.sprintf "%s" (Time.string_of_sec (sum p)) | Property.Cpu -> Printf.sprintf "%s" (Time.string_of_sec (sum p))
| Property.Wall -> Printf.sprintf "%s" (Time.string_of_sec (sum (max_value_per_compute_node p))) | Property.Wall -> Printf.sprintf "%s" (Time.string_of_sec (sum (max_value_per_compute_node p)))
| Property.Accep -> Printf.sprintf "%16.10f" (average p |> Average.to_float) | Property.Accep -> Printf.sprintf "%16.10f" (average p |> Average.to_float)
| _ -> | _ ->
begin begin
if Property.is_scalar p.property then if Property.is_scalar p.property then
match ave_error p with match ave_error p with
| (ave, Some error) -> | (ave, Some error) ->
let (ave, error) = let (ave, error) =
Average.to_float ave, Average.to_float ave,
Error.to_float error Error.to_float error
in in
Printf.sprintf "%16.10f +/- %16.10f" ave error Printf.sprintf "%16.10f +/- %16.10f" ave error
| (ave, None) -> | (ave, None) ->
let ave = let ave =
Average.to_float ave Average.to_float ave
in in
Printf.sprintf "%16.10f" ave Printf.sprintf "%16.10f" ave
else else
match ave_error p with match ave_error p with
| (ave, Some error) -> | (ave, Some error) ->
let idxmax = let idxmax =
Average.dimension ave Average.dimension ave
in in
let rec f accu idx = let rec f accu idx =
if (idx < idxmax) then if (idx < idxmax) then
let (ave, error) = let (ave, error) =
Average.to_float ~idx ave, Average.to_float ~idx ave,
Error.to_float ~idx error Error.to_float ~idx error
in in
let s = let s =
@ -650,9 +652,9 @@ let to_string p =
accu accu
in in
(f "[ \n" 0) ^ " ]" (f "[ \n" 0) ^ " ]"
| (ave, None) -> | (ave, None) ->
Average.to_float ave Average.to_float ave
|> Printf.sprintf "%16.10f" |> Printf.sprintf "%16.10f"
end end
@ -666,19 +668,19 @@ let compress_files () =
let properties = let properties =
Lazy.force Block.properties Lazy.force Block.properties
in in
(* Create temporary file *) (* Create temporary file *)
let dir_name = let dir_name =
Block.dir_name Block.dir_name
in in
let dir_name = let dir_name =
Lazy.force dir_name Lazy.force dir_name
in in
let files = let files =
Sys.readdir dir_name Sys.readdir dir_name
|> Array.to_list |> Array.to_list
|> List.filter (fun x -> |> List.filter (fun x ->
try try
Str.search_backward (Str.regexp "locked") x (String.length x) >= 0 Str.search_backward (Str.regexp "locked") x (String.length x) >= 0
with with
@ -688,10 +690,10 @@ let compress_files () =
|> List.rev |> List.rev
in in
let out_channel_dir = let out_channel_dir =
let rand_num = Random.int 1000000 |> string_of_int in let rand_num = Random.int 1000000 |> string_of_int in
let dirname = let dirname =
Filename.concat !Ezfio.ezfio_filename "blocks" Filename.concat !Ezfio.ezfio_filename "blocks"
in in
if not ( Sys.file_exists dirname ) then if not ( Sys.file_exists dirname ) then
Unix.mkdir dirname 0o755; Unix.mkdir dirname 0o755;
@ -706,31 +708,31 @@ let compress_files () =
raise (Sys_error message) raise (Sys_error message)
in in
let out_channel_name = let out_channel_name =
let hostname = let hostname =
Lazy.force Qmcchem_config.hostname Lazy.force Qmcchem_config.hostname
and suffix = and suffix =
Unix.getpid () Unix.getpid ()
|> string_of_int |> string_of_int
in in
String.concat "." [ hostname ; suffix ] String.concat "." [ hostname ; suffix ]
in in
let block_channel = let block_channel =
Filename.concat out_channel_dir out_channel_name Filename.concat out_channel_dir out_channel_name
|> open_out |> open_out
in in
List.iter (fun p -> List.iter (fun p ->
let l = let l =
match p with match p with
| Property.Cpu | Property.Cpu
| Property.Accep -> | Property.Accep ->
of_raw_data ~locked:false ~range:(0.,100.) p of_raw_data ~locked:false ~range:(0.,100.) p
|> merge_per_compute_node |> merge_per_compute_node
| Property.Wall -> | Property.Wall ->
of_raw_data ~locked:false ~range:(0.,100.) p of_raw_data ~locked:false ~range:(0.,100.) p
|> max_value_per_compute_node |> max_value_per_compute_node
| _ -> | _ ->
of_raw_data ~locked:false ~range:(0.,100.) p of_raw_data ~locked:false ~range:(0.,100.) p
(* (*
@ -740,8 +742,8 @@ let compress_files () =
List.iter (fun x -> List.iter (fun x ->
output_string block_channel (Block.to_string x); output_string block_channel (Block.to_string x);
output_char block_channel '\n'; output_char block_channel '\n';
) l.data ) l.data
) properties ; ) properties ;
close_out block_channel; close_out block_channel;
List.iter Unix.unlink files ; List.iter Unix.unlink files ;
@ -760,17 +762,17 @@ let autocovariance { property ; data } =
match (merge_per_block_id { property ; data }) match (merge_per_block_id { property ; data })
with { property ; data } -> Array.of_list data with { property ; data } -> Array.of_list data
in in
let x_t = let x_t =
Array.map (fun x -> (Sample.to_float x.Block.value) -. ave) data Array.map (fun x -> (Sample.to_float x.Block.value) -. ave) data
in in
let f i = let f i =
let denom = let denom =
if (i > 1) then (float_of_int i) else 1. if (i > 1) then (float_of_int i) else 1.
in in
let r = let r =
Array.sub x_t 0 i Array.sub x_t 0 i
|> Array.fold_left (fun accu x -> |> Array.fold_left (fun accu x ->
accu +. x *. x_t.(i)) 0. accu +. x *. x_t.(i)) 0.
in in
r /. denom r /. denom
in in
@ -786,14 +788,14 @@ let centered_cumulants { property ; data } =
|> Average.to_float |> Average.to_float
in in
let centered_data = let centered_data =
List.rev_map (fun x -> List.rev_map (fun x ->
( (Weight.to_float x.Block.weight), ( (Weight.to_float x.Block.weight),
(Sample.to_float x.Block.value) -. ave ) (Sample.to_float x.Block.value) -. ave )
) data ) data
|> List.rev |> List.rev
in in
let var = let var =
let (num, denom) = let (num, denom) =
List.fold_left (fun (a2, ad) (w,x) -> List.fold_left (fun (a2, ad) (w,x) ->
let x2 = x *. x let x2 = x *. x
in in
@ -802,18 +804,18 @@ let centered_cumulants { property ; data } =
in (a2 +. var, ad +. den) in (a2 +. var, ad +. den)
) (0., 0.) centered_data ) (0., 0.) centered_data
in num /. denom in num /. denom
in in
let centered_data = let centered_data =
let sigma_inv = let sigma_inv =
1. /. (sqrt var) 1. /. (sqrt var)
in in
List.rev_map (fun x -> List.rev_map (fun x ->
( (Weight.to_float x.Block.weight), ( (Weight.to_float x.Block.weight),
( (Sample.to_float x.Block.value) -. ave ) *. sigma_inv ) ( (Sample.to_float x.Block.value) -. ave ) *. sigma_inv )
) data ) data
|> List.rev |> List.rev
in in
let (cum3,cum4) = let (cum3,cum4) =
let (cum3, cum4, denom) = let (cum3, cum4, denom) =
List.fold_left (fun (a3, a4, ad) (w,x) -> List.fold_left (fun (a3, a4, ad) (w,x) ->
let x2 = x *. x let x2 = x *. x
@ -823,9 +825,9 @@ let centered_cumulants { property ; data } =
and den = w and den = w
in (a3 +. cum3, a4 +. cum4, ad +. den) in (a3 +. cum3, a4 +. cum4, ad +. den)
) (0., 0., 0.) centered_data ) (0., 0., 0.) centered_data
in in
( cum3 /. denom, cum4 /. denom -. 3. ) ( cum3 /. denom, cum4 /. denom -. 3. )
in in
[| ave ; var ; cum3 ; cum4 |] [| ave ; var ; cum3 ; cum4 |]
@ -833,26 +835,26 @@ let centered_cumulants { property ; data } =
(** Computes a histogram *) (** Computes a histogram *)
let histogram { property ; data } = let histogram { property ; data } =
let min, max = let min, max =
(min_block { property ; data }), (min_block { property ; data }),
(max_block { property ; data }) (max_block { property ; data })
in in
let length = let length =
max -. min max -. min
and n = and n =
List.length data List.length data
|> float_of_int |> float_of_int
|> sqrt |> sqrt
in in
let delta_x = let delta_x =
length /. (n-.1.) length /. (n-.1.)
and result = and result =
Array.init (int_of_float n + 1) (fun _ -> 0.) Array.init (int_of_float n + 1) (fun _ -> 0.)
in in
List.iter (fun x -> List.iter (fun x ->
let w = let w =
(Weight.to_float x.Block.weight) (Weight.to_float x.Block.weight)
and x = and x =
(Sample.to_float x.Block.value) (Sample.to_float x.Block.value)
in in
let i = let i =
@ -862,7 +864,7 @@ let histogram { property ; data } =
result.(i) <- result.(i) +. w result.(i) <- result.(i) +. w
) data ) data
; ;
let norm = let norm =
1. /. ( delta_x *. ( 1. /. ( delta_x *. (
Array.fold_left (fun accu x -> accu +. x) 0. result Array.fold_left (fun accu x -> accu +. x) 0. result
) ) ) )

View File

@ -43,7 +43,6 @@ let to_string = function
Array.map string_of_float x Array.map string_of_float x
|> Array.to_list |> Array.to_list
|> String.concat " " |> String.concat " "
|> Printf.sprintf "%s"
let to_bytes = function let to_bytes = function
| One_dimensional x -> Qptypes.bytes_of_float x | One_dimensional x -> Qptypes.bytes_of_float x
@ -54,3 +53,12 @@ let to_bytes = function
|> Bytes.set_int64_le b (i*8) ) x; |> Bytes.set_int64_le b (i*8) ) x;
b b
let of_bytes b =
match Bytes.length b with
| 8 -> let x = Qptypes.float_of_bytes b in
One_dimensional x
| l -> let len = l/8 in
Multidimensional ( Array.init len (fun i ->
Bytes.get_int64_le b (i*8)
|> Int64.float_of_bits ),
len )

View File

@ -5,5 +5,6 @@ val of_float : float -> t
val of_float_array : dim:int -> float array -> t val of_float_array : dim:int -> float array -> t
val to_string : t -> string val to_string : t -> string
val to_bytes : t -> bytes val to_bytes : t -> bytes
val of_bytes : bytes -> t
val dimension : t -> int val dimension : t -> int

View File

@ -7,9 +7,12 @@ let global_replace x =
|> Str.global_replace (Str.regexp "Int.to_bytes") "bytes_of_int" |> Str.global_replace (Str.regexp "Int.to_bytes") "bytes_of_int"
|> Str.global_replace (Str.regexp "Int64.to_bytes") "bytes_of_int64" |> Str.global_replace (Str.regexp "Int64.to_bytes") "bytes_of_int64"
|> Str.global_replace (Str.regexp "Float.to_bytes") "bytes_of_float" |> Str.global_replace (Str.regexp "Float.to_bytes") "bytes_of_float"
|> Str.global_replace (Str.regexp "Float.of_bytes") "float_of_bytes"
|> Str.global_replace (Str.regexp "Int.of_bytes") "int_of_bytes" |> Str.global_replace (Str.regexp "Int.of_bytes") "int_of_bytes"
|> Str.global_replace (Str.regexp "Int64.of_bytes") "int64_of_bytes"
|> Str.global_replace (Str.regexp "String.\\(to\\|of\\)_string") "" |> Str.global_replace (Str.regexp "String.\\(to\\|of\\)_string") ""
|> Str.global_replace (Str.regexp "String.to_bytes") "Bytes.of_string" |> Str.global_replace (Str.regexp "String.to_bytes") "Bytes.of_string"
|> Str.global_replace (Str.regexp "String.of_bytes") "Bytes.to_string"
let input_data = " let input_data = "
* Positive_float : float * Positive_float : float
@ -182,8 +185,22 @@ let bytes_of_int i =
|> bytes_of_int64 |> bytes_of_int64
let int64_of_bytes b =
Bytes.get_int64_le b 0
let int_of_bytes b =
int64_of_bytes b
|> Int64.to_int
let float_of_bytes b =
int64_of_bytes b
|> Int64.float_of_bits
let bytes_of_float f = let bytes_of_float f =
Int64.of_float f Int64.bits_of_float f
|> bytes_of_int64 |> bytes_of_int64
" "
@ -195,12 +212,14 @@ module %s : sig
val of_%s : %s %s -> t val of_%s : %s %s -> t
val to_string : t -> string val to_string : t -> string
val to_bytes : t -> bytes val to_bytes : t -> bytes
val of_bytes : bytes -> t
end = struct end = struct
type t = %s [@@deriving sexp] type t = %s [@@deriving sexp]
let to_%s x = x let to_%s x = x
let of_%s %s x = ( %s x ) let of_%s %s x = ( %s x )
let to_string x = %s.to_string x let to_string x = %s.to_string x
let to_bytes x = %s.to_bytes x let to_bytes x = %s.to_bytes x
let of_bytes b = %s.of_bytes b
end end
" "
@ -224,7 +243,7 @@ let parse_input input=
and name = String_ext.strip name in and name = String_ext.strip name in
let typ_cap = String.capitalize_ascii typ in let typ_cap = String.capitalize_ascii typ in
let newstring = Printf.sprintf template name typ typ typ params_val typ typ let newstring = Printf.sprintf template name typ typ typ params_val typ typ
typ typ params ( String_ext.strip text ) typ_cap typ_cap typ typ params ( String_ext.strip text ) typ_cap typ_cap typ_cap
in in
List.rev (parse (newstring::result) tail ) List.rev (parse (newstring::result) tail )
in in
@ -274,6 +293,10 @@ end = struct
end end
" "
(*
val of_bytes : bytes -> t
let of_bytes x = %s.of_bytes x
*)
let parse_input_ezfio input= let parse_input_ezfio input=
let parse s = let parse s =
@ -320,7 +343,8 @@ let input_lines filename =
let create_ezfio_handler () = let create_ezfio_handler () =
let lines = let lines =
input_lines "ezfio.ml" input_lines "ezfio.ml"
|> List.mapi (fun i l -> if i > 417 then Some l else None) (* /!\ Change when ezfio.ml changes *)
|> List.mapi (fun i l -> if i > 444 then Some l else None)
|> List.filter (fun x -> x <> None) |> List.filter (fun x -> x <> None)
|> List.map (fun x -> |> List.map (fun x ->
match x with match x with

125
src/MAIN/admc.py Executable file
View File

@ -0,0 +1,125 @@
#!/usr/bin/env python3
from mpi4py import MPI
import sys
import gzip
import random
import math
import subprocess
admc_exec = "/home/scemama/qmcchem/src/MAIN/admc"
n_walk_per_proc = 10
def start():
return subprocess.Popen(
[ admc_exec, sys.argv[1] ],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def read(process,len_walk):
line = process.stdout.readline().decode("utf-8").strip()
walk_num = int(line)
walkers = []
print(walk_num)
for k in range(walk_num):
w = []
for i in range(len_walk):
line = process.stdout.readline().decode("utf-8").strip()
w.append( line )
w = '\n'.join(w)
walkers.append(w)
_, E, W = process.stdout.readline().decode("utf-8").split()
return walkers, float(E), float(W)
def write(process, message):
process.stdin.write(f"{message}\n".encode("utf-8"))
process.stdin.flush()
def terminate(process):
process.stdin.close()
process.terminate()
process.wait(timeout=0.2)
def print_energy(EnergyWeight, Energy2Weight, Weight, N):
e = EnergyWeight / Weight
e2 = Energy2Weight / Weight
err = math.sqrt(abs(e*e - e2) / max(1,(N-1)) )
print("%f +/- %f"%(e, err))
return err
def main():
try:
input_dir = sys.argv[1]
except:
print("syntax: argv[0] [FILE]")
sys.exit(-1)
# Pool of electron coordinates
with gzip.open(input_dir+"/electrons/elec_coord_pool.gz","r") as f:
data = f.read().decode("utf-8").split()
len_walk = int(data[1])*int(data[2])
icount = 0
buffer = []
walkers = []
for d in data[4:]:
buffer.append(d)
icount += 1
if (icount == len_walk):
walkers.append(buffer)
buffer = []
icount = 0
walkers = [ '\n'.join(x) for x in walkers ]
do_loop = True
EnergyWeight = 0.
Energy2Weight = 0.
Weight = 0.
NSamples = 0.
# Start processes
proc = start()
while do_loop:
# Once every 1000, shuffle the list of walkers
if random.random() < 0.01:
print("SHUFFLE")
random.shuffle(walkers)
# Pick new walkers
new_coords = walkers[:n_walk_per_proc]
walkers = walkers[n_walk_per_proc:]
# Send new walkers to the process
write(proc, '\n'.join(new_coords))
# Fetch new walkers from the process
new_coords, e_new, w_new = read(proc, len_walk)
walkers += new_coords
# Print energy
ew = e_new * w_new
EnergyWeight += ew
Energy2Weight += e_new * ew
Weight += w_new
NSamples += 1.
print (len(walkers))
err = print_energy(EnergyWeight, Energy2Weight, Weight, NSamples)
if err < 1.e-3:
do_loop = False
terminate(proc)
return
if __name__ == "__main__":
main()