diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index 52f212bf..aea967eb 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -279,7 +279,13 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_ loop = .True. pullLoop : do while (loop) + integer, external :: zmq_delete_tasks_async_send, zmq_delete_tasks_async_recv + integer, external :: zmq_delete_tasks + call pull_pt2_results(zmq_socket_pull, index, pt2_mwen, task_id, n_tasks) + if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then + stop 'Unable to send delete tasks' + endif do i=1,n_tasks pt2_detail(1:N_states, index(i)) += pt2_mwen(1:N_states,i) parts_to_get(index(i)) -= 1 @@ -292,17 +298,13 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_ if(parts_to_get(index(i)) == 0) actually_computed(index(i)) = .true. enddo - integer, external :: zmq_delete_tasks - if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then - stop 'Unable to delete tasks' - endif + call wall_time(time) + if (more == 0) then loop = .False. endif - time = omp_get_wtime() - - if(time - timeLast > 10d0 .or. (.not.loop)) then + if(time - timeLast > 4d0 .or. (.not.loop)) then timeLast = time do i=1, first_det_of_teeth(1)-1 if(.not.(actually_computed(i))) then diff --git a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f index b4a4d578..3ecf2fb4 100644 --- a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f +++ b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f @@ -51,20 +51,34 @@ subroutine run_wf do - call wait_for_states(states,zmq_state,size(states)) + if (mpi_master) then + print *, trim(zmq_state) + call wait_for_states(states,zmq_state,size(states)) + endif + + IRP_IF MPI + call MPI_BARRIER(MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here, 'error in barrier' + endif + call MPI_BCAST (zmq_state, 128, MPI_CHARACTER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here, 'error in broadcast of zmq_state' + endif + IRP_ENDIF + if (zmq_state == old_state) then cycle else old_state = zmq_state endif - print *, trim(zmq_state) - if(zmq_state(1:7) == 'Stopped') then - exit + endif - else if (zmq_state(1:9) == 'selection') then + + if (zmq_state(1:9) == 'selection') then ! Selection ! --------- @@ -80,12 +94,14 @@ subroutine run_wf psi_energy(1:N_states) = energy(1:N_states) TOUCH psi_energy state_average_weight threshold_selectors threshold_generators - print *, 'N_det', N_det - print *, 'N_det_generators', N_det_generators - print *, 'N_det_selectors', N_det_selectors - print *, 'psi_energy', psi_energy - print *, 'pt2_stoch_istate', pt2_stoch_istate - print *, 'state_average_weight', state_average_weight + if (mpi_master) then + print *, 'N_det', N_det + print *, 'N_det_generators', N_det_generators + print *, 'N_det_selectors', N_det_selectors + print *, 'psi_energy', psi_energy + print *, 'pt2_stoch_istate', pt2_stoch_istate + print *, 'state_average_weight', state_average_weight + endif call wall_time(t1) call write_double(6,(t1-t0),'Broadcast time') @@ -100,14 +116,15 @@ subroutine run_wf ! Davidson ! -------- - print *, 'Davidson' call wall_time(t0) if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_N_states_diag(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states_diag) == -1) cycle call wall_time(t1) - call write_double(6,(t1-t0),'Broadcast time') + if (mpi_master) then + call write_double(6,(t1-t0),'Broadcast time') + endif call omp_set_nested(.True.) call davidson_slave_tcp(0) @@ -119,7 +136,6 @@ subroutine run_wf ! PT2 ! --- - print *, 'PT2' call wall_time(t0) if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) == -1) cycle @@ -131,12 +147,14 @@ subroutine run_wf if (zmq_get_dvector(zmq_to_qp_run_socket,1,'state_average_weight',state_average_weight,N_states) == -1) cycle psi_energy(1:N_states) = energy(1:N_states) TOUCH psi_energy state_average_weight pt2_stoch_istate threshold_selectors threshold_generators - print *, 'N_det', N_det - print *, 'N_det_generators', N_det_generators - print *, 'N_det_selectors', N_det_selectors - print *, 'psi_energy', psi_energy - print *, 'pt2_stoch_istate', pt2_stoch_istate - print *, 'state_average_weight', state_average_weight + if (mpi_master) then + print *, 'N_det', N_det + print *, 'N_det_generators', N_det_generators + print *, 'N_det_selectors', N_det_selectors + print *, 'psi_energy', psi_energy + print *, 'pt2_stoch_istate', pt2_stoch_istate + print *, 'state_average_weight', state_average_weight + endif call wall_time(t1) call write_double(6,(t1-t0),'Broadcast time') @@ -152,13 +170,6 @@ subroutine run_wf endif - IRP_IF MPI - call MPI_BARRIER(MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here, 'error in barrier' - endif - IRP_ENDIF - end do IRP_IF MPI call MPI_finalize(i) diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index c263c884..14ab4302 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -148,15 +148,15 @@ function new_zmq_to_qp_run_socket() stop 'Unable to create zmq req socket' endif - rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 300000, 4) - if (rc /= 0) then - stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' - endif - - rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 300000, 4) - if (rc /= 0) then - stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 300000, 4) +! if (rc /= 0) then +! stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 300000, 4) +! if (rc /= 0) then +! stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' +! endif rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0))) if (rc /= 0) then @@ -250,20 +250,20 @@ IRP_ENDIF stop 'Unable to create zmq pull socket' endif - rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on pull socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on pull socket' +! endif ! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVBUF,100000000,4) ! if (rc /= 0) then ! stop 'Unable to set ZMQ_RCVBUF on pull socket' ! endif - rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,5,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_RCVHWM on pull socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,5,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_RCVHWM on pull socket' +! endif integer :: icount @@ -332,15 +332,15 @@ IRP_ENDIF stop 'Unable to create zmq push socket' endif - rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on push socket' - endif - - rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,1,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_SNDHWM on push socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on push socket' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,1,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_SNDHWM on push socket' +! endif ! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDBUF,100000000,4) ! if (rc /= 0) then @@ -352,10 +352,10 @@ IRP_ENDIF stop 'Unable to set ZMQ_IMMEDIATE on push socket' endif - rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 300000, 4) - if (rc /= 0) then - stop 'Unable to set send timout in new_zmq_push_socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 300000, 4) +! if (rc /= 0) then +! stop 'Unable to set send timout in new_zmq_push_socket' +! endif if (thread == 1) then rc = f77_zmq_connect(new_zmq_push_socket, zmq_socket_push_inproc_address) @@ -488,10 +488,10 @@ subroutine end_zmq_push_socket(zmq_socket_push,thread) integer :: rc character*(8), external :: zmq_port - rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on push socket' - endif +! rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on push socket' +! endif call omp_set_lock(zmq_lock) rc = f77_zmq_close(zmq_socket_push) @@ -1019,10 +1019,10 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) character*(8), external :: zmq_port integer :: rc - rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' - endif +! rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' +! endif rc = f77_zmq_close(zmq_to_qp_run_socket) if (rc /= 0) then @@ -1112,6 +1112,68 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n endif end +integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) + use f77_zmq + implicit none + BEGIN_DOC +! When a task is done, it has to be removed from the list of tasks on the qp_run +! queue. This guarantees that the results have been received in the pull. + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer(ZMQ_PTR) :: zmq_socket_pull + integer, intent(in) :: n_tasks, task_id(n_tasks) + integer, intent(in) :: more + integer :: rc, k + character*(64) :: fmt, reply + character(LEN=:), allocatable :: message + + zmq_delete_tasks_async_send = 0 + + allocate(character(LEN=64+n_tasks*12) :: message) + + write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))' + write(message,*) 'del_task '//trim(zmq_state), (task_id(k), k=1,n_tasks) + + + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) + if (rc /= len(trim(message))) then + zmq_delete_tasks_async_send = -1 + deallocate(message) + return + endif + deallocate(message) + +end + + +integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) + use f77_zmq + implicit none + BEGIN_DOC +! When a task is done, it has to be removed from the list of tasks on the qp_run +! queue. This guarantees that the results have been received in the pull. + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer(ZMQ_PTR) :: zmq_socket_pull + integer, intent(in) :: n_tasks, task_id(n_tasks) + integer, intent(out) :: more + integer :: rc + character*(64) :: reply + + zmq_delete_tasks_async_recv = 0 + + reply = '' + rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0) + + if (reply(16:19) == 'more') then + more = 1 + else if (reply(16:19) == 'done') then + more = 0 + else + zmq_delete_tasks_async_recv = -1 + endif +end + subroutine wait_for_next_state(state) use f77_zmq