From a07c4a98f2733e22b90ae98a1d8692d967c5a07c Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Mon, 4 Jun 2018 10:26:49 +0200 Subject: [PATCH] ZMQ -> zmq --- ocaml/Qmcchem_dataserver.ml | 98 ++++++++++---------- ocaml/Qmcchem_debug.ml | 14 +-- ocaml/Qmcchem_forwarder.ml | 178 ++++++++++++++++++------------------ ocaml/Qmcchem_run.ml | 20 ++-- ocaml/build.ninja | 2 +- 5 files changed, 156 insertions(+), 156 deletions(-) diff --git a/ocaml/Qmcchem_dataserver.ml b/ocaml/Qmcchem_dataserver.ml index 73cc34e..a9a9c14 100644 --- a/ocaml/Qmcchem_dataserver.ml +++ b/ocaml/Qmcchem_dataserver.ml @@ -17,7 +17,7 @@ let initialization_timeout = 600. let bind_socket ~socket_type ~socket ~address = try - ZMQ.Socket.bind socket address + Zmq.Socket.bind socket address with | Unix.Unix_error (_, message, f) -> failwith @@ Printf.sprintf @@ -54,7 +54,7 @@ let run ?(daemon=true) ezfio_filename = (** {2 ZeroMQ initialization} *) let zmq_context = - ZMQ.Context.create () + Zmq.Context.create () in @@ -93,16 +93,16 @@ let run ?(daemon=true) ezfio_filename = adress_prefix ^ (Int.to_string (n+i)) in let socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.rep + Zmq.Socket.create zmq_context Zmq.Socket.rep in let result = try - ZMQ.Socket.bind socket address; + Zmq.Socket.bind socket address; accu with | _ -> false in - ZMQ.Socket.close socket; + Zmq.Socket.close socket; result ) in @@ -128,15 +128,15 @@ let run ?(daemon=true) ezfio_filename = let debug_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.xpub + Zmq.Socket.create zmq_context Zmq.Socket.xpub and address = Printf.sprintf "tcp://*:%d" (port+4) in bind_socket "XPUB" debug_socket address; - ZMQ.Socket.set_linger_period debug_socket 100 ; + Zmq.Socket.set_linger_period debug_socket 100 ; let close_debug_socket () = - ZMQ.Socket.close debug_socket + Zmq.Socket.close debug_socket in (** Sends a log text to the debug socket *) @@ -148,7 +148,7 @@ let run ?(daemon=true) ezfio_filename = Printf.sprintf "%20s : %8d : %10s : %s" socket size text (Time.Span.to_string dt) in - ZMQ.Socket.send debug_socket message + Zmq.Socket.send debug_socket message in @@ -402,7 +402,7 @@ let run ?(daemon=true) ezfio_filename = send_log "status" 0 t0 "Starting status thread"; let socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.pub + Zmq.Socket.create zmq_context Zmq.Socket.pub and address = Printf.sprintf "tcp://*:%d" (port+1) in @@ -432,7 +432,7 @@ let run ?(daemon=true) ezfio_filename = let status_string = Status.to_string !status in - ZMQ.Socket.send socket status_string; + Zmq.Socket.send socket status_string; send_log "status" (String.length status_string) now status_string; let test = @@ -472,9 +472,9 @@ let run ?(daemon=true) ezfio_filename = | (_, _, _) -> () ; done; - ZMQ.Socket.send socket (Status.to_string !status); - ZMQ.Socket.set_linger_period socket 1_000 ; - ZMQ.Socket.close socket + Zmq.Socket.send socket (Status.to_string !status); + Zmq.Socket.set_linger_period socket 1_000 ; + Zmq.Socket.close socket ) in @@ -488,27 +488,27 @@ let run ?(daemon=true) ezfio_filename = send_log "status" 0 t0 "Starting log thread"; let socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.xsub + Zmq.Socket.create zmq_context Zmq.Socket.xsub and address = Printf.sprintf "tcp://*:%d" (port+3) in bind_socket "XSUB" socket address; let pollitem = - ZMQ.Poll.mask_of - [| (socket , ZMQ.Poll.In) ; - (debug_socket , ZMQ.Poll.In) + Zmq.Poll.mask_of + [| (socket , Zmq.Poll.In) ; + (debug_socket , Zmq.Poll.In) |] in while (!status <> Status.Stopped) do let polling = - ZMQ.Poll.poll ~timeout:1000 pollitem + Zmq.Poll.poll ~timeout:1000 pollitem in - if (polling.(0) = Some ZMQ.Poll.In) then + if (polling.(0) = Some Zmq.Poll.In) then begin let message = - ZMQ.Socket.recv_all ~block:false socket + Zmq.Socket.recv_all ~block:false socket |> String.concat ~sep:" " in let now = @@ -516,15 +516,15 @@ let run ?(daemon=true) ezfio_filename = in send_log "log" 0 now message end - else if (polling.(1) = Some ZMQ.Poll.In) then + else if (polling.(1) = Some Zmq.Poll.In) then begin (* Forward subscription from XPUB to XSUB *) - ZMQ.Socket.recv_all ~block:false debug_socket - |> ZMQ.Socket.send_all socket + Zmq.Socket.recv_all ~block:false debug_socket + |> Zmq.Socket.send_all socket end done; - ZMQ.Socket.set_linger_period socket 1000 ; - ZMQ.Socket.close socket + Zmq.Socket.set_linger_period socket 1000 ; + Zmq.Socket.close socket ) in (** {3 Main thread} *) @@ -557,15 +557,15 @@ let run ?(daemon=true) ezfio_filename = (** Reply socket *) let rep_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.rep + Zmq.Socket.create zmq_context Zmq.Socket.rep and address = Printf.sprintf "tcp://*:%d" port in bind_socket "REP" rep_socket address; - ZMQ.Socket.set_receive_high_water_mark rep_socket 100_000; - ZMQ.Socket.set_send_high_water_mark rep_socket 100_000; - ZMQ.Socket.set_immediate rep_socket true; - ZMQ.Socket.set_linger_period rep_socket 600_000 ; + Zmq.Socket.set_receive_high_water_mark rep_socket 100_000; + Zmq.Socket.set_send_high_water_mark rep_socket 100_000; + Zmq.Socket.set_immediate rep_socket true; + Zmq.Socket.set_linger_period rep_socket 600_000 ; (** EZFIO Cache *) let ezfio_cache = @@ -587,7 +587,7 @@ let run ?(daemon=true) ezfio_filename = (** Pull socket for computed data *) let pull_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.pull + Zmq.Socket.create zmq_context Zmq.Socket.pull and address = Printf.sprintf "tcp://*:%d" (port+2) in @@ -607,9 +607,9 @@ let run ?(daemon=true) ezfio_filename = (** Polling item to poll REP and PULL sockets. *) let pollitem = - ZMQ.Poll.mask_of - [| ( rep_socket, ZMQ.Poll.In) ; - ( pull_socket, ZMQ.Poll.In) ; + Zmq.Poll.mask_of + [| ( rep_socket, Zmq.Poll.In) ; + ( pull_socket, Zmq.Poll.In) ; |] in @@ -617,7 +617,7 @@ let run ?(daemon=true) ezfio_filename = (** Handles messages coming into the REP socket. *) let handle_rep () = let raw_msg = - ZMQ.Socket.recv_all ~block:false rep_socket + Zmq.Socket.recv_all ~block:false rep_socket in let t0 = Time.now () @@ -634,7 +634,7 @@ let run ?(daemon=true) ezfio_filename = let result = handle_ezfio ezfio_msg in - ZMQ.Socket.send_all rep_socket + Zmq.Socket.send_all rep_socket [ String.length result |> Printf.sprintf "%d " ; result ] ; @@ -645,7 +645,7 @@ let run ?(daemon=true) ezfio_filename = let result = random_walkers n_walks in - ZMQ.Socket.send_all rep_socket result; + Zmq.Socket.send_all rep_socket result; send_log "rep" walkers_size t0 "get_walkers" end | Message.Register (w,pid) -> @@ -661,12 +661,12 @@ let run ?(daemon=true) ezfio_filename = add_worker w pid; if (!status = Status.Queued) then change_status Status.Running ; - ZMQ.Socket.send rep_socket "OK"; + Zmq.Socket.send rep_socket "OK"; send_log "rep" 2 t0 "Register : OK" end | Status.Stopping | Status.Stopped -> - ZMQ.Socket.send rep_socket "Failed"; + Zmq.Socket.send rep_socket "Failed"; end | Message.Unregister (w,pid) -> begin @@ -674,7 +674,7 @@ let run ?(daemon=true) ezfio_filename = (Compute_node.to_string w) ; " " ; (Pid.to_string pid) ] |> send_log "req" msg_size t0; - ZMQ.Socket.send rep_socket "OK"; + Zmq.Socket.send rep_socket "OK"; del_worker w pid; String.concat [ "Unregister : "; (Hashtbl.length workers_hash) |> Int.to_string ; @@ -690,7 +690,7 @@ let run ?(daemon=true) ezfio_filename = end | Message.Test -> begin - ZMQ.Socket.send rep_socket "OK"; + Zmq.Socket.send rep_socket "OK"; send_log "rep" 2 t0 "Test" end | Message.Walkers (_, _, _) @@ -702,7 +702,7 @@ let run ?(daemon=true) ezfio_filename = (** Handles messages coming into the PULL socket. *) let handle_pull status = let raw_msg = - ZMQ.Socket.recv_all ~block:false pull_socket + Zmq.Socket.recv_all ~block:false pull_socket in let t0 = Time.now () @@ -770,14 +770,14 @@ let run ?(daemon=true) ezfio_filename = while (!status <> Status.Stopped) do let polling = - ZMQ.Poll.poll ~timeout:1000 pollitem + Zmq.Poll.poll ~timeout:1000 pollitem in match polling.(1) with - | Some ZMQ.Poll.In -> handle_pull !status + | Some Zmq.Poll.In -> handle_pull !status | _ -> begin match polling.(0) with - | Some ZMQ.Poll.In -> handle_rep () + | Some Zmq.Poll.In -> handle_rep () | _ -> begin Out_channel.flush !block_channel ; @@ -796,8 +796,8 @@ let run ?(daemon=true) ezfio_filename = done; List.iter ~f:(fun socket -> - ZMQ.Socket.set_linger_period socket 1000 ; - ZMQ.Socket.close socket) + Zmq.Socket.set_linger_period socket 1000 ; + Zmq.Socket.close socket) [ rep_socket ; pull_socket ] in Thread.create f @@ -816,7 +816,7 @@ let run ?(daemon=true) ezfio_filename = compress_block_file (); send_log "status" 0 t0 "Done"; close_debug_socket (); - ZMQ.Context.terminate zmq_context; + Zmq.Context.terminate zmq_context; begin try Out_channel.close !block_channel; diff --git a/ocaml/Qmcchem_debug.ml b/ocaml/Qmcchem_debug.ml index e04f5f2..717b085 100644 --- a/ocaml/Qmcchem_debug.ml +++ b/ocaml/Qmcchem_debug.ml @@ -10,12 +10,12 @@ let run ~t ezfio_filename= ; let zmq_context = - ZMQ.Context.create () + Zmq.Context.create () in Printf.printf "Debugging %s\n%!" ezfio_filename; let socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.sub + Zmq.Socket.create zmq_context Zmq.Socket.sub in let address = @@ -25,8 +25,8 @@ let run ~t ezfio_filename= | Some (a,p) -> a^":"^( (Int.of_string p)+4 |> Int.to_string ) | None -> failwith "Badly formed address" in - ZMQ.Socket.connect socket address; - ZMQ.Socket.subscribe socket ""; + Zmq.Socket.connect socket address; + Zmq.Socket.subscribe socket ""; if t then begin @@ -39,7 +39,7 @@ let run ~t ezfio_filename= while true do let msg = - ZMQ.Socket.recv socket + Zmq.Socket.recv socket in let (socket, bytes) = match Str.split re_split msg with @@ -57,7 +57,7 @@ let run ~t ezfio_filename= while true do let msg = - ZMQ.Socket.recv socket + Zmq.Socket.recv socket in Printf.printf "%s\n%!" msg; done @@ -76,7 +76,7 @@ let spec = let command = Command.basic_spec ~summary: "Debug ZeroMQ communications" - ~readme:(fun () -> "Gets debug information from the ZMQ debug sockets.") + ~readme:(fun () -> "Gets debug information from the Zmq debug sockets.") spec (fun t ezfio_file () -> run t ezfio_file) diff --git a/ocaml/Qmcchem_forwarder.ml b/ocaml/Qmcchem_forwarder.ml index 7e1dd5c..8b9fb5c 100644 --- a/ocaml/Qmcchem_forwarder.ml +++ b/ocaml/Qmcchem_forwarder.ml @@ -8,7 +8,7 @@ let bind_socket ~socket_type ~socket ~address = | -1 -> () | i -> try - ZMQ.Socket.bind socket address; + Zmq.Socket.bind socket address; loop (-1) with | Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_sec 1. ; loop (i-1) ) @@ -95,7 +95,7 @@ let run ezfio_filename dataserver = (* Fetch input *) let zmq_context = - ZMQ.Context.create () + Zmq.Context.create () in let terminate () = @@ -108,7 +108,7 @@ let run ezfio_filename dataserver = | Ok _ -> () | _ -> print_endline "Unable to remove temporary directory" ; - ZMQ.Context.terminate zmq_context ; + Zmq.Context.terminate zmq_context ; for i=port to port+4 do let filename = @@ -155,43 +155,43 @@ let run ezfio_filename dataserver = let f () = let pub_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.pub + Zmq.Socket.create zmq_context Zmq.Socket.pub and address = Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+1); in bind_socket "PUB" pub_socket address; let sub_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.sub + Zmq.Socket.create zmq_context Zmq.Socket.sub and address = Printf.sprintf "tcp://%s:%d" dataserver_address (port+1-10) in - ZMQ.Socket.connect sub_socket address; - ZMQ.Socket.subscribe sub_socket ""; + Zmq.Socket.connect sub_socket address; + Zmq.Socket.subscribe sub_socket ""; let pollitem = - ZMQ.Poll.mask_of - [| (sub_socket, ZMQ.Poll.In) ; + Zmq.Poll.mask_of + [| (sub_socket, Zmq.Poll.In) ; |] in while (!status <> Status.Stopped) do let polling = - ZMQ.Poll.poll ~timeout:1000 pollitem + Zmq.Poll.poll ~timeout:1000 pollitem in - if (polling.(0) = Some ZMQ.Poll.In) then + if (polling.(0) = Some Zmq.Poll.In) then begin let msg = - ZMQ.Socket.recv ~block:false sub_socket + Zmq.Socket.recv ~block:false sub_socket in - ZMQ.Socket.send pub_socket msg; + Zmq.Socket.send pub_socket msg; status := Status.of_string msg; end; done; List.iter ~f:(fun socket -> - ZMQ.Socket.set_linger_period socket 1000 ; - ZMQ.Socket.close socket) + Zmq.Socket.set_linger_period socket 1000 ; + Zmq.Socket.close socket) [ sub_socket ; pub_socket ] in Thread.create f @@ -201,23 +201,23 @@ let run ezfio_filename dataserver = let f () = let sub_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.xsub + Zmq.Socket.create zmq_context Zmq.Socket.xsub and address = Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+3); in bind_socket "XSUB" sub_socket address; let pub_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.xpub + Zmq.Socket.create zmq_context Zmq.Socket.xpub and address = Printf.sprintf "tcp://%s:%d" dataserver_address (port+3-10) in - ZMQ.Socket.connect pub_socket address; + Zmq.Socket.connect pub_socket address; let pollitem = - ZMQ.Poll.mask_of - [| (sub_socket, ZMQ.Poll.In) ; - (pub_socket, ZMQ.Poll.In) ; + Zmq.Poll.mask_of + [| (sub_socket, Zmq.Poll.In) ; + (pub_socket, Zmq.Poll.In) ; |] in @@ -225,23 +225,23 @@ let run ezfio_filename dataserver = while (!status <> Status.Stopped) do let polling = - ZMQ.Poll.poll ~timeout:1000 pollitem + Zmq.Poll.poll ~timeout:1000 pollitem in - if (polling.(0) = Some ZMQ.Poll.In) then + if (polling.(0) = Some Zmq.Poll.In) then begin - ZMQ.Socket.recv ~block:false sub_socket - |> ZMQ.Socket.send pub_socket ; + Zmq.Socket.recv ~block:false sub_socket + |> Zmq.Socket.send pub_socket ; end - else if (polling.(1) = Some ZMQ.Poll.In) then + else if (polling.(1) = Some Zmq.Poll.In) then begin Printf.eprintf "Forwarder subscribe\n%!"; - ZMQ.Socket.recv ~block:false pub_socket - |> ZMQ.Socket.send sub_socket ; + Zmq.Socket.recv ~block:false pub_socket + |> Zmq.Socket.send sub_socket ; end done; List.iter ~f:(fun socket -> - ZMQ.Socket.set_linger_period socket 1000 ; - ZMQ.Socket.close socket) + Zmq.Socket.set_linger_period socket 1000 ; + Zmq.Socket.close socket) [ sub_socket ; pub_socket ] in Thread.create f @@ -252,29 +252,29 @@ let run ezfio_filename dataserver = let f () = let req_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.req + Zmq.Socket.create zmq_context Zmq.Socket.req in - ZMQ.Socket.connect req_socket dataserver; - ZMQ.Socket.set_receive_timeout req_socket 600_000; + Zmq.Socket.connect req_socket dataserver; + Zmq.Socket.set_receive_timeout req_socket 600_000; let dealer_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.dealer + Zmq.Socket.create zmq_context Zmq.Socket.dealer in bind_socket "PROXY" dealer_socket "inproc://dealer"; - ZMQ.Socket.set_receive_high_water_mark dealer_socket 100_000; - ZMQ.Socket.set_send_high_water_mark dealer_socket 100_000; - ZMQ.Socket.set_immediate dealer_socket true; - ZMQ.Socket.set_linger_period dealer_socket 600_000; + Zmq.Socket.set_receive_high_water_mark dealer_socket 100_000; + Zmq.Socket.set_send_high_water_mark dealer_socket 100_000; + Zmq.Socket.set_immediate dealer_socket true; + Zmq.Socket.set_linger_period dealer_socket 600_000; let fetch_walkers () = - ZMQ.Socket.send_all req_socket ["get_walkers" ; Int.to_string !walk_num ]; - ZMQ.Socket.recv_all req_socket + Zmq.Socket.send_all req_socket ["get_walkers" ; Int.to_string !walk_num ]; + Zmq.Socket.recv_all req_socket in let pollitem = - ZMQ.Poll.mask_of - [| (dealer_socket, ZMQ.Poll.In) ; + Zmq.Poll.mask_of + [| (dealer_socket, Zmq.Poll.In) ; |] in @@ -287,9 +287,9 @@ let run ezfio_filename dataserver = | Some result -> result | None -> begin - ZMQ.Socket.send_all req_socket ["Ezfio" ; msg]; + Zmq.Socket.send_all req_socket ["Ezfio" ; msg]; let result = - ZMQ.Socket.recv_all req_socket + Zmq.Socket.recv_all req_socket in match (Hashtbl.add ezfio_cache ~key:msg ~data:result) with | `Ok -> result @@ -302,12 +302,12 @@ let run ezfio_filename dataserver = while (!status <> Status.Stopped) do let polling = - ZMQ.Poll.poll ~timeout:1000 pollitem + Zmq.Poll.poll ~timeout:1000 pollitem in - if (polling.(0) = Some ZMQ.Poll.In) then + if (polling.(0) = Some Zmq.Poll.In) then begin let raw_msg = - ZMQ.Socket.recv_all ~block:false dealer_socket + Zmq.Socket.recv_all ~block:false dealer_socket in let header, msg = let rec aux header = function @@ -323,7 +323,7 @@ let run ezfio_filename dataserver = let result = handle_ezfio ezfio_msg in - ZMQ.Socket.send_all dealer_socket (header @ result) + Zmq.Socket.send_all dealer_socket (header @ result) | Message.GetWalkers n_walks -> begin if (!walk_num = 0) then @@ -331,11 +331,11 @@ let run ezfio_filename dataserver = walk_num := Qptypes.Strictly_positive_int.to_int n_walks; walkers := fetch_walkers (); end; - ZMQ.Socket.send_all dealer_socket (header @ !walkers); + Zmq.Socket.send_all dealer_socket (header @ !walkers); walkers := fetch_walkers (); end | Message.Test -> - ZMQ.Socket.send_all dealer_socket (header @ [ "OK" ]) + Zmq.Socket.send_all dealer_socket (header @ [ "OK" ]) | Message.Error _ -> () | Message.Register _ | Message.Unregister _ @@ -345,10 +345,10 @@ let run ezfio_filename dataserver = in handle msg end; done; - ZMQ.Socket.set_linger_period dealer_socket 1000 ; - ZMQ.Socket.set_linger_period req_socket 1000 ; - ZMQ.Socket.close dealer_socket; - ZMQ.Socket.close req_socket; + Zmq.Socket.set_linger_period dealer_socket 1000 ; + Zmq.Socket.set_linger_period req_socket 1000 ; + Zmq.Socket.close dealer_socket; + Zmq.Socket.close req_socket; in Thread.create f in @@ -358,38 +358,38 @@ let run ezfio_filename dataserver = let f () = let dealer_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.dealer + Zmq.Socket.create zmq_context Zmq.Socket.dealer in - ZMQ.Socket.connect dealer_socket dataserver; - ZMQ.Socket.set_linger_period dealer_socket 600_000; + Zmq.Socket.connect dealer_socket dataserver; + Zmq.Socket.set_linger_period dealer_socket 600_000; let proxy_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.dealer + Zmq.Socket.create zmq_context Zmq.Socket.dealer in - ZMQ.Socket.connect proxy_socket "inproc://dealer"; + Zmq.Socket.connect proxy_socket "inproc://dealer"; let router_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.router + Zmq.Socket.create zmq_context Zmq.Socket.router and address = Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port); in bind_socket "ROUTER" router_socket address; - ZMQ.Socket.set_receive_high_water_mark router_socket 100000; - ZMQ.Socket.set_send_high_water_mark router_socket 100000; - ZMQ.Socket.set_immediate router_socket true; - ZMQ.Socket.set_linger_period router_socket 600_000; + Zmq.Socket.set_receive_high_water_mark router_socket 100000; + Zmq.Socket.set_send_high_water_mark router_socket 100000; + Zmq.Socket.set_immediate router_socket true; + Zmq.Socket.set_linger_period router_socket 600_000; (* Pull socket for computed data *) let push_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.push + Zmq.Socket.create zmq_context Zmq.Socket.push and address = Printf.sprintf "tcp://%s:%d" dataserver_address (port+2-10) in - ZMQ.Socket.connect push_socket address; - ZMQ.Socket.set_linger_period push_socket 600_000; + Zmq.Socket.connect push_socket address; + Zmq.Socket.set_linger_period push_socket 600_000; let pull_socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.pull + Zmq.Socket.create zmq_context Zmq.Socket.pull and address = Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm (port+2); in @@ -399,7 +399,7 @@ let run ezfio_filename dataserver = (* Handles messages coming into the ROUTER socket. *) let handle_router () = let raw_msg = - ZMQ.Socket.recv_all ~block:false router_socket + Zmq.Socket.recv_all ~block:false router_socket in let header, msg = let rec aux header = function @@ -414,10 +414,10 @@ let run ezfio_filename dataserver = | Message.GetWalkers _ | Message.Ezfio _ | Message.Test -> - ZMQ.Socket.send_all proxy_socket raw_msg + Zmq.Socket.send_all proxy_socket raw_msg | Message.Register _ | Message.Unregister _ -> - ZMQ.Socket.send_all dealer_socket raw_msg + Zmq.Socket.send_all dealer_socket raw_msg | Message.Walkers (_, _, _) | Message.Property _ -> failwith "Bad message" @@ -426,13 +426,13 @@ let run ezfio_filename dataserver = in let handle_dealer () = - ZMQ.Socket.recv_all ~block:false dealer_socket - |> ZMQ.Socket.send_all router_socket + Zmq.Socket.recv_all ~block:false dealer_socket + |> Zmq.Socket.send_all router_socket in let handle_proxy () = - ZMQ.Socket.recv_all ~block:false proxy_socket - |> ZMQ.Socket.send_all router_socket + Zmq.Socket.recv_all ~block:false proxy_socket + |> Zmq.Socket.send_all router_socket in let select_n_of ~n ~len l = @@ -463,7 +463,7 @@ let run ezfio_filename dataserver = (* Handles messages coming into the PULL socket. *) let handle_pull () = let message = - ZMQ.Socket.recv_all ~block:false pull_socket + Zmq.Socket.recv_all ~block:false pull_socket in let new_message = match message with @@ -481,36 +481,36 @@ let run ezfio_filename dataserver = Int.to_string (5*len)] ; ( select_n_of ~n:5 ~len rest ) ] | _ -> message in - ZMQ.Socket.send_all push_socket new_message + Zmq.Socket.send_all push_socket new_message in (* Polling item to poll ROUTER and PULL sockets. *) let pollitem = - ZMQ.Poll.mask_of - [| (router_socket , ZMQ.Poll.In) ; - (pull_socket , ZMQ.Poll.In) ; - (dealer_socket, ZMQ.Poll.In) ; - (proxy_socket , ZMQ.Poll.In) + Zmq.Poll.mask_of + [| (router_socket , Zmq.Poll.In) ; + (pull_socket , Zmq.Poll.In) ; + (dealer_socket, Zmq.Poll.In) ; + (proxy_socket , Zmq.Poll.In) |] in (* Main loop *) while (!status <> Status.Stopped) do let polling = - ZMQ.Poll.poll ~timeout:1000 pollitem + Zmq.Poll.poll ~timeout:1000 pollitem in - if (polling.(0) = Some ZMQ.Poll.In) then + if (polling.(0) = Some Zmq.Poll.In) then handle_router (); - if (polling.(1) = Some ZMQ.Poll.In) then + if (polling.(1) = Some Zmq.Poll.In) then handle_pull (); - if (polling.(2) = Some ZMQ.Poll.In) then + if (polling.(2) = Some Zmq.Poll.In) then handle_dealer (); - if (polling.(3) = Some ZMQ.Poll.In) then + if (polling.(3) = Some Zmq.Poll.In) then handle_proxy (); done; List.iter ~f:(fun socket -> - ZMQ.Socket.set_linger_period socket 1000 ; - ZMQ.Socket.close socket) + Zmq.Socket.set_linger_period socket 1000 ; + Zmq.Socket.close socket) [ router_socket ; dealer_socket ; push_socket ; pull_socket ; proxy_socket ] in Thread.create f diff --git a/ocaml/Qmcchem_run.ml b/ocaml/Qmcchem_run.ml index fd22d38..c9883b1 100644 --- a/ocaml/Qmcchem_run.ml +++ b/ocaml/Qmcchem_run.ml @@ -49,29 +49,29 @@ let full_run ?(start_dataserver=true) ezfio_filename = end; - (* Check if the ZMQ Rep socket is open *) + (* Check if the Zmq Rep socket is open *) let test_open_rep_socket () = let zmq_context = - ZMQ.Context.create () + Zmq.Context.create () in let socket = - ZMQ.Socket.create zmq_context ZMQ.Socket.req + Zmq.Socket.create zmq_context Zmq.Socket.req and address = Ezfio.get_simulation_http_server () in let reply = try ( - ZMQ.Socket.set_receive_timeout socket 100; - ZMQ.Socket.connect socket address; - ZMQ.Socket.send socket (Message.(to_string Test)); - ZMQ.Socket.recv socket + Zmq.Socket.set_receive_timeout socket 100; + Zmq.Socket.connect socket address; + Zmq.Socket.send socket (Message.(to_string Test)); + Zmq.Socket.recv socket ) with | Unix.Unix_error (_,_,_) -> begin - ZMQ.Socket.set_linger_period socket 1 ; - ZMQ.Socket.close socket; - ZMQ.Context.terminate zmq_context; + Zmq.Socket.set_linger_period socket 1 ; + Zmq.Socket.close socket; + Zmq.Context.terminate zmq_context; "Failed" end in diff --git a/ocaml/build.ninja b/ocaml/build.ninja index 76786c0..4e384a7 100644 --- a/ocaml/build.ninja +++ b/ocaml/build.ninja @@ -1,7 +1,7 @@ MAIN=qmcchem # Main program to build -PACKAGES=-package core,cryptokit,str,ZMQ +PACKAGES=-package core,cryptokit,str,zmq #,ppx_sexp_conv # Required opam packages, for example: # PACKAGES=-package core,sexplib.syntax