diff --git a/src/Davidson/davidson_parallel.irp.f b/src/Davidson/davidson_parallel.irp.f index cede52c9..50b58f67 100644 --- a/src/Davidson/davidson_parallel.irp.f +++ b/src/Davidson/davidson_parallel.irp.f @@ -501,7 +501,7 @@ subroutine davidson_miniserver_end() integer rc character*(64) buf - address = trim(qp_run_address)//':11223' + address = trim(qp_run_address_tcp)//':11223' requester = f77_zmq_socket(zmq_context, ZMQ_REQ) rc = f77_zmq_connect(requester,address) @@ -520,7 +520,7 @@ subroutine davidson_miniserver_get() character*(20) buffer integer rc - address = trim(qp_run_address)//':11223' + address = trim(qp_run_address_tcp)//':11223' requester = f77_zmq_socket(zmq_context, ZMQ_REQ) rc = f77_zmq_connect(requester,address) diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 61fb45de..84665199 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -17,6 +17,8 @@ END_PROVIDER BEGIN_PROVIDER [ character*(128), qp_run_address ] +&BEGIN_PROVIDER [ character*(128), qp_run_address_ipc ] +&BEGIN_PROVIDER [ character*(128), qp_run_address_tcp ] &BEGIN_PROVIDER [ integer, zmq_port_start ] use f77_zmq implicit none @@ -34,19 +36,22 @@ END_PROVIDER integer :: i do i=len(buffer),1,-1 if ( buffer(i:i) == ':') then - qp_run_address = trim(buffer(1:i-1)) + qp_run_address_tcp = trim(buffer(1:i-1)) read(buffer(i+1:), *) zmq_port_start exit endif enddo + qp_run_address_ipc = 'ipc:///tmp/qp_run' + qp_run_address = qp_run_address_ipc END_PROVIDER + BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ] +&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ] &BEGIN_PROVIDER [ character*(128), zmq_socket_pair_inproc_address ] &BEGIN_PROVIDER [ character*(128), zmq_socket_push_tcp_address ] -&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ] &BEGIN_PROVIDER [ character*(128), zmq_socket_push_inproc_address ] -&BEGIN_PROVIDER [ character*(128), zmq_socket_sub_tcp_address ] +&BEGIN_PROVIDER [ character*(128), zmq_socket_sub_address ] use f77_zmq implicit none BEGIN_DOC @@ -54,12 +59,12 @@ END_PROVIDER END_DOC character*(8), external :: zmq_port - zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' ' zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' ' - zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' ' zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' ' - zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' ' + zmq_socket_push_tcp_address = trim(qp_run_address_tcp)//':'//zmq_port(2)//' ' + zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address + zmq_socket_sub_address = trim(qp_run_address)//':'//zmq_port(1)//' ' ! /!\ Don't forget to change subroutine reset_zmq_addresses END_PROVIDER @@ -72,12 +77,13 @@ subroutine reset_zmq_addresses END_DOC character*(8), external :: zmq_port - zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' ' zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' ' - zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' ' zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' ' - zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' ' + zmq_socket_push_tcp_address = trim(qp_run_address_tcp)//':'//zmq_port(2)//' ' + zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address + zmq_socket_sub_address = trim(qp_run_address)//':'//zmq_port(1)//' ' + end @@ -105,6 +111,7 @@ subroutine switch_qp_run_to_master exit endif enddo + qp_run_address_tcp = qp_run_address call reset_zmq_addresses end @@ -367,7 +374,7 @@ function new_zmq_sub_socket() stop 'Unable to subscribe new_zmq_sub_socket' endif - rc = f77_zmq_connect(new_zmq_sub_socket, zmq_socket_sub_tcp_address) + rc = f77_zmq_connect(new_zmq_sub_socket, zmq_socket_sub_address) if (rc /= 0) then stop 'Unable to connect new_zmq_sub_socket' endif @@ -403,17 +410,6 @@ subroutine end_zmq_pair_socket(zmq_socket_pair) character*(8), external :: zmq_port rc = f77_zmq_unbind(zmq_socket_pair,zmq_socket_pair_inproc_address) -! if (rc /= 0) then -! print *, rc -! print *, irp_here, 'f77_zmq_unbind(zmq_socket_pair,zmq_socket_pair_inproc_address)' -! stop 'error' -! endif - -! rc = f77_zmq_setsockopt(zmq_socket_pair,0ZMQ_LINGER,1000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on zmq_socket_pair' -! endif - rc = f77_zmq_close(zmq_socket_pair) if (rc /= 0) then print *, 'f77_zmq_close(zmq_socket_pair)' @@ -433,26 +429,7 @@ subroutine end_zmq_pull_socket(zmq_socket_pull) character*(8), external :: zmq_port rc = f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_inproc_address) -! if (rc /= 0) then -! print *, rc -! print *, irp_here, 'f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_inproc_address)' -! stop 'error' -! endif - rc = f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_tcp_address) -! if (rc /= 0) then -! print *, rc -! print *, irp_here, 'f77_zmq_unbind(zmq_socket_pull,zmq_socket_pull_tcp_address)' -! stop 'error' -! endif - -! call sleep(1) ! see https://github.com/zeromq/libzmq/issues/1922 - -! rc = f77_zmq_setsockopt(zmq_socket_pull,ZMQ_LINGER,10000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on zmq_socket_pull' -! endif - rc = f77_zmq_close(zmq_socket_pull) if (rc /= 0) then print *, 'f77_zmq_close(zmq_socket_pull)' @@ -784,6 +761,9 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) else if (trim(reply) == 'terminate') then task_id = 0 task = 'terminate' + else if (trim(message) == 'error No job is running') then + task_id = 0 + task = 'terminate' else print *, 'Unable to get the next task' print *, trim(message)