diff --git a/plugins/dress_zmq/dress_stoch_routines.irp.f b/plugins/dress_zmq/dress_stoch_routines.irp.f index faf26c8a..025600d6 100644 --- a/plugins/dress_zmq/dress_stoch_routines.irp.f +++ b/plugins/dress_zmq/dress_stoch_routines.irp.f @@ -183,6 +183,7 @@ subroutine ZMQ_dress(E, dress, delta_out, delta_s2_out, relative_error) integer, external :: add_task_to_taskserver double precision :: state_average_weight_save(N_states) + print *, "ZMQ_dress" task(:) = CHAR(0) allocate(delta(N_states,N_det), delta_s2(N_states, N_det)) state_average_weight_save(:) = state_average_weight(:) @@ -232,7 +233,7 @@ subroutine ZMQ_dress(E, dress, delta_out, delta_s2_out, relative_error) do i=1,N_det_generators - do j=1,pt2_F(i) !!!!!!!!!!!! + do j=1,pt2_F(i) write(task(1:20),'(I9,1X,I9''|'')') j, pt2_J(i) if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:20))) == -1) then stop 'Unable to add task to task server' @@ -286,7 +287,7 @@ end allocate(d(N_det_generators+1)) - dress_e(:,:) = 1d0 + dress_e(:,:) = 0d0 dress_dot_t(:) = 0 dress_dot_n_0(:) = 0 dress_dot_F = 0 @@ -319,6 +320,7 @@ end end do end do end do + do m=dress_N_cp, 2, -1 dress_e(:,m) -= dress_e(:,m-1) end do @@ -355,19 +357,20 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, double precision, external :: omp_get_wtime integer, allocatable :: dot_f(:) integer, external :: zmq_delete_tasks, dress_find_sample - + logical :: found + found = .false. delta = 0d0 delta_s2 = 0d0 allocate(cp(N_states, N_det, dress_N_cp, 2), edI(N_states, N_det)) allocate(edI_task(N_states, N_det), edI_index(N_det)) allocate(breve_delta_m(N_states, N_det, 2)) - allocate(dot_f(dress_N_cp)) + allocate(dot_f(dress_N_cp+1)) allocate(S(pt2_N_teeth+1), S2(pt2_N_teeth+1)) edI = 0d0 cp = 0d0 - dot_f(:) = dress_dot_F(:) - + dot_f(:dress_N_cp) = dress_dot_F(:) + dot_f(dress_N_cp+1) = 1 zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() more = 1 m = 1 @@ -376,8 +379,8 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, S2(:) = 0d0 time0 = omp_get_wtime() more = 1 - do while (m <= dress_N_cp) - if(more == 0 .and. dot_f(m) /= 0) exit + do while (.not. found) !(m <= dress_N_cp) + !if(more == 0 .and. dot_f(m) /= 0) exit if(dot_f(m) == 0) then E0 = 0 do i=dress_dot_n_0(m),1,-1 @@ -400,23 +403,22 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, eqt = sqrt(eqt / dble(c-1)) error = eqt time = omp_get_wtime() - print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', c, avg+E0, eqt, time-time0, '' + print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', c, avg+E0+E(dress_stoch_istate), eqt, time-time0, '' m += 1 - if(eqt <= 0d0*relative_error) then - if (zmq_abort(zmq_to_qp_run_socket) == -1) then - call sleep(1) - if (zmq_abort(zmq_to_qp_run_socket) == -1) then - print *, irp_here, ': Error in sending abort signal (2)' - endif - endif + if(eqt <= 1d0*relative_error) then + found = .true. end if else do call pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, breve_delta_m, task_id, n_tasks) - if(task_id == 0) exit - if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then - stop 'Unable to delete tasks' - endif + if(m_task == 0) then + if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then + stop 'Unable to delete tasks' + endif + else + i= zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) + exit + end if end do do i=1,n_tasks edI(:, edI_index(i)) += edI_task(:, i) @@ -427,7 +429,22 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, dot_f(m_task) -= f end if end do - + + if (zmq_abort(zmq_to_qp_run_socket) == -1) then + call sleep(1) + if (zmq_abort(zmq_to_qp_run_socket) == -1) then + print *, irp_here, ': Error in sending abort signal (2)' + endif + endif + + do while(more /= 0) + call pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, breve_delta_m, task_id, n_tasks) + if(m_task == 0) then + i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) + else + i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) + end if + end do delta(:,:) = cp(:,:,m-1,1) delta_s2(:,:) = cp(:,:,m-1,2) dress(istate) = E(istate)+E0+avg diff --git a/plugins/dress_zmq/run_dress_slave.irp.f b/plugins/dress_zmq/run_dress_slave.irp.f index 6941b7b2..9a090b36 100644 --- a/plugins/dress_zmq/run_dress_slave.irp.f +++ b/plugins/dress_zmq/run_dress_slave.irp.f @@ -31,11 +31,10 @@ subroutine run_dress_slave(thread,iproce,energy) integer, allocatable :: f(:) integer :: cp_sent, cp_done integer :: cp_max(Nproc) - integer :: will_send, task_id + integer :: will_send, task_id, purge_task_id(dress_N_cp+1) integer(kind=OMP_LOCK_KIND) :: lck_det(0:pt2_N_teeth+1) integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending double precision :: fac - if(iproce /= 0) stop "RUN DRESS SLAVE is OMP" @@ -43,7 +42,6 @@ subroutine run_dress_slave(thread,iproce,energy) allocate(cp(N_states, N_det, dress_N_cp, 2)) allocate(edI(N_det_generators), f(N_det_generators)) allocate(edI_index(N_det_generators), edI_task(N_det_generators)) - edI = 0d0 f = 0 delta_det = 0d0 @@ -64,7 +62,8 @@ subroutine run_dress_slave(thread,iproce,energy) will_send = 0 double precision :: hij, sij, tmp - + logical :: purge + purge_task_id = 0 hij = E0_denominator(1) !PROVIDE BEFORE OMP PARALLEL !$OMP PARALLEL DEFAULT(SHARED) & @@ -72,7 +71,6 @@ subroutine run_dress_slave(thread,iproce,energy) !$OMP PRIVATE(tmp,fac,m,l,t,sum_f,n_tasks) & !$OMP PRIVATE(i,p,will_send, i_generator, subset, iproc) & !$OMP PRIVATE(zmq_to_qp_run_socket, zmq_socket_push, worker_id) - zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() zmq_socket_push = new_zmq_push_socket(thread) call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) @@ -82,12 +80,11 @@ subroutine run_dress_slave(thread,iproce,energy) stop "WORKER -1" end if - iproc = omp_get_thread_num()+1 allocate(breve_delta_m(N_states,N_det,2)) - do while(m /= dress_N_cp+1) + do while(cp_done > cp_sent .or. m /= dress_N_cp+1) call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) task = task//" 0" if(task_id /= 0) then @@ -106,12 +103,16 @@ subroutine run_dress_slave(thread,iproce,energy) will_send = cp_sent + 1 cp_sent = will_send end if + if(purge_task_id(m) == 0) then + purge_task_id(m) = task_id + task_id = 0 + end if !$OMP END CRITICAL if(will_send /= 0) then breve_delta_m = 0d0 - do l=1, will_send + do l=will_send, 1,-1 breve_delta_m(:,:,1) += cp(:,:,l,1) breve_delta_m(:,:,2) += cp(:,:,l,2) end do @@ -134,7 +135,10 @@ subroutine run_dress_slave(thread,iproce,energy) sum_f += f(i) end if end do - call push_dress_results(zmq_socket_push, will_send, sum_f, edI_task, edI_index, breve_delta_m, 0, n_tasks) + if(purge_task_id(will_send) /= 0) then + call push_dress_results(zmq_socket_push, will_send, sum_f, edI_task, edI_index, breve_delta_m, purge_task_id(will_send), n_tasks) + end if + purge_task_id(will_send) = 0 call omp_unset_lock(sending) end if @@ -172,10 +176,20 @@ subroutine run_dress_slave(thread,iproce,energy) !$OMP ATOMIC f(i_generator) += 1 !push bidon - call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_id, 1) + if(task_id /= 0) then + call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_id, 1) + end if end if end do - + !$OMP BARRIER + !$OMP SINGLE + do m=1,dress_N_cp + if(purge_task_id(m) /= 0) then + call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, purge_task_id(m), 1) + end if + end do + !$OMP END SINGLE + call disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) @@ -210,15 +224,20 @@ subroutine push_dress_results(zmq_socket_push, m_task, f, edI_task, edI_index, b rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE) if(rc /= 4) stop "push4" - + rc = f77_zmq_send( zmq_socket_push, edI_task, 8*n_tasks, ZMQ_SNDMORE) if(rc /= 8*n_tasks) stop "push5" rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, ZMQ_SNDMORE) if(rc /= 4*n_tasks) stop "push6" - - rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, 0) - if(rc /= 8*N_det*N_states*2) stop "push6" + + if(m_task /= 0) then + rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, 0) + if(rc /= 8*N_det*N_states*2) stop "push6" + else + rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8, 0) + if(rc /= 8) stop "push6" + end if ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH IRP_ELSE @@ -256,9 +275,13 @@ subroutine pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, b rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0) if(rc /= 4*n_tasks) stop "pullc" - - rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0) - if(rc /= 8*N_det*N_states*2) stop "pullc" + if(m_task /= 0) then + rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0) + if(rc /= 8*N_det*N_states*2) stop "pullc" + else + rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8, 0) + if(rc /= 8) stop "pullc" + end if ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH IRP_ELSE