9
1
mirror of https://github.com/QuantumPackage/qp2.git synced 2025-01-02 08:35:38 +01:00

Added qp_tunnel

This commit is contained in:
Anthony Scemama 2019-06-15 00:43:09 +02:00
parent 7cb3321152
commit 191d8ff0af
8 changed files with 687 additions and 78 deletions

View File

@ -1,71 +0,0 @@
#!/usr/bin/env python2
"""
Creates an ssh tunnel for using slaves on another network.
Launch a server on the front-end node of the cluster on which the master
process runs. Then start a client ont the front-end node of the distant
cluster.
Usage:
qp_tunnel server EZFIO_DIR
qp_tunnel client <address> EZFIO_DIR
Options:
-h --help
"""
import os
import sys
import zmq
try:
import qp_path
except ImportError:
print "source .quantum_package.rc"
raise
from docopt import docopt
from ezfio import ezfio
def get_address(filename):
with open(os.path.join(filename,'work','qp_run_address'),'r') as f:
a = f.readlines()[0].strip()
return a
def set_address(filename,address):
with open(os.path.join(filename,'work','qp_run_address'),'r') as f:
backup = f.readlines()
with open(os.path.join(filename,'work','qp_run_address'),'w') as f:
f.write('\n'.join([address]+backup))
def main_server(arguments,filename):
destination = get_address(filename)
print destination
def main_client(arguments,filename):
destination = arguments["<address>"]
print destination
def main(arguments):
"""Main function"""
print arguments
filename = arguments["EZFIO_DIR"]
if arguments["server"]:
return main_server(arguments, filename)
if arguments["client"]:
return main_client(arguments, filename)
if __name__ == '__main__':
ARGUMENTS = docopt(__doc__)
main(ARGUMENTS)

View File

@ -0,0 +1,89 @@
.. _qp_tunnel:
=========
qp_tunnel
=========
.. TODO
.. program:: qp_tunnel
Establishes a tunnel to allow communications between machines within
different networks, for example multiple MPI slave jobs running on
different clusters.
Usage
-----
.. code:: bash
qp_tunnel [-g] (ADDRESS|EZFIO_DIR)
``EZFIO_DIR`` is the name of the |EZFIO| directory containing the data,
and ``ADDRESS`` is the address of another tunnel.
.. option:: -h, --help
Displays the help message
.. option:: -g, --get-input
Download the EZFIO directory from the remote instance of qp_tunnel.
Example
-------
.. code:: text
+-------------------+ +------------------+
| | | |
| N1_1 N1_2 N1_3 | | N2_1 N2_2 N2_3 |
| | | | | | | | | |
| +----+----+ | | +----+----+ |
| | | | | |
| C1 F1 | | F2 C2 |
| +---------=----=--------+ |
| | | |
+-------------------+ +------------------+
Imagine you have two clusters, C1 and C2. Each cluster is accessible via SSH
on a front-end named respectively F1 and F2. Groups of nodes N1 and N2 have
been reserved by the batch scheduling system on both clusters.
Each node in N1 is on the same network as the other nodes of N1, but they
can't access the network on which the nodes of N2 are.
1) Start a parallel simulation on the cluster C1, running on nodes N1.
We assume that there is a shared file system, such that F1 can access
the EZFIO directory. We also assume that F1 can communicate with the
nodes of N1.
2) Run a tunnel on the front-end F1 and keep it running:
.. code:: bash
me@f1 $ qp_tunnel my_directory.ezfio
Connect to:
tcp://31.122.230.47:42379
Ready
3) On the front-end F2, run another instance connecting to the other one,
which will fetch the |EZFIO| directory:
.. code:: bash
me@f2 $ qp_tunnel --get-input tcp://31.122.230.47:42379
Connect to:
tcp://31.122.209.139:42379
Communication [ OK ]
Getting input... my_directory.ezfio ...done
Ready
4) Keep the tunnel running, and you can now run a slave simulation within the
nodes N2.

139
man/qp_tunnel.1 Normal file
View File

@ -0,0 +1,139 @@
.\" Man page generated from reStructuredText.
.
.TH "QP_TUNNEL" "1" "Jun 15, 2019" "2.0" "Quantum Package"
.SH NAME
qp_tunnel \- | Quantum Package >
.
.nr rst2man-indent-level 0
.
.de1 rstReportMargin
\\$1 \\n[an-margin]
level \\n[rst2man-indent-level]
level margin: \\n[rst2man-indent\\n[rst2man-indent-level]]
-
\\n[rst2man-indent0]
\\n[rst2man-indent1]
\\n[rst2man-indent2]
..
.de1 INDENT
.\" .rstReportMargin pre:
. RS \\$1
. nr rst2man-indent\\n[rst2man-indent-level] \\n[an-margin]
. nr rst2man-indent-level +1
.\" .rstReportMargin post:
..
.de UNINDENT
. RE
.\" indent \\n[an-margin]
.\" old: \\n[rst2man-indent\\n[rst2man-indent-level]]
.nr rst2man-indent-level -1
.\" new: \\n[rst2man-indent\\n[rst2man-indent-level]]
.in \\n[rst2man-indent\\n[rst2man-indent-level]]u
..
.sp
Establishes a tunnel to allow communications between machines within
different networks, for example multiple MPI slave jobs running on
different clusters.
.SH USAGE
.INDENT 0.0
.INDENT 3.5
.sp
.nf
.ft C
qp_tunnel [\-g] (ADDRESS|EZFIO_DIR)
.ft P
.fi
.UNINDENT
.UNINDENT
.sp
\fBEZFIO_DIR\fP is the name of the \fI\%EZFIO\fP directory containing the data,
and \fBADDRESS\fP is the address of another tunnel.
.INDENT 0.0
.TP
.B \-h, \-\-help
Displays the help message
.UNINDENT
.INDENT 0.0
.TP
.B \-g, \-\-get\-input
Download the EZFIO directory from the remote instance of qp_tunnel.
.UNINDENT
.SH EXAMPLE
.INDENT 0.0
.INDENT 3.5
.sp
.nf
.ft C
+\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-+ +\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-+
| | | |
| N1_1 N1_2 N1_3 | | N2_1 N2_2 N2_3 |
| | | | | | | | | |
| +\-\-\-\-+\-\-\-\-+ | | +\-\-\-\-+\-\-\-\-+ |
| | | | | |
| C1 F1 | | F2 C2 |
| +\-\-\-\-\-\-\-\-\-=\-\-\-\-=\-\-\-\-\-\-\-\-+ |
| | | |
+\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-+ +\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-+
.ft P
.fi
.UNINDENT
.UNINDENT
.sp
Imagine you have two clusters, C1 and C2. Each cluster is accessible via SSH
on a front\-end named respectively F1 and F2. Groups of nodes N1 and N2 have
been reserved by the batch scheduling system on both clusters.
Each node in N1 is on the same network as the other nodes of N1, but they
cant access the network on which the nodes of N2 are.
.INDENT 0.0
.IP 1. 3
Start a parallel simulation on the cluster C1, running on nodes N1.
We assume that there is a shared file system, such that F1 can access
the EZFIO directory. We also assume that F1 can communicate with the
nodes of N1.
.IP 2. 3
Run a tunnel on the front\-end F1 and keep it running:
.UNINDENT
.INDENT 0.0
.INDENT 3.5
.sp
.nf
.ft C
me@f1 $ qp_tunnel my_directory.ezfio
Connect to:
tcp://31.122.230.47:42379
Ready
.ft P
.fi
.UNINDENT
.UNINDENT
.INDENT 0.0
.IP 3. 3
On the front\-end F2, run another instance connecting to the other one,
which will fetch the \fI\%EZFIO\fP directory:
.UNINDENT
.INDENT 0.0
.INDENT 3.5
.sp
.nf
.ft C
me@f2 $ qp_tunnel \-\-get\-input tcp://31.122.230.47:42379
Connect to:
tcp://31.122.209.139:42379
Communication [ OK ]
Getting input... my_directory.ezfio ...done
Ready
.ft P
.fi
.UNINDENT
.UNINDENT
.INDENT 0.0
.IP 4. 3
Keep the tunnel running, and you can now run a slave simulation within the
nodes N2.
.UNINDENT
.SH AUTHOR
A. Scemama, E. Giner
.SH COPYRIGHT
2019, A. Scemama, E. Giner
.\" Generated by docutils manpage writer.
.

View File

@ -34,7 +34,7 @@ level margin: \\n[rst2man-indent\\n[rst2man-indent-level]]
.INDENT 3.5 .INDENT 3.5
Rotates molecular orbitals i and j by combining them as Rotates molecular orbitals i and j by combining them as
$1/sqrt{2} ( phi_i + phi_j )$ and $1/sqrt{2} ( phi_i + phi_j )$ and
$1/sqrt{2} ( phi_i \- phi_j )$. $1/sqrt{2} ( phi_i - phi_j )$.
.sp .sp
Needs: Needs:
.INDENT 0.0 .INDENT 0.0

55
man/test.1 Normal file
View File

@ -0,0 +1,55 @@
.\" Man page generated from reStructuredText.
.
.TH "TEST" "1" "Jun 15, 2019" "2.0" "Quantum Package"
.SH NAME
test \- | Quantum Package >
.
.nr rst2man-indent-level 0
.
.de1 rstReportMargin
\\$1 \\n[an-margin]
level \\n[rst2man-indent-level]
level margin: \\n[rst2man-indent\\n[rst2man-indent-level]]
-
\\n[rst2man-indent0]
\\n[rst2man-indent1]
\\n[rst2man-indent2]
..
.de1 INDENT
.\" .rstReportMargin pre:
. RS \\$1
. nr rst2man-indent\\n[rst2man-indent-level] \\n[an-margin]
. nr rst2man-indent-level +1
.\" .rstReportMargin post:
..
.de UNINDENT
. RE
.\" indent \\n[an-margin]
.\" old: \\n[rst2man-indent\\n[rst2man-indent-level]]
.nr rst2man-indent-level -1
.\" new: \\n[rst2man-indent\\n[rst2man-indent-level]]
.in \\n[rst2man-indent\\n[rst2man-indent-level]]u
..
.INDENT 0.0
.INDENT 3.5
Calls:
.INDENT 0.0
.INDENT 2.0
.IP \(bu 2
\fBtwo_e_integrals_index()\fP
.UNINDENT
.INDENT 2.0
.IP \(bu 2
\fBtwo_e_integrals_index_reverse()\fP
.UNINDENT
.INDENT 2.0
.UNINDENT
.UNINDENT
.UNINDENT
.UNINDENT
.SH AUTHOR
A. Scemama, E. Giner
.SH COPYRIGHT
2019, A. Scemama, E. Giner
.\" Generated by docutils manpage writer.
.

View File

@ -1,4 +1,4 @@
PKG core ZMQ cryptokit PKG core zmq cryptokit
B _build/ B _build/

402
ocaml/qp_tunnel.ml Normal file
View File

@ -0,0 +1,402 @@
open Qputils
open Qptypes
type ezfio_or_address = EZFIO of string | ADDRESS of string
type req_or_sub = REQ | SUB
let localport = 42379
let () =
let open Command_line in
begin
"Creates an ssh tunnel for using slaves on another network. Launch a server on the front-end node of the cluster on which the master process runs. Then start a client ont the front-end node of the distant cluster."
|> set_footer_doc ;
[ { short='g' ; long="get-input" ; opt=Optional ;
doc="Downloads the EZFIO directory." ;
arg=Without_arg; } ;
anonymous
"(EZFIO_DIR|ADDRESS)"
Mandatory
"EZFIO directory or address.";
] |> set_specs
end;
let arg =
let x =
match Command_line.anon_args () with
| [x] -> x
| _ -> begin
Command_line.help () ;
failwith "EZFIO_FILE or ADDRESS is missing"
end
in
if Sys.file_exists x && Sys.is_directory x then
EZFIO x
else
ADDRESS x
in
let localhost =
Lazy.force TaskServer.ip_address
in
let long_address =
match arg with
| ADDRESS x -> x
| EZFIO x ->
let ic =
Filename.concat (Qpackage.ezfio_work x) "qp_run_address"
|> open_in
in
let result =
input_line ic
|> String.trim
in
close_in ic;
result
in
let protocol, address, port =
match String.split_on_char ':' long_address with
| t :: a :: p :: [] -> t, a, int_of_string p
| _ -> failwith @@
Printf.sprintf "%s : Malformed address" long_address
in
let zmq_context =
Zmq.Context.create ()
in
(** Check availability of the ports *)
let localport =
let dummy_socket =
Zmq.Socket.create zmq_context Zmq.Socket.rep
in
let rec try_new_port port_number =
try
List.iter (fun i ->
let address =
Printf.sprintf "tcp://%s:%d" localhost (port_number+i)
in
Zmq.Socket.bind dummy_socket address;
Zmq.Socket.unbind dummy_socket address
) [ 0;1;2;3;4;5;6;7;8;9 ] ;
port_number
with
| Unix.Unix_error _ -> try_new_port (port_number+100)
in
let result =
try_new_port localport
in
Zmq.Socket.close dummy_socket;
result
in
let create_socket sock_type bind_or_connect addr =
let socket =
Zmq.Socket.create zmq_context sock_type
in
let () =
try
bind_or_connect socket addr
with
| _ -> failwith @@
Printf.sprintf "Unable to establish connection to %s." addr
in
socket
in
(* Handle termination *)
let run_status = ref true in
let handler =
Sys.Signal_handle (fun signum ->
run_status := false;
Sys.set_signal signum Sys.Signal_default
)
in
Sys.set_signal Sys.sigusr1 handler;
Sys.set_signal Sys.sigint handler;
let new_thread req_or_sub addr_in addr_out =
let socket_in, socket_out =
match req_or_sub with
| REQ ->
create_socket Zmq.Socket.rep Zmq.Socket.bind addr_in,
create_socket Zmq.Socket.req Zmq.Socket.connect addr_out
| SUB ->
create_socket Zmq.Socket.sub Zmq.Socket.connect addr_in,
create_socket Zmq.Socket.pub Zmq.Socket.bind addr_out
in
if req_or_sub = SUB then
Zmq.Socket.subscribe socket_in "";
let action =
match req_or_sub with
| REQ -> (fun () ->
Zmq.Socket.recv_all socket_in |> Zmq.Socket.send_all socket_out;
Zmq.Socket.recv_all socket_out |> Zmq.Socket.send_all socket_in )
| SUB -> (fun () ->
Zmq.Socket.recv_all socket_in |> Zmq.Socket.send_all socket_out)
in
let pollitem =
Zmq.Poll.mask_of
[| (socket_in, Zmq.Poll.In) |]
in
while !run_status do
let polling =
Zmq.Poll.poll ~timeout:1000 pollitem
in
match polling.(0) with
| Some Zmq.Poll.In -> action ()
| None -> ()
| Some Zmq.Poll.In_out
| Some Zmq.Poll.Out -> ()
done;
Zmq.Socket.close socket_in;
Zmq.Socket.close socket_out;
in
let ocaml_thread =
let addr_out =
Printf.sprintf "tcp:%s:%d" address port
in
let addr_in =
Printf.sprintf "tcp://*:%d" localport
in
let f () =
new_thread REQ addr_in addr_out
in
(Thread.create f) ()
in
Printf.printf "Connect to:\ntcp://%s:%d\n%!" localhost localport;
let fortran_thread =
let addr_out =
Printf.sprintf "tcp:%s:%d" address (port+2)
in
let addr_in =
Printf.sprintf "tcp://*:%d" (localport+2)
in
let f () =
new_thread REQ addr_in addr_out
in
(Thread.create f) ()
in
let pub_thread =
let addr_in =
Printf.sprintf "tcp:%s:%d" address (port+1)
in
let addr_out =
Printf.sprintf "tcp://*:%d" (localport+1)
in
let f () =
new_thread SUB addr_in addr_out
in
(Thread.create f) ()
in
let input_thread =
let f () =
let addr_out =
match arg with
| EZFIO _ -> None
| ADDRESS _ -> Some (
Printf.sprintf "tcp:%s:%d" address (port+9) )
in
let addr_in =
Printf.sprintf "tcp://*:%d" (localport+9)
in
let socket_in =
create_socket Zmq.Socket.rep Zmq.Socket.bind addr_in
in
let socket_out =
match addr_out with
| Some addr_out -> Some (
create_socket Zmq.Socket.req Zmq.Socket.connect addr_out)
| None -> None
in
let temp_file =
Filename.temp_file "qp_tunnel" ".tar.gz"
in
let get_ezfio_filename () =
match arg with
| EZFIO x -> x
| ADDRESS _ ->
begin
match socket_out with
| None -> assert false
| Some socket_out -> (
Zmq.Socket.send socket_out "get_ezfio_filename" ;
Zmq.Socket.recv socket_out
)
end
in
let get_input () =
match arg with
| EZFIO x ->
begin
Printf.sprintf "tar -zcf %s %s" temp_file x
|> Sys.command |> ignore;
let fd =
Unix.openfile temp_file [Unix.O_RDONLY] 0o640
in
let len =
Unix.lseek fd 0 Unix.SEEK_END
in
ignore @@ Unix.lseek fd 0 Unix.SEEK_SET ;
let bstr =
Unix.map_file fd Bigarray.char
Bigarray.c_layout false [| len |]
|> Bigarray.array1_of_genarray
in
let result =
String.init len (fun i -> bstr.{i}) ;
in
Unix.close fd;
Sys.remove temp_file;
result
end
| ADDRESS _ ->
begin
match socket_out with
| None -> assert false
| Some socket_out -> (
Zmq.Socket.send socket_out "get_input" ;
Zmq.Socket.recv socket_out
)
end
in
let () =
match socket_out with
| None -> ()
| Some socket_out ->
Zmq.Socket.send socket_out "test";
Printf.printf "Communication [ %s ]\n%!" (Zmq.Socket.recv socket_out);
in
(* Download input if asked *)
if Command_line.get_bool "get-input" then
begin
match arg with
| EZFIO _ -> ()
| ADDRESS _ ->
begin
Printf.printf "Getting input... %!";
let ezfio_filename =
get_ezfio_filename ()
in
Printf.printf "%s%!" ezfio_filename;
let oc =
open_out temp_file
in
get_input ()
|> output_string oc;
close_out oc;
Printf.sprintf "tar -zxf %s" temp_file
|> Sys.command |> ignore ;
let oc =
Filename.concat (Qpackage.ezfio_work ezfio_filename) "qp_run_address"
|> open_out
in
Printf.fprintf oc "tcp://%s:%d\n" localhost localport;
close_out oc;
Printf.printf " ...done\n%!"
end
end;
(* Main loop *)
let pollitem =
Zmq.Poll.mask_of [| (socket_in, Zmq.Poll.In) |]
in
let action () =
match Zmq.Socket.recv socket_in with
| "get_input" -> get_input ()
|> Zmq.Socket.send socket_in
| "get_ezfio_filename" -> get_ezfio_filename ()
|> Zmq.Socket.send socket_in
| "test" -> Zmq.Socket.send socket_in "OK"
| x -> Printf.sprintf "Message '%s' not understood" x
|> Zmq.Socket.send socket_in
in
Printf.printf "Ready\n%!";
while !run_status do
let polling =
Zmq.Poll.poll ~timeout:1000 pollitem
in
match polling.(0) with
| Some Zmq.Poll.In -> action ()
| None -> ()
| Some Zmq.Poll.In_out
| Some Zmq.Poll.Out -> ()
done;
let () =
match socket_out with
| Some socket_out -> Zmq.Socket.close socket_out
| None -> ()
in
Zmq.Socket.close socket_in
in
(Thread.create f) ()
in
(* Termination *)
Thread.join input_thread;
Thread.join fortran_thread;
Thread.join pub_thread;
Thread.join ocaml_thread;
Zmq.Context.terminate zmq_context;
Printf.printf "qp_tunnel exited properly.\n"

View File

@ -301,7 +301,6 @@ function new_zmq_push_socket(thread)
END_DOC END_DOC
integer, intent(in) :: thread integer, intent(in) :: thread
integer :: rc integer :: rc
character*(8), external :: zmq_port
integer(ZMQ_PTR) :: new_zmq_push_socket integer(ZMQ_PTR) :: new_zmq_push_socket
call omp_set_lock(zmq_lock) call omp_set_lock(zmq_lock)
@ -425,7 +424,6 @@ subroutine end_zmq_pair_socket(zmq_socket_pair)
END_DOC END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_socket_pair integer(ZMQ_PTR), intent(in) :: zmq_socket_pair
integer :: rc integer :: rc
character*(8), external :: zmq_port
call omp_set_lock(zmq_lock) call omp_set_lock(zmq_lock)
rc = f77_zmq_close(zmq_socket_pair) rc = f77_zmq_close(zmq_socket_pair)
@ -445,7 +443,6 @@ subroutine end_zmq_pull_socket(zmq_socket_pull)
END_DOC END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
integer :: rc integer :: rc
character*(8), external :: zmq_port
! rc = f77_zmq_setsockopt(zmq_socket_pull,ZMQ_LINGER,0,4) ! rc = f77_zmq_setsockopt(zmq_socket_pull,ZMQ_LINGER,0,4)
! if (rc /= 0) then ! if (rc /= 0) then
@ -472,7 +469,6 @@ subroutine end_zmq_push_socket(zmq_socket_push,thread)
integer, intent(in) :: thread integer, intent(in) :: thread
integer(ZMQ_PTR), intent(in) :: zmq_socket_push integer(ZMQ_PTR), intent(in) :: zmq_socket_push
integer :: rc integer :: rc
character*(8), external :: zmq_port
rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4)
if (rc /= 0) then if (rc /= 0) then
@ -1032,7 +1028,6 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
! Terminate the socket from the application to qp_run ! Terminate the socket from the application to qp_run
END_DOC END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
character*(8), external :: zmq_port
integer :: rc integer :: rc
rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4) rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4)