diff --git a/plugins/dress_zmq/dress_stoch_routines.irp.f b/plugins/dress_zmq/dress_stoch_routines.irp.f index b93e7c3b..7934beff 100644 --- a/plugins/dress_zmq/dress_stoch_routines.irp.f +++ b/plugins/dress_zmq/dress_stoch_routines.irp.f @@ -341,7 +341,6 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, double precision, intent(in) :: relative_error, E(N_states) double precision, intent(out) :: dress(N_states) - double precision, allocatable :: cp(:,:,:,:) double precision, intent(out) :: delta(N_states, N_det) double precision, intent(out) :: delta_s2(N_states, N_det) @@ -361,20 +360,23 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, integer, allocatable :: dot_f(:) integer, external :: zmq_delete_tasks, dress_find_sample logical :: found + integer :: worker_id + zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + + call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,1) + found = .false. delta = 0d0 delta_s2 = 0d0 - allocate(cp(N_states, N_det, dress_N_cp, 2), edI(N_states, N_det)) + allocate(edI(N_states, N_det)) allocate(edI_task(N_states, N_det), edI_index(N_det)) allocate(breve_delta_m(N_states, N_det, 2)) allocate(dot_f(dress_N_cp+1)) allocate(S(pt2_N_teeth+1), S2(pt2_N_teeth+1)) edI = 0d0 - cp = 0d0 dot_f(:dress_N_cp) = dress_dot_F(:) dot_f(dress_N_cp+1) = 1 - zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() more = 1 m = 1 c = 0 @@ -408,7 +410,9 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, time = omp_get_wtime() print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', c, avg+E0+E(dress_stoch_istate), eqt, time-time0, '' m += 1 - if(eqt <= 1d0*relative_error) then + if(eqt <= relative_error) then + integer, external :: zmq_put_dvector + i= zmq_put_dvector(zmq_to_qp_run_socket, worker_id, "ending", dble(m-1), 1) found = .true. end if else @@ -427,8 +431,6 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, do i=1,n_tasks edI(:, edI_index(i)) += edI_task(:, i) end do - cp(:,:,m_task,1) += breve_delta_m(:,:,1) - cp(:,:,m_task,2) += breve_delta_m(:,:,2) dot_f(m_task) -= f end if end do @@ -439,19 +441,31 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, print *, irp_here, ': Error in sending abort signal (2)' endif endif - + + integer :: ff + + ff = dress_dot_F(m-1) + delta= 0d0 + delta_s2 = 0d0 do while(more /= 0) call pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, breve_delta_m, task_id, n_tasks) - if(task_id == 0) cycle - if(m_task == 0) then - i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) - else - i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) - end if + + if(task_id == 0) cycle + if(m_task == 0) then + i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) + else + i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) + end if + + + if(m_task >= 0) cycle + ff = ff - f + delta(:,:) += breve_delta_m(:,:,1) + delta_s2(:,:) += breve_delta_m(:,:,2) end do - delta(:,:) = cp(:,:,m-1,1) - delta_s2(:,:) = cp(:,:,m-1,2) dress(istate) = E(istate)+E0+avg + if(ff /= 0) stop "WRONG NUMBER OF FRAGMENTS COLLECTED" + call disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) end subroutine diff --git a/plugins/dress_zmq/run_dress_slave.irp.f b/plugins/dress_zmq/run_dress_slave.irp.f index 99e67e67..5f53b7fc 100644 --- a/plugins/dress_zmq/run_dress_slave.irp.f +++ b/plugins/dress_zmq/run_dress_slave.irp.f @@ -35,7 +35,9 @@ subroutine run_dress_slave(thread,iproce,energy) integer(kind=OMP_LOCK_KIND) :: lck_det(0:pt2_N_teeth+1) integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending double precision :: fac - + double precision :: ending(1) + integer, external :: zmq_get_dvector + if(iproce /= 0) stop "RUN DRESS SLAVE is OMP" allocate(delta_det(N_states, N_det, 0:pt2_N_teeth+1, 2)) @@ -45,7 +47,6 @@ subroutine run_dress_slave(thread,iproce,energy) edI = 0d0 f = 0 delta_det = 0d0 - cp = 0d0 task(:) = CHAR(0) @@ -65,7 +66,7 @@ subroutine run_dress_slave(thread,iproce,energy) logical :: purge purge_task_id = 0 hij = E0_denominator(1) !PROVIDE BEFORE OMP PARALLEL - + ending(1) = dble(dress_N_cp+1) !$OMP PARALLEL DEFAULT(SHARED) & !$OMP PRIVATE(breve_delta_m, task, task_id) & !$OMP PRIVATE(tmp,fac,m,l,t,sum_f,n_tasks) & @@ -92,6 +93,7 @@ subroutine run_dress_slave(thread,iproce,energy) m = dress_P(i_generator) else m = dress_N_cp + 1 + i= zmq_get_dvector(zmq_to_qp_run_socket, worker_id, "ending", ending, 1) end if will_send = 0 @@ -109,7 +111,7 @@ subroutine run_dress_slave(thread,iproce,energy) end if !$OMP END CRITICAL - if(will_send /= 0) then + if(will_send /= 0 .and. will_send <= int(ending(1))) then breve_delta_m = 0d0 do l=will_send, 1,-1 @@ -136,7 +138,6 @@ subroutine run_dress_slave(thread,iproce,energy) end if end do call push_dress_results(zmq_socket_push, will_send, sum_f, edI_task, edI_index, breve_delta_m, 0, n_tasks) - !call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,purge_task_id(will_send)) call omp_unset_lock(sending) end if @@ -175,26 +176,45 @@ subroutine run_dress_slave(thread,iproce,energy) f(i_generator) += 1 !push bidon if(task_id /= 0) then - !call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_id, 1) end if end if end do !$OMP BARRIER !$OMP SINGLE - !do m=1,dress_N_cp if(purge_task_id /= 0) then - !call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,purge_task_id(m)) - call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, purge_task_id, 1) - end if - !end do - !$OMP END SINGLE + do while(int(ending(1)) == dress_N_cp+1) + call sleep(1) + i= zmq_get_dvector(zmq_to_qp_run_socket, worker_id, "ending", ending, 1) + end do + will_send = int(ending(1)) + breve_delta_m = 0d0 + + do l=will_send, 1,-1 + breve_delta_m(:,:,1) += cp(:,:,l,1) + breve_delta_m(:,:,2) += cp(:,:,l,2) + end do + + breve_delta_m(:,:,:) = breve_delta_m(:,:,:) / dress_M_m(will_send) + + do t=dress_dot_t(will_send)-1,0,-1 + breve_delta_m(:,:,1) = breve_delta_m(:,:,1) + delta_det(:,:,t,1) + breve_delta_m(:,:,2) = breve_delta_m(:,:,2) + delta_det(:,:,t,2) + end do + + sum_f = 0 + do i=1,N_det_generators + if(dress_P(i) <= will_send) sum_f = sum_f + f(i) + end do + call push_dress_results(zmq_socket_push, -will_send, sum_f, edI_task, edI_index, breve_delta_m, purge_task_id, 1) + end if + + !$OMP END SINGLE call disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) !$OMP END PARALLEL - do i=0,dress_N_cp+1 call omp_destroy_lock(lck_sto(i)) end do @@ -231,7 +251,7 @@ subroutine push_dress_results(zmq_socket_push, m_task, f, edI_task, edI_index, b rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, ZMQ_SNDMORE) if(rc /= 4*n_tasks) stop "push6" - if(m_task /= 0) then + if(m_task < 0) then rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, 0) if(rc /= 8*N_det*N_states*2) stop "push6" else @@ -275,7 +295,7 @@ subroutine pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, b rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0) if(rc /= 4*n_tasks) stop "pullc" - if(m_task /= 0) then + if(m_task < 0) then rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0) if(rc /= 8*N_det*N_states*2) stop "pullc" else