From df3a4fce2b57787e51cba5a991753745a8d3a12a Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Mon, 4 Feb 2019 13:20:24 +0100 Subject: [PATCH] Optimized communications --- src/cipsi/pt2_stoch_routines.irp.f | 24 +-- src/cipsi/run_pt2_slave.irp.f | 262 ++++++++++++++-------------- src/cipsi/selection_buffer.irp.f | 5 + src/cipsi/slave_cipsi.irp.f | 1 + src/determinants/slater_rules.irp.f | 15 +- src/zmq/utils.irp.f | 2 +- 6 files changed, 163 insertions(+), 146 deletions(-) diff --git a/src/cipsi/pt2_stoch_routines.irp.f b/src/cipsi/pt2_stoch_routines.irp.f index ea339bee..0ca69bf9 100644 --- a/src/cipsi/pt2_stoch_routines.irp.f +++ b/src/cipsi/pt2_stoch_routines.irp.f @@ -16,7 +16,7 @@ END_PROVIDER call write_int(6,pt2_n_tasks_max,'pt2_n_tasks_max') pt2_F(:) = int(sqrt(float(pt2_n_tasks_max))) - do i=1,pt2_n_0(pt2_N_teeth/4) + do i=1,pt2_n_0(1+pt2_N_teeth/4) pt2_F(i) = pt2_n_tasks_max enddo do i=pt2_n_0(pt2_N_teeth-pt2_N_teeth/4), N_det_generators @@ -115,7 +115,6 @@ subroutine ZMQ_pt2(E, pt2,relative_error, error, variance, norm, N_in) integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull integer, intent(in) :: N_in - integer, external :: omp_get_thread_num double precision, intent(in) :: relative_error, E(N_states) double precision, intent(out) :: pt2(N_states),error(N_states) double precision, intent(out) :: variance(N_states),norm(N_states) @@ -123,7 +122,6 @@ subroutine ZMQ_pt2(E, pt2,relative_error, error, variance, norm, N_in) integer :: i, N - double precision, external :: omp_get_wtime double precision :: state_average_weight_save(N_states), w(N_states,4) integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket type(selection_buffer) :: b @@ -132,7 +130,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, error, variance, norm, N_in) PROVIDE psi_bilinear_matrix_rows psi_det_sorted_order psi_bilinear_matrix_order PROVIDE psi_bilinear_matrix_transp_rows_loc psi_bilinear_matrix_transp_columns PROVIDE psi_bilinear_matrix_transp_order psi_selectors_coef_transp psi_det_sorted - PROVIDE psi_det_hii + PROVIDE psi_det_hii N_generators_bitmask if (s2_eig) then PROVIDE psi_occ_pattern_hii det_to_occ_pattern @@ -148,6 +146,10 @@ subroutine ZMQ_pt2(E, pt2,relative_error, error, variance, norm, N_in) N = max(N_in,1) * N_states state_average_weight_save(:) = state_average_weight(:) + if (int(N,8)*2_8 > huge(1)) then + print *, irp_here, ': integer too large' + stop -1 + endif call create_selection_buffer(N, N*2, b) ASSERT (associated(b%det)) ASSERT (associated(b%val)) @@ -291,6 +293,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, error, variance, norm, N_in) print '(A)', ' Samples Energy Stat. Err Variance Norm Seconds ' print '(A)', '========== ================= =========== =============== =============== =================' + PROVIDE global_selection_buffer !$OMP PARALLEL DEFAULT(shared) NUM_THREADS(nproc_target+1) & !$OMP PRIVATE(i) i = omp_get_thread_num() @@ -338,6 +341,7 @@ subroutine pt2_slave_inproc(i) implicit none integer, intent(in) :: i + PROVIDE global_selection_buffer call run_pt2_slave(1,i,pt2_e0_denominator) end @@ -371,7 +375,6 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc integer, allocatable :: task_id(:) integer, allocatable :: index(:) - double precision, external :: omp_get_wtime double precision :: v, x, x2, x3, avg, avg2, avg3, eqt, E0, v0, n0 double precision :: time, time1, time0 @@ -437,7 +440,6 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc stop_now = .false. do while (n <= N_det_generators) if(f(pt2_J(n)) == 0) then -!print *, 'f(pt2_J(n)) == 0' d(pt2_J(n)) = .true. do while(d(U+1)) U += 1 @@ -490,6 +492,7 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc pt2(pt2_stoch_istate) = avg variance(pt2_stoch_istate) = avg2 norm(pt2_stoch_istate) = avg3 + call wall_time(time) ! 1/(N-1.5) : see Brugger, The American Statistician (23) 4 p. 32 (1969) if(c > 2) then eqt = dabs((S2(t) / c) - (S(t)/c)**2) ! dabs for numerical stability @@ -510,7 +513,6 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc endif endif endif - call wall_time(time) end if n += 1 else if(more == 0) then @@ -521,15 +523,15 @@ subroutine pt2_collector(zmq_socket_pull, E, relative_error, pt2, error, varianc stop 'Unable to delete tasks' endif do i=1,n_tasks - eI(:, index(i)) += eI_task(:,i) - vI(:, index(i)) += vI_task(:,i) - nI(:, index(i)) += nI_task(:,i) + eI(1:N_states, index(i)) += eI_task(1:N_states,i) + vI(1:N_states, index(i)) += vI_task(1:N_states,i) + nI(1:N_states, index(i)) += nI_task(1:N_states,i) f(index(i)) -= 1 end do do i=1, b2%cur - call add_to_selection_buffer(b, b2%det(1,1,i), b2%val(i)) ! We assume the pulled buffer is sorted if (b2%val(i) > b%mini) exit + call add_to_selection_buffer(b, b2%det(1,1,i), b2%val(i)) end do if (zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,more,sending) == -1) then stop 'Unable to delete tasks' diff --git a/src/cipsi/run_pt2_slave.irp.f b/src/cipsi/run_pt2_slave.irp.f index be3184c9..a32a4172 100644 --- a/src/cipsi/run_pt2_slave.irp.f +++ b/src/cipsi/run_pt2_slave.irp.f @@ -1,6 +1,31 @@ + use omp_lib + use selection_types + use f77_zmq +BEGIN_PROVIDER [ integer(omp_lock_kind), global_selection_buffer_lock ] + use omp_lib + implicit none + BEGIN_DOC + ! Global buffer for the OpenMP selection + END_DOC + call omp_init_lock(global_selection_buffer_lock) +END_PROVIDER + +BEGIN_PROVIDER [ type(selection_buffer), global_selection_buffer ] + use omp_lib + implicit none + BEGIN_DOC + ! Global buffer for the OpenMP selection + END_DOC + call omp_set_lock(global_selection_buffer_lock) + call delete_selection_buffer(global_selection_buffer) + call create_selection_buffer(N_det_generators, 2*N_det_generators, & + global_selection_buffer) + call omp_unset_lock(global_selection_buffer_lock) +END_PROVIDER + subroutine run_pt2_slave(thread,iproc,energy) - use f77_zmq - use selection_types + use selection_types + use f77_zmq implicit none double precision, intent(in) :: energy(N_states_diag) @@ -28,6 +53,7 @@ subroutine run_pt2_slave(thread,iproc,energy) double precision, external :: memory_of_double, memory_of_int integer :: bsize ! Size of selection buffers logical :: sending + PROVIDE global_selection_buffer global_selection_buffer_lock rss = memory_of_int(pt2_n_tasks_max)*67.d0 rss += memory_of_double(pt2_n_tasks_max)*(N_states*3) @@ -103,10 +129,21 @@ subroutine run_pt2_slave(thread,iproc,energy) endif call sort_selection_buffer(b) call push_pt2_results_async_recv(zmq_socket_push,b%mini,sending) - call push_pt2_results_async_send(zmq_socket_push, i_generator, pt2, variance, norm, b, task_id, n_tasks,sending) + call omp_set_lock(global_selection_buffer_lock) + global_selection_buffer%mini = b%mini + call merge_selection_buffers(b,global_selection_buffer) b%cur=0 + call omp_unset_lock(global_selection_buffer_lock) + if ( iproc == 1 ) then + call omp_set_lock(global_selection_buffer_lock) + call push_pt2_results_async_send(zmq_socket_push, i_generator, pt2, variance, norm, global_selection_buffer, task_id, n_tasks,sending) + global_selection_buffer%cur = 0 + call omp_unset_lock(global_selection_buffer_lock) + else + call push_pt2_results_async_send(zmq_socket_push, i_generator, pt2, variance, norm, b, task_id, n_tasks,sending) + endif - ! Try to adjust n_tasks around nproc/2 seconds per job +! ! Try to adjust n_tasks around nproc/2 seconds per job ! n_tasks = min(2*n_tasks,int( dble(n_tasks * nproc/2) / (time1 - time0 + 1.d0))) n_tasks = 1 end do @@ -124,12 +161,13 @@ subroutine run_pt2_slave(thread,iproc,energy) if (buffer_ready) then call delete_selection_buffer(b) endif + FREE global_selection_buffer end subroutine subroutine push_pt2_results(zmq_socket_push, index, pt2, variance, norm, b, task_id, n_tasks) - use f77_zmq - use selection_types + use selection_types + use f77_zmq implicit none integer(ZMQ_PTR), intent(in) :: zmq_socket_push @@ -138,99 +176,17 @@ subroutine push_pt2_results(zmq_socket_push, index, pt2, variance, norm, b, task double precision, intent(in) :: norm(N_states,n_tasks) integer, intent(in) :: n_tasks, index(n_tasks), task_id(n_tasks) type(selection_buffer), intent(inout) :: b - integer :: rc - - rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 4) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, index, 4*n_tasks, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 4*n_tasks) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, pt2, 8*N_states*n_tasks, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 8*N_states*n_tasks) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, variance, 8*N_states*n_tasks, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 8*N_states*n_tasks) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, norm, 8*N_states*n_tasks, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 8*N_states*n_tasks) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, task_id, n_tasks*4, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 4*n_tasks) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, b%cur, 4, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 4) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, b%val, 8*b%cur, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 8*b%cur) then - stop 'push' - endif - - - rc = f77_zmq_send( zmq_socket_push, b%det, bit_kind*N_int*2*b%cur, 0) - if (rc == -1) then - return - else if(rc /= N_int*2*8*b%cur) then - stop 'push' - endif - - -! Activate is zmq_socket_push is a REQ -IRP_IF ZMQ_PUSH -IRP_ELSE - character*(2) :: ok - rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0) - if (rc == -1) then - return - else if ((rc /= 2).and.(ok(1:2) /= 'ok')) then - print *, irp_here//': error in receiving ok' - stop -1 - endif -IRP_ENDIF + logical :: sending + sending = .False. + call push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, norm, b, task_id, n_tasks, sending) + call push_pt2_results_async_recv(zmq_socket_push, b%mini, sending) end subroutine subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, norm, b, task_id, n_tasks, sending) - use f77_zmq - use selection_types + use selection_types + use f77_zmq implicit none integer(ZMQ_PTR), intent(in) :: zmq_socket_push @@ -241,6 +197,7 @@ subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, no type(selection_buffer), intent(inout) :: b logical, intent(inout) :: sending integer :: rc + integer*8 :: rc8 if (sending) then print *, irp_here, ': sending is true' @@ -250,6 +207,8 @@ subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, no rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) if (rc == -1) then + print *, irp_here, ': error sending result' + stop 1 return else if(rc /= 4) then stop 'push' @@ -258,6 +217,8 @@ subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, no rc = f77_zmq_send( zmq_socket_push, index, 4*n_tasks, ZMQ_SNDMORE) if (rc == -1) then + print *, irp_here, ': error sending result' + stop 2 return else if(rc /= 4*n_tasks) then stop 'push' @@ -266,6 +227,8 @@ subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, no rc = f77_zmq_send( zmq_socket_push, pt2, 8*N_states*n_tasks, ZMQ_SNDMORE) if (rc == -1) then + print *, irp_here, ': error sending result' + stop 3 return else if(rc /= 8*N_states*n_tasks) then stop 'push' @@ -274,6 +237,8 @@ subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, no rc = f77_zmq_send( zmq_socket_push, variance, 8*N_states*n_tasks, ZMQ_SNDMORE) if (rc == -1) then + print *, irp_here, ': error sending result' + stop 4 return else if(rc /= 8*N_states*n_tasks) then stop 'push' @@ -282,6 +247,8 @@ subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, no rc = f77_zmq_send( zmq_socket_push, norm, 8*N_states*n_tasks, ZMQ_SNDMORE) if (rc == -1) then + print *, irp_here, ': error sending result' + stop 5 return else if(rc /= 8*N_states*n_tasks) then stop 'push' @@ -290,40 +257,63 @@ subroutine push_pt2_results_async_send(zmq_socket_push, index, pt2, variance, no rc = f77_zmq_send( zmq_socket_push, task_id, n_tasks*4, ZMQ_SNDMORE) if (rc == -1) then + print *, irp_here, ': error sending result' + stop 6 return else if(rc /= 4*n_tasks) then stop 'push' endif - rc = f77_zmq_send( zmq_socket_push, b%cur, 4, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 4) then - stop 'push' - endif + if (b%cur == 0) then + + rc = f77_zmq_send( zmq_socket_push, b%cur, 4, 0) + if (rc == -1) then + print *, irp_here, ': error sending result' + stop 7 + return + else if(rc /= 4) then + stop 'push' + endif + + else + + rc = f77_zmq_send( zmq_socket_push, b%cur, 4, ZMQ_SNDMORE) + if (rc == -1) then + print *, irp_here, ': error sending result' + stop 7 + return + else if(rc /= 4) then + stop 'push' + endif - rc = f77_zmq_send( zmq_socket_push, b%val, 8*b%cur, ZMQ_SNDMORE) - if (rc == -1) then - return - else if(rc /= 8*b%cur) then - stop 'push' - endif + rc8 = f77_zmq_send8( zmq_socket_push, b%val, 8_8*int(b%cur,8), ZMQ_SNDMORE) + if (rc8 == -1_8) then + print *, irp_here, ': error sending result' + stop 8 + return + else if(rc8 /= 8_8*int(b%cur,8)) then + stop 'push' + endif - rc = f77_zmq_send( zmq_socket_push, b%det, bit_kind*N_int*2*b%cur, 0) - if (rc == -1) then - return - else if(rc /= N_int*2*8*b%cur) then - stop 'push' + rc8 = f77_zmq_send8( zmq_socket_push, b%det, int(bit_kind*N_int*2,8)*int(b%cur,8), 0) + if (rc8 == -1_8) then + print *, irp_here, ': error sending result' + stop 9 + return + else if(rc8 /= int(N_int*2*8,8)*int(b%cur,8)) then + stop 'push' + endif + endif end subroutine subroutine push_pt2_results_async_recv(zmq_socket_push,mini,sending) - use f77_zmq - use selection_types + use selection_types + use f77_zmq implicit none integer(ZMQ_PTR), intent(in) :: zmq_socket_push @@ -339,12 +329,22 @@ IRP_ELSE character*(2) :: ok rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0) if (rc == -1) then + print *, irp_here, ': error sending result' + stop 10 return else if ((rc /= 2).and.(ok(1:2) /= 'ok')) then print *, irp_here//': error in receiving ok' stop -1 endif rc = f77_zmq_recv( zmq_socket_push, mini, 8, 0) + if (rc == -1) then + print *, irp_here, ': error sending result' + stop 11 + return + else if (rc /= 8) then + print *, irp_here//': error in receiving mini' + stop 12 + endif IRP_ENDIF sending = .False. end subroutine @@ -352,8 +352,8 @@ end subroutine subroutine pull_pt2_results(zmq_socket_pull, index, pt2, variance, norm, task_id, n_tasks, b) - use f77_zmq - use selection_types + use selection_types + use f77_zmq implicit none integer(ZMQ_PTR), intent(in) :: zmq_socket_pull double precision, intent(inout) :: pt2(N_states,*) @@ -363,6 +363,7 @@ subroutine pull_pt2_results(zmq_socket_pull, index, pt2, variance, norm, task_id integer, intent(out) :: index(*) integer, intent(out) :: n_tasks, task_id(*) integer :: rc, rn, i + integer*8 :: rc8 rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) if (rc == -1) then @@ -420,22 +421,25 @@ subroutine pull_pt2_results(zmq_socket_pull, index, pt2, variance, norm, task_id stop 'pull' endif - rc = f77_zmq_recv( zmq_socket_pull, b%val, 8*b%cur, 0) - if (rc == -1) then - n_tasks = 1 - task_id(1) = 0 - else if(rc /= 8*b%cur) then - stop 'pull' - endif + if (b%cur > 0) then - rc = f77_zmq_recv( zmq_socket_pull, b%det, bit_kind*N_int*2*b%cur, 0) - if (rc == -1) then - n_tasks = 1 - task_id(1) = 0 - else if(rc /= N_int*2*8*b%cur) then - stop 'pull' - endif + rc8 = f77_zmq_recv8( zmq_socket_pull, b%val, 8_8*int(b%cur,8), 0) + if (rc8 == -1_8) then + n_tasks = 1 + task_id(1) = 0 + else if(rc8 /= 8_8*int(b%cur,8)) then + stop 'pull' + endif + rc8 = f77_zmq_recv8( zmq_socket_pull, b%det, int(bit_kind*N_int*2,8)*int(b%cur,8), 0) + if (rc8 == -1_8) then + n_tasks = 1 + task_id(1) = 0 + else if(rc8 /= int(N_int*2*8,8)*int(b%cur,8)) then + stop 'pull' + endif + + endif ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH diff --git a/src/cipsi/selection_buffer.irp.f b/src/cipsi/selection_buffer.irp.f index 91fdcd3a..2dda924f 100644 --- a/src/cipsi/selection_buffer.irp.f +++ b/src/cipsi/selection_buffer.irp.f @@ -37,6 +37,11 @@ subroutine delete_selection_buffer(b) if (associated(b%val)) then deallocate(b%val) endif + NULLIFY(b%det) + NULLIFY(b%val) + b%cur = 0 + b%mini = 0.d0 + b%N = 0 end diff --git a/src/cipsi/slave_cipsi.irp.f b/src/cipsi/slave_cipsi.irp.f index 0de07e67..74b3df99 100644 --- a/src/cipsi/slave_cipsi.irp.f +++ b/src/cipsi/slave_cipsi.irp.f @@ -288,6 +288,7 @@ subroutine run_slave_main endif endif + PROVIDE global_selection_buffer !$OMP PARALLEL PRIVATE(i) NUM_THREADS(nproc_target+1) i = omp_get_thread_num() call run_pt2_slave(0,i,pt2_e0_denominator) diff --git a/src/determinants/slater_rules.irp.f b/src/determinants/slater_rules.irp.f index a36d94a9..2c273f84 100644 --- a/src/determinants/slater_rules.irp.f +++ b/src/determinants/slater_rules.irp.f @@ -1793,11 +1793,16 @@ subroutine ac_operator(iorb,ispin,key,hjj,Nint,na,nb) key(k,ispin) = ibset(key(k,ispin),l) other_spin = iand(ispin,1)+1 -! if (iorb > mo_num) then -! print *, irp_here, 'iorb > mo_num' -! print *, iorb, mo_num -! stop -1 -! endif + if (iorb < 1) then + print *, irp_here, 'iorb < 1' + print *, iorb, mo_num + stop -1 + endif + if (iorb > mo_num) then + print *, irp_here, 'iorb > mo_num' + print *, iorb, mo_num + stop -1 + endif hjj = hjj + mo_one_e_integrals(iorb,iorb) ! Same spin diff --git a/src/zmq/utils.irp.f b/src/zmq/utils.irp.f index 4c838302..ae21b899 100644 --- a/src/zmq/utils.irp.f +++ b/src/zmq/utils.irp.f @@ -603,8 +603,8 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) do i=3600,1,-1 rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0) + call sleep(1) if (trim(message(1:13)) == 'error waiting') then - call sleep(1) cycle else if (message(1:2) == 'ok') then exit