10
1
mirror of https://gitlab.com/scemama/qmcchem.git synced 2024-11-15 02:23:38 +01:00
qmcchem/ocaml/Qmcchem_dataserver.ml

880 lines
23 KiB
OCaml
Raw Normal View History

2015-12-19 02:35:13 +01:00
open Qptypes
(** Data server of QMC=Chem.
5 ZeroMQ sockets are opened:
- a REP socket used for registering/unregisterning the walkers and for the clients to fetch the initial walkers positions
- a PULL socket to pull the results computed by the clients
- a PUB socket to send the status to the clients for the termination
- a XSUB socket for receiving debug
- a XPUB socket for sending debug
@author A. Scemama
*)
let initialization_timeout = 600.
let bind_socket ~socket_type ~socket ~address =
try
2018-06-04 10:26:49 +02:00
Zmq.Socket.bind socket address
2015-12-19 02:35:13 +01:00
with
| Unix.Unix_error (_, message, f) ->
failwith @@ Printf.sprintf
"\n%s\nUnable to bind the dataserver's %s socket :\n %s\n%s"
f socket_type address message
| other_exception -> raise other_exception
let run ?(daemon=true) ezfio_filename =
Qputils.set_ezfio_filename ezfio_filename;
2015-12-19 02:35:13 +01:00
2016-03-29 15:16:16 +02:00
(** Check if walkers need to be created. *)
let () =
if ( not(Ezfio.has_electrons_elec_coord_pool ()) ) then
begin
Printf.printf "Generating initial walkers...\n%!";
2019-07-19 17:06:01 +02:00
match Unix.fork () with
| 0 ->
2019-07-23 17:27:02 +02:00
Unix.execvp
2019-07-19 17:06:01 +02:00
(Lazy.force Qmcchem_config.qmc_create_walkers)
[|"qmc_create_walkers" ; ezfio_filename|]
| pid ->
begin
2019-07-23 17:27:02 +02:00
ignore @@ Unix.waitpid [] pid;
2019-07-19 17:06:01 +02:00
Printf.printf "Initial walkers ready\n%!"
end
end
2016-03-29 15:16:16 +02:00
in
2019-07-19 17:06:01 +02:00
(** Measures the time difference between [t0] and [Unix.time ()] *)
2015-12-19 02:35:13 +01:00
let delta_t t0 =
let t1 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
2019-07-19 17:06:01 +02:00
t1 -. t0
2015-12-19 02:35:13 +01:00
in
(** {2 ZeroMQ initialization} *)
let zmq_context =
2018-06-04 10:26:49 +02:00
Zmq.Context.create ()
2015-12-19 02:35:13 +01:00
in
(** Maximum size of the blocks file before compressing *)
2019-07-19 17:06:01 +02:00
let max_file_size = ref ( 64 * 1024 )
2015-12-19 02:35:13 +01:00
in
let hostname =
Lazy.force Qmcchem_config.hostname
in
(** Status variable (mutable) *)
let status =
ref Status.Queued
in
let change_status s =
status := s;
Status.write s;
Printf.printf "Status : %s\n%!" (Status.to_string s)
in
change_status Status.Queued;
let check_port n =
let adress_prefix =
"tcp://*:"
in
let result =
2019-07-19 17:06:01 +02:00
List.fold_left (fun accu i ->
2015-12-19 02:35:13 +01:00
let address =
2019-07-19 17:06:01 +02:00
adress_prefix ^ (string_of_int (n+i))
2015-12-19 02:35:13 +01:00
in
let socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.rep
2015-12-19 02:35:13 +01:00
in
let result =
try
2018-06-04 10:26:49 +02:00
Zmq.Socket.bind socket address;
2017-10-12 17:59:08 +02:00
accu
2015-12-19 02:35:13 +01:00
with
2017-11-29 19:48:28 +01:00
| _ -> false
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.close socket;
2015-12-19 02:35:13 +01:00
result
2019-07-19 17:06:01 +02:00
) true [0;1;2;3]
2015-12-19 02:35:13 +01:00
in
if (result) then
`Available
else
`Unavailable
in
(** Random port number between 49152 and 65535 *)
let port =
let newport =
2016-03-17 15:27:31 +01:00
ref ( 1024 + (Random.int (49151-1024)))
2015-12-19 02:35:13 +01:00
in
while ((check_port !newport) = `Unavailable)
do
2016-03-17 15:27:31 +01:00
newport := 1024 + (Random.int (49151-1024))
2015-12-19 02:35:13 +01:00
done;
!newport
in
let debug_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.xpub
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://*:%d" (port+4)
in
bind_socket "XPUB" debug_socket address;
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_linger_period debug_socket 100 ;
2015-12-19 02:35:13 +01:00
let close_debug_socket () =
2018-06-04 10:26:49 +02:00
Zmq.Socket.close debug_socket
2015-12-19 02:35:13 +01:00
in
(** Sends a log text to the debug socket *)
let send_log socket size t0 text =
let dt =
delta_t t0
in
let message =
2019-07-19 17:06:01 +02:00
Printf.sprintf "%20s : %8d : %10s : %f"
socket size text dt
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.send debug_socket message
2015-12-19 02:35:13 +01:00
in
(** {2 Walkers} *)
(** Number of electrons *)
let elec_num =
Lazy.force Qputils.elec_num
in
(** Size of the walkers message *)
let walkers_size =
20*3*(elec_num+1)
in
(** Seconds after when the block is ended on the worker. *)
let block_time =
Input.Block_time.read ()
|> Input.Block_time.to_float
in
(** Total number of walkers to keep for restart *)
let walk_num_tot =
Input.Walk_num_tot.read ()
in
(** Array of walkers. The size is [walk_num_tot]. *)
let walkers_array =
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
let j =
3*elec_num + 3
in
let result =
let size =
Ezfio.get_electrons_elec_coord_pool_size ()
and ez =
Ezfio.get_electrons_elec_coord_pool ()
|> Ezfio.flattened_ezfio
2019-07-19 17:06:01 +02:00
|> Array.map string_of_float
2015-12-19 02:35:13 +01:00
in
try
2019-07-19 17:06:01 +02:00
Array.init walk_num_tot (fun i ->
Array.sub ez (j*(i mod size)) j )
2015-12-19 02:35:13 +01:00
with
| Invalid_argument _ ->
failwith "Walkers file is broken."
in
2019-07-19 17:06:01 +02:00
String.concat " " [ "Read" ; string_of_int (Array.length result) ;
"walkers"]
2015-12-19 02:35:13 +01:00
|> send_log "status" 0 t0 ;
result
in
(** Id of the last saved walker (mutable). *)
let last_walker =
ref 0
in
(** Last time when the walkers were saved to disk. *)
let last_save_walkers =
2019-07-19 17:06:01 +02:00
ref (Unix.time ())
2015-12-19 02:35:13 +01:00
in
(** Saves the walkers to disk. *)
let save_walkers () =
2019-07-19 17:06:01 +02:00
if (delta_t !last_save_walkers > 10. ) then
2015-12-19 02:35:13 +01:00
begin
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
Ezfio.set_electrons_elec_coord_pool_size walk_num_tot ;
let walkers_list =
2019-07-19 17:06:01 +02:00
Array.map Array.to_list walkers_array
2015-12-19 02:35:13 +01:00
|> Array.to_list
|> List.concat
2019-07-19 17:06:01 +02:00
|> List.map float_of_string
2015-12-19 02:35:13 +01:00
in
Ezfio.set_electrons_elec_coord_pool (Ezfio.ezfio_array_of_list
~rank:3 ~dim:[| elec_num+1 ; 3 ; walk_num_tot |] ~data:walkers_list);
send_log "status" walk_num_tot t0 "Saved walkers";
2019-07-19 17:06:01 +02:00
last_save_walkers := Unix.time ();
2015-12-19 02:35:13 +01:00
end
in
(** Increments the [last_walker] mutable value, and saves the walkers to
disk if the array of walkers is filled. In that case, sets the last_walker to 0.
*)
let increment_last_walker () =
2019-07-19 17:06:01 +02:00
incr last_walker;
2015-12-19 02:35:13 +01:00
if (!last_walker = walk_num_tot) then
begin
last_walker := 0 ;
save_walkers ()
end
in
(** {2 Set of workers} *)
(** A hash table is kept to track the running workers. The keys are the
built as string containing the couple ([Compute_node], [PID]), and
the values are the last communication time.
*)
(** The hash table for workers *)
let workers_hash =
2019-07-19 17:06:01 +02:00
Hashtbl.create 63
2015-12-19 02:35:13 +01:00
in
(** Creates a key using the couple ([Compute_node], [PID]) *)
let key compute_node pid =
2019-07-19 17:06:01 +02:00
String.concat " " [
(Compute_node.to_string compute_node);
(string_of_int pid) ]
2015-12-19 02:35:13 +01:00
in
(** Adds a new worker to the hash table.
@raise Failure when the worker is already in the table. *)
let add_worker w pid =
let s =
key w pid
in
2019-07-19 17:06:01 +02:00
match Hashtbl.find_opt workers_hash s with
| Some _ -> failwith (s^" already registered")
| None -> Hashtbl.add workers_hash s (Unix.time ())
2015-12-19 02:35:13 +01:00
in
(** Deletes a new worker from the hash table.
@raise Failure when the worker is not in the table. *)
let del_worker w pid =
let s =
key w pid
in
2019-07-19 17:06:01 +02:00
match Hashtbl.find_opt workers_hash s with
2015-12-19 02:35:13 +01:00
| Some x -> Hashtbl.remove workers_hash s
| None -> failwith (s^" not registered")
in
2019-07-19 17:06:01 +02:00
(** Sets the last access of the worker to [Unix.time ()] *)
2015-12-19 02:35:13 +01:00
let touch_worker w pid =
let s =
key w pid
in
2019-07-19 17:06:01 +02:00
Hashtbl.replace workers_hash s (Unix.time ())
2015-12-19 02:35:13 +01:00
in
(** Returns the number of connected workers *)
let n_connected hash now =
let delta =
2019-07-19 17:06:01 +02:00
initialization_timeout +. block_time *. 2.
2015-12-19 02:35:13 +01:00
in
2019-07-19 17:06:01 +02:00
Hashtbl.fold (fun k v accu ->
if (now -. v) <= delta then
v :: accu
else
accu ) hash []
|> List.length
2015-12-19 02:35:13 +01:00
in
(** Current PID. *)
let dataserver_pid =
Unix.getpid ()
in
(** Name of the blocks file written by the current process. *)
let block_channel_filename =
let dirname =
Lazy.force Block.dir_name
in
let () =
2019-07-31 15:07:04 +02:00
if not ( Sys.file_exists dirname ) then
2019-07-23 17:27:02 +02:00
Unix.mkdir dirname 0o755
2015-12-19 02:35:13 +01:00
in
Filename.concat dirname (
2019-07-19 17:06:01 +02:00
hostname ^ "." ^ (string_of_int dataserver_pid)
2015-12-19 02:35:13 +01:00
)
in
(** Name of the blocks file written by the current process, currently locked *)
let block_channel_filename_locked =
block_channel_filename ^ ".locked"
in
let block_channel_filename_tmp =
block_channel_filename ^ ".tmp"
in
(** [Out_channel] corresponding to the blocks file written by the current process. *)
let block_channel =
try
2019-07-19 17:06:01 +02:00
ref (open_out block_channel_filename_locked)
2015-12-19 02:35:13 +01:00
with
| Sys_error _ ->
begin
(* NFS Stale file handle :
* Wait 5 seconds, and retry *)
2019-07-19 17:06:01 +02:00
Unix.sleep 5;
ref (open_out block_channel_filename_locked)
2015-12-19 02:35:13 +01:00
end
in
(** Compresses the blocks file by merging all blocks with the same block ID and the
same host name, but different PIDs. The result is merging all the CPU cores of
the compute nodes. Happens when [max_file_size] is reached.
*)
let compress_block_file filename =
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
2019-07-19 17:06:01 +02:00
close_out !block_channel;
Unix.rename block_channel_filename_locked block_channel_filename_tmp;
2015-12-19 02:35:13 +01:00
Random_variable.compress_files ();
send_log "status" 0 t0 "Compressed block file";
2019-07-23 17:39:51 +02:00
if Sys.file_exists block_channel_filename_locked then
block_channel := open_out_gen [ Open_append ] 0o660 block_channel_filename_locked
else
block_channel := open_out block_channel_filename_locked
2015-12-19 02:35:13 +01:00
in
(** {2 Threads} *)
(** {3 Status thread} *)
let start_status_thread =
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
Thread.create (fun () ->
send_log "status" 0 t0 "Starting status thread";
let socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.pub
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://*:%d" (port+1)
in
bind_socket "PUB" socket address;
2019-07-19 17:06:01 +02:00
let delay = 0.3
and delay_read = 2.
2015-12-19 02:35:13 +01:00
in
let start_time =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
and stop_time =
2019-07-19 17:06:01 +02:00
ref (Input.Stop_time.(read () |> to_float) )
2015-12-19 02:35:13 +01:00
in
let last_update =
ref start_time
in
while (!status <> Status.Stopped)
do
2019-07-19 17:06:01 +02:00
Unix.sleepf delay;
2015-12-19 02:35:13 +01:00
let now =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
let status_string =
Status.to_string !status
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.send socket status_string;
2015-12-19 02:35:13 +01:00
send_log "status" (String.length status_string) now status_string;
let test =
2019-07-19 17:06:01 +02:00
if (now -. !last_update > delay_read) then
2015-12-19 02:35:13 +01:00
let n_connect =
n_connected workers_hash now
in
`Update n_connect
2019-07-19 17:06:01 +02:00
else if (now -. start_time > !stop_time) then
2015-12-19 02:35:13 +01:00
`Terminate
2019-07-19 17:06:01 +02:00
else if (now -. start_time > initialization_timeout) then
2015-12-19 02:35:13 +01:00
`Timeout
else
`None
in
match (daemon, !status, test) with
| (_ , _ , `None ) -> ()
| (_ , Status.Running , `Terminate ) -> change_status Status.Stopping
| (false, Status.Running , `Update 0 ) -> change_status Status.Stopped
| (true , Status.Running , `Update 0 ) -> change_status Status.Queued
| (_ , _ , `Update i ) ->
begin
status := Status.read ();
last_update := now;
2019-07-19 17:06:01 +02:00
stop_time := Input.Stop_time.(read () |> to_float) ;
2015-12-19 02:35:13 +01:00
let n_tot =
Hashtbl.length workers_hash
in
if (i <> n_tot) then
begin
Printf.sprintf "Connected workers : %d / %d" i n_tot
|> send_log "status" 0 now
end
end
| (false, Status.Queued , `Timeout ) -> change_status Status.Stopped
| (_, _, _) -> ()
;
done;
2018-06-04 10:26:49 +02:00
Zmq.Socket.send socket (Status.to_string !status);
Zmq.Socket.set_linger_period socket 1_000 ;
Zmq.Socket.close socket
2015-12-19 02:35:13 +01:00
)
in
(** {3 Log thread} *)
let start_log_thread =
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
Thread.create (fun () ->
send_log "status" 0 t0 "Starting log thread";
let socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.xsub
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://*:%d" (port+3)
in
bind_socket "XSUB" socket address;
let pollitem =
2018-06-04 10:26:49 +02:00
Zmq.Poll.mask_of
[| (socket , Zmq.Poll.In) ;
(debug_socket , Zmq.Poll.In)
2015-12-19 02:35:13 +01:00
|]
in
while (!status <> Status.Stopped)
do
let polling =
2018-06-04 10:26:49 +02:00
Zmq.Poll.poll ~timeout:1000 pollitem
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
if (polling.(0) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
begin
let message =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false socket
2019-07-19 17:06:01 +02:00
|> String.concat " "
2015-12-19 02:35:13 +01:00
in
let now =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
send_log "log" 0 now message
end
2018-06-04 10:26:49 +02:00
else if (polling.(1) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
begin
(* Forward subscription from XPUB to XSUB *)
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false debug_socket
|> Zmq.Socket.send_all socket
2015-12-19 02:35:13 +01:00
end
done;
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_linger_period socket 1000 ;
Zmq.Socket.close socket
2015-12-19 02:35:13 +01:00
)
in
(** {3 Main thread} *)
let random_walkers n_walks =
let rec walkers accu = function
| 0 -> accu
| n ->
let random_int =
Random.int (Strictly_positive_int.to_int n_walks)
in
let new_accu =
walkers_array.(random_int) :: accu
in
walkers new_accu (n-1)
2015-12-19 02:35:13 +01:00
in
walkers [] (Strictly_positive_int.to_int n_walks)
|> Array.concat
|> Array.to_list
2015-12-19 02:35:13 +01:00
in
let start_main_thread =
let wall0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
let f () =
change_status Status.Queued;
send_log "status" 0 wall0 "Starting main thread";
(** Reply socket *)
let rep_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.rep
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://*:%d" port
in
bind_socket "REP" rep_socket address;
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_receive_high_water_mark rep_socket 100_000;
Zmq.Socket.set_send_high_water_mark rep_socket 100_000;
Zmq.Socket.set_immediate rep_socket true;
Zmq.Socket.set_linger_period rep_socket 600_000 ;
2015-12-19 02:35:13 +01:00
(** EZFIO Cache *)
let ezfio_cache =
2019-07-19 17:06:01 +02:00
Hashtbl.create 63
2015-12-19 02:35:13 +01:00
in
let handle_ezfio msg =
2019-07-19 17:06:01 +02:00
match Hashtbl.find_opt ezfio_cache msg with
2015-12-19 02:35:13 +01:00
| Some result -> result
| None ->
begin
let result =
decode_ezfio_message msg
in
2019-07-19 17:06:01 +02:00
Hashtbl.add ezfio_cache msg result;
result
2015-12-19 02:35:13 +01:00
end
in
(** Pull socket for computed data *)
let pull_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.pull
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://*:%d" (port+2)
in
bind_socket "PULL" pull_socket address;
(** Address of the dataserver *)
let server_address =
let ip =
Lazy.force Qmcchem_config.ip_address
in
Printf.sprintf "tcp://%s:%d" ip port
in
Ezfio.set_simulation_http_server server_address;
Printf.printf "Server address: %s\n%!" server_address;
(** Polling item to poll REP and PULL sockets. *)
let pollitem =
2018-06-04 10:26:49 +02:00
Zmq.Poll.mask_of
[| ( rep_socket, Zmq.Poll.In) ;
( pull_socket, Zmq.Poll.In) ;
2015-12-19 02:35:13 +01:00
|]
in
(** Handles messages coming into the REP socket. *)
let handle_rep () =
let raw_msg =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false rep_socket
2015-12-19 02:35:13 +01:00
in
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
let msg =
2019-07-19 17:06:01 +02:00
List.map String.trim raw_msg
2015-12-19 02:35:13 +01:00
|> Message.create
and msg_size =
2019-07-19 17:06:01 +02:00
List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg
2015-12-19 02:35:13 +01:00
in
let handle = function
| Message.Error _ -> ()
2015-12-19 02:35:13 +01:00
| Message.Ezfio ezfio_msg ->
let result =
handle_ezfio ezfio_msg
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all rep_socket
2015-12-19 02:35:13 +01:00
[ String.length result
|> Printf.sprintf "%d " ;
result ] ;
send_log "rep" (String.length result) t0 ezfio_msg
| Message.GetWalkers n_walks ->
begin
send_log "req" msg_size t0 "get_walkers";
let result =
random_walkers n_walks
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all rep_socket result;
2015-12-19 02:35:13 +01:00
send_log "rep" walkers_size t0 "get_walkers"
end
| Message.Register (w,pid) ->
begin
match !status with
| Status.Queued
| Status.Running ->
begin
2019-07-19 17:06:01 +02:00
String.concat " " [ "Register :" ;
Compute_node.to_string w ;
string_of_int pid ]
2015-12-19 02:35:13 +01:00
|> send_log "req" msg_size t0;
add_worker w pid;
if (!status = Status.Queued) then
change_status Status.Running ;
2018-06-04 10:26:49 +02:00
Zmq.Socket.send rep_socket "OK";
2015-12-19 02:35:13 +01:00
send_log "rep" 2 t0 "Register : OK"
end
| Status.Stopping
| Status.Stopped ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.send rep_socket "Failed";
2015-12-19 02:35:13 +01:00
end
| Message.Unregister (w,pid) ->
begin
2019-07-19 17:06:01 +02:00
String.concat " " [ "Unregister :" ;
(Compute_node.to_string w) ;
(string_of_int pid) ]
2015-12-19 02:35:13 +01:00
|> send_log "req" msg_size t0;
2018-06-04 10:26:49 +02:00
Zmq.Socket.send rep_socket "OK";
2015-12-19 02:35:13 +01:00
del_worker w pid;
2019-07-19 17:06:01 +02:00
String.concat " " [ "Unregister :";
(Hashtbl.length workers_hash) |> string_of_int ;
"remaining" ]
2015-12-19 02:35:13 +01:00
|> send_log "rep" 2 t0 ;
let n_connect =
n_connected workers_hash t0
in
match (daemon,n_connect) with
| (false,0) -> change_status Status.Stopped
| (true ,0) -> change_status Status.Queued
| _ -> ()
end
| Message.Test ->
begin
2018-06-04 10:26:49 +02:00
Zmq.Socket.send rep_socket "OK";
2015-12-19 02:35:13 +01:00
send_log "rep" 2 t0 "Test"
end
| Message.Walkers (_, _, _)
| Message.Property _
-> failwith "Bad message"
in handle msg
in
(** Handles messages coming into the PULL socket. *)
let handle_pull status =
let raw_msg =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false pull_socket
2015-12-19 02:35:13 +01:00
in
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
let msg =
2019-07-19 17:06:01 +02:00
List.map String.trim raw_msg
2015-12-19 02:35:13 +01:00
|> Message.create
and msg_size =
2019-07-19 17:06:01 +02:00
List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg
2015-12-19 02:35:13 +01:00
in
let recv_log =
send_log "pull" msg_size t0
in
let handle = function
2016-04-05 00:48:37 +02:00
| Message.Error m -> Printf.eprintf "%s\n%!" m;
2015-12-19 02:35:13 +01:00
| Message.Walkers (h,pid,w) ->
begin
if (status = Status.Running) then
touch_worker h pid ;
let log_msg =
Printf.sprintf "Walkers from %s : %d / %d / %d"
(key h pid) (Array.length w) (!last_walker) walk_num_tot
in
recv_log log_msg ;
for i=0 to ((Array.length w)-1)
do
2019-07-19 17:06:01 +02:00
(*
Array.replace walkers_array (!last_walker) (fun _ -> Array.map
2019-07-19 17:06:01 +02:00
string_of_float w.(i));
*)
walkers_array.(!last_walker) <- Array.map string_of_float w.(i);
2015-12-19 02:35:13 +01:00
increment_last_walker ();
done;
let wall =
Printf.sprintf "%f %f # %s %s %s %d"
2019-07-19 17:06:01 +02:00
(Unix.time () -. wall0)
2015-12-19 02:35:13 +01:00
1. (Property.to_string Property.Wall)
2019-07-19 17:06:01 +02:00
hostname (string_of_int dataserver_pid) 1
2015-12-19 02:35:13 +01:00
|> Block.of_string
in
match wall with
| Some wall ->
begin
2019-07-19 17:06:01 +02:00
output_string !block_channel (Block.to_string wall);
output_char !block_channel '\n';
2015-12-19 02:35:13 +01:00
end
| _ -> ()
end
| Message.Property b ->
begin
if (status = Status.Running) then
touch_worker b.Block.compute_node b.Block.pid ;
2019-07-19 17:06:01 +02:00
output_string !block_channel (Block.to_string b);
output_char !block_channel '\n';
2015-12-19 02:35:13 +01:00
recv_log (Block.to_string b)
end
| Message.Test
| Message.GetWalkers _
| Message.Ezfio _
| Message.Register (_, _)
| Message.Unregister (_, _)
-> failwith "Bad message"
in handle msg
in
(* Main loop *)
while (!status <> Status.Stopped)
do
let polling =
2018-06-04 10:26:49 +02:00
Zmq.Poll.poll ~timeout:1000 pollitem
2015-12-19 02:35:13 +01:00
in
match polling.(1) with
2018-06-04 10:26:49 +02:00
| Some Zmq.Poll.In -> handle_pull !status
2015-12-19 02:35:13 +01:00
| _ ->
begin
match polling.(0) with
2018-06-04 10:26:49 +02:00
| Some Zmq.Poll.In -> handle_rep ()
2015-12-19 02:35:13 +01:00
| _ ->
begin
2019-07-19 17:06:01 +02:00
flush !block_channel ;
2015-12-19 02:35:13 +01:00
let file_size =
(Unix.stat block_channel_filename_locked).Unix.st_size
in
if (file_size > !max_file_size) then
begin
compress_block_file ();
2019-07-19 17:06:01 +02:00
max_file_size := (file_size * 12) / 10;
2015-12-19 02:35:13 +01:00
end
end
end
done;
2019-07-19 17:06:01 +02:00
List.iter (fun socket ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_linger_period socket 1000 ;
Zmq.Socket.close socket)
2015-12-19 02:35:13 +01:00
[ rep_socket ; pull_socket ]
in
Thread.create f
in
(** {2 Finalization} *)
(** Cleans all the open files, sockets, etc.
@param t0 is the initial time of the run, such that the wall time can be computed.
*)
let finalize ~t0 =
print_string "Finalizing...";
change_status Status.Stopped;
compress_block_file ();
send_log "status" 0 t0 "Done";
close_debug_socket ();
2018-06-04 10:26:49 +02:00
Zmq.Context.terminate zmq_context;
2015-12-19 02:35:13 +01:00
begin
try
2019-07-19 17:06:01 +02:00
close_out !block_channel;
Unix.unlink block_channel_filename_locked
2015-12-19 02:35:13 +01:00
with
| _ -> ()
end;
2016-02-19 11:20:34 +01:00
Qmcchem_result.display_summary ~range:(0.,100.);
2015-12-19 02:35:13 +01:00
in
(** {3 Main function} *)
let t0 =
2019-07-19 17:06:01 +02:00
Unix.time ()
2015-12-19 02:35:13 +01:00
in
(* Handle signals *)
let handler s =
2019-07-19 17:06:01 +02:00
Printf.printf "Dataserver received signal %d... killing\n%!" s;
2015-12-19 02:35:13 +01:00
Watchdog.kill ();
in
2019-07-19 17:06:01 +02:00
List.iter (fun s -> ignore @@ Sys.signal s (Sys.Signal_handle handler))
[
Sys.sigint ;
Sys.sigterm ;
Sys.sigquit ;
2015-12-19 02:35:13 +01:00
]
;
2019-07-19 17:06:01 +02:00
2015-12-19 02:35:13 +01:00
(* Run threads *)
begin
try
2019-07-19 17:06:01 +02:00
(List.iter Thread.join
2015-12-19 02:35:13 +01:00
[ start_status_thread () ;
start_log_thread () ;
start_main_thread () ;
])
with
| err ->
begin
print_endline "Trapped error. Waiting 10 seconds...";
change_status Status.Stopping;
2019-07-19 17:06:01 +02:00
Unix.sleep 10;
2015-12-19 02:35:13 +01:00
finalize ~t0;
raise err
end
end;
finalize ~t0