10
1
mirror of https://gitlab.com/scemama/qmcchem.git synced 2024-12-22 12:23:30 +01:00

ZMQ -> zmq

This commit is contained in:
Anthony Scemama 2018-06-04 10:26:49 +02:00
parent b9a69fa62d
commit a07c4a98f2
5 changed files with 156 additions and 156 deletions

View File

@ -17,7 +17,7 @@ let initialization_timeout = 600.
let bind_socket ~socket_type ~socket ~address = let bind_socket ~socket_type ~socket ~address =
try try
ZMQ.Socket.bind socket address Zmq.Socket.bind socket address
with with
| Unix.Unix_error (_, message, f) -> | Unix.Unix_error (_, message, f) ->
failwith @@ Printf.sprintf failwith @@ Printf.sprintf
@ -54,7 +54,7 @@ let run ?(daemon=true) ezfio_filename =
(** {2 ZeroMQ initialization} *) (** {2 ZeroMQ initialization} *)
let zmq_context = let zmq_context =
ZMQ.Context.create () Zmq.Context.create ()
in in
@ -93,16 +93,16 @@ let run ?(daemon=true) ezfio_filename =
adress_prefix ^ (Int.to_string (n+i)) adress_prefix ^ (Int.to_string (n+i))
in in
let socket = let socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.rep Zmq.Socket.create zmq_context Zmq.Socket.rep
in in
let result = let result =
try try
ZMQ.Socket.bind socket address; Zmq.Socket.bind socket address;
accu accu
with with
| _ -> false | _ -> false
in in
ZMQ.Socket.close socket; Zmq.Socket.close socket;
result result
) )
in in
@ -128,15 +128,15 @@ let run ?(daemon=true) ezfio_filename =
let debug_socket = let debug_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.xpub Zmq.Socket.create zmq_context Zmq.Socket.xpub
and address = and address =
Printf.sprintf "tcp://*:%d" (port+4) Printf.sprintf "tcp://*:%d" (port+4)
in in
bind_socket "XPUB" debug_socket address; bind_socket "XPUB" debug_socket address;
ZMQ.Socket.set_linger_period debug_socket 100 ; Zmq.Socket.set_linger_period debug_socket 100 ;
let close_debug_socket () = let close_debug_socket () =
ZMQ.Socket.close debug_socket Zmq.Socket.close debug_socket
in in
(** Sends a log text to the debug socket *) (** Sends a log text to the debug socket *)
@ -148,7 +148,7 @@ let run ?(daemon=true) ezfio_filename =
Printf.sprintf "%20s : %8d : %10s : %s" Printf.sprintf "%20s : %8d : %10s : %s"
socket size text (Time.Span.to_string dt) socket size text (Time.Span.to_string dt)
in in
ZMQ.Socket.send debug_socket message Zmq.Socket.send debug_socket message
in in
@ -402,7 +402,7 @@ let run ?(daemon=true) ezfio_filename =
send_log "status" 0 t0 "Starting status thread"; send_log "status" 0 t0 "Starting status thread";
let socket = let socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pub Zmq.Socket.create zmq_context Zmq.Socket.pub
and address = and address =
Printf.sprintf "tcp://*:%d" (port+1) Printf.sprintf "tcp://*:%d" (port+1)
in in
@ -432,7 +432,7 @@ let run ?(daemon=true) ezfio_filename =
let status_string = let status_string =
Status.to_string !status Status.to_string !status
in in
ZMQ.Socket.send socket status_string; Zmq.Socket.send socket status_string;
send_log "status" (String.length status_string) now status_string; send_log "status" (String.length status_string) now status_string;
let test = let test =
@ -472,9 +472,9 @@ let run ?(daemon=true) ezfio_filename =
| (_, _, _) -> () | (_, _, _) -> ()
; ;
done; done;
ZMQ.Socket.send socket (Status.to_string !status); Zmq.Socket.send socket (Status.to_string !status);
ZMQ.Socket.set_linger_period socket 1_000 ; Zmq.Socket.set_linger_period socket 1_000 ;
ZMQ.Socket.close socket Zmq.Socket.close socket
) )
in in
@ -488,27 +488,27 @@ let run ?(daemon=true) ezfio_filename =
send_log "status" 0 t0 "Starting log thread"; send_log "status" 0 t0 "Starting log thread";
let socket = let socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.xsub Zmq.Socket.create zmq_context Zmq.Socket.xsub
and address = and address =
Printf.sprintf "tcp://*:%d" (port+3) Printf.sprintf "tcp://*:%d" (port+3)
in in
bind_socket "XSUB" socket address; bind_socket "XSUB" socket address;
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| (socket , ZMQ.Poll.In) ; [| (socket , Zmq.Poll.In) ;
(debug_socket , ZMQ.Poll.In) (debug_socket , Zmq.Poll.In)
|] |]
in in
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
ZMQ.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
if (polling.(0) = Some ZMQ.Poll.In) then if (polling.(0) = Some Zmq.Poll.In) then
begin begin
let message = let message =
ZMQ.Socket.recv_all ~block:false socket Zmq.Socket.recv_all ~block:false socket
|> String.concat ~sep:" " |> String.concat ~sep:" "
in in
let now = let now =
@ -516,15 +516,15 @@ let run ?(daemon=true) ezfio_filename =
in in
send_log "log" 0 now message send_log "log" 0 now message
end end
else if (polling.(1) = Some ZMQ.Poll.In) then else if (polling.(1) = Some Zmq.Poll.In) then
begin begin
(* Forward subscription from XPUB to XSUB *) (* Forward subscription from XPUB to XSUB *)
ZMQ.Socket.recv_all ~block:false debug_socket Zmq.Socket.recv_all ~block:false debug_socket
|> ZMQ.Socket.send_all socket |> Zmq.Socket.send_all socket
end end
done; done;
ZMQ.Socket.set_linger_period socket 1000 ; Zmq.Socket.set_linger_period socket 1000 ;
ZMQ.Socket.close socket Zmq.Socket.close socket
) )
in in
(** {3 Main thread} *) (** {3 Main thread} *)
@ -557,15 +557,15 @@ let run ?(daemon=true) ezfio_filename =
(** Reply socket *) (** Reply socket *)
let rep_socket = let rep_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.rep Zmq.Socket.create zmq_context Zmq.Socket.rep
and address = and address =
Printf.sprintf "tcp://*:%d" port Printf.sprintf "tcp://*:%d" port
in in
bind_socket "REP" rep_socket address; bind_socket "REP" rep_socket address;
ZMQ.Socket.set_receive_high_water_mark rep_socket 100_000; Zmq.Socket.set_receive_high_water_mark rep_socket 100_000;
ZMQ.Socket.set_send_high_water_mark rep_socket 100_000; Zmq.Socket.set_send_high_water_mark rep_socket 100_000;
ZMQ.Socket.set_immediate rep_socket true; Zmq.Socket.set_immediate rep_socket true;
ZMQ.Socket.set_linger_period rep_socket 600_000 ; Zmq.Socket.set_linger_period rep_socket 600_000 ;
(** EZFIO Cache *) (** EZFIO Cache *)
let ezfio_cache = let ezfio_cache =
@ -587,7 +587,7 @@ let run ?(daemon=true) ezfio_filename =
(** Pull socket for computed data *) (** Pull socket for computed data *)
let pull_socket = let pull_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pull Zmq.Socket.create zmq_context Zmq.Socket.pull
and address = and address =
Printf.sprintf "tcp://*:%d" (port+2) Printf.sprintf "tcp://*:%d" (port+2)
in in
@ -607,9 +607,9 @@ let run ?(daemon=true) ezfio_filename =
(** Polling item to poll REP and PULL sockets. *) (** Polling item to poll REP and PULL sockets. *)
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| ( rep_socket, ZMQ.Poll.In) ; [| ( rep_socket, Zmq.Poll.In) ;
( pull_socket, ZMQ.Poll.In) ; ( pull_socket, Zmq.Poll.In) ;
|] |]
in in
@ -617,7 +617,7 @@ let run ?(daemon=true) ezfio_filename =
(** Handles messages coming into the REP socket. *) (** Handles messages coming into the REP socket. *)
let handle_rep () = let handle_rep () =
let raw_msg = let raw_msg =
ZMQ.Socket.recv_all ~block:false rep_socket Zmq.Socket.recv_all ~block:false rep_socket
in in
let t0 = let t0 =
Time.now () Time.now ()
@ -634,7 +634,7 @@ let run ?(daemon=true) ezfio_filename =
let result = let result =
handle_ezfio ezfio_msg handle_ezfio ezfio_msg
in in
ZMQ.Socket.send_all rep_socket Zmq.Socket.send_all rep_socket
[ String.length result [ String.length result
|> Printf.sprintf "%d " ; |> Printf.sprintf "%d " ;
result ] ; result ] ;
@ -645,7 +645,7 @@ let run ?(daemon=true) ezfio_filename =
let result = let result =
random_walkers n_walks random_walkers n_walks
in in
ZMQ.Socket.send_all rep_socket result; Zmq.Socket.send_all rep_socket result;
send_log "rep" walkers_size t0 "get_walkers" send_log "rep" walkers_size t0 "get_walkers"
end end
| Message.Register (w,pid) -> | Message.Register (w,pid) ->
@ -661,12 +661,12 @@ let run ?(daemon=true) ezfio_filename =
add_worker w pid; add_worker w pid;
if (!status = Status.Queued) then if (!status = Status.Queued) then
change_status Status.Running ; change_status Status.Running ;
ZMQ.Socket.send rep_socket "OK"; Zmq.Socket.send rep_socket "OK";
send_log "rep" 2 t0 "Register : OK" send_log "rep" 2 t0 "Register : OK"
end end
| Status.Stopping | Status.Stopping
| Status.Stopped -> | Status.Stopped ->
ZMQ.Socket.send rep_socket "Failed"; Zmq.Socket.send rep_socket "Failed";
end end
| Message.Unregister (w,pid) -> | Message.Unregister (w,pid) ->
begin begin
@ -674,7 +674,7 @@ let run ?(daemon=true) ezfio_filename =
(Compute_node.to_string w) ; " " ; (Compute_node.to_string w) ; " " ;
(Pid.to_string pid) ] (Pid.to_string pid) ]
|> send_log "req" msg_size t0; |> send_log "req" msg_size t0;
ZMQ.Socket.send rep_socket "OK"; Zmq.Socket.send rep_socket "OK";
del_worker w pid; del_worker w pid;
String.concat [ "Unregister : "; String.concat [ "Unregister : ";
(Hashtbl.length workers_hash) |> Int.to_string ; (Hashtbl.length workers_hash) |> Int.to_string ;
@ -690,7 +690,7 @@ let run ?(daemon=true) ezfio_filename =
end end
| Message.Test -> | Message.Test ->
begin begin
ZMQ.Socket.send rep_socket "OK"; Zmq.Socket.send rep_socket "OK";
send_log "rep" 2 t0 "Test" send_log "rep" 2 t0 "Test"
end end
| Message.Walkers (_, _, _) | Message.Walkers (_, _, _)
@ -702,7 +702,7 @@ let run ?(daemon=true) ezfio_filename =
(** Handles messages coming into the PULL socket. *) (** Handles messages coming into the PULL socket. *)
let handle_pull status = let handle_pull status =
let raw_msg = let raw_msg =
ZMQ.Socket.recv_all ~block:false pull_socket Zmq.Socket.recv_all ~block:false pull_socket
in in
let t0 = let t0 =
Time.now () Time.now ()
@ -770,14 +770,14 @@ let run ?(daemon=true) ezfio_filename =
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
ZMQ.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
match polling.(1) with match polling.(1) with
| Some ZMQ.Poll.In -> handle_pull !status | Some Zmq.Poll.In -> handle_pull !status
| _ -> | _ ->
begin begin
match polling.(0) with match polling.(0) with
| Some ZMQ.Poll.In -> handle_rep () | Some Zmq.Poll.In -> handle_rep ()
| _ -> | _ ->
begin begin
Out_channel.flush !block_channel ; Out_channel.flush !block_channel ;
@ -796,8 +796,8 @@ let run ?(daemon=true) ezfio_filename =
done; done;
List.iter ~f:(fun socket -> List.iter ~f:(fun socket ->
ZMQ.Socket.set_linger_period socket 1000 ; Zmq.Socket.set_linger_period socket 1000 ;
ZMQ.Socket.close socket) Zmq.Socket.close socket)
[ rep_socket ; pull_socket ] [ rep_socket ; pull_socket ]
in in
Thread.create f Thread.create f
@ -816,7 +816,7 @@ let run ?(daemon=true) ezfio_filename =
compress_block_file (); compress_block_file ();
send_log "status" 0 t0 "Done"; send_log "status" 0 t0 "Done";
close_debug_socket (); close_debug_socket ();
ZMQ.Context.terminate zmq_context; Zmq.Context.terminate zmq_context;
begin begin
try try
Out_channel.close !block_channel; Out_channel.close !block_channel;

View File

@ -10,12 +10,12 @@ let run ~t ezfio_filename=
; ;
let zmq_context = let zmq_context =
ZMQ.Context.create () Zmq.Context.create ()
in in
Printf.printf "Debugging %s\n%!" ezfio_filename; Printf.printf "Debugging %s\n%!" ezfio_filename;
let socket = let socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.sub Zmq.Socket.create zmq_context Zmq.Socket.sub
in in
let address = let address =
@ -25,8 +25,8 @@ let run ~t ezfio_filename=
| Some (a,p) -> a^":"^( (Int.of_string p)+4 |> Int.to_string ) | Some (a,p) -> a^":"^( (Int.of_string p)+4 |> Int.to_string )
| None -> failwith "Badly formed address" | None -> failwith "Badly formed address"
in in
ZMQ.Socket.connect socket address; Zmq.Socket.connect socket address;
ZMQ.Socket.subscribe socket ""; Zmq.Socket.subscribe socket "";
if t then if t then
begin begin
@ -39,7 +39,7 @@ let run ~t ezfio_filename=
while true while true
do do
let msg = let msg =
ZMQ.Socket.recv socket Zmq.Socket.recv socket
in in
let (socket, bytes) = let (socket, bytes) =
match Str.split re_split msg with match Str.split re_split msg with
@ -57,7 +57,7 @@ let run ~t ezfio_filename=
while true while true
do do
let msg = let msg =
ZMQ.Socket.recv socket Zmq.Socket.recv socket
in in
Printf.printf "%s\n%!" msg; Printf.printf "%s\n%!" msg;
done done
@ -76,7 +76,7 @@ let spec =
let command = let command =
Command.basic_spec Command.basic_spec
~summary: "Debug ZeroMQ communications" ~summary: "Debug ZeroMQ communications"
~readme:(fun () -> "Gets debug information from the ZMQ debug sockets.") ~readme:(fun () -> "Gets debug information from the Zmq debug sockets.")
spec spec
(fun t ezfio_file () -> run t ezfio_file) (fun t ezfio_file () -> run t ezfio_file)

View File

@ -8,7 +8,7 @@ let bind_socket ~socket_type ~socket ~address =
| -1 -> () | -1 -> ()
| i -> | i ->
try try
ZMQ.Socket.bind socket address; Zmq.Socket.bind socket address;
loop (-1) loop (-1)
with with
| Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_sec 1. ; loop (i-1) ) | Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_sec 1. ; loop (i-1) )
@ -95,7 +95,7 @@ let run ezfio_filename dataserver =
(* Fetch input *) (* Fetch input *)
let zmq_context = let zmq_context =
ZMQ.Context.create () Zmq.Context.create ()
in in
let terminate () = let terminate () =
@ -108,7 +108,7 @@ let run ezfio_filename dataserver =
| Ok _ -> () | Ok _ -> ()
| _ -> print_endline "Unable to remove temporary directory" | _ -> print_endline "Unable to remove temporary directory"
; ;
ZMQ.Context.terminate zmq_context ; Zmq.Context.terminate zmq_context ;
for i=port to port+4 for i=port to port+4
do do
let filename = let filename =
@ -155,43 +155,43 @@ let run ezfio_filename dataserver =
let f () = let f () =
let pub_socket = let pub_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pub Zmq.Socket.create zmq_context Zmq.Socket.pub
and address = and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+1); Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+1);
in in
bind_socket "PUB" pub_socket address; bind_socket "PUB" pub_socket address;
let sub_socket = let sub_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.sub Zmq.Socket.create zmq_context Zmq.Socket.sub
and address = and address =
Printf.sprintf "tcp://%s:%d" dataserver_address (port+1-10) Printf.sprintf "tcp://%s:%d" dataserver_address (port+1-10)
in in
ZMQ.Socket.connect sub_socket address; Zmq.Socket.connect sub_socket address;
ZMQ.Socket.subscribe sub_socket ""; Zmq.Socket.subscribe sub_socket "";
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| (sub_socket, ZMQ.Poll.In) ; [| (sub_socket, Zmq.Poll.In) ;
|] |]
in in
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
ZMQ.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
if (polling.(0) = Some ZMQ.Poll.In) then if (polling.(0) = Some Zmq.Poll.In) then
begin begin
let msg = let msg =
ZMQ.Socket.recv ~block:false sub_socket Zmq.Socket.recv ~block:false sub_socket
in in
ZMQ.Socket.send pub_socket msg; Zmq.Socket.send pub_socket msg;
status := Status.of_string msg; status := Status.of_string msg;
end; end;
done; done;
List.iter ~f:(fun socket -> List.iter ~f:(fun socket ->
ZMQ.Socket.set_linger_period socket 1000 ; Zmq.Socket.set_linger_period socket 1000 ;
ZMQ.Socket.close socket) Zmq.Socket.close socket)
[ sub_socket ; pub_socket ] [ sub_socket ; pub_socket ]
in in
Thread.create f Thread.create f
@ -201,23 +201,23 @@ let run ezfio_filename dataserver =
let f () = let f () =
let sub_socket = let sub_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.xsub Zmq.Socket.create zmq_context Zmq.Socket.xsub
and address = and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+3); Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+3);
in in
bind_socket "XSUB" sub_socket address; bind_socket "XSUB" sub_socket address;
let pub_socket = let pub_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.xpub Zmq.Socket.create zmq_context Zmq.Socket.xpub
and address = and address =
Printf.sprintf "tcp://%s:%d" dataserver_address (port+3-10) Printf.sprintf "tcp://%s:%d" dataserver_address (port+3-10)
in in
ZMQ.Socket.connect pub_socket address; Zmq.Socket.connect pub_socket address;
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| (sub_socket, ZMQ.Poll.In) ; [| (sub_socket, Zmq.Poll.In) ;
(pub_socket, ZMQ.Poll.In) ; (pub_socket, Zmq.Poll.In) ;
|] |]
in in
@ -225,23 +225,23 @@ let run ezfio_filename dataserver =
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
ZMQ.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
if (polling.(0) = Some ZMQ.Poll.In) then if (polling.(0) = Some Zmq.Poll.In) then
begin begin
ZMQ.Socket.recv ~block:false sub_socket Zmq.Socket.recv ~block:false sub_socket
|> ZMQ.Socket.send pub_socket ; |> Zmq.Socket.send pub_socket ;
end end
else if (polling.(1) = Some ZMQ.Poll.In) then else if (polling.(1) = Some Zmq.Poll.In) then
begin begin
Printf.eprintf "Forwarder subscribe\n%!"; Printf.eprintf "Forwarder subscribe\n%!";
ZMQ.Socket.recv ~block:false pub_socket Zmq.Socket.recv ~block:false pub_socket
|> ZMQ.Socket.send sub_socket ; |> Zmq.Socket.send sub_socket ;
end end
done; done;
List.iter ~f:(fun socket -> List.iter ~f:(fun socket ->
ZMQ.Socket.set_linger_period socket 1000 ; Zmq.Socket.set_linger_period socket 1000 ;
ZMQ.Socket.close socket) Zmq.Socket.close socket)
[ sub_socket ; pub_socket ] [ sub_socket ; pub_socket ]
in in
Thread.create f Thread.create f
@ -252,29 +252,29 @@ let run ezfio_filename dataserver =
let f () = let f () =
let req_socket = let req_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.req Zmq.Socket.create zmq_context Zmq.Socket.req
in in
ZMQ.Socket.connect req_socket dataserver; Zmq.Socket.connect req_socket dataserver;
ZMQ.Socket.set_receive_timeout req_socket 600_000; Zmq.Socket.set_receive_timeout req_socket 600_000;
let dealer_socket = let dealer_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.dealer Zmq.Socket.create zmq_context Zmq.Socket.dealer
in in
bind_socket "PROXY" dealer_socket "inproc://dealer"; bind_socket "PROXY" dealer_socket "inproc://dealer";
ZMQ.Socket.set_receive_high_water_mark dealer_socket 100_000; Zmq.Socket.set_receive_high_water_mark dealer_socket 100_000;
ZMQ.Socket.set_send_high_water_mark dealer_socket 100_000; Zmq.Socket.set_send_high_water_mark dealer_socket 100_000;
ZMQ.Socket.set_immediate dealer_socket true; Zmq.Socket.set_immediate dealer_socket true;
ZMQ.Socket.set_linger_period dealer_socket 600_000; Zmq.Socket.set_linger_period dealer_socket 600_000;
let fetch_walkers () = let fetch_walkers () =
ZMQ.Socket.send_all req_socket ["get_walkers" ; Int.to_string !walk_num ]; Zmq.Socket.send_all req_socket ["get_walkers" ; Int.to_string !walk_num ];
ZMQ.Socket.recv_all req_socket Zmq.Socket.recv_all req_socket
in in
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| (dealer_socket, ZMQ.Poll.In) ; [| (dealer_socket, Zmq.Poll.In) ;
|] |]
in in
@ -287,9 +287,9 @@ let run ezfio_filename dataserver =
| Some result -> result | Some result -> result
| None -> | None ->
begin begin
ZMQ.Socket.send_all req_socket ["Ezfio" ; msg]; Zmq.Socket.send_all req_socket ["Ezfio" ; msg];
let result = let result =
ZMQ.Socket.recv_all req_socket Zmq.Socket.recv_all req_socket
in in
match (Hashtbl.add ezfio_cache ~key:msg ~data:result) with match (Hashtbl.add ezfio_cache ~key:msg ~data:result) with
| `Ok -> result | `Ok -> result
@ -302,12 +302,12 @@ let run ezfio_filename dataserver =
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
ZMQ.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
if (polling.(0) = Some ZMQ.Poll.In) then if (polling.(0) = Some Zmq.Poll.In) then
begin begin
let raw_msg = let raw_msg =
ZMQ.Socket.recv_all ~block:false dealer_socket Zmq.Socket.recv_all ~block:false dealer_socket
in in
let header, msg = let header, msg =
let rec aux header = function let rec aux header = function
@ -323,7 +323,7 @@ let run ezfio_filename dataserver =
let result = let result =
handle_ezfio ezfio_msg handle_ezfio ezfio_msg
in in
ZMQ.Socket.send_all dealer_socket (header @ result) Zmq.Socket.send_all dealer_socket (header @ result)
| Message.GetWalkers n_walks -> | Message.GetWalkers n_walks ->
begin begin
if (!walk_num = 0) then if (!walk_num = 0) then
@ -331,11 +331,11 @@ let run ezfio_filename dataserver =
walk_num := Qptypes.Strictly_positive_int.to_int n_walks; walk_num := Qptypes.Strictly_positive_int.to_int n_walks;
walkers := fetch_walkers (); walkers := fetch_walkers ();
end; end;
ZMQ.Socket.send_all dealer_socket (header @ !walkers); Zmq.Socket.send_all dealer_socket (header @ !walkers);
walkers := fetch_walkers (); walkers := fetch_walkers ();
end end
| Message.Test -> | Message.Test ->
ZMQ.Socket.send_all dealer_socket (header @ [ "OK" ]) Zmq.Socket.send_all dealer_socket (header @ [ "OK" ])
| Message.Error _ -> () | Message.Error _ -> ()
| Message.Register _ | Message.Register _
| Message.Unregister _ | Message.Unregister _
@ -345,10 +345,10 @@ let run ezfio_filename dataserver =
in handle msg in handle msg
end; end;
done; done;
ZMQ.Socket.set_linger_period dealer_socket 1000 ; Zmq.Socket.set_linger_period dealer_socket 1000 ;
ZMQ.Socket.set_linger_period req_socket 1000 ; Zmq.Socket.set_linger_period req_socket 1000 ;
ZMQ.Socket.close dealer_socket; Zmq.Socket.close dealer_socket;
ZMQ.Socket.close req_socket; Zmq.Socket.close req_socket;
in in
Thread.create f Thread.create f
in in
@ -358,38 +358,38 @@ let run ezfio_filename dataserver =
let f () = let f () =
let dealer_socket = let dealer_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.dealer Zmq.Socket.create zmq_context Zmq.Socket.dealer
in in
ZMQ.Socket.connect dealer_socket dataserver; Zmq.Socket.connect dealer_socket dataserver;
ZMQ.Socket.set_linger_period dealer_socket 600_000; Zmq.Socket.set_linger_period dealer_socket 600_000;
let proxy_socket = let proxy_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.dealer Zmq.Socket.create zmq_context Zmq.Socket.dealer
in in
ZMQ.Socket.connect proxy_socket "inproc://dealer"; Zmq.Socket.connect proxy_socket "inproc://dealer";
let router_socket = let router_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.router Zmq.Socket.create zmq_context Zmq.Socket.router
and address = and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port); Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port);
in in
bind_socket "ROUTER" router_socket address; bind_socket "ROUTER" router_socket address;
ZMQ.Socket.set_receive_high_water_mark router_socket 100000; Zmq.Socket.set_receive_high_water_mark router_socket 100000;
ZMQ.Socket.set_send_high_water_mark router_socket 100000; Zmq.Socket.set_send_high_water_mark router_socket 100000;
ZMQ.Socket.set_immediate router_socket true; Zmq.Socket.set_immediate router_socket true;
ZMQ.Socket.set_linger_period router_socket 600_000; Zmq.Socket.set_linger_period router_socket 600_000;
(* Pull socket for computed data *) (* Pull socket for computed data *)
let push_socket = let push_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.push Zmq.Socket.create zmq_context Zmq.Socket.push
and address = and address =
Printf.sprintf "tcp://%s:%d" dataserver_address (port+2-10) Printf.sprintf "tcp://%s:%d" dataserver_address (port+2-10)
in in
ZMQ.Socket.connect push_socket address; Zmq.Socket.connect push_socket address;
ZMQ.Socket.set_linger_period push_socket 600_000; Zmq.Socket.set_linger_period push_socket 600_000;
let pull_socket = let pull_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pull Zmq.Socket.create zmq_context Zmq.Socket.pull
and address = and address =
Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+2); Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+2);
in in
@ -399,7 +399,7 @@ let run ezfio_filename dataserver =
(* Handles messages coming into the ROUTER socket. *) (* Handles messages coming into the ROUTER socket. *)
let handle_router () = let handle_router () =
let raw_msg = let raw_msg =
ZMQ.Socket.recv_all ~block:false router_socket Zmq.Socket.recv_all ~block:false router_socket
in in
let header, msg = let header, msg =
let rec aux header = function let rec aux header = function
@ -414,10 +414,10 @@ let run ezfio_filename dataserver =
| Message.GetWalkers _ | Message.GetWalkers _
| Message.Ezfio _ | Message.Ezfio _
| Message.Test -> | Message.Test ->
ZMQ.Socket.send_all proxy_socket raw_msg Zmq.Socket.send_all proxy_socket raw_msg
| Message.Register _ | Message.Register _
| Message.Unregister _ -> | Message.Unregister _ ->
ZMQ.Socket.send_all dealer_socket raw_msg Zmq.Socket.send_all dealer_socket raw_msg
| Message.Walkers (_, _, _) | Message.Walkers (_, _, _)
| Message.Property _ -> | Message.Property _ ->
failwith "Bad message" failwith "Bad message"
@ -426,13 +426,13 @@ let run ezfio_filename dataserver =
in in
let handle_dealer () = let handle_dealer () =
ZMQ.Socket.recv_all ~block:false dealer_socket Zmq.Socket.recv_all ~block:false dealer_socket
|> ZMQ.Socket.send_all router_socket |> Zmq.Socket.send_all router_socket
in in
let handle_proxy () = let handle_proxy () =
ZMQ.Socket.recv_all ~block:false proxy_socket Zmq.Socket.recv_all ~block:false proxy_socket
|> ZMQ.Socket.send_all router_socket |> Zmq.Socket.send_all router_socket
in in
let select_n_of ~n ~len l = let select_n_of ~n ~len l =
@ -463,7 +463,7 @@ let run ezfio_filename dataserver =
(* Handles messages coming into the PULL socket. *) (* Handles messages coming into the PULL socket. *)
let handle_pull () = let handle_pull () =
let message = let message =
ZMQ.Socket.recv_all ~block:false pull_socket Zmq.Socket.recv_all ~block:false pull_socket
in in
let new_message = let new_message =
match message with match message with
@ -481,36 +481,36 @@ let run ezfio_filename dataserver =
Int.to_string (5*len)] ; ( select_n_of ~n:5 ~len rest ) ] Int.to_string (5*len)] ; ( select_n_of ~n:5 ~len rest ) ]
| _ -> message | _ -> message
in in
ZMQ.Socket.send_all push_socket new_message Zmq.Socket.send_all push_socket new_message
in in
(* Polling item to poll ROUTER and PULL sockets. *) (* Polling item to poll ROUTER and PULL sockets. *)
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| (router_socket , ZMQ.Poll.In) ; [| (router_socket , Zmq.Poll.In) ;
(pull_socket , ZMQ.Poll.In) ; (pull_socket , Zmq.Poll.In) ;
(dealer_socket, ZMQ.Poll.In) ; (dealer_socket, Zmq.Poll.In) ;
(proxy_socket , ZMQ.Poll.In) (proxy_socket , Zmq.Poll.In)
|] |]
in in
(* Main loop *) (* Main loop *)
while (!status <> Status.Stopped) while (!status <> Status.Stopped)
do do
let polling = let polling =
ZMQ.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
if (polling.(0) = Some ZMQ.Poll.In) then if (polling.(0) = Some Zmq.Poll.In) then
handle_router (); handle_router ();
if (polling.(1) = Some ZMQ.Poll.In) then if (polling.(1) = Some Zmq.Poll.In) then
handle_pull (); handle_pull ();
if (polling.(2) = Some ZMQ.Poll.In) then if (polling.(2) = Some Zmq.Poll.In) then
handle_dealer (); handle_dealer ();
if (polling.(3) = Some ZMQ.Poll.In) then if (polling.(3) = Some Zmq.Poll.In) then
handle_proxy (); handle_proxy ();
done; done;
List.iter ~f:(fun socket -> List.iter ~f:(fun socket ->
ZMQ.Socket.set_linger_period socket 1000 ; Zmq.Socket.set_linger_period socket 1000 ;
ZMQ.Socket.close socket) Zmq.Socket.close socket)
[ router_socket ; dealer_socket ; push_socket ; pull_socket ; proxy_socket ] [ router_socket ; dealer_socket ; push_socket ; pull_socket ; proxy_socket ]
in in
Thread.create f Thread.create f

View File

@ -49,29 +49,29 @@ let full_run ?(start_dataserver=true) ezfio_filename =
end; end;
(* Check if the ZMQ Rep socket is open *) (* Check if the Zmq Rep socket is open *)
let test_open_rep_socket () = let test_open_rep_socket () =
let zmq_context = let zmq_context =
ZMQ.Context.create () Zmq.Context.create ()
in in
let socket = let socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.req Zmq.Socket.create zmq_context Zmq.Socket.req
and address = and address =
Ezfio.get_simulation_http_server () Ezfio.get_simulation_http_server ()
in in
let reply = let reply =
try try
( (
ZMQ.Socket.set_receive_timeout socket 100; Zmq.Socket.set_receive_timeout socket 100;
ZMQ.Socket.connect socket address; Zmq.Socket.connect socket address;
ZMQ.Socket.send socket (Message.(to_string Test)); Zmq.Socket.send socket (Message.(to_string Test));
ZMQ.Socket.recv socket Zmq.Socket.recv socket
) with ) with
| Unix.Unix_error (_,_,_) -> | Unix.Unix_error (_,_,_) ->
begin begin
ZMQ.Socket.set_linger_period socket 1 ; Zmq.Socket.set_linger_period socket 1 ;
ZMQ.Socket.close socket; Zmq.Socket.close socket;
ZMQ.Context.terminate zmq_context; Zmq.Context.terminate zmq_context;
"Failed" "Failed"
end end
in in

View File

@ -1,7 +1,7 @@
MAIN=qmcchem MAIN=qmcchem
# Main program to build # Main program to build
PACKAGES=-package core,cryptokit,str,ZMQ PACKAGES=-package core,cryptokit,str,zmq
#,ppx_sexp_conv #,ppx_sexp_conv
# Required opam packages, for example: # Required opam packages, for example:
# PACKAGES=-package core,sexplib.syntax # PACKAGES=-package core,sexplib.syntax