From 271b004cfbdfb67c1f3f214f7c8302f67132b843 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Tue, 2 Oct 2018 14:50:37 +0200 Subject: [PATCH] Fixed parallel SBK --- plugins/dress_zmq/dress_stoch_routines.irp.f | 9 +- plugins/dress_zmq/run_dress_slave.irp.f | 52 +++- plugins/shiftedbk/selection_buffer.irp.f | 244 ------------------- src/ZMQ/put_get.irp.f | 61 +++++ 4 files changed, 111 insertions(+), 255 deletions(-) delete mode 100644 plugins/shiftedbk/selection_buffer.irp.f diff --git a/plugins/dress_zmq/dress_stoch_routines.irp.f b/plugins/dress_zmq/dress_stoch_routines.irp.f index ec35c5e4..e44a108e 100644 --- a/plugins/dress_zmq/dress_stoch_routines.irp.f +++ b/plugins/dress_zmq/dress_stoch_routines.irp.f @@ -298,6 +298,10 @@ subroutine ZMQ_dress(E, dress, delta_out, delta_s2_out, relative_error) stop 'Unable to put threshold_generators on ZMQ server' endif + if (zmq_put_int(zmq_to_qp_run_socket, 1, 'ending', (-1)) == -1) then + stop 'Unable to put initial ending' + endif + call write_int(6,pt2_n_tasks_max,'Max number of task fragments') @@ -551,7 +555,10 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, if(do_exit .and. (dabs(error) / (1.d-20 + dabs(avg) ) <= relative_error)) then integer, external :: zmq_put_dvector integer, external :: zmq_put_int - i= zmq_put_int(zmq_to_qp_run_socket, worker_id, 'ending', (m-1)) + do while (zmq_put_int(zmq_to_qp_run_socket, worker_id, 'ending', (m-1)) == -1) + print *, 'Unable to put ending. Retrying...' + call sleep(1) + enddo exit end if else diff --git a/plugins/dress_zmq/run_dress_slave.irp.f b/plugins/dress_zmq/run_dress_slave.irp.f index 1e351368..c12d4908 100644 --- a/plugins/dress_zmq/run_dress_slave.irp.f +++ b/plugins/dress_zmq/run_dress_slave.irp.f @@ -37,7 +37,7 @@ subroutine run_dress_slave(thread,iproce,energy) ! integer(kind=OMP_LOCK_KIND) :: lck_sto(dress_N_cp) double precision :: fac integer :: ending, ending_tmp - integer, external :: zmq_get_dvector, zmq_get_int + integer, external :: zmq_get_dvector, zmq_get_int_nompi ! double precision, external :: omp_get_wtime double precision :: time, time0 integer :: ntask_tbd, task_tbd(Nproc), i_gen_tbd(Nproc), subset_tbd(Nproc) @@ -80,6 +80,7 @@ subroutine run_dress_slave(thread,iproce,energy) zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() zmq_socket_push = new_zmq_push_socket(thread) integer, external :: connect_to_taskserver + !$OMP CRITICAL if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then print *, irp_here, ': Unable to connect to task server' stop -1 @@ -97,9 +98,23 @@ subroutine run_dress_slave(thread,iproce,energy) if(iproc==1) then call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf) end if + !$OMP END CRITICAL m=0 + !$OMP MASTER + IRP_IF MPI_DEBUG + print *, irp_here, mpi_rank + call MPI_BARRIER(MPI_COMM_WORLD, ierr) + IRP_ENDIF + IRP_IF MPI + integer :: ierr + include 'mpif.h' + call MPI_BARRIER(MPI_COMM_WORLD,ierr) + IRP_ENDIF + !$OMP END MASTER + !$OMP BARRIER + do while( (cp_done > cp_sent) .or. (m /= dress_N_cp+1) ) !$OMP CRITICAL (send) if(ntask_tbd == 0) then @@ -119,9 +134,10 @@ subroutine run_dress_slave(thread,iproce,energy) ntask_tbd -= 1 else m = dress_N_cp + 1 - if (zmq_get_int(zmq_to_qp_run_socket, worker_id, "ending", ending_tmp) /= -1) then - ending = ending_tmp - endif + do while (zmq_get_int_nompi(zmq_to_qp_run_socket, worker_id, "ending", ending) == -1) + print *, 'unable to get ending. Retrying....' + call sleep(1) + enddo end if will_send = 0 @@ -207,17 +223,19 @@ subroutine run_dress_slave(thread,iproce,energy) end if end do - !$OMP BARRIER if(ntask_buf /= 0) then call push_dress_results(zmq_socket_push, 0, 0, edI_task, edI_index, breve_delta_m, task_buf, ntask_buf) ntask_buf = 0 end if + !$OMP BARRIER - !$OMP SINGLE + !$OMP MASTER if(purge_task_id /= 0) then - do while (zmq_get_int(zmq_to_qp_run_socket, worker_id, "ending", ending) == -1) + ending = -1 + do while (ending == -1) + i = zmq_get_int_nompi(zmq_to_qp_run_socket, worker_id, "ending", ending) call sleep(1) - end do + enddo will_send = ending breve_delta_m = 0d0 @@ -248,10 +266,22 @@ subroutine run_dress_slave(thread,iproce,energy) end do task_buf(1) = purge_task_id call push_dress_results(zmq_socket_push, -will_send, sum_f, edI_task, edI_index, breve_delta_m, task_buf, 1) - end if + end if - !$OMP END SINGLE + !$OMP END MASTER + !$OMP BARRIER + !$OMP MASTER + IRP_IF MPI_DEBUG + print *, irp_here, mpi_rank + call MPI_BARRIER(MPI_COMM_WORLD, ierr) + IRP_ENDIF + IRP_IF MPI + call MPI_BARRIER(MPI_COMM_WORLD,ierr) + IRP_ENDIF + !$OMP END MASTER + + !$OMP CRITICAL integer, external :: disconnect_from_taskserver if (disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id) == -1) then print *, irp_here, ': Unable to disconnect from task server' @@ -259,6 +289,8 @@ subroutine run_dress_slave(thread,iproce,energy) endif call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) + !$OMP END CRITICAL + !$OMP END PARALLEL ! do i=0,dress_N_cp+1 ! call omp_destroy_lock(lck_sto(i)) diff --git a/plugins/shiftedbk/selection_buffer.irp.f b/plugins/shiftedbk/selection_buffer.irp.f deleted file mode 100644 index 8b140666..00000000 --- a/plugins/shiftedbk/selection_buffer.irp.f +++ /dev/null @@ -1,244 +0,0 @@ - -subroutine create_selection_buffer(N, siz_, res) - use selection_types - implicit none - - integer, intent(in) :: N, siz_ - type(selection_buffer), intent(out) :: res - - integer :: siz - siz = max(siz_,1) - allocate(res%det(N_int, 2, siz), res%val(siz)) - - res%val(:) = 0d0 - res%det(:,:,:) = 0_8 - res%N = N - res%mini = 0d0 - res%cur = 0 -end subroutine - -subroutine delete_selection_buffer(b) - use selection_types - implicit none - type(selection_buffer), intent(inout) :: b - if (associated(b%det)) then - deallocate(b%det) - endif - if (associated(b%val)) then - deallocate(b%val) - endif -end - - -subroutine add_to_selection_buffer(b, det, val) - use selection_types - implicit none - - type(selection_buffer), intent(inout) :: b - integer(bit_kind), intent(in) :: det(N_int, 2) - double precision, intent(in) :: val - integer :: i - - if(b%N > 0 .and. val <= b%mini) then - b%cur += 1 - b%det(1:N_int,1:2,b%cur) = det(1:N_int,1:2) - b%val(b%cur) = val - if(b%cur == size(b%val)) then - call sort_selection_buffer(b) - end if - end if -end subroutine - -subroutine merge_selection_buffers(b1, b2) - use selection_types - implicit none - BEGIN_DOC -! Merges the selection buffers b1 and b2 into b2 - END_DOC - type(selection_buffer), intent(inout) :: b1 - type(selection_buffer), intent(inout) :: b2 - integer(bit_kind), pointer :: detmp(:,:,:) - double precision, pointer :: val(:) - integer :: i, i1, i2, k, nmwen - if (b1%cur == 0) return - do while (b1%val(b1%cur) > b2%mini) - b1%cur = b1%cur-1 - if (b1%cur == 0) then - return - endif - enddo - nmwen = min(b1%N, b1%cur+b2%cur) - allocate( val(size(b1%val)), detmp(N_int, 2, size(b1%det,3)) ) - i1=1 - i2=1 - do i=1,nmwen - if ( (i1 > b1%cur).and.(i2 > b2%cur) ) then - exit - else if (i1 > b1%cur) then - val(i) = b2%val(i2) - detmp(1:N_int,1,i) = b2%det(1:N_int,1,i2) - detmp(1:N_int,2,i) = b2%det(1:N_int,2,i2) - i2=i2+1 - else if (i2 > b2%cur) then - val(i) = b1%val(i1) - detmp(1:N_int,1,i) = b1%det(1:N_int,1,i1) - detmp(1:N_int,2,i) = b1%det(1:N_int,2,i1) - i1=i1+1 - else - if (b1%val(i1) <= b2%val(i2)) then - val(i) = b1%val(i1) - detmp(1:N_int,1,i) = b1%det(1:N_int,1,i1) - detmp(1:N_int,2,i) = b1%det(1:N_int,2,i1) - i1=i1+1 - else - val(i) = b2%val(i2) - detmp(1:N_int,1,i) = b2%det(1:N_int,1,i2) - detmp(1:N_int,2,i) = b2%det(1:N_int,2,i2) - i2=i2+1 - endif - endif - enddo - deallocate(b2%det, b2%val) - do i=nmwen+1,b2%N - val(i) = 0.d0 - detmp(1:N_int,1:2,i) = 0_bit_kind - enddo - b2%det => detmp - b2%val => val - b2%mini = min(b2%mini,b2%val(b2%N)) - b2%cur = nmwen -end - - -subroutine sort_selection_buffer(b) - use selection_types - implicit none - - type(selection_buffer), intent(inout) :: b - integer, allocatable :: iorder(:) - integer(bit_kind), pointer :: detmp(:,:,:) - integer :: i, nmwen - logical, external :: detEq - if (b%N == 0 .or. b%cur == 0) return - nmwen = min(b%N, b%cur) - - allocate(iorder(b%cur), detmp(N_int, 2, size(b%det,3))) - do i=1,b%cur - iorder(i) = i - end do - call dsort(b%val, iorder, b%cur) - do i=1, nmwen - detmp(1:N_int,1,i) = b%det(1:N_int,1,iorder(i)) - detmp(1:N_int,2,i) = b%det(1:N_int,2,iorder(i)) - end do - deallocate(b%det,iorder) - b%det => detmp - b%mini = min(b%mini,b%val(b%N)) - b%cur = nmwen -end subroutine - - - -subroutine truncate_to_mini(b) - use selection_types - implicit none - - type(selection_buffer), intent(inout) :: b - - do - if(b%cur == 0) exit - if(b%val(b%cur) <= b%mini) exit - b%cur -= 1 - end do -end subroutine - - - - - -subroutine unique_selection_buffer(b) - use selection_types - implicit none - BEGIN_DOC -! Removes duplicate determinants in the selection buffer - END_DOC - type(selection_buffer), intent(inout) :: b - integer, allocatable :: iorder(:) - integer(bit_kind), pointer :: detmp(:,:,:) - double precision, pointer :: val(:) - integer :: i,j,k - integer(bit_kind), allocatable :: bit_tmp(:) - logical,allocatable :: duplicate(:) - - logical :: found_duplicates - integer*8, external :: det_search_key - - if (b%N == 0 .or. b%cur == 0) return - allocate (duplicate(b%cur), val(size(b%val)), detmp(N_int, 2, size(b%val)), bit_tmp(b%cur)) - call sort_dets_by_det_search_key(b%cur, b%det, b%val, detmp, val, 1) - - deallocate(b%det, b%val) - do i=b%cur+1,b%N - val(i) = 0.d0 - detmp(1:N_int,1:2,i) = 0_bit_kind - enddo - b%det => detmp - b%val => val - - do i=1,b%cur - bit_tmp(i) = det_search_key(b%det(1,1,i),N_int) - duplicate(i) = .False. - enddo - - do i=1,b%cur-1 - if (duplicate(i)) then - cycle - endif - j = i+1 - do while (bit_tmp(j)==bit_tmp(i)) - if (duplicate(j)) then - j += 1 - if (j > b%cur) then - exit - else - cycle - endif - endif - duplicate(j) = .True. - do k=1,N_int - if ( (b%det(k,1,i) /= b%det(k,1,j) ) & - .or. (b%det(k,2,i) /= b%det(k,2,j) ) ) then - duplicate(j) = .False. - exit - endif - enddo - j += 1 - if (j > b%cur) then - exit - endif - enddo - enddo - - found_duplicates = .False. - do i=1,b%cur - if (duplicate(i)) then - found_duplicates = .True. - exit - endif - enddo - - if (found_duplicates) then - k=0 - do i=1,N_det - if (.not.duplicate(i)) then - k += 1 - b%det(:,:,k) = b%det(:,:,i) - b%val(k) = b%val(i) - endif - enddo - b%cur = k - endif - deallocate (duplicate,bit_tmp) -end - - diff --git a/src/ZMQ/put_get.irp.f b/src/ZMQ/put_get.irp.f index 69acb107..4fb4ecea 100644 --- a/src/ZMQ/put_get.irp.f +++ b/src/ZMQ/put_get.irp.f @@ -599,6 +599,67 @@ integer function zmq_get_int(zmq_to_qp_run_socket, worker_id, name, x) 10 continue + IRP_IF MPI_DEBUG + print *, irp_here, mpi_rank + call MPI_BARRIER(MPI_COMM_WORLD, ierr) + IRP_ENDIF + IRP_IF MPI + integer :: ierr + include 'mpif.h' + call MPI_BCAST (zmq_get_int, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast zmq_get_i8matrix' + stop -1 + endif + call MPI_BARRIER(MPI_COMM_WORLD,ierr) + call MPI_BCAST (x, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast zmq_get_i8matrix' + stop -1 + endif + IRP_ENDIF + +end + + +integer function zmq_get_int_nompi(zmq_to_qp_run_socket, worker_id, name, x) + use f77_zmq + implicit none + BEGIN_DOC +! Get a vector of integers from the qp_run scheduler + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer, intent(in) :: worker_id + character*(*), intent(in) :: name + integer, intent(out) :: x + integer :: rc + character*(256) :: msg + + PROVIDE zmq_state + ! Success + zmq_get_int_nompi = 0 + + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) + if (rc /= len(trim(msg))) then + zmq_get_int_nompi = -1 + go to 10 + endif + + rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) + if (msg(1:14) /= 'get_data_reply') then + zmq_get_int_nompi = -1 + go to 10 + endif + + rc = f77_zmq_recv(zmq_to_qp_run_socket,x,4,0) + if (rc /= 4) then + zmq_get_int_nompi = -1 + go to 10 + endif + + 10 continue + end