10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-12-24 13:23:41 +01:00

tasks get by batches of Nproc

This commit is contained in:
Yann Garniron 2018-09-03 16:08:02 +02:00
parent 997a5a1265
commit a521f0cb82

View File

@ -11,7 +11,7 @@ subroutine run_dress_slave(thread,iproce,energy)
integer :: rc, i, subset, i_generator integer :: rc, i, subset, i_generator
integer :: worker_id, ctask, ltask integer :: worker_id, ctask, ltask
character*(5120) :: task character*(512) :: task(Nproc)
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_to_qp_run_socket integer(ZMQ_PTR) :: zmq_to_qp_run_socket
@ -34,13 +34,13 @@ subroutine run_dress_slave(thread,iproce,energy)
integer :: will_send, task_id, purge_task_id, ntask_buf integer :: will_send, task_id, purge_task_id, ntask_buf
integer, allocatable :: task_buf(:) integer, allocatable :: task_buf(:)
integer(kind=OMP_LOCK_KIND) :: lck_det(0:pt2_N_teeth+1) integer(kind=OMP_LOCK_KIND) :: lck_det(0:pt2_N_teeth+1)
integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending, getting_task
double precision :: fac double precision :: fac
double precision :: ending(1) double precision :: ending(1)
integer, external :: zmq_get_dvector integer, external :: zmq_get_dvector
! double precision, external :: omp_get_wtime ! double precision, external :: omp_get_wtime
double precision :: time, time0 double precision :: time, time0
integer :: ntask_tbd, task_tbd(Nproc), i_gen_tbd(Nproc), subset_tbd(Nproc)
if(iproce /= 0) stop "RUN DRESS SLAVE is OMP" if(iproce /= 0) stop "RUN DRESS SLAVE is OMP"
allocate(delta_det(N_states, N_det, 0:pt2_N_teeth+1, 2)) allocate(delta_det(N_states, N_det, 0:pt2_N_teeth+1, 2))
@ -51,9 +51,10 @@ double precision :: time, time0
f = 0 f = 0
delta_det = 0d0 delta_det = 0d0
task(:) = CHAR(0) task = CHAR(0)
call omp_init_lock(sending) call omp_init_lock(sending)
call omp_init_lock(getting_task)
do i=0,dress_N_cp+1 do i=0,dress_N_cp+1
call omp_init_lock(lck_sto(i)) call omp_init_lock(lck_sto(i))
end do end do
@ -70,8 +71,9 @@ double precision :: time, time0
purge_task_id = 0 purge_task_id = 0
hij = E0_denominator(1) !PROVIDE BEFORE OMP PARALLEL hij = E0_denominator(1) !PROVIDE BEFORE OMP PARALLEL
ending(1) = dble(dress_N_cp+1) ending(1) = dble(dress_N_cp+1)
ntask_tbd = 0
!$OMP PARALLEL DEFAULT(SHARED) & !$OMP PARALLEL DEFAULT(SHARED) &
!$OMP PRIVATE(breve_delta_m, task, task_id) & !$OMP PRIVATE(breve_delta_m, task_id) &
!$OMP PRIVATE(tmp,fac,m,l,t,sum_f,n_tasks) & !$OMP PRIVATE(tmp,fac,m,l,t,sum_f,n_tasks) &
!$OMP PRIVATE(i,p,will_send, i_generator, subset, iproc) & !$OMP PRIVATE(i,p,will_send, i_generator, subset, iproc) &
!$OMP PRIVATE(zmq_to_qp_run_socket, zmq_socket_push, worker_id) & !$OMP PRIVATE(zmq_to_qp_run_socket, zmq_socket_push, worker_id) &
@ -90,16 +92,27 @@ double precision :: time, time0
ntask_buf = 0 ntask_buf = 0
do while(cp_done > cp_sent .or. m /= dress_N_cp+1) do while(cp_done > cp_sent .or. m /= dress_N_cp+1)
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) call omp_set_lock(getting_task)
task = task//" 0" if(ntask_tbd == 0) then
ntask_tbd = size(task_tbd)
call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_tbd, task, ntask_tbd)
!task = task//" 0"
end if
task_id = task_tbd(1)
if(task_id /= 0) then if(task_id /= 0) then
read (task,*) subset, i_generator read (task(1),*) subset, i_generator
do i=1,size(task_tbd)-1
task_tbd(i) = task_tbd(i+1)
task(i) = task(i+1)
end do
m = dress_P(i_generator) m = dress_P(i_generator)
ntask_tbd -= 1
else else
m = dress_N_cp + 1 m = dress_N_cp + 1
i= zmq_get_dvector(zmq_to_qp_run_socket, worker_id, "ending", ending, 1) i= zmq_get_dvector(zmq_to_qp_run_socket, worker_id, "ending", ending, 1)
end if end if
call omp_unset_lock(getting_task)
will_send = 0 will_send = 0
!$OMP CRITICAL !$OMP CRITICAL
@ -180,7 +193,7 @@ double precision :: time, time0
call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf) call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf)
ntask_buf = 0 ntask_buf = 0
end if end if
!$OMP SINGLE !$OMP SINGLE
if(purge_task_id /= 0) then if(purge_task_id /= 0) then
do while(int(ending(1)) == dress_N_cp+1) do while(int(ending(1)) == dress_N_cp+1)
call sleep(1) call sleep(1)
@ -240,24 +253,22 @@ subroutine push_dress_results(zmq_socket_push, m_task, f, edI_task, edI_index, b
if(rc /= 4) stop "push1" if(rc /= 4) stop "push1"
rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE) rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push4" if(rc /= 4) stop "push4"
rc = f77_zmq_send( zmq_socket_push, edI_task, 8*n_tasks, ZMQ_SNDMORE) rc = f77_zmq_send( zmq_socket_push, edI_task, 8*n_tasks, ZMQ_SNDMORE)
if(rc /= 8*n_tasks) stop "push5" if(rc /= 8*n_tasks) stop "push5"
rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, 0) rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "push6" if(rc /= 4*n_tasks) stop "push6"
else if(m_task == 0) then else if(m_task == 0) then
rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push1" if(rc /= 4) stop "push1"
rc = f77_zmq_send( zmq_socket_push, task_id, 4*n_tasks, 0) rc = f77_zmq_send( zmq_socket_push, task_id, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "push2" if(rc /= 4*n_tasks) stop "push2"
else else
rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push4"
rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, ZMQ_SNDMORE) rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, ZMQ_SNDMORE)
if(rc /= 8*N_det*N_states*2) stop "push6" if(rc /= 8*N_det*N_states*2) stop "push6"
rc = f77_zmq_send( zmq_socket_push, task_id, 4, 0) rc = f77_zmq_send( zmq_socket_push, task_id, 4, 0)
if(rc /= 4) stop "push6" if(rc /= 4) stop "push6"
end if end if
! Activate is zmq_socket_pull is a REP ! Activate is zmq_socket_pull is a REP
IRP_IF ZMQ_PUSH IRP_IF ZMQ_PUSH
@ -285,31 +296,24 @@ subroutine pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, b
if(m_task > 0) then if(m_task > 0) then
rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0)
if(rc /= 4) stop "pullc" if(rc /= 4) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0)
rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0)
if(rc /= 4) stop "pullc" if(rc /= 4) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, edI_task, 8*n_tasks, 0) rc = f77_zmq_recv( zmq_socket_pull, edI_task, 8*n_tasks, 0)
if(rc /= 8*n_tasks) stop "pullc" if(rc /= 8*n_tasks) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0) rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "pullc" if(rc /= 4*n_tasks) stop "pullc"
else if(m_task==0) then else if(m_task==0) then
rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0)
if(rc /= 4) stop "pullc" if(rc /= 4) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, task_id, 4*n_tasks, 0) rc = f77_zmq_recv( zmq_socket_pull, task_id, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "pull4" if(rc /= 4*n_tasks) stop "pull4"
else else
rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0)
if(rc /= 4) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0) rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0)
if(rc /= 8*N_det*N_states*2) stop "pullc" if(rc /= 8*N_det*N_states*2) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, task_id, 4, 0) rc = f77_zmq_recv( zmq_socket_pull, task_id, 4, 0)
if(rc /= 4) stop "pull4" if(rc /= 4) stop "pull4"
end if end if
! Activate is zmq_socket_pull is a REP ! Activate is zmq_socket_pull is a REP
IRP_IF ZMQ_PUSH IRP_IF ZMQ_PUSH