Changes for OCaml zmq-5.0

This commit is contained in:
Anthony Scemama 2018-05-09 11:31:32 +02:00
parent 596cb71894
commit 1311fd72bd
6 changed files with 63 additions and 63 deletions

View File

@ -5,7 +5,7 @@ QP_ROOT=$PWD
cd - cd -
# Normal installation # Normal installation
PACKAGES="core.v0.10.0 cryptokit ocamlfind sexplib.v0.10.0 ZMQ ppx_sexp_conv ppx_deriving jbuilder.1.0+beta17" PACKAGES="core.v0.10.0 cryptokit ocamlfind sexplib.v0.10.0 zmq ppx_sexp_conv ppx_deriving jbuilder.1.0+beta17"
# Needed for ZeroMQ # Needed for ZeroMQ
export C_INCLUDE_PATH="${QP_ROOT}"/include:"${C_INCLUDE_PATH}" export C_INCLUDE_PATH="${QP_ROOT}"/include:"${C_INCLUDE_PATH}"

View File

@ -47,10 +47,10 @@ let debug str =
let zmq_context = let zmq_context =
ZMQ.Context.create () Zmq.Context.create ()
let () = let () =
ZMQ.Context.set_io_threads zmq_context 8 Zmq.Context.set_io_threads zmq_context 8
let bind_socket ~socket_type ~socket ~port = let bind_socket ~socket_type ~socket ~port =
@ -61,7 +61,7 @@ let bind_socket ~socket_type ~socket ~port =
| -1 -> () | -1 -> ()
| i -> | i ->
try try
ZMQ.Socket.bind socket @@ Printf.sprintf "tcp://*:%d" port; Zmq.Socket.bind socket @@ Printf.sprintf "tcp://*:%d" port;
loop (-1) loop (-1)
with with
| Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_sec 1. ; loop (i-1) ) | Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_sec 1. ; loop (i-1) )
@ -105,31 +105,31 @@ let ip_address = lazy (
let reply_ok rep_socket = let reply_ok rep_socket =
Message.Ok_msg.create Message.Ok_msg.create
|> Message.Ok_msg.to_string |> Message.Ok_msg.to_string
|> ZMQ.Socket.send rep_socket |> Zmq.Socket.send rep_socket
let reply_wrong_state rep_socket = let reply_wrong_state rep_socket =
Message.Error_msg.create "Wrong state" Message.Error_msg.create "Wrong state"
|> Message.Error_msg.to_string |> Message.Error_msg.to_string
|> ZMQ.Socket.send rep_socket |> Zmq.Socket.send rep_socket
let stop ~port = let stop ~port =
debug "STOP"; debug "STOP";
let req_socket = let req_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.req Zmq.Socket.create zmq_context Zmq.Socket.req
and address = and address =
Printf.sprintf "tcp://localhost:%d" port Printf.sprintf "tcp://localhost:%d" port
in in
ZMQ.Socket.set_linger_period req_socket 1_000_000; Zmq.Socket.set_linger_period req_socket 1_000_000;
ZMQ.Socket.connect req_socket address; Zmq.Socket.connect req_socket address;
Message.Terminate (Message.Terminate_msg.create) Message.Terminate (Message.Terminate_msg.create)
|> Message.to_string |> Message.to_string
|> ZMQ.Socket.send req_socket ; |> Zmq.Socket.send req_socket ;
let msg = let msg =
ZMQ.Socket.recv req_socket Zmq.Socket.recv req_socket
|> Message.of_string |> Message.of_string
in in
let () = let () =
@ -137,8 +137,8 @@ let stop ~port =
| Message.Ok _ -> () | Message.Ok _ -> ()
| _ -> failwith "Problem in termination" | _ -> failwith "Problem in termination"
in in
ZMQ.Socket.set_linger_period req_socket 1_000; Zmq.Socket.set_linger_period req_socket 1_000;
ZMQ.Socket.close req_socket Zmq.Socket.close req_socket
let new_job msg program_state rep_socket pair_socket = let new_job msg program_state rep_socket pair_socket =
@ -166,7 +166,7 @@ let new_job msg program_state rep_socket pair_socket =
in in
reply_ok rep_socket; reply_ok rep_socket;
string_of_pub_state Waiting string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ; |> Zmq.Socket.send pair_socket ;
result result
let change_pub_state msg program_state rep_socket pair_socket = let change_pub_state msg program_state rep_socket pair_socket =
@ -186,7 +186,7 @@ let change_pub_state msg program_state rep_socket pair_socket =
in in
reply_ok rep_socket; reply_ok rep_socket;
string_of_pub_state msg string_of_pub_state msg
|> ZMQ.Socket.send pair_socket ; |> Zmq.Socket.send pair_socket ;
program_state program_state
@ -216,7 +216,7 @@ let end_job msg program_state rep_socket pair_socket =
Printf.sprintf "waiting for %d slaves..." n Printf.sprintf "waiting for %d slaves..." n
|> Message.Error_msg.create |> Message.Error_msg.create
|> Message.Error_msg.to_string |> Message.Error_msg.to_string
|> ZMQ.Socket.send rep_socket ; |> Zmq.Socket.send rep_socket ;
program_state program_state
in in
@ -227,13 +227,13 @@ let end_job msg program_state rep_socket pair_socket =
if (msg.Message.Endjob_msg.state = force_state) then if (msg.Message.Endjob_msg.state = force_state) then
begin begin
string_of_pub_state Waiting string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ; |> Zmq.Socket.send pair_socket ;
success () success ()
end end
else if (msg.Message.Endjob_msg.state = state) then else if (msg.Message.Endjob_msg.state = state) then
begin begin
string_of_pub_state Waiting string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ; |> Zmq.Socket.send pair_socket ;
if (Queuing_system.number_of_clients program_state.queue = 0) then if (Queuing_system.number_of_clients program_state.queue = 0) then
success () success ()
else else
@ -280,7 +280,7 @@ let connect msg program_state rep_socket =
Message.ConnectReply (Message.ConnectReply_msg.create Message.ConnectReply (Message.ConnectReply_msg.create
~state:state ~client_id ~push_address) ~state:state ~client_id ~push_address)
|> Message.to_string |> Message.to_string
|> ZMQ.Socket.send rep_socket ; |> Zmq.Socket.send rep_socket ;
{ program_state with { program_state with
queue = new_queue queue = new_queue
} }
@ -306,7 +306,7 @@ let disconnect msg program_state rep_socket =
in in
Message.DisconnectReply (Message.DisconnectReply_msg.create ~state) Message.DisconnectReply (Message.DisconnectReply_msg.create ~state)
|> Message.to_string |> Message.to_string
|> ZMQ.Socket.send rep_socket ; |> Zmq.Socket.send rep_socket ;
new_program_state new_program_state
in in
@ -352,7 +352,7 @@ let del_task msg program_state rep_socket =
in in
Message.DelTaskReply (Message.DelTaskReply_msg.create ~task_ids ~more) Message.DelTaskReply (Message.DelTaskReply_msg.create ~task_ids ~more)
|> Message.to_string |> Message.to_string
|> ZMQ.Socket.send ~block:true rep_socket ; (** /!\ Has to be blocking *) |> Zmq.Socket.send ~block:true rep_socket ; (** /!\ Has to be blocking *)
new_program_state new_program_state
in in
@ -426,10 +426,10 @@ let get_task msg program_state rep_socket pair_socket =
if no_task then if no_task then
string_of_pub_state Waiting string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket |> Zmq.Socket.send pair_socket
else else
string_of_pub_state (Running (Message.State.to_string state)) string_of_pub_state (Running (Message.State.to_string state))
|> ZMQ.Socket.send pair_socket; |> Zmq.Socket.send pair_socket;
let new_program_state = let new_program_state =
{ program_state with { program_state with
@ -440,7 +440,7 @@ let get_task msg program_state rep_socket pair_socket =
Message.GetTaskReply (Message.GetTaskReply_msg.create ~task ~task_id) Message.GetTaskReply (Message.GetTaskReply_msg.create ~task ~task_id)
|> Message.to_string |> Message.to_string
|> ZMQ.Socket.send rep_socket ; |> Zmq.Socket.send rep_socket ;
new_program_state new_program_state
in in
@ -498,10 +498,10 @@ let get_tasks msg program_state rep_socket pair_socket =
if no_task then if no_task then
string_of_pub_state Waiting string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket |> Zmq.Socket.send pair_socket
else else
string_of_pub_state (Running (Message.State.to_string state)) string_of_pub_state (Running (Message.State.to_string state))
|> ZMQ.Socket.send pair_socket; |> Zmq.Socket.send pair_socket;
let new_program_state = let new_program_state =
{ program_state with { program_state with
@ -512,7 +512,7 @@ let get_tasks msg program_state rep_socket pair_socket =
Message.GetTasksReply (Message.GetTasksReply_msg.create result) Message.GetTasksReply (Message.GetTasksReply_msg.create result)
|> Message.to_string_list |> Message.to_string_list
|> ZMQ.Socket.send_all rep_socket ; |> Zmq.Socket.send_all rep_socket ;
new_program_state new_program_state
in in
@ -596,7 +596,7 @@ let put_data msg rest_of_msg program_state rep_socket =
StringHashtbl.set program_state.data ~key ~data:value ; StringHashtbl.set program_state.data ~key ~data:value ;
Message.PutDataReply (Message.PutDataReply_msg.create ()) Message.PutDataReply (Message.PutDataReply_msg.create ())
|> Message.to_string |> Message.to_string
|> ZMQ.Socket.send rep_socket; |> Zmq.Socket.send rep_socket;
program_state program_state
and failure () = and failure () =
@ -629,7 +629,7 @@ let get_data msg program_state rep_socket =
in in
Message.GetDataReply (Message.GetDataReply_msg.create ~value) Message.GetDataReply (Message.GetDataReply_msg.create ~value)
|> Message.to_string_list |> Message.to_string_list
|> ZMQ.Socket.send_all rep_socket; |> Zmq.Socket.send_all rep_socket;
program_state program_state
and failure () = and failure () =
@ -699,7 +699,7 @@ let abort program_state rep_socket =
let error msg program_state rep_socket = let error msg program_state rep_socket =
Message.Error (Message.Error_msg.create msg) Message.Error (Message.Error_msg.create msg)
|> Message.to_string |> Message.to_string
|> ZMQ.Socket.send rep_socket ; |> Zmq.Socket.send rep_socket ;
program_state program_state
let start_pub_thread ~port = let start_pub_thread ~port =
@ -709,54 +709,54 @@ let start_pub_thread ~port =
in in
let pair_socket = let pair_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pair Zmq.Socket.create zmq_context Zmq.Socket.pair
and address = and address =
"inproc://pair" "inproc://pair"
in in
ZMQ.Socket.connect pair_socket address; Zmq.Socket.connect pair_socket address;
let pub_socket = let pub_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pub Zmq.Socket.create zmq_context Zmq.Socket.pub
in in
bind_socket ~socket_type:"PUB" ~socket:pub_socket ~port; bind_socket ~socket_type:"PUB" ~socket:pub_socket ~port;
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| (pair_socket, ZMQ.Poll.In) |] [| (pair_socket, Zmq.Poll.In) |]
in in
let rec run state = let rec run state =
let new_state = let new_state =
let polling = let polling =
ZMQ.Poll.poll ~timeout pollitem Zmq.Poll.poll ~timeout pollitem
in in
if (polling.(0) = Some ZMQ.Poll.In) then if (polling.(0) = Some Zmq.Poll.In) then
ZMQ.Socket.recv ~block:false pair_socket Zmq.Socket.recv ~block:false pair_socket
|> pub_state_of_string |> pub_state_of_string
else else
state state
in in
ZMQ.Socket.send pub_socket @@ string_of_pub_state new_state; Zmq.Socket.send pub_socket @@ string_of_pub_state new_state;
match state with match state with
| Stopped -> () | Stopped -> ()
| _ -> run new_state | _ -> run new_state
in in
run Waiting; run Waiting;
ZMQ.Socket.set_linger_period pair_socket 1000 ; Zmq.Socket.set_linger_period pair_socket 1000 ;
ZMQ.Socket.close pair_socket; Zmq.Socket.close pair_socket;
ZMQ.Socket.set_linger_period pub_socket 1000 ; Zmq.Socket.set_linger_period pub_socket 1000 ;
ZMQ.Socket.close pub_socket; Zmq.Socket.close pub_socket;
) )
let run ~port = let run ~port =
(** Bind inproc socket for changing state of pub *) (** Bind inproc socket for changing state of pub *)
let pair_socket = let pair_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pair Zmq.Socket.create zmq_context Zmq.Socket.pair
and address = and address =
"inproc://pair" "inproc://pair"
in in
ZMQ.Socket.bind pair_socket address; Zmq.Socket.bind pair_socket address;
let pub_thread = let pub_thread =
start_pub_thread ~port:(port+1) () start_pub_thread ~port:(port+1) ()
@ -764,9 +764,9 @@ let run ~port =
(** Bind REP socket *) (** Bind REP socket *)
let rep_socket = let rep_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.rep Zmq.Socket.create zmq_context Zmq.Socket.rep
in in
ZMQ.Socket.set_linger_period rep_socket 1_000_000; Zmq.Socket.set_linger_period rep_socket 1_000_000;
bind_socket "REP" rep_socket port; bind_socket "REP" rep_socket port;
let initial_program_state = let initial_program_state =
@ -783,8 +783,8 @@ let run ~port =
(** ZMR polling item *) (** ZMR polling item *)
let pollitem = let pollitem =
ZMQ.Poll.mask_of Zmq.Poll.mask_of
[| (rep_socket, ZMQ.Poll.In) |] [| (rep_socket, Zmq.Poll.In) |]
in in
let address = let address =
@ -798,9 +798,9 @@ let run ~port =
| false -> () | false -> ()
| true -> | true ->
let polling = let polling =
ZMQ.Poll.poll ~timeout:1000 pollitem Zmq.Poll.poll ~timeout:1000 pollitem
in in
if (polling.(0) <> Some ZMQ.Poll.In) then if (polling.(0) <> Some Zmq.Poll.In) then
main_loop program_state true main_loop program_state true
else else
begin begin
@ -818,7 +818,7 @@ let run ~port =
(** Extract message *) (** Extract message *)
let raw_message, rest = let raw_message, rest =
match ZMQ.Socket.recv_all rep_socket with match Zmq.Socket.recv_all rep_socket with
| x :: rest -> x, rest | x :: rest -> x, rest
| [] -> failwith "Badly formed message" | [] -> failwith "Badly formed message"
in in
@ -873,9 +873,9 @@ let run ~port =
end end
in main_loop initial_program_state true; in main_loop initial_program_state true;
ZMQ.Socket.send pair_socket @@ string_of_pub_state Stopped; Zmq.Socket.send pair_socket @@ string_of_pub_state Stopped;
Thread.join pub_thread; Thread.join pub_thread;
ZMQ.Socket.close rep_socket Zmq.Socket.close rep_socket

View File

@ -1,3 +1,3 @@
true: package(core,cryptokit,ZMQ,str,ppx_sexp_conv,ppx_deriving) true: package(core,cryptokit,zmq,str,ppx_sexp_conv,ppx_deriving)
true: thread true: thread
false: profile false: profile

View File

@ -20,10 +20,10 @@ let run slave exe ezfio_file =
(** Check availability of the ports *) (** Check availability of the ports *)
let port_number = let port_number =
let zmq_context = let zmq_context =
ZMQ.Context.create () Zmq.Context.create ()
in in
let dummy_socket = let dummy_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.rep Zmq.Socket.create zmq_context Zmq.Socket.rep
in in
let rec try_new_port port_number = let rec try_new_port port_number =
try try
@ -31,8 +31,8 @@ let run slave exe ezfio_file =
let address = let address =
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) (port_number+i) Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) (port_number+i)
in in
ZMQ.Socket.bind dummy_socket address; Zmq.Socket.bind dummy_socket address;
ZMQ.Socket.unbind dummy_socket address; Zmq.Socket.unbind dummy_socket address;
); );
port_number port_number
with with
@ -41,8 +41,8 @@ let run slave exe ezfio_file =
let result = let result =
try_new_port 41279 try_new_port 41279
in in
ZMQ.Socket.close dummy_socket; Zmq.Socket.close dummy_socket;
ZMQ.Context.terminate zmq_context; Zmq.Context.terminate zmq_context;
result result
in in

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python2
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Welcome to the ei_handler. Welcome to the ei_handler.

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python #!/usr/bin/env python2
""" """
convert output of gamess/GAU$$IAN to ezfio convert output of gamess/GAU$$IAN to ezfio