diff --git a/src/Integrals_Bielec/ao_bi_integrals.irp.f b/src/Integrals_Bielec/ao_bi_integrals.irp.f index 8bae1a1e..b1f1dba0 100644 --- a/src/Integrals_Bielec/ao_bi_integrals.irp.f +++ b/src/Integrals_Bielec/ao_bi_integrals.irp.f @@ -373,6 +373,7 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ] character*(32) :: task + do l=1,ao_num write(task,*) 'triangle', l call add_task_to_taskserver(zmq_to_qp_run_socket,task) diff --git a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f index e21014b8..818247ff 100644 --- a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f +++ b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f @@ -38,7 +38,8 @@ subroutine ao_bielec_integrals_in_map_slave(thread) zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() integer(ZMQ_PTR) :: zmq_socket_push - zmq_socket_push = f77_zmq_socket(zmq_context, ZMQ_PUSH) +! zmq_socket_push = f77_zmq_socket(zmq_context, ZMQ_PUSH) + zmq_socket_push = f77_zmq_socket(zmq_context, ZMQ_REQ ) if (thread == 1) then rc = f77_zmq_connect(zmq_socket_push, trim(zmq_socket_pull_inproc_address)) else @@ -63,6 +64,8 @@ subroutine ao_bielec_integrals_in_map_slave(thread) rc = f77_zmq_send( zmq_socket_push, buffer_i, key_kind*n_integrals, ZMQ_SNDMORE) rc = f77_zmq_send( zmq_socket_push, buffer_value, integral_kind*n_integrals, 0) call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) + character*(2) :: ok + rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0) enddo deallocate( buffer_i, buffer_value ) @@ -72,6 +75,7 @@ subroutine ao_bielec_integrals_in_map_slave(thread) if (finished /= 0) then rc = f77_zmq_send( zmq_socket_push, -1, 4, 0) + rc = f77_zmq_recv( zmq_socket_push, ok, 2, ZMQ_NOBLOCK) endif rc = f77_zmq_disconnect(zmq_socket_push,trim(zmq_socket_push_tcp_address)) @@ -102,7 +106,10 @@ subroutine ao_bielec_integrals_in_map_collector if (n_integrals >= 0) then rc = f77_zmq_recv( zmq_socket_pull, buffer_i, key_kind*n_integrals, 0) rc = f77_zmq_recv( zmq_socket_pull, buffer_value, integral_kind*n_integrals, 0) + rc = f77_zmq_send( zmq_socket_pull, 'ok', 2, 0) call insert_into_ao_integrals_map(n_integrals,buffer_i,buffer_value) + else + rc = f77_zmq_send( zmq_socket_pull, 'ok', 2, 0) endif enddo diff --git a/src/Integrals_Bielec/qp_ao_ints.irp.f b/src/Integrals_Bielec/qp_ao_ints.irp.f new file mode 100644 index 00000000..dbeee9d7 --- /dev/null +++ b/src/Integrals_Bielec/qp_ao_ints.irp.f @@ -0,0 +1,20 @@ +program qp_ao_ints + implicit none + BEGIN_DOC +! Increments a running calculation to compute AO integrals + END_DOC + + ! Set the state of the ZMQ + zmq_state = 'ao_integrals' + + ! Provide everything needed + double precision :: integral, ao_bielec_integral + integral = ao_bielec_integral(1,1,1,1) + + !$OMP PARALLEL DEFAULT(PRIVATE) + call ao_bielec_integrals_in_map_slave_tcp + !$OMP END PARALLEL + + print *, 'Done' +end + diff --git a/src/ZMQ/zmq.irp.f b/src/ZMQ/zmq.irp.f index 6a887422..234271a0 100644 --- a/src/ZMQ/zmq.irp.f +++ b/src/ZMQ/zmq.irp.f @@ -87,7 +87,8 @@ end zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(1) zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(1) - zmq_socket_pull = f77_zmq_socket(zmq_context, ZMQ_PULL) +! zmq_socket_pull = f77_zmq_socket(zmq_context, ZMQ_PULL) + zmq_socket_pull = f77_zmq_socket(zmq_context, ZMQ_REP ) rc = f77_zmq_bind(zmq_socket_pull, zmq_socket_pull_tcp_address) rc = f77_zmq_bind(zmq_socket_pull, zmq_socket_pull_inproc_address) if (rc /= 0) then @@ -104,7 +105,7 @@ END_PROVIDER ! Threads executing work through the ZeroMQ interface END_DOC zmq_thread = 0_ZMQ_PTR - zmq_state = '' + zmq_state = 'No_state' END_PROVIDER subroutine new_parallel_job(zmq_to_qp_run_socket,name) @@ -317,6 +318,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,name) stop 'Wrong end of job' endif + ! Wait for Slaves do i=1,nproc rc = pthread_join( zmq_thread(i) ) if (rc /= 0) then @@ -324,12 +326,18 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,name) stop -1 endif zmq_thread(i) = 0 + print *, 'joined ', i enddo - zmq_state = 'None' + ! Wait for collector + rc = pthread_join( zmq_thread(0) ) + zmq_thread(0) = 0 + print *, 'joined ', 0 + zmq_state = 'No_state' character*(8), external :: zmq_port rc = f77_zmq_disconnect(zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0))) rc = f77_zmq_close(zmq_to_qp_run_socket) + SOFT_TOUCH zmq_thread zmq_state end