10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-07-23 11:17:33 +02:00

buffered task_id send

This commit is contained in:
Yann Garniron 2018-09-03 14:18:04 +02:00
parent 99ea7948e0
commit 997a5a1265
2 changed files with 73 additions and 54 deletions

View File

@ -10,7 +10,7 @@ END_PROVIDER
implicit none
pt2_F(:) = 1
!pt2_F(:N_det_generators/1000*0+50) = 1
pt2_n_tasks_max = N_det_generators/100 + 1
pt2_n_tasks_max = 16 ! N_det_generators/100 + 1
if(N_det_generators < 256) then
pt2_minDetInFirstTeeth = 1
@ -176,7 +176,6 @@ subroutine ZMQ_dress(E, dress, delta_out, delta_s2_out, relative_error)
integer, external :: add_task_to_taskserver
double precision :: state_average_weight_save(N_states)
print *, "ZMQ_dress"
task(:) = CHAR(0)
allocate(delta(N_states,N_det), delta_s2(N_states, N_det))
state_average_weight_save(:) = state_average_weight(:)
@ -359,9 +358,9 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
integer(ZMQ_PTR), external :: new_zmq_pull_socket, zmq_abort
integer :: more
integer, allocatable :: task_id(:)
integer :: i, c, j, k, f, t, m, p, m_task
integer :: task_id, n_tasks
integer :: more, n_tasks
double precision :: E0, error, x, v, time, time0
double precision :: avg, eqt
double precision, external :: omp_get_wtime
@ -376,6 +375,7 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
found = .false.
delta = 0d0
delta_s2 = 0d0
allocate(task_id(pt2_n_tasks_max))
allocate(edI(N_states, N_det))
allocate(edI_task(N_states, N_det), edI_index(N_det))
allocate(breve_delta_m(N_states, N_det, 2))
@ -431,7 +431,7 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
stop 'Unable to delete tasks'
endif
else
if(task_id /= 0) stop "TASKID"
!if(task_id(1) /= 0) stop "TASKID"
!i= zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more)
exit
end if
@ -442,7 +442,6 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
dot_f(m_task) -= f
end if
end do
if (zmq_abort(zmq_to_qp_run_socket) == -1) then
call sleep(1)
if (zmq_abort(zmq_to_qp_run_socket) == -1) then
@ -458,10 +457,10 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
do while(more /= 0)
call pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, breve_delta_m, task_id, n_tasks)
if(task_id == 0) cycle
!if(task_id(0) == 0) cycle
if(m_task == 0) then
i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
else
else if(m_task < 0) then
i = zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more)
end if

View File

@ -31,7 +31,8 @@ subroutine run_dress_slave(thread,iproce,energy)
integer, allocatable :: f(:)
integer :: cp_sent, cp_done
integer :: cp_max(Nproc)
integer :: will_send, task_id, purge_task_id
integer :: will_send, task_id, purge_task_id, ntask_buf
integer, allocatable :: task_buf(:)
integer(kind=OMP_LOCK_KIND) :: lck_det(0:pt2_N_teeth+1)
integer(kind=OMP_LOCK_KIND) :: lck_sto(0:dress_N_cp+1), sending
double precision :: fac
@ -74,7 +75,7 @@ double precision :: time, time0
!$OMP PRIVATE(tmp,fac,m,l,t,sum_f,n_tasks) &
!$OMP PRIVATE(i,p,will_send, i_generator, subset, iproc) &
!$OMP PRIVATE(zmq_to_qp_run_socket, zmq_socket_push, worker_id) &
!$OMP PRIVATE(time, time0)
!$OMP PRIVATE(task_buf, ntask_buf,time, time0)
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
@ -85,7 +86,8 @@ double precision :: time, time0
end if
iproc = omp_get_thread_num()+1
allocate(breve_delta_m(N_states,N_det,2))
allocate(task_buf(pt2_n_tasks_max))
ntask_buf = 0
do while(cp_done > cp_sent .or. m /= dress_N_cp+1)
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task)
@ -110,6 +112,9 @@ double precision :: time, time0
if(purge_task_id == 0) then
purge_task_id = task_id
task_id = 0
else if(task_id /= 0) then
ntask_buf += 1
task_buf(ntask_buf) = task_id
end if
!$OMP END CRITICAL
@ -164,12 +169,17 @@ double precision :: time, time0
!$OMP ATOMIC
f(i_generator) += 1
!push bidon
if(task_id /= 0) then
call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_id, 1)
if(ntask_buf == size(task_buf)) then
call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf)
ntask_buf = 0
end if
end if
end do
!$OMP BARRIER
if(ntask_buf /= 0) then
call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf)
ntask_buf = 0
end if
!$OMP SINGLE
if(purge_task_id /= 0) then
do while(int(ending(1)) == dress_N_cp+1)
@ -220,32 +230,34 @@ subroutine push_dress_results(zmq_socket_push, m_task, f, edI_task, edI_index, b
integer(ZMQ_PTR), intent(in) :: zmq_socket_push
integer, intent(in) :: m_task, f, edI_index(n_tasks)
double precision, intent(in) :: breve_delta_m(N_states, N_det, 2), edI_task(n_tasks)
integer, intent(in) :: task_id, n_tasks
integer, intent(in) :: task_id(pt2_n_tasks_max), n_tasks
integer :: rc, i, j, k
rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push1"
rc = f77_zmq_send( zmq_socket_push, task_id, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push2"
rc = f77_zmq_send( zmq_socket_push, m_task, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push3"
if(m_task > 0) then
rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push1"
rc = f77_zmq_send( zmq_socket_push, f, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push4"
rc = f77_zmq_send( zmq_socket_push, edI_task, 8*n_tasks, ZMQ_SNDMORE)
if(rc /= 8*n_tasks) stop "push5"
rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, ZMQ_SNDMORE)
rc = f77_zmq_send( zmq_socket_push, edI_index, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "push6"
else if(m_task == 0) then
rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push1"
if(m_task < 0) then
rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, 0)
if(rc /= 8*N_det*N_states*2) stop "push6"
rc = f77_zmq_send( zmq_socket_push, task_id, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "push2"
else
rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8, 0)
if(rc /= 8) stop "push6"
rc = f77_zmq_send( zmq_socket_push, breve_delta_m, 8*N_det*N_states*2, ZMQ_SNDMORE)
if(rc /= 8*N_det*N_states*2) stop "push6"
rc = f77_zmq_send( zmq_socket_push, task_id, 4, 0)
if(rc /= 4) stop "push6"
end if
! Activate is zmq_socket_pull is a REP
IRP_IF ZMQ_PUSH
@ -264,18 +276,17 @@ subroutine pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, b
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
integer, intent(out) :: m_task, f, edI_index(N_det_generators)
double precision, intent(out) :: breve_delta_m(N_states, N_det, 2), edI_task(N_det_generators)
integer, intent(out) :: task_id, n_tasks
integer, intent(out) :: task_id(pt2_n_tasks_max), n_tasks
integer :: rc, i, j, k
rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0)
if(rc /= 4) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, task_id, 4, 0)
if(rc /= 4) stop "pull4"
rc = f77_zmq_recv( zmq_socket_pull, m_task, 4, 0)
if(rc /= 4) stop "pullc"
if(m_task > 0) then
rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0)
if(rc /= 4) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, f, 4, 0)
if(rc /= 4) stop "pullc"
@ -284,12 +295,21 @@ subroutine pull_dress_results(zmq_socket_pull, m_task, f, edI_task, edI_index, b
rc = f77_zmq_recv( zmq_socket_pull, edI_index, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "pullc"
if(m_task < 0) then
else if(m_task==0) then
rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0)
if(rc /= 4) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, task_id, 4*n_tasks, 0)
if(rc /= 4*n_tasks) stop "pull4"
else
rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8*N_det*N_states*2, 0)
if(rc /= 8*N_det*N_states*2) stop "pullc"
else
rc = f77_zmq_recv( zmq_socket_pull, breve_delta_m, 8, 0)
if(rc /= 8) stop "pullc"
rc = f77_zmq_recv( zmq_socket_pull, task_id, 4, 0)
if(rc /= 4) stop "pull4"
end if
! Activate is zmq_socket_pull is a REP
IRP_IF ZMQ_PUSH