diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index 745dffac..70ce056f 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -9,7 +9,7 @@ subroutine ZMQ_pt2(pt2,relative_error) implicit none - character*(512) :: task + character(len=64000) :: task integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_to_qp_run_socket2 type(selection_buffer) :: b integer, external :: omp_get_thread_num @@ -62,49 +62,42 @@ subroutine ZMQ_pt2(pt2,relative_error) integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket + integer :: ipos + ipos=1 + do i=1,tbc(0) + if(tbc(i) > fragment_first) then + write(task(ipos:ipos+20),'(I9,X,I9,''|'')') 0, i + ipos += 20 + if (ipos > 64000) then + call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos-20))) + ipos=1 + endif + else + do j=1,fragment_count + write(task(ipos:ipos+20),'(I9,X,I9,''|'')') j, i + ipos += 20 + if (ipos > 64000) then + call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos-20))) + ipos=1 + endif + end do + end if + end do + if (ipos > 1) then + call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos-20))) + endif + call zmq_set_running(zmq_to_qp_run_socket) - !$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2, relative_error) NUM_THREADS(nproc+1) & - !$OMP PRIVATE(i,zmq_to_qp_run_socket2,i_generator_end,task,j) - zmq_to_qp_run_socket2 = new_zmq_to_qp_run_socket() - - !$OMP DO SCHEDULE(static,1) - do i=1,min(2000,tbc(0)) - i_generator_end = min(i+generator_per_task-1, tbc(0)) - if(tbc(i) > fragment_first) then - write(task,*) (i_generator_end-i+1), 0, tbc(i:i_generator_end) - call add_task_to_taskserver(zmq_to_qp_run_socket2,task) - else - do j=1,fragment_count - write(task,*) (i_generator_end-i+1), j, tbc(i:i_generator_end) - call add_task_to_taskserver(zmq_to_qp_run_socket2,task) - end do - end if - end do - !$OMP END DO NOWAIT - + !$OMP PARALLEL DEFAULT(shared) NUM_THREADS(nproc+1) & + !$OMP PRIVATE(i) i = omp_get_thread_num() if (i==0) then - call zmq_set_running(zmq_to_qp_run_socket) call pt2_collector(b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, pt2) - else if (i==1) then - do i=2001,tbc(0) - i_generator_end = min(i+generator_per_task-1, tbc(0)) - if(tbc(i) > fragment_first) then - write(task,*) (i_generator_end-i+1), 0, tbc(i:i_generator_end) - call add_task_to_taskserver(zmq_to_qp_run_socket2,task) - else - do j=1,fragment_count - write(task,*) (i_generator_end-i+1), j, tbc(i:i_generator_end) - call add_task_to_taskserver(zmq_to_qp_run_socket2,task) - end do - end if - end do - call pt2_slave_inproc(1) else call pt2_slave_inproc(i) endif - call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket2) !$OMP END PARALLEL + call end_parallel_job(zmq_to_qp_run_socket, 'pt2') tbc(0) = 0 if (pt2(1) /= 0.d0) then diff --git a/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f b/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f index 63033f82..f6f41ab3 100644 --- a/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f +++ b/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f @@ -9,7 +9,7 @@ subroutine run_pt2_slave(thread,iproc,energy) integer :: rc, i integer :: worker_id, task_id(1), ctask, ltask - character(len=:), allocatable :: task + character*(512) :: task integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket integer(ZMQ_PTR) :: zmq_to_qp_run_socket @@ -26,7 +26,6 @@ subroutine run_pt2_slave(thread,iproc,energy) integer :: Nindex allocate(pt2_detail(N_states, N_det), index(N_det)) - allocate(character(len=10000) :: task) zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() zmq_socket_push = new_zmq_push_socket(thread) call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) @@ -40,6 +39,7 @@ subroutine run_pt2_slave(thread,iproc,energy) ctask = 1 pt2 = 0d0 pt2_detail = 0d0 + Nindex=1 do call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) @@ -125,7 +125,8 @@ subroutine push_pt2_results(zmq_socket_push, N, index, pt2_detail, task_id, ntas if(rc /= 4*ntask) stop "push" ! Activate is zmq_socket_push is a REQ - rc = f77_zmq_recv( zmq_socket_push, task_id(1), ntask*4, 0) + character*(2) :: ok + rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0) end subroutine @@ -155,7 +156,7 @@ subroutine pull_pt2_results(zmq_socket_pull, N, index, pt2_detail, task_id, ntas if(rc /= 4*ntask) stop "pull" ! Activate is zmq_socket_pull is a REP - rc = f77_zmq_send( zmq_socket_pull, task_id(1), ntask*4, 0) + rc = f77_zmq_send( zmq_socket_pull, 'ok', 2, 0) end subroutine diff --git a/plugins/Full_CI_ZMQ/zmq_selection.irp.f b/plugins/Full_CI_ZMQ/zmq_selection.irp.f index e033b9c2..8aaddc19 100644 --- a/plugins/Full_CI_ZMQ/zmq_selection.irp.f +++ b/plugins/Full_CI_ZMQ/zmq_selection.irp.f @@ -25,12 +25,11 @@ subroutine ZMQ_selection(N_in, pt2) endif character(len=:), allocatable :: task - allocate(character(len=20*N_det_generators) :: task) + task = repeat(' ',20*N_det_generators) do i= 1, N_det_generators write(task(20*(i-1)+1:20*i),'(I9,X,I9,''|'')') i, N end do call add_task_to_taskserver(zmq_to_qp_run_socket,task) - deallocate(task) !$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1) i = omp_get_thread_num() diff --git a/src/Davidson/u0Hu0.irp.f b/src/Davidson/u0Hu0.irp.f index 233919da..b1946a42 100644 --- a/src/Davidson/u0Hu0.irp.f +++ b/src/Davidson/u0Hu0.irp.f @@ -327,51 +327,43 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,H_jj,S2_jj,n,keys_tmp,Nint,N_st,sze_ PROVIDE nproc - !$OMP PARALLEL NUM_THREADS(nproc+2) PRIVATE(ithread,sh,i,j, & - !$OMP workload,istep,blockb2,task,ipos,iposmax,send) + character(len=:), allocatable :: task + task = repeat(' ', iposmax) + character(32) :: tmp_task + integer :: ipos, iposmax + iposmax = shortcut_(0,1)+32 + ipos = 1 + do sh=1,shortcut_(0,1),1 + workload = shortcut_(0,1)+dble(shortcut_(sh+1,1) - shortcut_(sh,1))**2 + do i=sh, shortcut_(0,2), shortcut_(0,1) + do j=i, min(i, shortcut_(0,2)) + workload += (shortcut_(j+1,2) - shortcut_(j, 2))**2 + end do + end do + istep = 1+ int(workload*target_workload_inv) + do blockb2=0, istep-1 + write(tmp_task,'(3(I9,X),''|'',X)') sh, blockb2, istep + task = task//tmp_task + ipos += 32 + if (ipos+32 > iposmax) then + call add_task_to_taskserver(handler, trim(task)) + ipos=1 + task = '' + endif + enddo + enddo + if (ipos>1) then + call add_task_to_taskserver(handler, trim(task)) + endif + + !$OMP PARALLEL NUM_THREADS(nproc+2) PRIVATE(ithread) ithread = omp_get_thread_num() if (ithread == 0 ) then - character(len=:), allocatable :: task - character(32) :: tmp_task - integer :: ipos, iposmax - logical :: send - iposmax = shortcut_(0,1)+32 - send = .False. - allocate(character(len=iposmax) :: task) - task = '' - ipos = 1 - do sh=1,shortcut_(0,1),1 - workload = shortcut_(0,1)+dble(shortcut_(sh+1,1) - shortcut_(sh,1))**2 - do i=sh, shortcut_(0,2), shortcut_(0,1) - do j=i, min(i, shortcut_(0,2)) - workload += (shortcut_(j+1,2) - shortcut_(j, 2))**2 - end do - end do - istep = 1+ int(workload*target_workload_inv) - do blockb2=0, istep-1 - write(tmp_task,'(3(I9,X),''|'',X)') sh, blockb2, istep - task = task//tmp_task - ipos += 32 - if (ipos+32 < iposmax) then - send = .True. - else - call add_task_to_taskserver(handler, trim(task)) - ipos=1 - task = '' - send = .False. - endif - enddo - enddo - if (send) call add_task_to_taskserver(handler, trim(task)) - deallocate(task) call zmq_set_running(handler) - !$OMP BARRIER call davidson_run(handler, v_0, s_0, size(v_0,1)) else if (ithread == 1 ) then - !$OMP BARRIER call davidson_miniserver_run (update_dets) else - !$OMP BARRIER call davidson_slave_inproc(ithread) endif !$OMP END PARALLEL diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index aff2707a..8e3a94e5 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -94,7 +94,7 @@ subroutine switch_qp_run_to_master print *, 'This run should be started with the qp_run command' stop -1 endif - qp_run_address = trim(buffer) + qp_run_address = adjustl(buffer) print *, 'Switched to qp_run master : ', trim(qp_run_address) integer :: i @@ -684,26 +684,24 @@ subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task) character*(*), intent(in) :: task integer :: rc, sze - character(len=:), allocatable :: message + character(len=:), allocatable :: message + + message='add_task '//trim(zmq_state)//' '//trim(task) + sze = len(message) + rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) - sze = len(trim(task))+12+len(trim(zmq_state)) - allocate(character(len=sze) :: message) - write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) - - rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc /= sze) then print *, rc, sze print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' stop 'error' endif - rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, sze-1, 0) if (message(1:rc) /= 'ok') then print *, trim(task) print *, 'Unable to add the next task' stop -1 endif - deallocate(message) end @@ -720,7 +718,7 @@ subroutine add_task_to_taskserver_send(zmq_to_qp_run_socket,task) character(len=:), allocatable :: message sze = len(trim(task))+12+len(trim(zmq_state)) - allocate(character(len=sze) :: message) + message = repeat(' ',sze) write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) @@ -729,7 +727,6 @@ subroutine add_task_to_taskserver_send(zmq_to_qp_run_socket,task) print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' stop 'error' endif - deallocate(message) end @@ -797,17 +794,17 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) write(message,*) 'get_task '//trim(zmq_state), worker_id sze = len(trim(message)) - rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) + rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) if (rc /= sze) then print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' stop 'error' endif + message = repeat(' ',512) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) - message = trim(message(1:rc)) - read(message,*) reply + read(message(1:rc),*) reply if (trim(reply) == 'get_task_reply') then - read(message,*) reply, task_id + read(message(1:rc),*) reply, task_id rc = 15 do while (message(rc:rc) == ' ') rc += 1