10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-12-25 05:43:47 +01:00

reduce tasks

This commit is contained in:
Yann Garniron 2018-04-27 12:41:39 +02:00
parent 46450f0826
commit 274bb043c2
3 changed files with 211 additions and 85 deletions

View File

@ -10,7 +10,7 @@ subroutine ZMQ_dress(E, dress, delta, delta_s2, relative_error)
implicit none
character(len=64000) :: task
character(len=3200) :: temp
integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull
integer, external :: omp_get_thread_num
double precision, intent(in) :: E(N_states), relative_error
@ -27,6 +27,8 @@ subroutine ZMQ_dress(E, dress, delta, delta_s2, relative_error)
double precision :: state_average_weight_save(N_states)
task(:) = CHAR(0)
temp(:) = CHAR(0)
state_average_weight_save(:) = state_average_weight(:)
do dress_stoch_istate=1,N_states
SOFT_TOUCH dress_stoch_istate
@ -63,37 +65,68 @@ subroutine ZMQ_dress(E, dress, delta, delta_s2, relative_error)
endif
integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket
integer :: ipos
integer :: ipos, sz
integer :: block(50), block_i, cur_tooth_reduce, ntas
logical :: flushme
block = 0
block_i = 0
cur_tooth_reduce = 0
ipos=1
do i=1,N_dress_jobs
if(dress_jobs(i) > fragment_first) then
write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') 0, dress_jobs(i)
ipos += 20
if (ipos > 63980) then
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
stop 'Unable to add task to task server'
endif
ipos=1
endif
else
do j=1,fragment_count
write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') j, dress_jobs(i)
ipos += 20
if (ipos > 63980) then
ntas = 0
do i=1,N_dress_jobs+1
flushme = (i==N_dress_jobs+1 .or. block_i == size(block))
if(.not. flushme) flushme = (tooth_reduce(dress_jobs(i)) == 0 .or. tooth_reduce(dress_jobs(i)) /= cur_tooth_reduce)
if(flushme .and. block_i > 0) then
if(block(1) > fragment_first) then
ntas += 1
write(temp, '(I9,1X,60(I9,1X))') 0, block(:block_i)
sz = len(trim(temp))+1
temp(sz:sz) = '|'
!write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') 0, dress_jobs(i)
write(task(ipos:ipos+sz), *) temp(:sz)
!ipos += 20
ipos += sz+1
if (ipos > 63000 .or. i==N_dress_jobs+1) then
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
stop 'Unable to add task to task server'
endif
ipos=1
endif
end do
else
if(block_i /= 1) stop "reduced fragmented dets"
do j=1,fragment_count
ntas += 1
write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') j, block(1)
ipos += 20
if (ipos > 63000 .or. i==N_dress_jobs+1) then
ntas += 1
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
stop 'Unable to add task to task server'
endif
ipos=1
endif
end do
end if
block_i = 0
block = 0
end if
if(i /= N_dress_jobs+1) then
cur_tooth_reduce = tooth_reduce(dress_jobs(i))
block_i += 1
block(block_i) = dress_jobs(i)
end if
end do
if (ipos > 1) then
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
stop 'Unable to add task to task server'
endif
endif
print *, "ACTUAL TASK NUM", ntas
!stop
!if (ipos > 1) then
! if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
! stop 'Unable to add task to task server'
! endif
!endif
if (zmq_set_running(zmq_to_qp_run_socket) == -1) then
print *, irp_here, ': Failed in zmq_set_running'
endif
@ -134,7 +167,7 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
implicit none
integer, parameter :: delta_loc_N = 2
integer, parameter :: delta_loc_N = 1
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
integer, intent(in) :: istate
@ -205,22 +238,26 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
timeLast = time0
cur_cp = 0
old_cur_cp = 0
logical :: loop
integer :: felem, felem_loc
logical :: loop, last
integer :: felem(0:delta_loc_N), felem_loc
loop = .true.
felem = N_det+1
pullLoop : do while (loop)
call pull_dress_results(zmq_socket_pull, ind, delta_loc(1,1,1,delta_loc_cur), int_buf, double_buf, det_buf, N_buf, task_id, felem_loc)
call pull_dress_results(zmq_socket_pull, ind, last, delta_loc(1,1,1,delta_loc_cur), int_buf, double_buf, det_buf, N_buf, task_id, felem_loc)
call dress_pulled(ind, int_buf, double_buf, det_buf, N_buf)
felem = min(felem_loc, felem)
!print *, "felem", felem_loc, felem
felem(delta_loc_cur) = felem_loc
felem(0) = min(felem_loc, felem(0))
dress_mwen(:) = 0d0
integer, external :: zmq_delete_tasks
if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) == -1) then
stop 'Unable to delete tasks'
endif
if(more == 0) loop = .false.
if(last) then
if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) == -1) then
stop 'Unable to delete tasks'
endif
if(more == 0) loop = .false.
end if
do i_state=1,N_states
@ -245,29 +282,36 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
end if
end do
if(ok) then
do i=felem,N_det_generators
if(ok .and. .false.) then
do i=felem(0),N_det_generators
do is=1,N_states
cp(is,i,j,1) += delta_loc(is,i,1,1) * fac(1) &
+ delta_loc(is,i,1,2) * fac(2)
cp(is,i,j,1) += delta_loc(is,i,1,1) * fac(1)! &
!+ delta_loc(is,i,1,2) * fac(2) &
!+ delta_loc(is,i,1,3) * fac(3) &
!+ delta_loc(is,i,1,4) * fac(4) &
!+ delta_loc(:,i,1,5) * fac(5) &
!+ delta_loc(:,i,1,6) * fac(6) &
!+ delta_loc(:,i,1,7) * fac(7) &
!+ delta_loc(:,i,1,8) * fac(8)
!+ delta_loc(is,i,1,4) * fac(4) &
!+ delta_loc(is,i,1,5) * fac(5) &
!+ delta_loc(is,i,1,6) * fac(6) &
!+ delta_loc(is,i,1,7) * fac(7) &
!+ delta_loc(is,i,1,8) * fac(8)
end do
end do
cp(is,i,j,2) += delta_loc(is,i,2,1) * fac(1) &
+ delta_loc(is,i,2,2) * fac(2)
do i=felem(0),N_det_generators
do is=1,N_states
cp(is,i,j,2) += delta_loc(is,i,2,1) * fac(1)! &
!+ delta_loc(is,i,2,2) * fac(2) &
!+ delta_loc(is,i,2,3) * fac(3) &
!+ delta_loc(is,i,2,4) * fac(4) &
!+ delta_loc(:,i,2,5) * fac(5) &
!+ delta_loc(:,i,2,6) * fac(6) &
!+ delta_loc(:,i,2,7) * fac(7) &
!+ delta_loc(:,i,2,8) * fac(8)
!+ delta_loc(is,i,2,4) * fac(4) &
!+ delta_loc(is,i,2,5) * fac(5) &
!+ delta_loc(is,i,2,6) * fac(6) &
!+ delta_loc(is,i,2,7) * fac(7) &
!+ delta_loc(is,i,2,8) * fac(8)
end do
end do
end if
end if
end do
do i=1,delta_loc_cur
@ -279,14 +323,14 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
fracted = (toothMwen /= 0)
if(fracted) fracted = (ind == first_det_of_teeth(toothMwen))
if(fracted) then
delta_det(1:N_states,felem:N_det,toothMwen-1, 1) = delta_det(1:N_states,felem:N_det,toothMwen-1, 1) + delta_loc(1:N_states,felem:N_det,1,i) * (1d0-fractage(toothMwen))
delta_det(1:N_states,felem:N_det,toothMwen-1, 2) = delta_det(1:N_states,felem:N_det,toothMwen-1, 2) + delta_loc(1:N_states,felem:N_det,2,i) * (1d0-fractage(toothMwen))
delta_det(1:N_states,felem:N_det,toothMwen , 1) = delta_det(1:N_states,felem:N_det,toothMwen , 1) + delta_loc(1:N_states,felem:N_det,1,i) * (fractage(toothMwen))
delta_det(1:N_states,felem:N_det,toothMwen , 2) = delta_det(1:N_states,felem:N_det,toothMwen , 2) + delta_loc(1:N_states,felem:N_det,2,i) * (fractage(toothMwen))
else
delta_det(1:N_states,felem:N_det,toothMwen , 1) = delta_det(1:N_states,felem:N_det,toothMwen , 1) + delta_loc(1:N_states,felem:N_det,1,i)
delta_det(1:N_states,felem:N_det,toothMwen , 2) = delta_det(1:N_states,felem:N_det,toothMwen , 2) + delta_loc(1:N_states,felem:N_det,2,i)
if(fracted .and. .false.) then
delta_det(1:N_states,felem(i):N_det,toothMwen-1, 1) = delta_det(1:N_states,felem(i):N_det,toothMwen-1, 1) + delta_loc(1:N_states,felem(i):N_det,1,i) * (1d0-fractage(toothMwen))
delta_det(1:N_states,felem(i):N_det,toothMwen-1, 2) = delta_det(1:N_states,felem(i):N_det,toothMwen-1, 2) + delta_loc(1:N_states,felem(i):N_det,2,i) * (1d0-fractage(toothMwen))
delta_det(1:N_states,felem(i):N_det,toothMwen , 1) = delta_det(1:N_states,felem(i):N_det,toothMwen , 1) + delta_loc(1:N_states,felem(i):N_det,1,i) * (fractage(toothMwen))
delta_det(1:N_states,felem(i):N_det,toothMwen , 2) = delta_det(1:N_states,felem(i):N_det,toothMwen , 2) + delta_loc(1:N_states,felem(i):N_det,2,i) * (fractage(toothMwen))
else if(.false.) then
delta_det(1:N_states,felem(i):N_det,toothMwen , 1) = delta_det(1:N_states,felem(i):N_det,toothMwen , 1) + delta_loc(1:N_states,felem(i):N_det,1,i)
delta_det(1:N_states,felem(i):N_det,toothMwen , 2) = delta_det(1:N_states,felem(i):N_det,toothMwen , 2) + delta_loc(1:N_states,felem(i):N_det,2,i)
end if
parts_to_get(ind) -= 1
@ -295,7 +339,7 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
total_computed += 1
end if
end do
felem = N_det_generators+1
felem = N_det+1
delta_loc_cur = 1
else
delta_loc_cur += 1
@ -320,7 +364,7 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2,
exit
end if
end do
if(cur_cp == 0) cycle pullLoop
if(cur_cp == 0 .or. (cur_cp == old_cur_cp .and. total_computed /= N_det_generators)) cycle pullLoop
double precision :: su, su2, eqt, avg, E0, val
integer, external :: zmq_abort
@ -436,13 +480,17 @@ END_PROVIDER
&BEGIN_PROVIDER [ integer, N_dress_jobs ]
&BEGIN_PROVIDER [ integer, dress_jobs, (N_det_generators) ]
&BEGIN_PROVIDER [ double precision, comb, (N_det_generators) ]
&BEGIN_PROVIDER [ integer, tooth_reduce, (N_det_generators) ]
implicit none
logical, allocatable :: computed(:), comp_filler(:)
integer :: i, j, last_full, dets(comb_teeth)
integer :: k, l, cur_cp, under_det(comb_teeth+1)
integer :: cp_limit(N_cps_max)
integer, allocatable :: iorder(:), first_cp(:)
integer, allocatable :: filler(:)
integer :: nfiller, lfiller, cfiller
logical :: fracted
allocate(filler(n_det_generators))
allocate(iorder(N_det_generators), first_cp(N_cps_max+1))
@ -455,6 +503,15 @@ END_PROVIDER
comp_filler = .false.
computed = .false.
cps_N = 1d0
tooth_reduce = 0
integer :: fragsize
fragsize = N_det_generators / ((N_cps_max+1)*(N_cps_max+2)/2)
do i=1,N_cps_max
cp_limit(i) = fragsize * i * (i+1) / 2
end do
print *, "CP_LIMIT", cp_limit
N_dress_jobs = first_det_of_comb - 1
do i=1, N_dress_jobs
@ -471,7 +528,8 @@ END_PROVIDER
!DIR$ FORCEINLINE
call add_comb(comb(i), computed, cps(1, cur_cp), N_dress_jobs, dress_jobs)
if(N_dress_jobs / gen_per_cp > (cur_cp-1) .or. N_dress_jobs == N_det_generators) then
!if(N_dress_jobs / gen_per_cp > (cur_cp-1) .or. N_dress_jobs == N_det_generators) then
if(N_dress_jobs > cp_limit(cur_cp) .or. N_dress_jobs == N_det_generators) then
first_cp(cur_cp+1) = N_dress_jobs
done_cp_at(N_dress_jobs) = cur_cp
cps_N(cur_cp) = dfloat(i)
@ -561,11 +619,32 @@ END_PROVIDER
end do
iorder = -1
cps(:, N_cp) = 0d0
iloop : do i=fragment_first+1,N_det_generators
k = tooth_of_det(i)
if(k == 0) cycle
if (i == first_det_of_teeth(k)) cycle
do j=1,N_cp
if(cps(i, j) /= 0d0) cycle iloop
end do
tooth_reduce(i) = k
end do iloop
do i=1,N_det_generators
if(tooth_reduce(dress_jobs(i)) == 0) dress_jobs(i) = dress_jobs(i)+N_det*2
end do
do i=1,N_cp-1
call isort(dress_jobs(first_cp(i)+1:first_cp(i+1)),iorder,first_cp(i+1)-first_cp(i))
end do
cps(:, N_cp) = 0d0
do i=1,N_det_generators
if(dress_jobs(i) > N_det) dress_jobs(i) = dress_jobs(i) - N_det*2
end do
END_PROVIDER

View File

@ -15,10 +15,10 @@ subroutine run_dress_slave(thread,iproc,energy)
double precision, intent(in) :: energy(N_states_diag)
integer, intent(in) :: thread, iproc
integer :: rc, i, subset, i_generator
integer :: rc, i, subset, i_generator(60)
integer :: worker_id, task_id, ctask, ltask
character*(512) :: task
character*(5120) :: task
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
@ -40,7 +40,9 @@ subroutine run_dress_slave(thread,iproc,energy)
double precision, allocatable :: double_buf(:)
integer(bit_kind), allocatable :: det_buf(:,:,:)
integer :: N_buf(3)
logical :: last
task(:) = CHAR(0)
allocate(int_buf(N_dress_int_buffer))
allocate(double_buf(N_dress_double_buffer))
@ -62,13 +64,22 @@ subroutine run_dress_slave(thread,iproc,energy)
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task)
if(task_id /= 0) then
task = trim(task)//' 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0'
i_generator = 0
read (task,*) subset, i_generator
if(i_generator(size(i_generator)) /= 0) stop "i_generator buffer too small"
delta_ij_loc = 0d0
call generator_start(i_generator, iproc)
call alpha_callback(delta_ij_loc, i_generator, subset, iproc)
call generator_done(i_generator, int_buf, double_buf, det_buf, N_buf, iproc)
i=1
do while(i_generator(i) /= 0)
call generator_start(i_generator(i), iproc)
call alpha_callback(delta_ij_loc, i_generator(i), subset, iproc)
call generator_done(i_generator(i), int_buf, double_buf, det_buf, N_buf, iproc)
last = (i_generator(i+1) == 0)
call push_dress_results(zmq_socket_push, i_generator(i), last, delta_ij_loc, int_buf, double_buf, det_buf, N_buf, task_id)
i += 1
end do
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id)
call push_dress_results(zmq_socket_push, i_generator, delta_ij_loc, int_buf, double_buf, det_buf, N_buf, task_id)
else
exit
end if
@ -91,19 +102,24 @@ end subroutine
!subroutine pull_dress_results(zmq_socket_pull, ind, delta_loc, int_buf, double_buf, det_buf, N_buf, task_id, felem)
subroutine push_dress_results(zmq_socket_push, ind, delta_loc, int_buf, double_buf, det_buf, N_buf, task_id)
subroutine push_dress_results(zmq_socket_push, ind, last, delta_loc, int_buf, double_buf, det_buf, N_bufi, task_id)
use f77_zmq
implicit none
integer(ZMQ_PTR), intent(in) :: zmq_socket_push
double precision, intent(in) :: delta_loc(N_states, N_det, 2)
double precision, intent(in) :: double_buf(*)
logical, intent(in) :: last
integer, intent(in) :: int_buf(*)
integer(bit_kind), intent(in) :: det_buf(N_int, 2, *)
integer, intent(in) :: N_buf(3)
integer, intent(in) :: N_bufi(3)
integer :: N_buf(3)
integer, intent(in) :: ind, task_id
integer :: rc, i, j, felem
double precision :: vast_emptiness(N_states)
integer :: fillness
vast_emptiness = 0d0
felem = 1
dloop : do i=1, N_det
@ -114,21 +130,48 @@ subroutine push_dress_results(zmq_socket_push, ind, delta_loc, int_buf, double_b
end if
end do
end do dloop
if(last) then
fillness = 0
do i=felem,N_det
do j=1,N_states
if(delta_loc(j,i,1) /= 0d0 .or. delta_loc(j,i,2) /= 0d0) then
fillness += 1
end if
end do
end do
!print *, "FILLNESS", float(fillness) / float((N_det-felem+1)*N_states)
end if
rc = f77_zmq_send( zmq_socket_push, ind, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push"
rc = f77_zmq_send( zmq_socket_push, felem, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push"
rc = f77_zmq_send( zmq_socket_push, delta_loc(1,felem,1), 8*N_states*(N_det-felem+1), ZMQ_SNDMORE)
if(rc /= 8*N_states*(N_det+1-felem)) stop "push"
rc = f77_zmq_send( zmq_socket_push, delta_loc(1,felem,2), 8*N_states*(N_det-felem+1), ZMQ_SNDMORE)
if(rc /= 8*N_states*(N_det+1-felem)) stop "push"
rc = f77_zmq_send( zmq_socket_push, last, 1, ZMQ_SNDMORE)
if(rc /= 1) stop "push"
if(last) then
rc = f77_zmq_send( zmq_socket_push, felem, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push"
rc = f77_zmq_send( zmq_socket_push, delta_loc(1,felem,1), 8*N_states*(N_det-felem+1), ZMQ_SNDMORE)
if(rc /= 8*N_states*(N_det+1-felem)) stop "push"
rc = f77_zmq_send( zmq_socket_push, delta_loc(1,felem,2), 8*N_states*(N_det-felem+1), ZMQ_SNDMORE)
if(rc /= 8*N_states*(N_det+1-felem)) stop "push"
else
rc = f77_zmq_send( zmq_socket_push, N_det, 4, ZMQ_SNDMORE)
if(rc /= 4) stop "push"
rc = f77_zmq_send( zmq_socket_push, vast_emptiness, 8*N_states, ZMQ_SNDMORE)
if(rc /= 8*N_states) stop "push"
rc = f77_zmq_send( zmq_socket_push, vast_emptiness, 8*N_states, ZMQ_SNDMORE)
if(rc /= 8*N_states) stop "push"
end if
N_buf = N_bufi
rc = f77_zmq_send( zmq_socket_push, N_buf, 4*3, ZMQ_SNDMORE)
if(rc /= 4*3) stop "push5"
@ -137,7 +180,7 @@ subroutine push_dress_results(zmq_socket_push, ind, delta_loc, int_buf, double_b
if(N_buf(2) > N_dress_double_buffer) stop "run_dress_slave N_buf bad size?"
if(N_buf(3) > N_dress_det_buffer) stop "run_dress_slave N_buf bad size?"
if(N_buf(1) > 0) then
rc = f77_zmq_send( zmq_socket_push, int_buf, 4*N_buf(1), ZMQ_SNDMORE)
if(rc /= 4*N_buf(1)) stop "push6"
@ -166,10 +209,11 @@ IRP_ENDIF
end subroutine
subroutine pull_dress_results(zmq_socket_pull, ind, delta_loc, int_buf, double_buf, det_buf, N_buf, task_id, felem)
subroutine pull_dress_results(zmq_socket_pull, ind, last, delta_loc, int_buf, double_buf, det_buf, N_buf, task_id, felem)
use f77_zmq
implicit none
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
logical, intent(out) :: last
double precision, intent(inout) :: delta_loc(N_states, N_det, 2)
double precision, intent(out) :: double_buf(*)
integer, intent(out) :: int_buf(*)
@ -183,6 +227,9 @@ subroutine pull_dress_results(zmq_socket_pull, ind, delta_loc, int_buf, double_b
rc = f77_zmq_recv( zmq_socket_pull, ind, 4, 0)
if(rc /= 4) stop "pulla"
rc = f77_zmq_recv( zmq_socket_pull, last, 1, 0)
if(rc /= 1) stop "pulla"
rc = f77_zmq_recv( zmq_socket_pull, felem, 4, 0)
if(rc /= 4) stop "pullb"

View File

@ -62,7 +62,7 @@ subroutine generator_done(i_gen, int_buf, double_buf, det_buf, N_buf, iproc)
if(sb(iproc)%cur > 0) then
!$OMP CRITICAL
call merge_selection_buffers(sb(iproc), mini_sb)
call sort_selection_buffer(mini_sb)
!call sort_selection_buffer(mini_sb)
do i=1,Nproc
sb(i)%mini = min(sb(i)%mini, mini_sb%mini)
end do
@ -203,7 +203,7 @@ subroutine undress_with_alpha(old_generators, old_det_gen, alpha, n_alpha)
!global_sum_alpha2(:) -= c_alpha(:,1)
print *, "SUM ALPHA2 POST", c_alpha(:,1)
do i=1,N_states
delta_ij_tmp(i,:,:) = delta_ij_tmp(i,:,:) / (1d0 + global_sum_alpha2(i))
! delta_ij_tmp(i,:,:) = delta_ij_tmp(i,:,:) / (1d0 + global_sum_alpha2(i))
end do
global_sum_alpha2 = 0d0
end subroutine