Introduced IPC for qp_run and sub sockets

This commit is contained in:
Anthony Scemama 2016-10-12 12:09:25 +02:00
parent e356e97e16
commit 5f83602578
2 changed files with 22 additions and 42 deletions

View File

@ -501,7 +501,7 @@ subroutine davidson_miniserver_end()
integer rc
character*(64) buf
address = trim(qp_run_address)//':11223'
address = trim(qp_run_address_tcp)//':11223'
requester = f77_zmq_socket(zmq_context, ZMQ_REQ)
rc = f77_zmq_connect(requester,address)
@ -520,7 +520,7 @@ subroutine davidson_miniserver_get()
character*(20) buffer
integer rc
address = trim(qp_run_address)//':11223'
address = trim(qp_run_address_tcp)//':11223'
requester = f77_zmq_socket(zmq_context, ZMQ_REQ)
rc = f77_zmq_connect(requester,address)

View File

@ -17,6 +17,8 @@ END_PROVIDER
BEGIN_PROVIDER [ character*(128), qp_run_address ]
&BEGIN_PROVIDER [ character*(128), qp_run_address_ipc ]
&BEGIN_PROVIDER [ character*(128), qp_run_address_tcp ]
&BEGIN_PROVIDER [ integer, zmq_port_start ]
use f77_zmq
implicit none
@ -34,19 +36,22 @@ END_PROVIDER
integer :: i
do i=len(buffer),1,-1
if ( buffer(i:i) == ':') then
qp_run_address = trim(buffer(1:i-1))
qp_run_address_tcp = trim(buffer(1:i-1))
read(buffer(i+1:), *) zmq_port_start
exit
endif
enddo
qp_run_address_ipc = 'ipc:///tmp/qp_run'
qp_run_address = qp_run_address_ipc
END_PROVIDER
BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_pair_inproc_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_push_tcp_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_push_inproc_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_sub_tcp_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_sub_address ]
use f77_zmq
implicit none
BEGIN_DOC
@ -54,12 +59,12 @@ END_PROVIDER
END_DOC
character*(8), external :: zmq_port
zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' '
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' '
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' '
zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' '
zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address
zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' '
zmq_socket_push_tcp_address = trim(qp_run_address_tcp)//':'//zmq_port(2)//' '
zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address
zmq_socket_sub_address = trim(qp_run_address)//':'//zmq_port(1)//' '
! /!\ Don't forget to change subroutine reset_zmq_addresses
END_PROVIDER
@ -72,12 +77,13 @@ subroutine reset_zmq_addresses
END_DOC
character*(8), external :: zmq_port
zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' '
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' '
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' '
zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' '
zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address
zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' '
zmq_socket_push_tcp_address = trim(qp_run_address_tcp)//':'//zmq_port(2)//' '
zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address
zmq_socket_sub_address = trim(qp_run_address)//':'//zmq_port(1)//' '
end
@ -105,6 +111,7 @@ subroutine switch_qp_run_to_master
exit
endif
enddo
qp_run_address_tcp = qp_run_address
call reset_zmq_addresses
end
@ -367,7 +374,7 @@ function new_zmq_sub_socket()
stop 'Unable to subscribe new_zmq_sub_socket'
endif
rc = f77_zmq_connect(new_zmq_sub_socket, zmq_socket_sub_tcp_address)
rc = f77_zmq_connect(new_zmq_sub_socket, zmq_socket_sub_address)
if (rc /= 0) then
stop 'Unable to connect new_zmq_sub_socket'
endif
@ -403,17 +410,6 @@ subroutine end_zmq_pair_socket(zmq_socket_pair)
character*(8), external :: zmq_port
rc = f77_zmq_unbind(zmq_socket_pair,zmq_socket_pair_inproc_address)
! if (rc /= 0) then
! print *, rc
! print *, irp_here, 'f77_zmq_unbind(zmq_socket_pair,zmq_socket_pair_inproc_address)'
! stop 'error'
! endif
! rc = f77_zmq_setsockopt(zmq_socket_pair,0ZMQ_LINGER,1000,4)
! if (rc /= 0) then
! stop 'Unable to set ZMQ_LINGER on zmq_socket_pair'
! endif
rc = f77_zmq_close(zmq_socket_pair)
if (rc /= 0) then
print *, 'f77_zmq_close(zmq_socket_pair)'
@ -433,26 +429,7 @@ subroutine end_zmq_pull_socket(zmq_socket_pull)
character*(8), external :: zmq_port
rc = f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_inproc_address)
! if (rc /= 0) then
! print *, rc
! print *, irp_here, 'f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_inproc_address)'
! stop 'error'
! endif
rc = f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_tcp_address)
! if (rc /= 0) then
! print *, rc
! print *, irp_here, 'f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_tcp_address)'
! stop 'error'
! endif
! call sleep(1) ! see https://github.com/zeromq/libzmq/issues/1922
! rc = f77_zmq_setsockopt(zmq_socket_pull,ZMQ_LINGER,10000,4)
! if (rc /= 0) then
! stop 'Unable to set ZMQ_LINGER on zmq_socket_pull'
! endif
rc = f77_zmq_close(zmq_socket_pull)
if (rc /= 0) then
print *, 'f77_zmq_close(zmq_socket_pull)'
@ -784,6 +761,9 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
else if (trim(reply) == 'terminate') then
task_id = 0
task = 'terminate'
else if (trim(message) == 'error No job is running') then
task_id = 0
task = 'terminate'
else
print *, 'Unable to get the next task'
print *, trim(message)