Asynchronous ZMQ

This commit is contained in:
Anthony Scemama 2019-01-31 17:23:47 +01:00
parent 3cf722374b
commit 8bd05b2c3f
5 changed files with 298 additions and 38 deletions

View File

@ -350,7 +350,8 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc
double precision, allocatable :: nI(:,:), nI_task(:,:), T3(:)
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
integer, external :: zmq_delete_tasks
integer, external :: zmq_delete_tasks_async_send
integer, external :: zmq_delete_tasks_async_recv
integer, external :: zmq_abort
integer, external :: pt2_find_sample_lr
@ -364,7 +365,7 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc
integer, allocatable :: f(:)
logical, allocatable :: d(:)
logical :: do_exit, stop_now
logical :: do_exit, stop_now, sending
logical, external :: qp_stop
type(selection_buffer) :: b2
@ -372,6 +373,8 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc
double precision :: rss
double precision, external :: memory_of_double, memory_of_int
sending =.False.
rss = memory_of_int(pt2_n_tasks_max*2+N_det_generators*2)
rss += memory_of_double(N_states*N_det_generators)*3.d0
rss += memory_of_double(N_states*pt2_n_tasks_max)*3.d0
@ -499,12 +502,10 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc
end if
n += 1
else if(more == 0) then
!print *, 'more == 0'
exit
else
!print *, 'pulling...'
call pull_pt2_results(zmq_socket_pull, index, eI_task, vI_task, nI_task, task_id, n_tasks, b2)
if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then
if (zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_tasks,sending) == -1) then
stop 'Unable to delete tasks'
endif
do i=1,n_tasks
@ -518,7 +519,9 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc
! We assume the pulled buffer is sorted
if (b2%val(i) > b%mini) exit
end do
!print *, 'done pulling'
if (zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,more,sending) == -1) then
stop 'Unable to delete tasks'
endif
end if
end do
!print *, 'deleting b2'

View File

@ -17,8 +17,6 @@ subroutine run_pt2_slave(thread,iproc,energy)
integer(ZMQ_PTR), external :: new_zmq_push_socket
integer(ZMQ_PTR) :: zmq_socket_push
double precision, save :: mini_omp_shared ! Max value of b%mini, shared among omp tasks via save
type(selection_buffer) :: b
logical :: done, buffer_ready
@ -29,10 +27,8 @@ subroutine run_pt2_slave(thread,iproc,energy)
double precision :: rss
double precision, external :: memory_of_double, memory_of_int
integer :: bsize ! Size of selection buffers
logical :: sending
!$OMP CRITICAL
mini_omp_shared = 0.d0
!$OMP END CRITICAL
rss = memory_of_int(pt2_n_tasks_max)*67.d0
rss += memory_of_double(pt2_n_tasks_max)*(N_states*3)
call check_mem(rss,irp_here)
@ -56,6 +52,7 @@ subroutine run_pt2_slave(thread,iproc,energy)
buffer_ready = .False.
n_tasks = 1
sending = .False.
done = .False.
n_tasks = 1
do while (.not.done)
@ -92,7 +89,6 @@ subroutine run_pt2_slave(thread,iproc,energy)
variance(:,k) = 0.d0
norm(:,k) = 0.d0
b%cur = 0
b%mini = mini_omp_shared
!double precision :: time2
!call wall_time(time2)
call select_connected(i_generator(k),energy,pt2(1,k),variance(1,k),norm(1,k),b,subset(k),pt2_F(i_generator(k)))
@ -107,15 +103,14 @@ subroutine run_pt2_slave(thread,iproc,energy)
done = .true.
endif
call sort_selection_buffer(b)
call push_pt2_results(zmq_socket_push, i_generator, pt2, variance, norm, b, task_id, n_tasks)
!$OMP CRITICAL
mini_omp_shared = min(b%mini,mini_omp_shared) ! For sharing with other threads
!$OMP END CRITICAL
call push_pt2_results_async_recv(zmq_socket_push,sending)
call push_pt2_results_async_send(zmq_socket_push, i_generator, pt2, variance, norm, b, task_id, n_tasks,sending)
b%cur=0
! Try to adjust n_tasks around nproc/8 seconds per job
n_tasks = min(2*n_tasks,int( dble(n_tasks * nproc/8) / (time1 - time0 + 1.d0)))
end do
call push_pt2_results_async_recv(zmq_socket_push,sending)
integer, external :: disconnect_from_taskserver
do i=1,300
@ -233,6 +228,127 @@ IRP_ENDIF
end subroutine
subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, norm, b, task_id, n_tasks, sending)
use f77_zmq
use selection_types
implicit none
integer(ZMQ_PTR), intent(in) :: zmq_socket_push
double precision, intent(in) :: pt2(N_states,n_tasks)
double precision, intent(in) :: variance(N_states,n_tasks)
double precision, intent(in) :: norm(N_states,n_tasks)
integer, intent(in) :: n_tasks, index(n_tasks), task_id(n_tasks)
type(selection_buffer), intent(inout) :: b
logical, intent(inout) :: sending
integer :: rc
if (sending) then
print *, irp_here, ': sending is true'
stop -1
endif
sending = .True.
rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 4) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, index, 4*n_tasks, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 4*n_tasks) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, pt2, 8*N_states*n_tasks, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 8*N_states*n_tasks) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, variance, 8*N_states*n_tasks, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 8*N_states*n_tasks) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, norm, 8*N_states*n_tasks, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 8*N_states*n_tasks) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, task_id, n_tasks*4, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 4*n_tasks) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, b%cur, 4, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 4) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, b%val, 8*b%cur, ZMQ_SNDMORE)
if (rc == -1) then
return
else if(rc /= 8*b%cur) then
stop 'push'
endif
rc = f77_zmq_send( zmq_socket_push, b%det, bit_kind*N_int*2*b%cur, 0)
if (rc == -1) then
return
else if(rc /= N_int*2*8*b%cur) then
stop 'push'
endif
end subroutine
subroutine push_pt2_results_async_recv(zmq_socket_push,sending)
use f77_zmq
use selection_types
implicit none
integer(ZMQ_PTR), intent(in) :: zmq_socket_push
integer(ZMQ_PTR), intent(inout) :: sending
integer :: rc
if (.not.sending) return
! Activate is zmq_socket_push is a REQ
IRP_IF ZMQ_PUSH
IRP_ELSE
character*(2) :: ok
rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0)
if (rc == -1) then
return
else if ((rc /= 2).and.(ok(1:2) /= 'ok')) then
print *, irp_here//': error in receiving ok'
stop -1
endif
IRP_ENDIF
sending = .False.
end subroutine
subroutine pull_pt2_results(zmq_socket_pull, index, pt2, variance, norm, task_id, n_tasks, b)
use f77_zmq
use selection_types

View File

@ -228,7 +228,7 @@ subroutine select_singles_and_doubles(i_generator,hole_mask,particle_mask,fock_d
deallocate(exc_degree)
nmax=k-1
allocate(iorder(nmax))
do i=1,nmax
iorder(i) = i
@ -238,8 +238,8 @@ subroutine select_singles_and_doubles(i_generator,hole_mask,particle_mask,fock_d
allocate(preinteresting(0:32), prefullinteresting(0:32), &
interesting(0:32), fullinteresting(0:32))
preinteresting(0) = 0
prefullinteresting(0) = 0
preinteresting(:) = 0
prefullinteresting(:) = 0
do i=1,N_int
negMask(i,1) = not(psi_det_generators(i,1,i_generator))
@ -642,13 +642,11 @@ subroutine splash_pq(mask, sp, det, i_gen, N_sel, bannedOrb, banned, mat, intere
negMask(i,2) = not(mask(i,2))
end do
do i=1, N_sel ! interesting(0)
!i = interesting(ii)
do i=1, N_sel
if (interesting(i) < 0) then
stop 'prefetch interesting(i) and det(i)'
endif
mobMask(1,1) = iand(negMask(1,1), det(1,1,i))
mobMask(1,2) = iand(negMask(1,2), det(1,2,i))
nt = popcnt(mobMask(1, 1)) + popcnt(mobMask(1, 2))
@ -679,10 +677,10 @@ subroutine splash_pq(mask, sp, det, i_gen, N_sel, bannedOrb, banned, mat, intere
end if
end if
call bitstring_to_list_in_selection(mobMask(1,1), p(1,1), p(0,1), N_int)
call bitstring_to_list_in_selection(mobMask(1,2), p(1,2), p(0,2), N_int)
if (interesting(i) >= i_gen) then
call bitstring_to_list_in_selection(mobMask(1,1), p(1,1), p(0,1), N_int)
call bitstring_to_list_in_selection(mobMask(1,2), p(1,2), p(0,2), N_int)
perMask(1,1) = iand(mask(1,1), not(det(1,1,i)))
perMask(1,2) = iand(mask(1,2), not(det(1,2,i)))
do j=2,N_int
@ -701,9 +699,14 @@ subroutine splash_pq(mask, sp, det, i_gen, N_sel, bannedOrb, banned, mat, intere
else
call get_d0(det(1,1,i), phasemask, bannedOrb, banned, mat, mask, h, p, sp, psi_selectors_coef_transp(1, interesting(i)))
end if
else
if(nt == 4) call past_d2(banned, p, sp)
if(nt == 3) call past_d1(bannedOrb, p)
else if(nt == 4) then
call bitstring_to_list_in_selection(mobMask(1,1), p(1,1), p(0,1), N_int)
call bitstring_to_list_in_selection(mobMask(1,2), p(1,2), p(0,2), N_int)
call past_d2(banned, p, sp)
else if(nt == 3) then
call bitstring_to_list_in_selection(mobMask(1,1), p(1,1), p(0,1), N_int)
call bitstring_to_list_in_selection(mobMask(1,2), p(1,2), p(0,2), N_int)
call past_d1(bannedOrb, p)
end if
end do

View File

@ -139,6 +139,8 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze,
! Run tasks
! ---------
logical :: sending
sending=.False.
allocate(v_t(N_st,N_det), s_t(N_st,N_det))
do
@ -158,9 +160,11 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze,
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then
print *, irp_here, 'Unable to send task_done'
endif
call davidson_push_results(zmq_socket_push, v_t, s_t, imin, imax, task_id)
call davidson_push_results_async_recv(zmq_socket_push, sending)
call davidson_push_results_async_send(zmq_socket_push, v_t, s_t, imin, imax, task_id, sending)
end do
deallocate(u_t,v_t, s_t)
call davidson_push_results_async_recv(zmq_socket_push, sending)
end subroutine
@ -210,6 +214,73 @@ IRP_ENDIF
end subroutine
subroutine davidson_push_results_async_send(zmq_socket_push, v_t, s_t, imin, imax, task_id,sending)
use f77_zmq
implicit none
BEGIN_DOC
! Push the results of $H | U \rangle$ from a worker to the master.
END_DOC
integer(ZMQ_PTR) ,intent(in) :: zmq_socket_push
integer ,intent(in) :: task_id, imin, imax
double precision ,intent(in) :: v_t(N_states_diag,N_det)
double precision ,intent(in) :: s_t(N_states_diag,N_det)
logical ,intent(inout) :: sending
integer :: rc, sz
integer*8 :: rc8
if (sending) then
print *, irp_here, ': sending=true'
stop -1
endif
sending = .True.
sz = (imax-imin+1)*N_states_diag
rc = f77_zmq_send( zmq_socket_push, task_id, 4, ZMQ_SNDMORE)
if(rc /= 4) stop 'davidson_push_results failed to push task_id'
rc = f77_zmq_send( zmq_socket_push, imin, 4, ZMQ_SNDMORE)
if(rc /= 4) stop 'davidson_push_results failed to push imin'
rc = f77_zmq_send( zmq_socket_push, imax, 4, ZMQ_SNDMORE)
if(rc /= 4) stop 'davidson_push_results failed to push imax'
rc8 = f77_zmq_send8( zmq_socket_push, v_t(1,imin), 8_8*sz, ZMQ_SNDMORE)
if(rc8 /= 8_8*sz) stop 'davidson_push_results failed to push vt'
rc8 = f77_zmq_send8( zmq_socket_push, s_t(1,imin), 8_8*sz, 0)
if(rc8 /= 8_8*sz) stop 'davidson_push_results failed to push st'
end subroutine
subroutine davidson_push_results_async_recv(zmq_socket_push,sending)
use f77_zmq
implicit none
BEGIN_DOC
! Push the results of $H | U \rangle$ from a worker to the master.
END_DOC
integer(ZMQ_PTR) ,intent(in) :: zmq_socket_push
logical ,intent(inout) :: sending
integer :: rc
if (.not.sending) return
! Activate is zmq_socket_push is a REQ
IRP_IF ZMQ_PUSH
IRP_ELSE
character*(2) :: ok
rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0)
if ((rc /= 2).and.(ok(1:2)/='ok')) then
print *, irp_here, ': f77_zmq_recv( zmq_socket_push, ok, 2, 0)'
stop -1
endif
IRP_ENDIF
sending = .False.
end subroutine
subroutine davidson_pull_results(zmq_socket_pull, v_t, s_t, imin, imax, task_id)
@ -275,22 +346,28 @@ subroutine davidson_collector(zmq_to_qp_run_socket, zmq_socket_pull, v0, s0, sze
integer :: more, task_id, imin, imax
double precision, allocatable :: v_t(:,:), s_t(:,:)
logical :: sending
integer :: i,j
integer, external :: zmq_delete_task_async_send
integer, external :: zmq_delete_task_async_recv
allocate(v_t(N_st,N_det), s_t(N_st,N_det))
v0 = 0.d0
s0 = 0.d0
more = 1
sending = .False.
do while (more == 1)
call davidson_pull_results(zmq_socket_pull, v_t, s_t, imin, imax, task_id)
if (zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending) == -1) then
stop 'Unable to delete task'
endif
do j=1,N_st
do i=imin,imax
v0(i,j) = v0(i,j) + v_t(j,i)
s0(i,j) = s0(i,j) + s_t(j,i)
enddo
enddo
integer, external :: zmq_delete_task
if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then
if (zmq_delete_task_async_recv(zmq_to_qp_run_socket,more,sending) == -1) then
stop 'Unable to delete task'
endif
end do

View File

@ -246,7 +246,7 @@ IRP_ENDIF
! stop 'Unable to set ZMQ_RCVBUF on pull socket'
! endif
rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,8,4)
rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,50,4)
if (rc /= 0) then
stop 'Unable to set ZMQ_RCVHWM on pull socket'
endif
@ -1085,6 +1085,62 @@ integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,mo
endif
end
integer function zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending)
use f77_zmq
implicit none
BEGIN_DOC
! When a task is done, it has to be removed from the list of tasks on the qp_run
! queue. This guarantees that the results have been received in the pull.
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer, intent(in) :: task_id
logical, intent(inout) :: sending
integer :: rc
character*(512) :: message
if (sending) then
print *, irp_here, ': sending=true'
stop -1
endif
zmq_delete_task_async_send = 0
write(message,*) 'del_task ', zmq_state, task_id
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
if (rc /= len(trim(message))) then
zmq_delete_task_async_send = -1
return
endif
sending = .True.
end
integer function zmq_delete_task_async_recv(zmq_to_qp_run_socket,more,sending)
use f77_zmq
implicit none
BEGIN_DOC
! When a task is done, it has to be removed from the list of tasks on the qp_run
! queue. This guarantees that the results have been received in the pull.
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer, intent(out) :: more
logical, intent(inout) :: sending
integer :: rc
character*(512) :: message
character*(64) :: reply
if (.not.sending) return
sending = .False.
reply = ''
rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0)
if (reply(16:19) == 'more') then
more = 1
else if (reply(16:19) == 'done') then
more = 0
else
zmq_delete_task_async_recv = -1
return
endif
end
integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
use f77_zmq
implicit none
@ -1128,7 +1184,7 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n
endif
end
integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_tasks,sending)
use f77_zmq
implicit none
BEGIN_DOC
@ -1136,13 +1192,17 @@ integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,zmq_socket_pul
! queue. This guarantees that the results have been received in the pull.
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_socket_pull
integer, intent(in) :: n_tasks, task_id(n_tasks)
integer, intent(in) :: more
integer, intent(inout) :: sending
integer :: rc, k
character*(64) :: fmt, reply
character(LEN=:), allocatable :: message
if (sending) then
print *, irp_here, ': sending is true'
stop -1
endif
sending = .True.
zmq_delete_tasks_async_send = 0
allocate(character(LEN=64+n_tasks*12) :: message)
@ -1162,7 +1222,7 @@ integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,zmq_socket_pul
end
integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,more,sending)
use f77_zmq
implicit none
BEGIN_DOC
@ -1170,12 +1230,12 @@ integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,zmq_socket_pul
! queue. This guarantees that the results have been received in the pull.
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_socket_pull
integer, intent(in) :: n_tasks, task_id(n_tasks)
integer, intent(out) :: more
integer, intent(inout) :: sending
integer :: rc
character*(64) :: reply
if (.not.sending) return
zmq_delete_tasks_async_recv = 0
reply = ''
@ -1188,6 +1248,7 @@ integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,zmq_socket_pul
else
zmq_delete_tasks_async_recv = -1
endif
sending = .False.
end