diff --git a/plugins/dress_zmq/dress_stoch_routines.irp.f b/plugins/dress_zmq/dress_stoch_routines.irp.f index 9abe9095..fc8f031d 100644 --- a/plugins/dress_zmq/dress_stoch_routines.irp.f +++ b/plugins/dress_zmq/dress_stoch_routines.irp.f @@ -10,7 +10,7 @@ END_PROVIDER implicit none pt2_F(:) = 1 !pt2_F(:N_det_generators/1000*0+50) = 1 - pt2_n_tasks_max = N_det_generators/100 + 1 + pt2_n_tasks_max = 16 ! N_det_generators/100 + 1 if(N_det_generators < 256) then pt2_minDetInFirstTeeth = 1 @@ -176,7 +176,6 @@ subroutine ZMQ_dress(E, dress, delta_out, delta_s2_out, relative_error) integer, external :: add_task_to_taskserver double precision :: state_average_weight_save(N_states) - print *, "ZMQ_dress" task(:) = CHAR(0) allocate(delta(N_states,N_det), delta_s2(N_states, N_det)) state_average_weight_save(:) = state_average_weight(:) @@ -359,9 +358,9 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, integer(ZMQ_PTR) :: zmq_to_qp_run_socket integer(ZMQ_PTR), external :: new_zmq_pull_socket, zmq_abort - integer :: more + integer, allocatable :: task_id(:) integer :: i, c, j, k, f, t, m, p, m_task - integer :: task_id, n_tasks + integer :: more, n_tasks double precision :: E0, error, x, v, time, time0 double precision :: avg, eqt double precision, external :: omp_get_wtime @@ -376,6 +375,7 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, found = .false. delta = 0d0 delta_s2 = 0d0 + allocate(task_id(pt2_n_tasks_max)) allocate(edI(N_states, N_det)) allocate(edI_task(N_states, N_det), edI_index(N_det)) allocate(breve_delta_m(N_states, N_det, 2)) @@ -431,7 +431,7 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, stop 'Unable to delete tasks' endif else - if(task_id /= 0) stop "TASKID" + !if(task_id(1) /= 0) stop "TASKID" !i= zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) exit end if @@ -442,7 +442,6 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, dot_f(m_task) -= f end if end do - if (zmq_abort(zmq_to_qp_run_socket) == -1) then call sleep(1) if (zmq_abort(zmq_to_qp_run_socket) == -1) then @@ -458,10 +457,10 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, do while(more /= 0) call pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, breve_delta_m, task_id, n_tasks) - if(task_id == 0) cycle - if(m_task == 0) then + !if(task_id(0) == 0) cycle + if(m_task == 0) then i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) - else + else if(m_task < 0) then i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) end if diff --git a/plugins/dress_zmq/run_dress_slave.irp.f b/plugins/dress_zmq/run_dress_slave.irp.f index 899cc3cf..95db9d92 100644 --- a/plugins/dress_zmq/run_dress_slave.irp.f +++ b/plugins/dress_zmq/run_dress_slave.irp.f @@ -31,7 +31,8 @@ subroutine run_dress_slave(thread,iproce,energy) integer, allocatable :: f(:) integer :: cp_sent, cp_done integer :: cp_max(Nproc) - integer :: will_send, task_id, purge_task_id + integer :: will_send, task_id, purge_task_id, ntask_buf + integer, allocatable :: task_buf(:) integer(kind=OMP_LOCK_KIND) :: lck_det(0:pt2_N_teeth+1) integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending double precision :: fac @@ -74,7 +75,7 @@ double precision :: time, time0 !$OMP PRIVATE(tmp,fac,m,l,t,sum_f,n_tasks) & !$OMP PRIVATE(i,p,will_send, i_generator, subset, iproc) & !$OMP PRIVATE(zmq_to_qp_run_socket, zmq_socket_push, worker_id) & - !$OMP PRIVATE(time, time0) + !$OMP PRIVATE(task_buf, ntask_buf,time, time0) zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() zmq_socket_push = new_zmq_push_socket(thread) call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) @@ -85,8 +86,9 @@ double precision :: time, time0 end if iproc = omp_get_thread_num()+1 allocate(breve_delta_m(N_states,N_det,2)) - - + allocate(task_buf(pt2_n_tasks_max)) + ntask_buf = 0 + do while(cp_done > cp_sent .or. m /= dress_N_cp+1) call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) task = task//" 0" @@ -110,6 +112,9 @@ double precision :: time, time0 if(purge_task_id == 0) then purge_task_id = task_id task_id = 0 + else if(task_id /= 0) then + ntask_buf += 1 + task_buf(ntask_buf) = task_id end if !$OMP END CRITICAL @@ -164,12 +169,17 @@ double precision :: time, time0 !$OMP ATOMIC f(i_generator) += 1 !push bidon - if(task_id /= 0) then - call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_id, 1) + if(ntask_buf == size(task_buf)) then + call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf) + ntask_buf = 0 end if end if end do !$OMP BARRIER + if(ntask_buf /= 0) then + call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf) + ntask_buf = 0 + end if !$OMP SINGLE if(purge_task_id /= 0) then do while(int(ending(1)) == dress_N_cp+1) @@ -220,33 +230,35 @@ subroutine push_dress_results(zmq_socket_push, m_task, f, edI_task, edI_index, b integer(ZMQ_PTR), intent(in) :: zmq_socket_push integer, intent(in) :: m_task, f, edI_index(n_tasks) double precision, intent(in) :: breve_delta_m(N_states, N_det, 2), edI_task(n_tasks) - integer, intent(in) :: task_id, n_tasks + integer, intent(in) :: task_id(pt2_n_tasks_max), n_tasks integer :: rc, i, j, k - rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) - if(rc /= 4) stop "push1" - - rc = f77_zmq_send( zmq_socket_push, task_id, 4, ZMQ_SNDMORE) - if(rc /= 4) stop "push2" - rc = f77_zmq_send( zmq_socket_push, m_task, 4, ZMQ_SNDMORE) if(rc /= 4) stop "push3" - - rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE) - if(rc /= 4) stop "push4" - rc = f77_zmq_send( zmq_socket_push, edI_task, 8*n_tasks, ZMQ_SNDMORE) - if(rc /= 8*n_tasks) stop "push5" - - rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, ZMQ_SNDMORE) - if(rc /= 4*n_tasks) stop "push6" + if(m_task > 0) then + rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) + if(rc /= 4) stop "push1" + rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE) + if(rc /= 4) stop "push4" - if(m_task < 0) then - rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, 0) - if(rc /= 8*N_det*N_states*2) stop "push6" + rc = f77_zmq_send( zmq_socket_push, edI_task, 8*n_tasks, ZMQ_SNDMORE) + if(rc /= 8*n_tasks) stop "push5" + + rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, 0) + if(rc /= 4*n_tasks) stop "push6" + else if(m_task == 0) then + rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) + if(rc /= 4) stop "push1" + + rc = f77_zmq_send( zmq_socket_push, task_id, 4*n_tasks, 0) + if(rc /= 4*n_tasks) stop "push2" else - rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8, 0) - if(rc /= 8) stop "push6" - end if + rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, ZMQ_SNDMORE) + if(rc /= 8*N_det*N_states*2) stop "push6" + rc = f77_zmq_send( zmq_socket_push, task_id, 4, 0) + if(rc /= 4) stop "push6" + + end if ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH IRP_ELSE @@ -264,32 +276,40 @@ subroutine pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, b integer(ZMQ_PTR), intent(in) :: zmq_socket_pull integer, intent(out) :: m_task, f, edI_index(N_det_generators) double precision, intent(out) :: breve_delta_m(N_states, N_det, 2), edI_task(N_det_generators) - integer, intent(out) :: task_id, n_tasks + integer, intent(out) :: task_id(pt2_n_tasks_max), n_tasks integer :: rc, i, j, k - - rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) - if(rc /= 4) stop "pullc" - - rc = f77_zmq_recv( zmq_socket_pull, task_id, 4, 0) - if(rc /= 4) stop "pull4" - + rc = f77_zmq_recv( zmq_socket_pull, m_task, 4, 0) if(rc /= 4) stop "pullc" - rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0) - if(rc /= 4) stop "pullc" + if(m_task > 0) then + rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) + if(rc /= 4) stop "pullc" + - rc = f77_zmq_recv( zmq_socket_pull, edI_task, 8*n_tasks, 0) - if(rc /= 8*n_tasks) stop "pullc" + rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0) + if(rc /= 4) stop "pullc" - rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0) - if(rc /= 4*n_tasks) stop "pullc" - if(m_task < 0) then + rc = f77_zmq_recv( zmq_socket_pull, edI_task, 8*n_tasks, 0) + if(rc /= 8*n_tasks) stop "pullc" + + rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0) + if(rc /= 4*n_tasks) stop "pullc" + else if(m_task==0) then + rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) + if(rc /= 4) stop "pullc" + + + + rc = f77_zmq_recv( zmq_socket_pull, task_id, 4*n_tasks, 0) + if(rc /= 4*n_tasks) stop "pull4" + else rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0) if(rc /= 8*N_det*N_states*2) stop "pullc" - else - rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8, 0) - if(rc /= 8) stop "pullc" + + rc = f77_zmq_recv( zmq_socket_pull, task_id, 4, 0) + if(rc /= 4) stop "pull4" + end if ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH