10
0
mirror of https://github.com/LCPQ/quantum_package synced 2025-01-08 20:33:26 +01:00

Introduced qp_run -slave

This commit is contained in:
Anthony Scemama 2016-10-12 11:26:21 +02:00
parent c3a6b5ba6b
commit e356e97e16
5 changed files with 65 additions and 34 deletions

View File

@ -127,3 +127,14 @@ let get_ezfio_default directory data =
|> aux |> aux
;; ;;
let ezfio_work ezfio_file =
let result =
Filename.concat ezfio_file "work"
in
begin
match Sys.is_directory result with
| `Yes -> ()
| _ -> Unix.mkdir result
end;
result
;;

View File

@ -48,20 +48,21 @@ let zmq_context =
ZMQ.Context.create () ZMQ.Context.create ()
let bind_socket ~socket_type ~socket ~address = let bind_socket ~socket_type ~socket ~port =
let rec loop = function let rec loop = function
| 0 -> failwith @@ Printf.sprintf | 0 -> failwith @@ Printf.sprintf
"Unable to bind the %s socket : %s " "Unable to bind the %s socket to port : %d "
socket_type address socket_type port
| -1 -> () | -1 -> ()
| i -> | i ->
try try
ZMQ.Socket.bind socket address; ZMQ.Socket.bind socket @@ Printf.sprintf "tcp://*:%d" port;
loop (-1) loop (-1)
with with
| Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_float 1. ; loop (i-1) ) | Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_float 1. ; loop (i-1) )
| other_exception -> raise other_exception | other_exception -> raise other_exception
in loop 60 in loop 60;
ZMQ.Socket.bind socket @@ Printf.sprintf "ipc:///tmp/qp_run:%d" port
let hostname = lazy ( let hostname = lazy (
@ -115,7 +116,7 @@ let stop ~port =
let req_socket = let req_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.req ZMQ.Socket.create zmq_context ZMQ.Socket.req
and address = and address =
Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port Printf.sprintf "ipc:///tmp/qp_run:%d" port
in in
ZMQ.Socket.set_linger_period req_socket 1_000_000; ZMQ.Socket.set_linger_period req_socket 1_000_000;
ZMQ.Socket.connect req_socket address; ZMQ.Socket.connect req_socket address;
@ -567,10 +568,8 @@ let start_pub_thread ~port =
let pub_socket = let pub_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pub ZMQ.Socket.create zmq_context ZMQ.Socket.pub
and address =
Printf.sprintf "tcp://*:%d" port
in in
bind_socket ~socket_type:"PUB" ~socket:pub_socket ~address; bind_socket ~socket_type:"PUB" ~socket:pub_socket ~port;
let pollitem = let pollitem =
ZMQ.Poll.mask_of ZMQ.Poll.mask_of
@ -608,7 +607,7 @@ let run ~port =
and address = and address =
"inproc://pair" "inproc://pair"
in in
bind_socket "PAIR" pair_socket address; ZMQ.Socket.bind pair_socket address;
let pub_thread = let pub_thread =
start_pub_thread ~port:(port+1) () start_pub_thread ~port:(port+1) ()
@ -617,11 +616,9 @@ let run ~port =
(** Bind REP socket *) (** Bind REP socket *)
let rep_socket = let rep_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.rep ZMQ.Socket.create zmq_context ZMQ.Socket.rep
and address =
Printf.sprintf "tcp://*:%d" port
in in
ZMQ.Socket.set_linger_period rep_socket 1_000_000; ZMQ.Socket.set_linger_period rep_socket 1_000_000;
bind_socket "REP" rep_socket address; bind_socket "REP" rep_socket port;
let initial_program_state = let initial_program_state =
{ queue = Queuing_system.create () ; { queue = Queuing_system.create () ;
@ -721,6 +718,7 @@ let run ~port =
ZMQ.Socket.send pair_socket @@ string_of_pub_state Stopped; ZMQ.Socket.send pair_socket @@ string_of_pub_state Stopped;
Thread.join pub_thread; Thread.join pub_thread;
ZMQ.Socket.close rep_socket

View File

@ -23,9 +23,9 @@ val debug : string -> unit
(** ZeroMQ context *) (** ZeroMQ context *)
val zmq_context : ZMQ.Context.t val zmq_context : ZMQ.Context.t
(** Bind a ZMQ socket *) (** Bind a ZMQ socket to a TCP port and to an IPC file /tmp/qp_run.<port> *)
val bind_socket : val bind_socket :
socket_type:string -> socket:'a ZMQ.Socket.t -> address:string -> unit socket_type:string -> socket:'a ZMQ.Socket.t -> port:int -> unit
(** Name of the host on which the server runs *) (** Name of the host on which the server runs *)
val hostname : string lazy_t val hostname : string lazy_t

View File

@ -15,7 +15,7 @@ let print_list () =
let () = let () =
Random.self_init () Random.self_init ()
let run ~master exe ezfio_file = let run slave exe ezfio_file =
(** Check availability of the ports *) (** Check availability of the ports *)
@ -28,7 +28,7 @@ let run ~master exe ezfio_file =
in in
let rec try_new_port port_number = let rec try_new_port port_number =
try try
List.iter [ 0;1;2;3;4 ] ~f:(fun i -> List.iter [ 0;1;2;3;4;5;6;7;8;9 ] ~f:(fun i ->
let address = let address =
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) (port_number+i) Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) (port_number+i)
in in
@ -75,16 +75,23 @@ let run ~master exe ezfio_file =
| 0 -> () | 0 -> ()
| i -> failwith "Error: Input inconsistent\n" | i -> failwith "Error: Input inconsistent\n"
end; end;
begin
match master with
| Some address -> Unix.putenv ~key:"QP_RUN_ADDRESS_MASTER" ~data:address
| None -> ()
end;
(** Start task server *) let qp_run_address_filename =
let address = Filename.concat (Qpackage.ezfio_work ezfio_file) "qp_run_address"
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number
in in
let () =
if slave then
try
let address =
In_channel.read_all qp_run_address_filename
|> String.strip
in
Unix.putenv ~key:"QP_RUN_ADDRESS_MASTER" ~data:address
with Sys_error _ -> failwith "No master is not running"
in
(** Start task server *)
let task_thread = let task_thread =
let thread = let thread =
Thread.create ( fun () -> Thread.create ( fun () ->
@ -92,7 +99,16 @@ let run ~master exe ezfio_file =
in in
thread (); thread ();
in in
let address =
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number
in
Unix.putenv ~key:"QP_RUN_ADDRESS" ~data:address; Unix.putenv ~key:"QP_RUN_ADDRESS" ~data:address;
let () =
if (not slave) then
Out_channel.with_file qp_run_address_filename ~f:(
fun oc -> Out_channel.output_lines oc [address])
in
(** Run executable *) (** Run executable *)
let prefix = let prefix =
@ -111,6 +127,8 @@ let run ~master exe ezfio_file =
TaskServer.stop ~port:port_number; TaskServer.stop ~port:port_number;
Thread.join task_thread; Thread.join task_thread;
if (not slave) then
Sys.remove qp_run_address_filename;
let duration = Time.diff (Time.now()) time_start let duration = Time.diff (Time.now()) time_start
|> Core.Span.to_string in |> Core.Span.to_string in
@ -119,8 +137,8 @@ let run ~master exe ezfio_file =
let spec = let spec =
let open Command.Spec in let open Command.Spec in
empty empty
+> flag "master" (optional string) +> flag "slave" no_arg
~doc:("address Address of the master process") ~doc:(" Needed for slave tasks")
+> anon ("executable" %: string) +> anon ("executable" %: string)
+> anon ("ezfio_file" %: string) +> anon ("ezfio_file" %: string)
;; ;;
@ -138,8 +156,8 @@ Executes a Quantum Package binary file among these:\n\n"
) )
) )
spec spec
(fun master exe ezfio_file () -> (fun slave exe ezfio_file () ->
run ~master exe ezfio_file run slave exe ezfio_file
) )
|> Command.run ~version: Git.sha1 ~build_info: Git.message |> Command.run ~version: Git.sha1 ~build_info: Git.message

View File

@ -670,12 +670,16 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
message = trim(message(1:rc)) message = trim(message(1:rc))
read(message,*) reply, state read(message,*) reply, state
if ( (trim(reply) /= 'disconnect_reply').or. & if ((trim(reply) == 'disconnect_reply').and.(trim(state) == trim(zmq_state))) then
(trim(state) /= zmq_state) ) then return
print *, 'Unable to disconnect : ', zmq_state
print *, trim(message)
stop -1
endif endif
if (trim(message) == 'error No job is running') then
return
endif
print *, 'Unable to disconnect : ', trim(zmq_state)
print *, trim(message)
stop -1
end end