qmcchem/ocaml/Qmcchem_forwarder.ml

555 lines
15 KiB
OCaml
Raw Normal View History

2015-12-19 02:35:13 +01:00
let bind_socket ~socket_type ~socket ~address =
2016-03-05 00:25:39 +01:00
let rec loop = function
2019-07-19 17:06:01 +02:00
| 0 -> failwith @@ Printf.sprintf
2016-03-05 00:25:39 +01:00
"Unable to bind the forwarder's %s socket : %s\n"
socket_type address
2019-07-19 17:06:01 +02:00
| -1 -> ()
| i ->
2016-03-05 00:25:39 +01:00
try
2018-06-04 10:26:49 +02:00
Zmq.Socket.bind socket address;
2016-03-05 00:25:39 +01:00
loop (-1)
with
2019-07-19 17:06:01 +02:00
| Unix.Unix_error _ -> (Unix.sleep 1 ; loop (i-1) )
2016-03-05 00:25:39 +01:00
| other_exception -> raise other_exception
in loop 10
2015-12-19 02:35:13 +01:00
let run ezfio_filename dataserver =
let dataserver_address, dataserver_port =
2019-07-19 17:06:01 +02:00
String.sub dataserver 6 (String.length dataserver - 6)
|> String_ext.lsplit2_exn ~on:':'
2015-12-19 02:35:13 +01:00
and qmc =
Lazy.force Qmcchem_config.qmc
in
(* Go into /dev/shm *)
Unix.chdir Qmcchem_config.dev_shm;
let tmpdir =
ezfio_filename ^ "_" ^ dataserver_port
in
(* Port of the data server *)
let port =
2019-07-19 17:06:01 +02:00
(int_of_string dataserver_port)+10
2015-12-19 02:35:13 +01:00
in
(* Build qmc executable command *)
2017-10-10 09:39:58 +02:00
let prog, argv =
2015-12-19 02:35:13 +01:00
qmc,
2019-07-19 17:06:01 +02:00
[| qmc ; ezfio_filename ;
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm port |];
2015-12-19 02:35:13 +01:00
in
(* Create the temporary directory. If it is possible, then the process is a
* master and the forwarder will start. Otherwise, only start a qmc process.
*)
let () =
try
2019-07-23 17:27:02 +02:00
Unix.mkdir tmpdir 0o755;
2016-03-05 00:25:39 +01:00
Unix.chdir tmpdir
2015-12-19 02:35:13 +01:00
with
| Unix.Unix_error _ ->
2019-07-19 17:06:01 +02:00
begin
Unix.chdir tmpdir;
2020-04-15 23:59:17 +02:00
Unix.sleep 1;
2019-07-19 17:06:01 +02:00
if Sys.file_exists "PID" then
begin
let pid =
let ic = open_in "PID" in
try
int_of_string (input_line ic)
with
| End_of_file -> -1
in
match pid with
| -1 -> ()
| pid ->
try
Unix.kill pid 0 ;
2019-07-23 17:27:02 +02:00
ignore @@ Unix.execvp prog argv
2019-07-19 17:06:01 +02:00
with
| Unix.Unix_error (Unix.ESRCH, _, _) -> ()
end
end
2015-12-19 02:35:13 +01:00
in
(* Now, only one forwarder will execute the following code *)
2019-07-19 17:06:01 +02:00
let oc = open_out "PID" in
Unix.getpid ()
|> Printf.sprintf "%d\n"
|> output_string oc
2020-04-15 23:59:17 +02:00
; close_out oc;
2015-12-19 02:35:13 +01:00
(* Fork a qmc *)
ignore @@
2019-07-19 17:06:01 +02:00
Watchdog.fork_exec ~prog ~args:argv ();
2015-12-19 02:35:13 +01:00
(* Fetch input *)
let zmq_context =
2018-06-04 10:26:49 +02:00
Zmq.Context.create ()
2015-12-19 02:35:13 +01:00
in
let terminate () =
(* Clean up the temp directory *)
Unix.chdir Qmcchem_config.dev_shm;
2018-06-04 10:26:49 +02:00
Zmq.Context.terminate zmq_context ;
for i=port to port+4
do
let filename =
2020-04-15 23:59:17 +02:00
Printf.sprintf ":%d" i
in
try
2020-04-15 23:59:17 +02:00
Sys.remove filename
with
| _ -> ()
;
2016-03-05 00:25:39 +01:00
done;
2020-04-15 23:59:17 +02:00
let command =
Printf.sprintf "rm -rf -- \"%s\" " tmpdir
in
try
ignore @@ Unix.system command
with
| Unix.Unix_error _ -> print_endline "Unable to remove temporary directory"
;
2016-03-05 00:25:39 +01:00
Watchdog.kill ()
2015-12-19 02:35:13 +01:00
in
(* Signal handler to Kill properly all the processes *)
let handler s =
2019-07-19 17:06:01 +02:00
Printf.printf "Forwarder received signal %d... killing\n%!" s;
2020-04-15 23:59:17 +02:00
terminate ()
2015-12-19 02:35:13 +01:00
in
2019-07-19 17:06:01 +02:00
List.iter (fun s -> ignore @@ Sys.signal s (Sys.Signal_handle handler))
[
Sys.sigint ;
Sys.sigterm ;
Sys.sigquit ;
2015-12-19 02:35:13 +01:00
]
;
(* Fetch walkers *)
let walk_num =
ref 0
and walkers =
ref []
in
(* Status thread *)
let status =
ref Status.Running
in
let start_status_thread =
let f () =
let pub_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.pub
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+1);
in
bind_socket "PUB" pub_socket address;
let sub_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.sub
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://%s:%d" dataserver_address (port+1-10)
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.connect sub_socket address;
Zmq.Socket.subscribe sub_socket "";
2015-12-19 02:35:13 +01:00
let pollitem =
2018-06-04 10:26:49 +02:00
Zmq.Poll.mask_of
[| (sub_socket, Zmq.Poll.In) ;
2015-12-19 02:35:13 +01:00
|]
in
while (!status <> Status.Stopped)
do
let polling =
2018-06-04 10:26:49 +02:00
Zmq.Poll.poll ~timeout:1000 pollitem
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
if (polling.(0) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
begin
let msg =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv ~block:false sub_socket
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.send pub_socket msg;
2015-12-19 02:35:13 +01:00
status := Status.of_string msg;
end;
done;
2019-07-19 17:06:01 +02:00
List.iter (fun socket ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_linger_period socket 1000 ;
Zmq.Socket.close socket)
2015-12-19 02:35:13 +01:00
[ sub_socket ; pub_socket ]
in
Thread.create f
in
let start_log_thread =
let f () =
let sub_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.xsub
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+3);
in
bind_socket "XSUB" sub_socket address;
let pub_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.xpub
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://%s:%d" dataserver_address (port+3-10)
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.connect pub_socket address;
2015-12-19 02:35:13 +01:00
let pollitem =
2018-06-04 10:26:49 +02:00
Zmq.Poll.mask_of
[| (sub_socket, Zmq.Poll.In) ;
(pub_socket, Zmq.Poll.In) ;
2015-12-19 02:35:13 +01:00
|]
in
(* Main loop *)
while (!status <> Status.Stopped)
do
let polling =
2018-06-04 10:26:49 +02:00
Zmq.Poll.poll ~timeout:1000 pollitem
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
if (polling.(0) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
begin
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv ~block:false sub_socket
|> Zmq.Socket.send pub_socket ;
2015-12-19 02:35:13 +01:00
end
2018-06-04 10:26:49 +02:00
else if (polling.(1) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
begin
2016-04-05 00:48:37 +02:00
Printf.eprintf "Forwarder subscribe\n%!";
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv ~block:false pub_socket
|> Zmq.Socket.send sub_socket ;
2015-12-19 02:35:13 +01:00
end
done;
2019-07-19 17:06:01 +02:00
List.iter (fun socket ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_linger_period socket 1000 ;
Zmq.Socket.close socket)
2015-12-19 02:35:13 +01:00
[ sub_socket ; pub_socket ]
in
Thread.create f
in
(* Proxy thread *)
let start_proxy_thread =
let f () =
let req_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.req
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.connect req_socket dataserver;
Zmq.Socket.set_receive_timeout req_socket 600_000;
2015-12-19 02:35:13 +01:00
let dealer_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.dealer
2015-12-19 02:35:13 +01:00
in
bind_socket "PROXY" dealer_socket "inproc://dealer";
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_receive_high_water_mark dealer_socket 100_000;
Zmq.Socket.set_send_high_water_mark dealer_socket 100_000;
Zmq.Socket.set_immediate dealer_socket true;
Zmq.Socket.set_linger_period dealer_socket 600_000;
2015-12-19 02:35:13 +01:00
let fetch_walkers () =
2019-07-19 17:06:01 +02:00
Zmq.Socket.send_all req_socket ["get_walkers" ; string_of_int !walk_num ];
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all req_socket
2015-12-19 02:35:13 +01:00
in
let pollitem =
2018-06-04 10:26:49 +02:00
Zmq.Poll.mask_of
[| (dealer_socket, Zmq.Poll.In) ;
2015-12-19 02:35:13 +01:00
|]
in
(* EZFIO Cache *)
let ezfio_cache =
2019-07-19 17:06:01 +02:00
Hashtbl.create 63
2015-12-19 02:35:13 +01:00
in
let handle_ezfio msg =
2019-07-19 17:06:01 +02:00
match Hashtbl.find_opt ezfio_cache msg with
2015-12-19 02:35:13 +01:00
| Some result -> result
| None ->
begin
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all req_socket ["Ezfio" ; msg];
2015-12-19 02:35:13 +01:00
let result =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all req_socket
2015-12-19 02:35:13 +01:00
in
2019-07-19 17:06:01 +02:00
Hashtbl.add ezfio_cache msg result;
result
2015-12-19 02:35:13 +01:00
end
in
(* Main loop *)
while (!status <> Status.Stopped)
do
let polling =
2018-06-04 10:26:49 +02:00
Zmq.Poll.poll ~timeout:1000 pollitem
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
if (polling.(0) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
begin
let raw_msg =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false dealer_socket
2015-12-19 02:35:13 +01:00
in
let header, msg =
let rec aux header = function
| "" :: msg -> List.rev ("" :: header), Message.create msg
| head :: tail -> aux (head::header) tail
| _ -> failwith "Too many routers in the middle"
in
2020-04-15 14:40:33 +02:00
aux [] (List.rev @@ List.rev_map String.trim raw_msg)
2015-12-19 02:35:13 +01:00
in
let handle message =
match message with
| Message.Ezfio ezfio_msg ->
let result =
handle_ezfio ezfio_msg
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all dealer_socket (header @ result)
2015-12-19 02:35:13 +01:00
| Message.GetWalkers n_walks ->
begin
if (!walk_num = 0) then
begin
walk_num := Qptypes.Strictly_positive_int.to_int n_walks;
walkers := fetch_walkers ();
end;
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all dealer_socket (header @ !walkers);
2015-12-19 02:35:13 +01:00
walkers := fetch_walkers ();
end
| Message.Test ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all dealer_socket (header @ [ "OK" ])
| Message.Error _ -> ()
2015-12-19 02:35:13 +01:00
| Message.Register _
| Message.Unregister _
| Message.Walkers _
| Message.Property _ ->
failwith "Bad message"
in handle msg
end;
done;
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_linger_period dealer_socket 1000 ;
Zmq.Socket.set_linger_period req_socket 1000 ;
Zmq.Socket.close dealer_socket;
Zmq.Socket.close req_socket;
2015-12-19 02:35:13 +01:00
in
Thread.create f
in
(* Main thread *)
let start_main_thread =
let f () =
let dealer_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.dealer
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.connect dealer_socket dataserver;
Zmq.Socket.set_linger_period dealer_socket 600_000;
2015-12-19 02:35:13 +01:00
let proxy_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.dealer
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.connect proxy_socket "inproc://dealer";
2015-12-19 02:35:13 +01:00
let router_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.router
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port);
in
bind_socket "ROUTER" router_socket address;
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_receive_high_water_mark router_socket 100000;
Zmq.Socket.set_send_high_water_mark router_socket 100000;
Zmq.Socket.set_immediate router_socket true;
Zmq.Socket.set_linger_period router_socket 600_000;
2015-12-19 02:35:13 +01:00
(* Pull socket for computed data *)
let push_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.push
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "tcp://%s:%d" dataserver_address (port+2-10)
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.connect push_socket address;
Zmq.Socket.set_linger_period push_socket 600_000;
2015-12-19 02:35:13 +01:00
let pull_socket =
2018-06-04 10:26:49 +02:00
Zmq.Socket.create zmq_context Zmq.Socket.pull
2015-12-19 02:35:13 +01:00
and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+2);
in
bind_socket "PULL" pull_socket address;
(* Handles messages coming into the ROUTER socket. *)
let handle_router () =
let raw_msg =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false router_socket
2015-12-19 02:35:13 +01:00
in
let header, msg =
let rec aux header = function
| "" :: msg -> List.rev ("" :: header), Message.create msg
| head :: tail -> aux (head::header) tail
| _ -> failwith "Too many routers in the middle"
in
2020-04-15 14:40:33 +02:00
aux [] (List.rev @@ List.rev_map String.trim raw_msg)
2015-12-19 02:35:13 +01:00
in
let handle message =
match message with
| Message.GetWalkers _
| Message.Ezfio _
| Message.Test ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all proxy_socket raw_msg
2015-12-19 02:35:13 +01:00
| Message.Register _
| Message.Unregister _ ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all dealer_socket raw_msg
2015-12-19 02:35:13 +01:00
| Message.Walkers (_, _, _)
| Message.Property _ ->
failwith "Bad message"
| Message.Error _ -> ()
2015-12-19 02:35:13 +01:00
in handle msg
in
let handle_dealer () =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false dealer_socket
|> Zmq.Socket.send_all router_socket
2015-12-19 02:35:13 +01:00
in
let handle_proxy () =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false proxy_socket
|> Zmq.Socket.send_all router_socket
2015-12-19 02:35:13 +01:00
in
2016-04-05 00:48:37 +02:00
let select_n_of ~n ~len l =
let a =
Array.of_list l
in
let s =
(Array.length a)/ len
in
let fetch i =
let rec loop accu = function
| -1 -> accu
2016-04-05 11:52:04 +02:00
| k -> loop ((Array.get a (i*len+k)) :: accu) (k-1)
2016-04-05 00:48:37 +02:00
in
loop [] (len-1)
in
let rec select accu = function
| 0 -> accu
| i -> let new_accu =
(fetch @@ Random.int s) :: accu
in
select new_accu (i-1)
in
select [] n
|> List.concat
in
2015-12-19 02:35:13 +01:00
(* Handles messages coming into the PULL socket. *)
let handle_pull () =
2016-04-05 00:48:37 +02:00
let message =
2018-06-04 10:26:49 +02:00
Zmq.Socket.recv_all ~block:false pull_socket
2016-04-05 00:48:37 +02:00
in
let new_message =
match message with
| "elec_coord":: hostname :: pid :: id :: n_str :: rest ->
let n =
2019-07-19 17:06:01 +02:00
int_of_string n_str
2016-04-05 00:48:37 +02:00
in
let len =
2019-10-01 10:41:48 +02:00
if !walk_num = 0 then n else
2016-04-05 00:48:37 +02:00
n / !walk_num
in
if (n < 5*len) then
message
else
List.concat [ [ "elec_coord" ; hostname ; pid ; id ;
2019-07-19 17:06:01 +02:00
string_of_int (5*len)] ; ( select_n_of ~n:5 ~len rest ) ]
2022-01-12 19:05:15 +01:00
| prop :: c :: pid :: b :: d :: w :: [] -> message
| prop :: c :: pid :: b :: d :: w :: l ->
if Qmcchem_config.binary_io then
match Message.create message with
| Message.Property block ->
prop :: c :: pid :: b :: d :: w :: "bin" ::
(Block.to_bytes block |> Bytes.unsafe_to_string ) :: []
| _ -> failwith "Inconsistent message"
else
message
2016-04-05 00:48:37 +02:00
| _ -> message
in
2018-06-04 10:26:49 +02:00
Zmq.Socket.send_all push_socket new_message
2015-12-19 02:35:13 +01:00
in
(* Polling item to poll ROUTER and PULL sockets. *)
let pollitem =
2018-06-04 10:26:49 +02:00
Zmq.Poll.mask_of
[| (router_socket , Zmq.Poll.In) ;
(pull_socket , Zmq.Poll.In) ;
(dealer_socket, Zmq.Poll.In) ;
(proxy_socket , Zmq.Poll.In)
2015-12-19 02:35:13 +01:00
|]
in
(* Main loop *)
while (!status <> Status.Stopped)
do
let polling =
2018-06-04 10:26:49 +02:00
Zmq.Poll.poll ~timeout:1000 pollitem
2015-12-19 02:35:13 +01:00
in
2018-06-04 10:26:49 +02:00
if (polling.(0) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
handle_router ();
2018-06-04 10:26:49 +02:00
if (polling.(1) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
handle_pull ();
2018-06-04 10:26:49 +02:00
if (polling.(2) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
handle_dealer ();
2018-06-04 10:26:49 +02:00
if (polling.(3) = Some Zmq.Poll.In) then
2015-12-19 02:35:13 +01:00
handle_proxy ();
done;
2019-07-19 17:06:01 +02:00
List.iter (fun socket ->
2018-06-04 10:26:49 +02:00
Zmq.Socket.set_linger_period socket 1000 ;
Zmq.Socket.close socket)
2015-12-19 02:35:13 +01:00
[ router_socket ; dealer_socket ; push_socket ; pull_socket ; proxy_socket ]
in
Thread.create f
in
(* Start the status thread and the main thread *)
begin
try
2019-07-19 17:06:01 +02:00
(List.iter Thread.join
2015-12-19 02:35:13 +01:00
[ start_status_thread ();
start_log_thread ();
start_proxy_thread ();
start_main_thread ();
])
with
| err ->
begin
print_endline "Trapped error. Waiting 10 seconds...";
status := Status.Stopping;
2019-07-19 17:06:01 +02:00
Unix.sleep 10 ;
2015-12-19 02:35:13 +01:00
raise err
end
end;
(* Wait for the qmc process to complete *)
2016-03-03 13:39:06 +01:00
try
ignore (Watchdog.join ());
terminate ()
with
| error ->
begin
terminate ();
raise error
end
2015-12-19 02:35:13 +01:00