2019-06-15 00:43:09 +02:00
open Qputils
open Qptypes
type ezfio_or_address = EZFIO of string | ADDRESS of string
2022-07-05 01:17:43 +02:00
type req_or_sub = REQ | SUB
2019-06-15 00:43:09 +02:00
let localport = 42379
2019-07-31 20:56:00 +02:00
let in_time_sum = ref 1 . e - 9
and in_size_sum = ref 0 .
2019-06-15 00:43:09 +02:00
let () =
let open Command_line in
begin
" Creates an ssh tunnel for using slaves on another network. Launch a server on the front-end node of the cluster on which the master process runs. Then start a client ont the front-end node of the distant cluster. "
| > set_footer_doc ;
[ { short = 'g' ; long = " get-input " ; opt = Optional ;
doc = " Downloads the EZFIO directory. " ;
arg = Without_arg ; } ;
anonymous
" (EZFIO_DIR|ADDRESS) "
Mandatory
" EZFIO directory or address. " ;
] | > set_specs
end ;
let arg =
2022-07-05 01:17:43 +02:00
let x =
2019-06-15 00:43:09 +02:00
match Command_line . anon_args () with
| [ x ] -> x
| _ -> begin
Command_line . help () ;
failwith " EZFIO_FILE or ADDRESS is missing "
end
in
if Sys . file_exists x && Sys . is_directory x then
EZFIO x
else
ADDRESS x
in
2022-07-05 01:17:43 +02:00
let localhost =
2019-06-15 00:43:09 +02:00
Lazy . force TaskServer . ip_address
in
let long_address =
match arg with
| ADDRESS x -> x
2022-07-05 01:17:43 +02:00
| EZFIO x ->
let ic =
2019-06-15 00:43:09 +02:00
Filename . concat ( Qpackage . ezfio_work x ) " qp_run_address "
| > open_in
in
2022-07-05 01:17:43 +02:00
let result =
2019-06-15 00:43:09 +02:00
input_line ic
| > String . trim
in
close_in ic ;
result
in
2022-07-05 01:17:43 +02:00
2019-06-15 00:43:09 +02:00
let protocol , address , port =
match String . split_on_char ':' long_address with
| t :: a :: p :: [] -> t , a , int_of_string p
2022-07-05 01:17:43 +02:00
| _ -> failwith @@
2019-06-15 00:43:09 +02:00
Printf . sprintf " %s : Malformed address " long_address
in
2022-07-05 01:17:43 +02:00
let zmq_context =
2019-06-15 00:43:09 +02:00
Zmq . Context . create ()
in
(* * Check availability of the ports *)
let localport =
let dummy_socket =
Zmq . Socket . create zmq_context Zmq . Socket . rep
in
let rec try_new_port port_number =
try
List . iter ( fun i ->
let address =
Printf . sprintf " tcp://%s:%d " localhost ( port_number + i )
in
Zmq . Socket . bind dummy_socket address ;
Zmq . Socket . unbind dummy_socket address
) [ 0 ; 1 ; 2 ; 3 ; 4 ; 5 ; 6 ; 7 ; 8 ; 9 ] ;
port_number
with
| Unix . Unix_error _ -> try_new_port ( port_number + 100 )
in
let result =
try_new_port localport
in
Zmq . Socket . close dummy_socket ;
result
in
let create_socket sock_type bind_or_connect addr =
2022-07-05 01:17:43 +02:00
let socket =
2019-06-15 00:43:09 +02:00
Zmq . Socket . create zmq_context sock_type
in
2022-07-05 01:17:43 +02:00
let () =
2019-06-15 00:43:09 +02:00
try
bind_or_connect socket addr
with
| _ -> failwith @@
Printf . sprintf " Unable to establish connection to %s. " addr
in
socket
in
(* Handle termination *)
let run_status = ref true in
let handler =
Sys . Signal_handle ( fun signum ->
run_status := false ;
Sys . set_signal signum Sys . Signal_default
)
in
Sys . set_signal Sys . sigusr1 handler ;
Sys . set_signal Sys . sigint handler ;
2022-07-05 01:17:43 +02:00
let new_thread req_or_sub addr_in addr_out =
2019-06-15 00:43:09 +02:00
let socket_in , socket_out =
2022-07-05 01:17:43 +02:00
match req_or_sub with
| REQ ->
2019-08-01 12:11:10 +02:00
create_socket Zmq . Socket . router Zmq . Socket . bind addr_in ,
create_socket Zmq . Socket . dealer Zmq . Socket . connect addr_out
2022-07-05 01:17:43 +02:00
| SUB ->
2019-06-15 00:43:09 +02:00
create_socket Zmq . Socket . sub Zmq . Socket . connect addr_in ,
create_socket Zmq . Socket . pub Zmq . Socket . bind addr_out
in
2022-07-05 01:17:43 +02:00
if req_or_sub = SUB then
Zmq . Socket . subscribe socket_in " " ;
2019-06-15 00:43:09 +02:00
2022-07-05 01:17:43 +02:00
let action_in =
match req_or_sub with
| REQ -> ( fun () -> Zmq . Socket . recv_all socket_in | > Zmq . Socket . send_all socket_out )
| SUB -> ( fun () -> Zmq . Socket . recv_all socket_in | > Zmq . Socket . send_all socket_out )
2019-08-01 12:11:10 +02:00
in
2022-07-05 01:17:43 +02:00
let action_out =
match req_or_sub with
| REQ -> ( fun () -> Zmq . Socket . recv_all socket_out | > Zmq . Socket . send_all socket_in )
| SUB -> ( fun () -> () )
2019-08-01 12:11:10 +02:00
in
2019-06-15 00:43:09 +02:00
let pollitem =
Zmq . Poll . mask_of
2022-07-05 01:17:43 +02:00
[| ( socket_in , Zmq . Poll . In ) ; ( socket_out , Zmq . Poll . In ) |]
2019-06-15 00:43:09 +02:00
in
while ! run_status do
let polling =
Zmq . Poll . poll ~ timeout : 1000 pollitem
in
2019-08-01 12:11:10 +02:00
match polling with
| [| Some Zmq . Poll . In ; Some Zmq . Poll . In |] -> ( action_out () ; action_in () )
2022-07-05 01:17:43 +02:00
| [| _ ; Some Zmq . Poll . In |] -> action_out ()
| [| Some Zmq . Poll . In ; _ |] -> action_in ()
2019-08-01 12:11:10 +02:00
| _ -> ()
2019-06-15 00:43:09 +02:00
done ;
Zmq . Socket . close socket_in ;
Zmq . Socket . close socket_out ;
in
let ocaml_thread =
let addr_out =
Printf . sprintf " tcp:%s:%d " address port
in
let addr_in =
Printf . sprintf " tcp://*:%d " localport
in
2022-07-05 01:17:43 +02:00
let f () =
new_thread REQ addr_in addr_out
2019-06-15 00:43:09 +02:00
in
( Thread . create f ) ()
in
Printf . printf " Connect to: \n tcp://%s:%d \n %! " localhost localport ;
let fortran_thread =
let addr_out =
Printf . sprintf " tcp:%s:%d " address ( port + 2 )
in
let addr_in =
Printf . sprintf " tcp://*:%d " ( localport + 2 )
in
2022-07-05 01:17:43 +02:00
let f () =
new_thread REQ addr_in addr_out
2019-06-15 00:43:09 +02:00
in
( Thread . create f ) ()
in
let pub_thread =
let addr_in =
Printf . sprintf " tcp:%s:%d " address ( port + 1 )
in
let addr_out =
Printf . sprintf " tcp://*:%d " ( localport + 1 )
in
2022-07-05 01:17:43 +02:00
let f () =
new_thread SUB addr_in addr_out
2019-06-15 00:43:09 +02:00
in
( Thread . create f ) ()
in
let input_thread =
2022-07-05 01:17:43 +02:00
let f () =
2019-06-15 00:43:09 +02:00
let addr_out =
match arg with
| EZFIO _ -> None
| ADDRESS _ -> Some (
Printf . sprintf " tcp:%s:%d " address ( port + 9 ) )
in
let addr_in =
Printf . sprintf " tcp://*:%d " ( localport + 9 )
in
2022-07-05 01:17:43 +02:00
let socket_in =
2019-06-15 00:43:09 +02:00
create_socket Zmq . Socket . rep Zmq . Socket . bind addr_in
in
let socket_out =
2022-07-05 01:17:43 +02:00
match addr_out with
2019-06-15 00:43:09 +02:00
| Some addr_out -> Some (
create_socket Zmq . Socket . req Zmq . Socket . connect addr_out )
| None -> None
in
2022-07-05 01:17:43 +02:00
let temp_file =
2019-06-15 00:43:09 +02:00
Filename . temp_file " qp_tunnel " " .tar.gz "
in
2022-07-05 01:17:43 +02:00
let get_ezfio_filename () =
2019-06-15 00:43:09 +02:00
match arg with
| EZFIO x -> x
| ADDRESS _ ->
begin
match socket_out with
| None -> assert false
| Some socket_out -> (
Zmq . Socket . send socket_out " get_ezfio_filename " ;
Zmq . Socket . recv socket_out
)
end
in
2022-07-05 01:17:43 +02:00
let get_input () =
2019-06-15 00:43:09 +02:00
match arg with
2022-07-05 01:17:43 +02:00
| EZFIO x ->
2019-06-15 00:43:09 +02:00
begin
2019-08-02 13:03:14 +02:00
Printf . sprintf " tar --exclude= \" *.gz.* \" -zcf %s %s " temp_file x
2019-06-15 00:43:09 +02:00
| > Sys . command | > ignore ;
let fd =
Unix . openfile temp_file [ Unix . O_RDONLY ] 0o640
in
let len =
Unix . lseek fd 0 Unix . SEEK_END
in
ignore @@ Unix . lseek fd 0 Unix . SEEK_SET ;
let bstr =
2022-07-05 01:17:43 +02:00
Unix . map_file fd Bigarray . char
2019-06-15 00:43:09 +02:00
Bigarray . c_layout false [| len |]
| > Bigarray . array1_of_genarray
in
2022-07-05 01:17:43 +02:00
let result =
2019-06-15 00:43:09 +02:00
String . init len ( fun i -> bstr . { i } ) ;
in
Unix . close fd ;
Sys . remove temp_file ;
result
end
| ADDRESS _ ->
begin
match socket_out with
| None -> assert false
| Some socket_out -> (
Zmq . Socket . send socket_out " get_input " ;
Zmq . Socket . recv socket_out
)
end
in
2022-07-05 01:17:43 +02:00
let () =
2019-06-15 00:43:09 +02:00
match socket_out with
| None -> ()
| Some socket_out ->
Zmq . Socket . send socket_out " test " ;
Printf . printf " Communication [ %s ] \n %! " ( Zmq . Socket . recv socket_out ) ;
in
(* Download input if asked *)
if Command_line . get_bool " get-input " then
begin
match arg with
| EZFIO _ -> ()
| ADDRESS _ ->
begin
Printf . printf " Getting input... %! " ;
2022-07-05 01:17:43 +02:00
let ezfio_filename =
2019-06-15 00:43:09 +02:00
get_ezfio_filename ()
in
Printf . printf " %s%! " ezfio_filename ;
let oc =
open_out temp_file
in
get_input ()
| > output_string oc ;
close_out oc ;
Printf . sprintf " tar -zxf %s " temp_file
| > Sys . command | > ignore ;
let oc =
Filename . concat ( Qpackage . ezfio_work ezfio_filename ) " qp_run_address "
2022-07-05 01:17:43 +02:00
| > open_out
2019-06-15 00:43:09 +02:00
in
Printf . fprintf oc " tcp://%s:%d \n " localhost localport ;
close_out oc ;
Printf . printf " ...done \n %! "
end
end ;
(* Main loop *)
let pollitem =
Zmq . Poll . mask_of [| ( socket_in , Zmq . Poll . In ) |]
in
let action () =
match Zmq . Socket . recv socket_in with
| " get_input " -> get_input ()
2022-07-05 01:17:43 +02:00
| > Zmq . Socket . send socket_in
2019-06-15 00:43:09 +02:00
| " get_ezfio_filename " -> get_ezfio_filename ()
2022-07-05 01:17:43 +02:00
| > Zmq . Socket . send socket_in
2019-06-15 00:43:09 +02:00
| " test " -> Zmq . Socket . send socket_in " OK "
| x -> Printf . sprintf " Message '%s' not understood " x
| > Zmq . Socket . send socket_in
in
2019-07-31 20:56:00 +02:00
Printf . printf "
On remote hosts , create ssh tunnel using :
2019-08-01 12:11:10 +02:00
ssh - L % d : % s : % d - L % d : % s : % d - L % d : % s : % d - L % d : % s : % d % s &
2019-07-31 20:56:00 +02:00
Or from this host connect to clients using :
2019-08-01 12:11:10 +02:00
ssh - R % d : localhost : % d - R % d : localhost : % d - R % d : localhost : % d - R % d : localhost : % d < host > &
2022-07-05 01:17:43 +02:00
%! "
2019-06-17 19:21:01 +02:00
( port ) localhost ( localport )
( port + 1 ) localhost ( localport + 1 )
2019-07-31 20:56:00 +02:00
( port + 2 ) localhost ( localport + 2 )
2019-06-17 19:21:01 +02:00
( port + 9 ) localhost ( localport + 9 )
2019-07-31 20:56:00 +02:00
( Unix . gethostname () )
( port ) ( localport )
( port + 1 ) ( localport + 1 )
( port + 2 ) ( localport + 2 )
( port + 9 ) ( localport + 9 ) ;
2019-06-15 00:43:09 +02:00
Printf . printf " Ready \n %! " ;
while ! run_status do
let polling =
Zmq . Poll . poll ~ timeout : 1000 pollitem
in
match polling . ( 0 ) with
| Some Zmq . Poll . In -> action ()
| None -> ()
2022-07-05 01:17:43 +02:00
| Some Zmq . Poll . In_out
2019-06-15 00:43:09 +02:00
| Some Zmq . Poll . Out -> ()
done ;
2022-07-05 01:17:43 +02:00
let () =
2019-06-15 00:43:09 +02:00
match socket_out with
| Some socket_out -> Zmq . Socket . close socket_out
| None -> ()
in
Zmq . Socket . close socket_in
in
( Thread . create f ) ()
in
(* Termination *)
Thread . join input_thread ;
Thread . join fortran_thread ;
Thread . join pub_thread ;
Thread . join ocaml_thread ;
Zmq . Context . terminate zmq_context ;
Printf . printf " qp_tunnel exited properly. \n "
2022-07-05 01:17:43 +02:00
2019-06-15 00:43:09 +02:00