2019-06-15 00:43:09 +02:00
open Qputils
open Qptypes
type ezfio_or_address = EZFIO of string | ADDRESS of string
type req_or_sub = REQ | SUB
let localport = 42379
2019-07-31 20:56:00 +02:00
let in_time_sum = ref 1 . e - 9
and in_size_sum = ref 0 .
2019-06-15 00:43:09 +02:00
let () =
let open Command_line in
begin
" Creates an ssh tunnel for using slaves on another network. Launch a server on the front-end node of the cluster on which the master process runs. Then start a client ont the front-end node of the distant cluster. "
| > set_footer_doc ;
[ { short = 'g' ; long = " get-input " ; opt = Optional ;
doc = " Downloads the EZFIO directory. " ;
arg = Without_arg ; } ;
2019-07-31 20:56:00 +02:00
{ short = 'v' ; long = " verbose " ; opt = Optional ;
doc = " Prints the transfer speed. " ;
arg = Without_arg ; } ;
2019-06-15 00:43:09 +02:00
anonymous
" (EZFIO_DIR|ADDRESS) "
Mandatory
" EZFIO directory or address. " ;
] | > set_specs
end ;
let arg =
let x =
match Command_line . anon_args () with
| [ x ] -> x
| _ -> begin
Command_line . help () ;
failwith " EZFIO_FILE or ADDRESS is missing "
end
in
if Sys . file_exists x && Sys . is_directory x then
EZFIO x
else
ADDRESS x
in
2019-07-31 20:56:00 +02:00
let verbose =
Command_line . get_bool " verbose "
in
2019-06-15 00:43:09 +02:00
let localhost =
Lazy . force TaskServer . ip_address
in
let long_address =
match arg with
| ADDRESS x -> x
| EZFIO x ->
let ic =
Filename . concat ( Qpackage . ezfio_work x ) " qp_run_address "
| > open_in
in
let result =
input_line ic
| > String . trim
in
close_in ic ;
result
in
let protocol , address , port =
match String . split_on_char ':' long_address with
| t :: a :: p :: [] -> t , a , int_of_string p
| _ -> failwith @@
Printf . sprintf " %s : Malformed address " long_address
in
let zmq_context =
Zmq . Context . create ()
in
(* * Check availability of the ports *)
let localport =
let dummy_socket =
Zmq . Socket . create zmq_context Zmq . Socket . rep
in
let rec try_new_port port_number =
try
List . iter ( fun i ->
let address =
Printf . sprintf " tcp://%s:%d " localhost ( port_number + i )
in
Zmq . Socket . bind dummy_socket address ;
Zmq . Socket . unbind dummy_socket address
) [ 0 ; 1 ; 2 ; 3 ; 4 ; 5 ; 6 ; 7 ; 8 ; 9 ] ;
port_number
with
| Unix . Unix_error _ -> try_new_port ( port_number + 100 )
in
let result =
try_new_port localport
in
Zmq . Socket . close dummy_socket ;
result
in
let create_socket sock_type bind_or_connect addr =
let socket =
Zmq . Socket . create zmq_context sock_type
in
let () =
try
bind_or_connect socket addr
with
| _ -> failwith @@
Printf . sprintf " Unable to establish connection to %s. " addr
in
socket
in
(* Handle termination *)
let run_status = ref true in
let handler =
Sys . Signal_handle ( fun signum ->
run_status := false ;
Sys . set_signal signum Sys . Signal_default
)
in
Sys . set_signal Sys . sigusr1 handler ;
Sys . set_signal Sys . sigint handler ;
let new_thread req_or_sub addr_in addr_out =
let socket_in , socket_out =
match req_or_sub with
| REQ ->
create_socket Zmq . Socket . rep Zmq . Socket . bind addr_in ,
create_socket Zmq . Socket . req Zmq . Socket . connect addr_out
| SUB ->
create_socket Zmq . Socket . sub Zmq . Socket . connect addr_in ,
create_socket Zmq . Socket . pub Zmq . Socket . bind addr_out
in
if req_or_sub = SUB then
Zmq . Socket . subscribe socket_in " " ;
let action =
2019-07-31 20:56:00 +02:00
if verbose then
begin
match req_or_sub with
| REQ -> ( fun () ->
let msg =
Zmq . Socket . recv_all socket_in
in
let t0 = Unix . gettimeofday () in
Zmq . Socket . send_all socket_out msg ;
let in_size =
float_of_int ( List . fold_left ( fun accu x -> accu + String . length x ) 0 msg )
/. 8192 . /. 1024 .
in
let msg =
Zmq . Socket . recv_all socket_out
in
let t1 = Unix . gettimeofday () in
Zmq . Socket . send_all socket_in msg ;
let in_time = t1 -. t0 in
in_time_sum := ! in_time_sum + . in_time ;
in_size_sum := ! in_size_sum + . in_size ;
Printf . printf " %16.2f MiB/s -- %16.2f MiB/s \n %! " ( in_size /. in_time ) ( ! in_size_sum /. ! in_time_sum ) ;
)
| SUB -> ( fun () ->
Zmq . Socket . recv_all socket_in | > Zmq . Socket . send_all socket_out )
end
else
begin
match req_or_sub with
| REQ -> ( fun () ->
Zmq . Socket . recv_all socket_in | > Zmq . Socket . send_all socket_out ;
Zmq . Socket . recv_all socket_out | > Zmq . Socket . send_all socket_in )
| SUB -> ( fun () ->
Zmq . Socket . recv_all socket_in | > Zmq . Socket . send_all socket_out )
end
2019-06-15 00:43:09 +02:00
in
let pollitem =
Zmq . Poll . mask_of
[| ( socket_in , Zmq . Poll . In ) |]
in
while ! run_status do
let polling =
Zmq . Poll . poll ~ timeout : 1000 pollitem
in
match polling . ( 0 ) with
| Some Zmq . Poll . In -> action ()
| None -> ()
| Some Zmq . Poll . In_out
| Some Zmq . Poll . Out -> ()
done ;
Zmq . Socket . close socket_in ;
Zmq . Socket . close socket_out ;
in
let ocaml_thread =
let addr_out =
Printf . sprintf " tcp:%s:%d " address port
in
let addr_in =
Printf . sprintf " tcp://*:%d " localport
in
let f () =
new_thread REQ addr_in addr_out
in
( Thread . create f ) ()
in
Printf . printf " Connect to: \n tcp://%s:%d \n %! " localhost localport ;
let fortran_thread =
let addr_out =
Printf . sprintf " tcp:%s:%d " address ( port + 2 )
in
let addr_in =
Printf . sprintf " tcp://*:%d " ( localport + 2 )
in
let f () =
new_thread REQ addr_in addr_out
in
( Thread . create f ) ()
in
let pub_thread =
let addr_in =
Printf . sprintf " tcp:%s:%d " address ( port + 1 )
in
let addr_out =
Printf . sprintf " tcp://*:%d " ( localport + 1 )
in
let f () =
new_thread SUB addr_in addr_out
in
( Thread . create f ) ()
in
let input_thread =
let f () =
let addr_out =
match arg with
| EZFIO _ -> None
| ADDRESS _ -> Some (
Printf . sprintf " tcp:%s:%d " address ( port + 9 ) )
in
let addr_in =
Printf . sprintf " tcp://*:%d " ( localport + 9 )
in
let socket_in =
create_socket Zmq . Socket . rep Zmq . Socket . bind addr_in
in
let socket_out =
match addr_out with
| Some addr_out -> Some (
create_socket Zmq . Socket . req Zmq . Socket . connect addr_out )
| None -> None
in
let temp_file =
Filename . temp_file " qp_tunnel " " .tar.gz "
in
let get_ezfio_filename () =
match arg with
| EZFIO x -> x
| ADDRESS _ ->
begin
match socket_out with
| None -> assert false
| Some socket_out -> (
Zmq . Socket . send socket_out " get_ezfio_filename " ;
Zmq . Socket . recv socket_out
)
end
in
let get_input () =
match arg with
| EZFIO x ->
begin
Printf . sprintf " tar -zcf %s %s " temp_file x
| > Sys . command | > ignore ;
let fd =
Unix . openfile temp_file [ Unix . O_RDONLY ] 0o640
in
let len =
Unix . lseek fd 0 Unix . SEEK_END
in
ignore @@ Unix . lseek fd 0 Unix . SEEK_SET ;
let bstr =
Unix . map_file fd Bigarray . char
Bigarray . c_layout false [| len |]
| > Bigarray . array1_of_genarray
in
let result =
String . init len ( fun i -> bstr . { i } ) ;
in
Unix . close fd ;
Sys . remove temp_file ;
result
end
| ADDRESS _ ->
begin
match socket_out with
| None -> assert false
| Some socket_out -> (
Zmq . Socket . send socket_out " get_input " ;
Zmq . Socket . recv socket_out
)
end
in
let () =
match socket_out with
| None -> ()
| Some socket_out ->
Zmq . Socket . send socket_out " test " ;
Printf . printf " Communication [ %s ] \n %! " ( Zmq . Socket . recv socket_out ) ;
in
(* Download input if asked *)
if Command_line . get_bool " get-input " then
begin
match arg with
| EZFIO _ -> ()
| ADDRESS _ ->
begin
Printf . printf " Getting input... %! " ;
let ezfio_filename =
get_ezfio_filename ()
in
Printf . printf " %s%! " ezfio_filename ;
let oc =
open_out temp_file
in
get_input ()
| > output_string oc ;
close_out oc ;
Printf . sprintf " tar -zxf %s " temp_file
| > Sys . command | > ignore ;
let oc =
Filename . concat ( Qpackage . ezfio_work ezfio_filename ) " qp_run_address "
| > open_out
in
Printf . fprintf oc " tcp://%s:%d \n " localhost localport ;
close_out oc ;
Printf . printf " ...done \n %! "
end
end ;
(* Main loop *)
let pollitem =
Zmq . Poll . mask_of [| ( socket_in , Zmq . Poll . In ) |]
in
let action () =
match Zmq . Socket . recv socket_in with
| " get_input " -> get_input ()
| > Zmq . Socket . send socket_in
| " get_ezfio_filename " -> get_ezfio_filename ()
| > Zmq . Socket . send socket_in
| " test " -> Zmq . Socket . send socket_in " OK "
| x -> Printf . sprintf " Message '%s' not understood " x
| > Zmq . Socket . send socket_in
in
2019-07-31 20:56:00 +02:00
Printf . printf "
On remote hosts , create ssh tunnel using :
ssh - n - L % d : % s : % d - L % d : % s : % d - L % d : % s : % d - L % d : % s : % d % s &
Or from this host connect to clients using :
ssh - n - R % d : localhost : % d - R % d : localhost : % d - R % d : localhost : % d - R % d : localhost : % d < host > &
%! "
2019-06-17 19:21:01 +02:00
( port ) localhost ( localport )
( port + 1 ) localhost ( localport + 1 )
2019-07-31 20:56:00 +02:00
( port + 2 ) localhost ( localport + 2 )
2019-06-17 19:21:01 +02:00
( port + 9 ) localhost ( localport + 9 )
2019-07-31 20:56:00 +02:00
( Unix . gethostname () )
( port ) ( localport )
( port + 1 ) ( localport + 1 )
( port + 2 ) ( localport + 2 )
( port + 9 ) ( localport + 9 ) ;
2019-06-15 00:43:09 +02:00
Printf . printf " Ready \n %! " ;
while ! run_status do
let polling =
Zmq . Poll . poll ~ timeout : 1000 pollitem
in
match polling . ( 0 ) with
| Some Zmq . Poll . In -> action ()
| None -> ()
| Some Zmq . Poll . In_out
| Some Zmq . Poll . Out -> ()
done ;
let () =
match socket_out with
| Some socket_out -> Zmq . Socket . close socket_out
| None -> ()
in
Zmq . Socket . close socket_in
in
( Thread . create f ) ()
in
(* Termination *)
Thread . join input_thread ;
Thread . join fortran_thread ;
Thread . join pub_thread ;
Thread . join ocaml_thread ;
Zmq . Context . terminate zmq_context ;
Printf . printf " qp_tunnel exited properly. \n "