From a521f0cb82bd333ce9879b4dfd87ae1abcf2baaa Mon Sep 17 00:00:00 2001 From: Yann Garniron Date: Mon, 3 Sep 2018 16:08:02 +0200 Subject: [PATCH] tasks get by batches of Nproc --- plugins/dress_zmq/run_dress_slave.irp.f | 52 +++++++++++++------------ 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/plugins/dress_zmq/run_dress_slave.irp.f b/plugins/dress_zmq/run_dress_slave.irp.f index 95db9d92..f5398025 100644 --- a/plugins/dress_zmq/run_dress_slave.irp.f +++ b/plugins/dress_zmq/run_dress_slave.irp.f @@ -11,7 +11,7 @@ subroutine run_dress_slave(thread,iproce,energy) integer :: rc, i, subset, i_generator integer :: worker_id, ctask, ltask - character*(5120) :: task + character*(512) :: task(Nproc) integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket integer(ZMQ_PTR) :: zmq_to_qp_run_socket @@ -34,13 +34,13 @@ subroutine run_dress_slave(thread,iproce,energy) integer :: will_send, task_id, purge_task_id, ntask_buf integer, allocatable :: task_buf(:) integer(kind=OMP_LOCK_KIND) :: lck_det(0:pt2_N_teeth+1) - integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending + integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending, getting_task double precision :: fac double precision :: ending(1) integer, external :: zmq_get_dvector ! double precision, external :: omp_get_wtime double precision :: time, time0 - + integer :: ntask_tbd, task_tbd(Nproc), i_gen_tbd(Nproc), subset_tbd(Nproc) if(iproce /= 0) stop "RUN DRESS SLAVE is OMP" allocate(delta_det(N_states, N_det, 0:pt2_N_teeth+1, 2)) @@ -51,9 +51,10 @@ double precision :: time, time0 f = 0 delta_det = 0d0 - task(:) = CHAR(0) + task = CHAR(0) call omp_init_lock(sending) + call omp_init_lock(getting_task) do i=0,dress_N_cp+1 call omp_init_lock(lck_sto(i)) end do @@ -70,8 +71,9 @@ double precision :: time, time0 purge_task_id = 0 hij = E0_denominator(1) !PROVIDE BEFORE OMP PARALLEL ending(1) = dble(dress_N_cp+1) + ntask_tbd = 0 !$OMP PARALLEL DEFAULT(SHARED) & - !$OMP PRIVATE(breve_delta_m, task, task_id) & + !$OMP PRIVATE(breve_delta_m, task_id) & !$OMP PRIVATE(tmp,fac,m,l,t,sum_f,n_tasks) & !$OMP PRIVATE(i,p,will_send, i_generator, subset, iproc) & !$OMP PRIVATE(zmq_to_qp_run_socket, zmq_socket_push, worker_id) & @@ -90,16 +92,27 @@ double precision :: time, time0 ntask_buf = 0 do while(cp_done > cp_sent .or. m /= dress_N_cp+1) - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) - task = task//" 0" + call omp_set_lock(getting_task) + if(ntask_tbd == 0) then + ntask_tbd = size(task_tbd) + call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_tbd, task, ntask_tbd) + !task = task//" 0" + end if + + task_id = task_tbd(1) if(task_id /= 0) then - read (task,*) subset, i_generator + read (task(1),*) subset, i_generator + do i=1,size(task_tbd)-1 + task_tbd(i) = task_tbd(i+1) + task(i) = task(i+1) + end do m = dress_P(i_generator) + ntask_tbd -= 1 else m = dress_N_cp + 1 i= zmq_get_dvector(zmq_to_qp_run_socket, worker_id, "ending", ending, 1) end if - + call omp_unset_lock(getting_task) will_send = 0 !$OMP CRITICAL @@ -180,7 +193,7 @@ double precision :: time, time0 call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf) ntask_buf = 0 end if - !$OMP SINGLE + !$OMP SINGLE if(purge_task_id /= 0) then do while(int(ending(1)) == dress_N_cp+1) call sleep(1) @@ -240,24 +253,22 @@ subroutine push_dress_results(zmq_socket_push, m_task, f, edI_task, edI_index, b if(rc /= 4) stop "push1" rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE) if(rc /= 4) stop "push4" - rc = f77_zmq_send( zmq_socket_push, edI_task, 8*n_tasks, ZMQ_SNDMORE) if(rc /= 8*n_tasks) stop "push5" - rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, 0) if(rc /= 4*n_tasks) stop "push6" else if(m_task == 0) then rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) if(rc /= 4) stop "push1" - rc = f77_zmq_send( zmq_socket_push, task_id, 4*n_tasks, 0) if(rc /= 4*n_tasks) stop "push2" else + rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE) + if(rc /= 4) stop "push4" rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, ZMQ_SNDMORE) if(rc /= 8*N_det*N_states*2) stop "push6" rc = f77_zmq_send( zmq_socket_push, task_id, 4, 0) if(rc /= 4) stop "push6" - end if ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH @@ -285,31 +296,24 @@ subroutine pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, b if(m_task > 0) then rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) if(rc /= 4) stop "pullc" - - - rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0) + rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0) if(rc /= 4) stop "pullc" - rc = f77_zmq_recv( zmq_socket_pull, edI_task, 8*n_tasks, 0) if(rc /= 8*n_tasks) stop "pullc" - rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0) if(rc /= 4*n_tasks) stop "pullc" else if(m_task==0) then rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) if(rc /= 4) stop "pullc" - - - rc = f77_zmq_recv( zmq_socket_pull, task_id, 4*n_tasks, 0) if(rc /= 4*n_tasks) stop "pull4" else + rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0) + if(rc /= 4) stop "pullc" rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0) if(rc /= 8*N_det*N_states*2) stop "pullc" - rc = f77_zmq_recv( zmq_socket_pull, task_id, 4, 0) if(rc /= 4) stop "pull4" - end if ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH