10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-06-18 11:15:28 +02:00

Task Server

This commit is contained in:
Anthony Scemama 2015-12-07 22:03:33 +01:00
parent 989b087f59
commit cc56ac8d3a
12 changed files with 467 additions and 174 deletions

6
ocaml/.gitignore vendored
View File

@ -39,9 +39,15 @@ test_excitation
test_excitation.byte
test_gto
test_gto.byte
test_message
test_message.byte
test_mo_label
test_mo_label.byte
test_molecule
test_molecule.byte
test_point3d
test_point3d.byte
test_queuing_system
test_queuing_system.byte
test_task_server
test_task_server.byte

View File

@ -106,6 +106,27 @@ end = struct
(Id.Client.to_int x.client_id)
end
module DisconnectReply_msg : sig
type t =
{ finished: bool ;
state: State.t ;
}
val create : state:State.t -> finished:bool -> t
val to_string : t -> string
end = struct
type t =
{ finished: bool;
state: State.t ;
}
let create ~state ~finished =
{ state ; finished }
let to_string x =
Printf.sprintf "disconnect_reply %s %d"
(State.to_string x.state)
(if x.finished then 1 else 0)
end
(** AddTask : Add a new task to the queue *)
module AddTask_msg : sig
@ -245,6 +266,7 @@ type t =
| Connect of Connect_msg.t
| ConnectReply of ConnectReply_msg.t
| Disconnect of Disconnect_msg.t
| DisconnectReply of DisconnectReply_msg.t
| GetTask of GetTask_msg.t
| GetTaskReply of GetTaskReply_msg.t
| AddTask of AddTask_msg.t
@ -289,6 +311,7 @@ let to_string = function
| Connect x -> Connect_msg.to_string x
| ConnectReply x -> ConnectReply_msg.to_string x
| Disconnect x -> Disconnect_msg.to_string x
| DisconnectReply x -> DisconnectReply_msg.to_string x
| GetTask x -> GetTask_msg.to_string x
| GetTaskReply x -> GetTaskReply_msg.to_string x
| AddTask x -> AddTask_msg.to_string x

View File

@ -66,19 +66,15 @@ let ip_address = lazy (
end
)
(** Initial ZeroMQ port :
Random port number between 49152 and 65535 *)
let port = lazy (
1024 + (Random.int (49151-1024)) )
let stop () =
let stop ~port =
let zmq_context =
ZMQ.Context.create ()
in
let req_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.req
and address =
Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) (Lazy.force port)
Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port
in
ZMQ.Socket.connect req_socket address;
@ -100,7 +96,7 @@ let stop () =
(** Run the task server *)
let run () =
let run ~port =
let zmq_context =
ZMQ.Context.create ()
@ -109,7 +105,7 @@ let run () =
let rep_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.rep
and address =
Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) (Lazy.force port)
Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port
in
bind_socket "REP" rep_socket address;
@ -118,6 +114,8 @@ let run () =
[| (rep_socket, ZMQ.Poll.In) |]
in
Printf.printf "Task server running : %s\n%!" address;
(** State variables *)
let q = ref
(Queuing_system.create ())
@ -194,7 +192,13 @@ let run () =
Queuing_system.del_client ~client_id:c !q
in
q := new_q;
Message.to_string ok
let finished =
Queuing_system.number_of_queued !q +
Queuing_system.number_of_running !q = 0
in
Message.DisconnectReply (Message.DisconnectReply_msg.create
~state ~finished)
|> Message.to_string
|> ZMQ.Socket.send rep_socket
and add_task state msg =
@ -257,8 +261,12 @@ let run () =
let message =
Message.of_string raw_message
in
Printf.printf "%s\n%!" (Message.to_string message);
Printf.printf "%s\n%!" (Queuing_system.to_string !q);
(*
Printf.printf "%d %d : %s\n%!"
(Queuing_system.number_of_queued !q)
(Queuing_system.number_of_running !q)
(Message.to_string message);
Printf.printf "%s\n%!" (Queuing_system.to_string !q); *)
match (state, message) with
| _ , Message.Terminate _ -> terminate ()
| None , Message.Newjob x -> newjob x
@ -280,7 +288,9 @@ let run () =
ZMQ.Socket.close rep_socket
(*
let () =
Printf.printf "export QP_RUN_ADDRESS=tcp://%s:%d\n%!" (Lazy.force ip_address) (Lazy.force port)
*)

12
ocaml/create_git_sha1.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/bash
SHA1=$(git log -1 | head -1 | cut -d ' ' -f 2)
DATE=$(git log -1 | grep Date | cut -d ':' -f 2)
MESSAGE=$(git log -1 | tail -1)
cat << EOF > Git.ml
open Core.Std
let sha1 = "$SHA1" |> String.strip
let date = "$DATE" |> String.strip
let message = "$MESSAGE" |> String.strip
EOF

View File

@ -17,12 +17,12 @@ type keyword =
| Electrons
| Mo_basis
| Nuclei
| Perturbation
| Hartree_fock
| Pseudo
| Integrals_bielec
| Determinants
| Integrals_bielec
| Pseudo
| Perturbation
| Properties
| Hartree_fock
;;
@ -32,12 +32,12 @@ let keyword_to_string = function
| Electrons -> "Electrons"
| Mo_basis -> "MO basis"
| Nuclei -> "Molecule"
| Perturbation -> "Perturbation"
| Hartree_fock -> "Hartree_fock"
| Pseudo -> "Pseudo"
| Integrals_bielec -> "Integrals_bielec"
| Determinants -> "Determinants"
| Integrals_bielec -> "Integrals_bielec"
| Pseudo -> "Pseudo"
| Perturbation -> "Perturbation"
| Properties -> "Properties"
| Hartree_fock -> "Hartree_fock"
;;
@ -86,18 +86,18 @@ let get s =
f Ao_basis.(read, to_rst)
| Determinants_by_hand ->
f Determinants_by_hand.(read, to_rst)
| Perturbation ->
f Perturbation.(read, to_rst)
| Hartree_fock ->
f Hartree_fock.(read, to_rst)
| Pseudo ->
f Pseudo.(read, to_rst)
| Integrals_bielec ->
f Integrals_bielec.(read, to_rst)
| Determinants ->
f Determinants.(read, to_rst)
| Integrals_bielec ->
f Integrals_bielec.(read, to_rst)
| Pseudo ->
f Pseudo.(read, to_rst)
| Perturbation ->
f Perturbation.(read, to_rst)
| Properties ->
f Properties.(read, to_rst)
| Hartree_fock ->
f Hartree_fock.(read, to_rst)
end
with
| Sys_error msg -> (Printf.eprintf "Info: %s\n%!" msg ; "")
@ -135,12 +135,12 @@ let set str s =
in
let open Input in
match s with
| Perturbation -> write Perturbation.(of_rst, write) s
| Hartree_fock -> write Hartree_fock.(of_rst, write) s
| Pseudo -> write Pseudo.(of_rst, write) s
| Integrals_bielec -> write Integrals_bielec.(of_rst, write) s
| Determinants -> write Determinants.(of_rst, write) s
| Integrals_bielec -> write Integrals_bielec.(of_rst, write) s
| Pseudo -> write Pseudo.(of_rst, write) s
| Perturbation -> write Perturbation.(of_rst, write) s
| Properties -> write Properties.(of_rst, write) s
| Hartree_fock -> write Hartree_fock.(of_rst, write) s
| Electrons -> write Electrons.(of_rst, write) s
| Determinants_by_hand -> write Determinants_by_hand.(of_rst, write) s
| Nuclei -> write Nuclei.(of_rst, write) s
@ -169,7 +169,7 @@ let run check_only ezfio_filename =
(* Open EZFIO *)
if (not (Sys.file_exists_exn ezfio_filename)) then
failwith (ezfio_filename^" does not exist");
failwith (ezfio_filename^" does not exists");
Ezfio.set_file ezfio_filename;
@ -188,12 +188,12 @@ let run check_only ezfio_filename =
Nuclei ;
Ao_basis;
Electrons ;
Perturbation ;
Hartree_fock ;
Pseudo ;
Integrals_bielec ;
Determinants ;
Integrals_bielec ;
Pseudo ;
Perturbation ;
Properties ;
Hartree_fock ;
Mo_basis;
Determinants_by_hand ;
]

View File

@ -17,13 +17,38 @@ let run exe ezfio_file =
if (not (List.exists ~f:(fun (x,_) -> x = exe) executables)) then
failwith ("Executable "^exe^" not found");
Printf.printf "%s\n" (Time.to_string time_start);
Printf.printf "===============\nQuantum Package\n===============\n\n";
Printf.printf "Date : %s\n\n%!" (Time.to_string time_start);
Printf.printf "Git Commit: %s\n" Git.message;
Printf.printf "Git Date : %s\n" Git.date;
Printf.printf "Git SHA1 : %s\n" Git.sha1;
Printf.printf "\n\n%!";
(** Check input *)
match (Sys.command ("qp_edit -c "^ezfio_file)) with
| 0 -> ()
| i -> failwith "Error: Input inconsistent\n";
;
(** Start task server *)
let port_number =
12345
in
let address =
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number
in
let task_thread =
let thread =
Thread.create ( fun () ->
TaskServer.run port_number )
in
thread ();
in
Unix.putenv ~key:"QP_RUN_ADDRESS" ~data:address;
(** Run executable *)
let exe =
match (List.find ~f:(fun (x,_) -> x = exe) executables) with
| None -> assert false
@ -34,6 +59,9 @@ let run exe ezfio_file =
| i -> Printf.printf "Program exited with code %d.\n%!" i;
;
TaskServer.stop ~port:port_number;
Thread.join task_thread;
let duration = Time.diff (Time.now()) time_start
|> Core.Span.to_string in
Printf.printf "Wall time : %s\n\n" duration;
@ -60,6 +88,7 @@ Executes a Quantum Package binary file among these:\n\n"
(fun exe ezfio_file () ->
run exe ezfio_file
)
|> Command.run
|> Command.run ~version: Git.sha1 ~build_info: Git.message
;;

View File

@ -1,5 +1,5 @@
open Core
let () =
TaskServer.run ()
TaskServer.run 12345

View File

@ -11,7 +11,7 @@ def main():
def send(msg,expected):
print "Send : ", msg
socket.send(msg)
print " -> ", socket.send(msg)
reply = socket.recv()
print "Reply : ", reply
print ""
@ -28,7 +28,7 @@ def main():
send("connect tcp","connect_reply ao_integrals 1 tcp://130.120.229.139:12345")
send("connect inproc","connect_reply ao_integrals 2 inproc://ao_integrals")
send("disconnect ao_integrals 3","error Queuing_system.ml:65:2 : disconnect ao_integrals 3")
send("disconnect ao_integrals 2","ok")
send("disconnect ao_integrals 2","disconnect_reply ao_integrals 1")
send("connect inproc","connect_reply ao_integrals 3 inproc://ao_integrals")
for i in range(10):

View File

@ -348,7 +348,7 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ]
real(integral_kind),allocatable :: buffer_value(:)
integer :: n_integrals, rc
integer :: jl_pairs(2,ao_num*(ao_num+1)/2), kk, m, j1, i1, lmax
integer :: kk, m, j1, i1, lmax
integral = ao_bielec_integral(1,1,1,1)
@ -368,55 +368,23 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ]
call wall_time(wall_1)
call cpu_time(cpu_1)
integer(ZMQ_PTR) :: zmq_socket_rep_inproc, zmq_socket_push_inproc
zmq_socket_rep_inproc = f77_zmq_socket(zmq_context, ZMQ_REP)
rc = f77_zmq_bind(zmq_socket_rep_inproc, 'inproc://req_rep')
if (rc /= 0) then
stop 'Unable to connect zmq_socket_rep_inproc'
endif
integer(ZMQ_PTR) :: thread(0:nproc)
external :: ao_bielec_integrals_in_map_slave, ao_bielec_integrals_in_map_collector
rc = pthread_create( thread(0), ao_bielec_integrals_in_map_collector )
! Create client threads
do i=1,nproc
rc = pthread_create( thread(i), ao_bielec_integrals_in_map_slave )
enddo
character*(64) :: message_string
do l = ao_num, 1, -1
rc = f77_zmq_recv( zmq_socket_rep_inproc, message_string, 64, 0)
print *, l
! TODO : error handling
ASSERT (rc >= 0)
ASSERT (message == 'get_ao_integrals')
rc = f77_zmq_send( zmq_socket_rep_inproc, l, 4, 0)
enddo
do i=1,nproc
rc = f77_zmq_recv( zmq_socket_rep_inproc, message_string, 64, 0)
! TODO : error handling
ASSERT (rc >= 0)
ASSERT (message == 'get_ao_integrals')
rc = f77_zmq_send( zmq_socket_rep_inproc, 0, 4, 0)
enddo
! TODO terminer thread(0)
rc = f77_zmq_unbind(zmq_socket_rep_inproc, 'inproc://req_rep')
do i=1,nproc
rc = pthread_join( thread(i) )
enddo
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
call new_parallel_job(zmq_to_qp_run_socket,'ao_integrals')
zmq_socket_push_inproc = f77_zmq_socket(zmq_context, ZMQ_PUSH)
rc = f77_zmq_connect(zmq_socket_push_inproc, 'inproc://push_pull')
if (rc /= 0) then
stop 'Unable to connect zmq_socket_push_inproc'
endif
rc = f77_zmq_send( zmq_socket_push_inproc, -1, 4, ZMQ_SNDMORE)
rc = f77_zmq_send( zmq_socket_push_inproc, 0_key_kind, key_kind, ZMQ_SNDMORE)
rc = f77_zmq_send( zmq_socket_push_inproc, 0_integral_kind, integral_kind, 0)
character*(32) :: task
do l=1,ao_num
do j = 1, l
if (ao_overlap_abs(j,l) < ao_integrals_threshold) then
cycle
endif
write(task,*) j, l
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
enddo
enddo
external :: ao_bielec_integrals_in_map_slave_inproc, ao_bielec_integrals_in_map_collector
call new_parallel_threads(ao_bielec_integrals_in_map_slave_inproc, ao_bielec_integrals_in_map_collector)
rc = pthread_join( thread(0) )
call end_parallel_job(zmq_to_qp_run_socket,'ao_integrals')
print*, 'Sorting the map'
call map_sort(ao_integrals_map)

View File

@ -1,4 +1,20 @@
subroutine ao_bielec_integrals_in_map_slave
subroutine ao_bielec_integrals_in_map_slave_tcp
implicit none
BEGIN_DOC
! Computes a buffer of integrals
END_DOC
call ao_bielec_integrals_in_map_slave(0)
end
subroutine ao_bielec_integrals_in_map_slave_inproc
implicit none
BEGIN_DOC
! Computes a buffer of integrals
END_DOC
call ao_bielec_integrals_in_map_slave(1)
end
subroutine ao_bielec_integrals_in_map_slave(thread)
use map_module
use f77_zmq
implicit none
@ -6,51 +22,61 @@ subroutine ao_bielec_integrals_in_map_slave
! Computes a buffer of integrals
END_DOC
integer, intent(in) :: thread
integer :: j,l,n_integrals
integer :: rc
character*(8), external :: zmq_port
integer(ZMQ_PTR) :: zmq_socket_req_inproc, zmq_socket_push_inproc
real(integral_kind), allocatable :: buffer_value(:)
integer(key_kind), allocatable :: buffer_i(:)
integer :: worker_id, task_id
character*(512) :: task
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
integer(ZMQ_PTR) :: zmq_socket_push
zmq_socket_push = f77_zmq_socket(zmq_context, ZMQ_PUSH)
if (thread == 1) then
rc = f77_zmq_connect(zmq_socket_push, trim(zmq_socket_pull_inproc_address))
else
rc = f77_zmq_connect(zmq_socket_push, trim(zmq_socket_push_tcp_address))
endif
if (rc /= 0) then
stop 'Unable to connect zmq_socket_push_tcp'
endif
allocate ( buffer_i(ao_num*ao_num), buffer_value(ao_num*ao_num) )
! Sockets
zmq_socket_req_inproc = f77_zmq_socket(zmq_context, ZMQ_REQ)
rc = f77_zmq_connect(zmq_socket_req_inproc, 'inproc://req_rep')
if (rc /= 0) then
stop 'Unable to connect zmq_socket_req_inproc'
endif
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
zmq_socket_push_inproc = f77_zmq_socket(zmq_context, ZMQ_PUSH)
rc = f77_zmq_connect(zmq_socket_push_inproc, 'inproc://push_pull')
if (rc /= 0) then
stop 'Unable to connect zmq_socket_push_inproc'
endif
rc = f77_zmq_send( zmq_socket_req_inproc, 'get_ao_integrals', 16, 0)
rc = f77_zmq_recv( zmq_socket_req_inproc, l, 4, 0)
do while (l > 0)
rc = f77_zmq_send( zmq_socket_req_inproc, 'get_ao_integrals', 16, 0)
do j = 1, l
if (ao_overlap_abs(j,l) < ao_integrals_threshold) then
cycle
endif
call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value)
rc = f77_zmq_send( zmq_socket_push_inproc, n_integrals, 4, ZMQ_SNDMORE)
rc = f77_zmq_send( zmq_socket_push_inproc, buffer_i, key_kind*n_integrals, ZMQ_SNDMORE)
rc = f77_zmq_send( zmq_socket_push_inproc, buffer_value, integral_kind*n_integrals, 0)
enddo
rc = f77_zmq_recv( zmq_socket_req_inproc, l, 4, 0)
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task)
if (task_id == 0) then
exit
endif
read(task,*) j, l
call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value)
rc = f77_zmq_send( zmq_socket_push, n_integrals, 4, ZMQ_SNDMORE)
rc = f77_zmq_send( zmq_socket_push, buffer_i, key_kind*n_integrals, ZMQ_SNDMORE)
rc = f77_zmq_send( zmq_socket_push, buffer_value, integral_kind*n_integrals, 0)
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id)
enddo
deallocate( buffer_i, buffer_value )
rc = f77_zmq_disconnect(zmq_socket_req_inproc, 'inproc://req_rep')
integer :: finished
call disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id,finished)
if (finished /= 0) then
rc = f77_zmq_send( zmq_socket_push, -1, 4, 0)
endif
rc = f77_zmq_disconnect(zmq_socket_push,trim(zmq_socket_push_tcp_address))
rc = f77_zmq_close(zmq_socket_push)
end
@ -64,36 +90,24 @@ subroutine ao_bielec_integrals_in_map_collector
integer :: j,l,n_integrals
integer :: rc
character*(8), external :: zmq_port
integer(ZMQ_PTR) :: zmq_socket_pull_inproc
real(integral_kind), allocatable :: buffer_value(:)
integer(key_kind), allocatable :: buffer_i(:)
allocate ( buffer_i(ao_num*ao_num), buffer_value(ao_num*ao_num) )
zmq_socket_pull_inproc = f77_zmq_socket(zmq_context, ZMQ_PULL)
rc = f77_zmq_bind(zmq_socket_pull_inproc, 'inproc://push_pull')
if (rc /= 0) then
stop 'Unable to connect zmq_socket_pull_inproc'
endif
n_integrals = 0
do while (n_integrals >= 0)
rc = f77_zmq_recv( zmq_socket_pull_inproc, n_integrals, 4, 0)
if (n_integrals > -1) then
rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_i, key_kind*n_integrals, 0)
rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_value, integral_kind*n_integrals, 0)
rc = f77_zmq_recv( zmq_socket_pull, n_integrals, 4, 0)
if (n_integrals >= 0) then
rc = f77_zmq_recv( zmq_socket_pull, buffer_i, key_kind*n_integrals, 0)
rc = f77_zmq_recv( zmq_socket_pull, buffer_value, integral_kind*n_integrals, 0)
call insert_into_ao_integrals_map(n_integrals,buffer_i,buffer_value)
else
rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_i, key_kind, 0)
rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_value, integral_kind, 0)
endif
enddo
rc = f77_zmq_unbind(zmq_socket_pull_inproc, 'inproc://push_pull')
deallocate( buffer_i, buffer_value )
end

View File

@ -1 +1 @@
Utils

View File

@ -20,10 +20,10 @@ END_PROVIDER
character*(128) :: buffer
call getenv('QP_RUN_ADDRESS',buffer)
if (trim(buffer) == '') then
stop 'QP_RUN_ADDRESS environment variable not defined'
print *, 'This run should be started with the qp_run command'
stop -1
endif
print *, trim(buffer)
integer :: i
do i=len(buffer),1,-1
if ( buffer(i:i) == ':') then
@ -44,62 +44,293 @@ function zmq_port(ishift)
end
BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_to_qp_run_socket ]
function new_zmq_to_qp_run_socket()
implicit none
BEGIN_DOC
! Socket on which the qp_run process replies
END_DOC
integer :: rc
zmq_to_qp_run_socket = f77_zmq_socket(zmq_context, ZMQ_REQ)
rc = f77_zmq_connect(zmq_to_qp_run_socket, trim(qp_run_address))
character*(8), external :: zmq_port
integer(ZMQ_PTR) :: new_zmq_to_qp_run_socket
new_zmq_to_qp_run_socket = f77_zmq_socket(zmq_context, ZMQ_REQ)
rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0)))
if (rc /= 0) then
stop 'Unable to connect zmq_to_qp_run_socket'
stop 'Unable to connect new_zmq_to_qp_run_socket'
endif
integer :: i
i=4
rc = f77_zmq_setsockopt(zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 120000, i)
rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 120000, i)
if (rc /= 0) then
stop 'Unable to set send timout in zmq_to_qp_run_socket'
stop 'Unable to set send timout in new_zmq_to_qp_run_socket'
endif
rc = f77_zmq_setsockopt(zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 120000, i)
rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 120000, i)
if (rc /= 0) then
stop 'Unable to set recv timout in zmq_to_qp_run_socket'
stop 'Unable to set recv timout in new_zmq_to_qp_run_socket'
endif
END_PROVIDER
end
BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_socket_push ]
implicit none
BEGIN_DOC
! Socket on which to push the results (1)
END_DOC
integer :: rc
character*(64) :: address
character*(8), external :: zmq_port
zmq_socket_push = f77_zmq_socket(zmq_context, ZMQ_PUSH)
address = trim(qp_run_address)//':'//zmq_port(1)
rc = f77_zmq_connect(zmq_socket_push, trim(address))
if (rc /= 0) then
stop 'Unable to connect zmq_socket_push'
endif
END_PROVIDER
BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_socket_pull ]
BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_socket_pull ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_push_tcp_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ]
implicit none
BEGIN_DOC
! Socket which pulls the results (2)
END_DOC
integer :: rc
character*(64) :: address
character*(8), external :: zmq_port
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(1)
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(1)
zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(1)
zmq_socket_pull = f77_zmq_socket(zmq_context, ZMQ_PULL)
address = 'tcp://*:'//zmq_port(2)
rc = f77_zmq_bind(zmq_socket_pull, trim(address))
rc = f77_zmq_bind(zmq_socket_pull, zmq_socket_pull_tcp_address)
rc = f77_zmq_bind(zmq_socket_pull, zmq_socket_pull_inproc_address)
if (rc /= 0) then
stop 'Unable to connect zmq_socket_pull'
stop 'Unable to bind zmq_socket_pull (tcp)'
endif
END_PROVIDER
BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_thread, (0:nproc) ]
&BEGIN_PROVIDER [ character*(128), zmq_state ]
implicit none
BEGIN_DOC
! Threads executing work through the ZeroMQ interface
END_DOC
zmq_thread = 0_ZMQ_PTR
zmq_state = ''
END_PROVIDER
subroutine new_parallel_job(zmq_to_qp_run_socket,name)
implicit none
BEGIN_DOC
! Start a new parallel job with name 'name'. The slave tasks execute subroutine 'slave'
END_DOC
character*(*), intent(in) :: name
character*(512) :: message
integer :: rc
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
message = 'new_job '//name//' '//zmq_socket_push_tcp_address//' '//zmq_socket_pull_inproc_address
rc = f77_zmq_send(zmq_to_qp_run_socket,message,len(trim(message)),0)
rc = f77_zmq_recv(zmq_to_qp_run_socket,message,510,0)
message = trim(message(1:rc))
if (message(1:2) /= 'ok') then
print *, 'Unable to start parallel job : '//name
stop 1
endif
zmq_state = name
SOFT_TOUCH zmq_state zmq_thread
end
subroutine new_parallel_threads(slave,collector)
implicit none
BEGIN_DOC
! Start a new parallel job with name 'name'. The slave tasks execute subroutine 'slave'
END_DOC
external :: slave, collector
integer :: i,rc
rc = pthread_create( zmq_thread(0), collector)
do i=1,nproc
rc = pthread_create( zmq_thread(i), slave )
enddo
SOFT_TOUCH zmq_thread zmq_state
end
subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
implicit none
BEGIN_DOC
! Connect to the task server and obtain the worker ID
END_DOC
integer, intent(out) :: worker_id
integer, intent(in) :: thread
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
character*(512) :: message
character*(128) :: reply, state, address
integer :: rc
if (thread == 1) then
rc = f77_zmq_send(zmq_to_qp_run_socket, "connect inproc", 14, 0)
else
rc = f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0)
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
read(message,*) reply, state, worker_id, address
if ( (trim(reply) /= 'connect_reply') .and. &
(trim(state) /= trim(zmq_state)) ) then
print *, 'Reply: ', trim(reply)
print *, 'State: ', trim(state), '/', trim(zmq_state)
print *, 'Address: ', trim(address)
stop -1
endif
end
subroutine disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id,finished)
implicit none
BEGIN_DOC
! Disconnect from the task server
END_DOC
integer, intent(in) :: worker_id
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer, intent(out) :: finished
integer :: rc
character*(64) :: message, reply, state
write(message,*) 'disconnect '//trim(zmq_state), worker_id
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
read(message,*) reply, state, finished
if ( (trim(reply) /= 'disconnect_reply').or. &
(trim(state) /= zmq_state) ) then
print *, 'Unable to disconnect'
print *, trim(message)
stop -1
endif
end
subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task)
implicit none
BEGIN_DOC
! Get a task from the task server
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
character*(*), intent(in) :: task
integer :: rc
character*(512) :: message
write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task)
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
if (trim(message) /= 'ok') then
print *, trim(task)
print *, 'Unable to add the next task'
stop -1
endif
end
subroutine task_done_to_taskserver(zmq_to_qp_run_socket,worker_id, task_id)
implicit none
BEGIN_DOC
! Get a task from the task server
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer, intent(in) :: worker_id, task_id
integer :: rc
character*(512) :: message
write(message,*) 'task_done '//trim(zmq_state), worker_id, task_id
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
if (trim(message) /= 'ok') then
print *, 'Unable to send task_done message'
stop -1
endif
end
subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
implicit none
BEGIN_DOC
! Get a task from the task server
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer, intent(in) :: worker_id
integer, intent(out) :: task_id
character*(512), intent(out) :: task
character*(512) :: message
character*(64) :: reply
integer :: rc
write(message,*) 'get_task '//trim(zmq_state), worker_id
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
read(message,*) reply
if (trim(reply) == 'get_task_reply') then
read(message,*) reply, task_id
rc = 15
do while (message(rc:rc) == ' ')
rc += 1
enddo
do while (message(rc:rc) /= ' ')
rc += 1
enddo
rc += 1
task = message(rc:)
else if (trim(reply) == 'terminate') then
task_id = 0
task = 'terminate'
else
print *, 'Unable to get the next task'
print *, trim(message)
stop -1
endif
end
subroutine end_parallel_job(zmq_to_qp_run_socket,name)
implicit none
BEGIN_DOC
! End a new parallel job with name 'name'. The slave tasks execute subroutine 'slave'
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
character*(*), intent(in) :: name
character*(512) :: message
integer :: i,rc
if (name /= zmq_state) then
stop 'Wrong end of job'
endif
do i=1,nproc
rc = pthread_join( zmq_thread(i) )
if (rc /= 0) then
print *, 'Unable to join thread : ', i
stop -1
endif
zmq_thread(i) = 0
enddo
zmq_state = 'None'
character*(8), external :: zmq_port
rc = f77_zmq_disconnect(zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0)))
rc = f77_zmq_close(zmq_to_qp_run_socket)
SOFT_TOUCH zmq_thread zmq_state
end