diff --git a/ocaml/Qpackage.ml b/ocaml/Qpackage.ml index bd0d34fc..8011b23b 100644 --- a/ocaml/Qpackage.ml +++ b/ocaml/Qpackage.ml @@ -127,3 +127,14 @@ let get_ezfio_default directory data = |> aux ;; +let ezfio_work ezfio_file = + let result = + Filename.concat ezfio_file "work" + in + begin + match Sys.is_directory result with + | `Yes -> () + | _ -> Unix.mkdir result + end; + result +;; diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 6f012981..cfc22cfc 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -48,20 +48,21 @@ let zmq_context = ZMQ.Context.create () -let bind_socket ~socket_type ~socket ~address = +let bind_socket ~socket_type ~socket ~port = let rec loop = function | 0 -> failwith @@ Printf.sprintf - "Unable to bind the %s socket : %s " - socket_type address + "Unable to bind the %s socket to port : %d " + socket_type port | -1 -> () | i -> try - ZMQ.Socket.bind socket address; + ZMQ.Socket.bind socket @@ Printf.sprintf "tcp://*:%d" port; loop (-1) with | Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_float 1. ; loop (i-1) ) | other_exception -> raise other_exception - in loop 60 + in loop 60; + ZMQ.Socket.bind socket @@ Printf.sprintf "ipc:///tmp/qp_run:%d" port let hostname = lazy ( @@ -115,7 +116,7 @@ let stop ~port = let req_socket = ZMQ.Socket.create zmq_context ZMQ.Socket.req and address = - Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port + Printf.sprintf "ipc:///tmp/qp_run:%d" port in ZMQ.Socket.set_linger_period req_socket 1_000_000; ZMQ.Socket.connect req_socket address; @@ -567,10 +568,8 @@ let start_pub_thread ~port = let pub_socket = ZMQ.Socket.create zmq_context ZMQ.Socket.pub - and address = - Printf.sprintf "tcp://*:%d" port in - bind_socket ~socket_type:"PUB" ~socket:pub_socket ~address; + bind_socket ~socket_type:"PUB" ~socket:pub_socket ~port; let pollitem = ZMQ.Poll.mask_of @@ -608,7 +607,7 @@ let run ~port = and address = "inproc://pair" in - bind_socket "PAIR" pair_socket address; + ZMQ.Socket.bind pair_socket address; let pub_thread = start_pub_thread ~port:(port+1) () @@ -617,11 +616,9 @@ let run ~port = (** Bind REP socket *) let rep_socket = ZMQ.Socket.create zmq_context ZMQ.Socket.rep - and address = - Printf.sprintf "tcp://*:%d" port in ZMQ.Socket.set_linger_period rep_socket 1_000_000; - bind_socket "REP" rep_socket address; + bind_socket "REP" rep_socket port; let initial_program_state = { queue = Queuing_system.create () ; @@ -721,6 +718,7 @@ let run ~port = ZMQ.Socket.send pair_socket @@ string_of_pub_state Stopped; Thread.join pub_thread; + ZMQ.Socket.close rep_socket diff --git a/ocaml/TaskServer.mli b/ocaml/TaskServer.mli index f923a18a..e1baab12 100644 --- a/ocaml/TaskServer.mli +++ b/ocaml/TaskServer.mli @@ -23,9 +23,9 @@ val debug : string -> unit (** ZeroMQ context *) val zmq_context : ZMQ.Context.t -(** Bind a ZMQ socket *) +(** Bind a ZMQ socket to a TCP port and to an IPC file /tmp/qp_run. *) val bind_socket : - socket_type:string -> socket:'a ZMQ.Socket.t -> address:string -> unit + socket_type:string -> socket:'a ZMQ.Socket.t -> port:int -> unit (** Name of the host on which the server runs *) val hostname : string lazy_t diff --git a/ocaml/qp_run.ml b/ocaml/qp_run.ml index 8a221614..e8c8d05a 100644 --- a/ocaml/qp_run.ml +++ b/ocaml/qp_run.ml @@ -15,7 +15,7 @@ let print_list () = let () = Random.self_init () -let run ~master exe ezfio_file = +let run slave exe ezfio_file = (** Check availability of the ports *) @@ -28,7 +28,7 @@ let run ~master exe ezfio_file = in let rec try_new_port port_number = try - List.iter [ 0;1;2;3;4 ] ~f:(fun i -> + List.iter [ 0;1;2;3;4;5;6;7;8;9 ] ~f:(fun i -> let address = Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) (port_number+i) in @@ -75,16 +75,23 @@ let run ~master exe ezfio_file = | 0 -> () | i -> failwith "Error: Input inconsistent\n" end; - begin - match master with - | Some address -> Unix.putenv ~key:"QP_RUN_ADDRESS_MASTER" ~data:address - | None -> () - end; - (** Start task server *) - let address = - Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number + let qp_run_address_filename = + Filename.concat (Qpackage.ezfio_work ezfio_file) "qp_run_address" in + + let () = + if slave then + try + let address = + In_channel.read_all qp_run_address_filename + |> String.strip + in + Unix.putenv ~key:"QP_RUN_ADDRESS_MASTER" ~data:address + with Sys_error _ -> failwith "No master is not running" + in + + (** Start task server *) let task_thread = let thread = Thread.create ( fun () -> @@ -92,7 +99,16 @@ let run ~master exe ezfio_file = in thread (); in + let address = + Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number + in Unix.putenv ~key:"QP_RUN_ADDRESS" ~data:address; + let () = + if (not slave) then + Out_channel.with_file qp_run_address_filename ~f:( + fun oc -> Out_channel.output_lines oc [address]) + in + (** Run executable *) let prefix = @@ -111,6 +127,8 @@ let run ~master exe ezfio_file = TaskServer.stop ~port:port_number; Thread.join task_thread; + if (not slave) then + Sys.remove qp_run_address_filename; let duration = Time.diff (Time.now()) time_start |> Core.Span.to_string in @@ -119,8 +137,8 @@ let run ~master exe ezfio_file = let spec = let open Command.Spec in empty - +> flag "master" (optional string) - ~doc:("address Address of the master process") + +> flag "slave" no_arg + ~doc:(" Needed for slave tasks") +> anon ("executable" %: string) +> anon ("ezfio_file" %: string) ;; @@ -138,8 +156,8 @@ Executes a Quantum Package binary file among these:\n\n" ) ) spec - (fun master exe ezfio_file () -> - run ~master exe ezfio_file + (fun slave exe ezfio_file () -> + run slave exe ezfio_file ) |> Command.run ~version: Git.sha1 ~build_info: Git.message diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index c3a55a05..61fb45de 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -670,12 +670,16 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & message = trim(message(1:rc)) read(message,*) reply, state - if ( (trim(reply) /= 'disconnect_reply').or. & - (trim(state) /= zmq_state) ) then - print *, 'Unable to disconnect : ', zmq_state - print *, trim(message) - stop -1 + if ((trim(reply) == 'disconnect_reply').and.(trim(state) == trim(zmq_state))) then + return endif + if (trim(message) == 'error No job is running') then + return + endif + + print *, 'Unable to disconnect : ', trim(zmq_state) + print *, trim(message) + stop -1 end