10
0
mirror of https://github.com/LCPQ/quantum_package synced 2025-01-10 13:08:23 +01:00

Fixed communications

This commit is contained in:
Anthony Scemama 2017-03-03 22:20:57 +01:00
parent 317ca2fbaa
commit 8da52b8f59
5 changed files with 77 additions and 95 deletions

View File

@ -9,7 +9,7 @@ subroutine ZMQ_pt2(pt2,relative_error)
implicit none implicit none
character*(512) :: task character(len=64000) :: task
integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_to_qp_run_socket2 integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_to_qp_run_socket2
type(selection_buffer) :: b type(selection_buffer) :: b
integer, external :: omp_get_thread_num integer, external :: omp_get_thread_num
@ -62,49 +62,42 @@ subroutine ZMQ_pt2(pt2,relative_error)
integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket
integer :: ipos
!$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2, relative_error) NUM_THREADS(nproc+1) & ipos=1
!$OMP PRIVATE(i,zmq_to_qp_run_socket2,i_generator_end,task,j) do i=1,tbc(0)
zmq_to_qp_run_socket2 = new_zmq_to_qp_run_socket()
!$OMP DO SCHEDULE(static,1)
do i=1,min(2000,tbc(0))
i_generator_end = min(i+generator_per_task-1, tbc(0))
if(tbc(i) > fragment_first) then if(tbc(i) > fragment_first) then
write(task,*) (i_generator_end-i+1), 0, tbc(i:i_generator_end) write(task(ipos:ipos+20),'(I9,X,I9,''|'')') 0, i
call add_task_to_taskserver(zmq_to_qp_run_socket2,task) ipos += 20
if (ipos > 64000) then
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos-20)))
ipos=1
endif
else else
do j=1,fragment_count do j=1,fragment_count
write(task,*) (i_generator_end-i+1), j, tbc(i:i_generator_end) write(task(ipos:ipos+20),'(I9,X,I9,''|'')') j, i
call add_task_to_taskserver(zmq_to_qp_run_socket2,task) ipos += 20
if (ipos > 64000) then
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos-20)))
ipos=1
endif
end do end do
end if end if
end do end do
!$OMP END DO NOWAIT if (ipos > 1) then
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos-20)))
endif
call zmq_set_running(zmq_to_qp_run_socket)
!$OMP PARALLEL DEFAULT(shared) NUM_THREADS(nproc+1) &
!$OMP PRIVATE(i)
i = omp_get_thread_num() i = omp_get_thread_num()
if (i==0) then if (i==0) then
call zmq_set_running(zmq_to_qp_run_socket)
call pt2_collector(b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, pt2) call pt2_collector(b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, pt2)
else if (i==1) then
do i=2001,tbc(0)
i_generator_end = min(i+generator_per_task-1, tbc(0))
if(tbc(i) > fragment_first) then
write(task,*) (i_generator_end-i+1), 0, tbc(i:i_generator_end)
call add_task_to_taskserver(zmq_to_qp_run_socket2,task)
else
do j=1,fragment_count
write(task,*) (i_generator_end-i+1), j, tbc(i:i_generator_end)
call add_task_to_taskserver(zmq_to_qp_run_socket2,task)
end do
end if
end do
call pt2_slave_inproc(1)
else else
call pt2_slave_inproc(i) call pt2_slave_inproc(i)
endif endif
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket2)
!$OMP END PARALLEL !$OMP END PARALLEL
call end_parallel_job(zmq_to_qp_run_socket, 'pt2') call end_parallel_job(zmq_to_qp_run_socket, 'pt2')
tbc(0) = 0 tbc(0) = 0
if (pt2(1) /= 0.d0) then if (pt2(1) /= 0.d0) then

View File

@ -9,7 +9,7 @@ subroutine run_pt2_slave(thread,iproc,energy)
integer :: rc, i integer :: rc, i
integer :: worker_id, task_id(1), ctask, ltask integer :: worker_id, task_id(1), ctask, ltask
character(len=:), allocatable :: task character*(512) :: task
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_to_qp_run_socket integer(ZMQ_PTR) :: zmq_to_qp_run_socket
@ -26,7 +26,6 @@ subroutine run_pt2_slave(thread,iproc,energy)
integer :: Nindex integer :: Nindex
allocate(pt2_detail(N_states, N_det), index(N_det)) allocate(pt2_detail(N_states, N_det), index(N_det))
allocate(character(len=10000) :: task)
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread) zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
@ -40,6 +39,7 @@ subroutine run_pt2_slave(thread,iproc,energy)
ctask = 1 ctask = 1
pt2 = 0d0 pt2 = 0d0
pt2_detail = 0d0 pt2_detail = 0d0
Nindex=1
do do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task)
@ -125,7 +125,8 @@ subroutine push_pt2_results(zmq_socket_push, N, index, pt2_detail, task_id, ntas
if(rc /= 4*ntask) stop "push" if(rc /= 4*ntask) stop "push"
! Activate is zmq_socket_push is a REQ ! Activate is zmq_socket_push is a REQ
rc = f77_zmq_recv( zmq_socket_push, task_id(1), ntask*4, 0) character*(2) :: ok
rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0)
end subroutine end subroutine
@ -155,7 +156,7 @@ subroutine pull_pt2_results(zmq_socket_pull, N, index, pt2_detail, task_id, ntas
if(rc /= 4*ntask) stop "pull" if(rc /= 4*ntask) stop "pull"
! Activate is zmq_socket_pull is a REP ! Activate is zmq_socket_pull is a REP
rc = f77_zmq_send( zmq_socket_pull, task_id(1), ntask*4, 0) rc = f77_zmq_send( zmq_socket_pull, 'ok', 2, 0)
end subroutine end subroutine

View File

@ -25,12 +25,11 @@ subroutine ZMQ_selection(N_in, pt2)
endif endif
character(len=:), allocatable :: task character(len=:), allocatable :: task
allocate(character(len=20*N_det_generators) :: task) task = repeat(' ',20*N_det_generators)
do i= 1, N_det_generators do i= 1, N_det_generators
write(task(20*(i-1)+1:20*i),'(I9,X,I9,''|'')') i, N write(task(20*(i-1)+1:20*i),'(I9,X,I9,''|'')') i, N
end do end do
call add_task_to_taskserver(zmq_to_qp_run_socket,task) call add_task_to_taskserver(zmq_to_qp_run_socket,task)
deallocate(task)
!$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1) !$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1)
i = omp_get_thread_num() i = omp_get_thread_num()

View File

@ -327,18 +327,11 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,H_jj,S2_jj,n,keys_tmp,Nint,N_st,sze_
PROVIDE nproc PROVIDE nproc
!$OMP PARALLEL NUM_THREADS(nproc+2) PRIVATE(ithread,sh,i,j, &
!$OMP workload,istep,blockb2,task,ipos,iposmax,send)
ithread = omp_get_thread_num()
if (ithread == 0 ) then
character(len=:), allocatable :: task character(len=:), allocatable :: task
task = repeat(' ', iposmax)
character(32) :: tmp_task character(32) :: tmp_task
integer :: ipos, iposmax integer :: ipos, iposmax
logical :: send
iposmax = shortcut_(0,1)+32 iposmax = shortcut_(0,1)+32
send = .False.
allocate(character(len=iposmax) :: task)
task = ''
ipos = 1 ipos = 1
do sh=1,shortcut_(0,1),1 do sh=1,shortcut_(0,1),1
workload = shortcut_(0,1)+dble(shortcut_(sh+1,1) - shortcut_(sh,1))**2 workload = shortcut_(0,1)+dble(shortcut_(sh+1,1) - shortcut_(sh,1))**2
@ -352,26 +345,25 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,H_jj,S2_jj,n,keys_tmp,Nint,N_st,sze_
write(tmp_task,'(3(I9,X),''|'',X)') sh, blockb2, istep write(tmp_task,'(3(I9,X),''|'',X)') sh, blockb2, istep
task = task//tmp_task task = task//tmp_task
ipos += 32 ipos += 32
if (ipos+32 < iposmax) then if (ipos+32 > iposmax) then
send = .True.
else
call add_task_to_taskserver(handler, trim(task)) call add_task_to_taskserver(handler, trim(task))
ipos=1 ipos=1
task = '' task = ''
send = .False.
endif endif
enddo enddo
enddo enddo
if (send) call add_task_to_taskserver(handler, trim(task)) if (ipos>1) then
deallocate(task) call add_task_to_taskserver(handler, trim(task))
endif
!$OMP PARALLEL NUM_THREADS(nproc+2) PRIVATE(ithread)
ithread = omp_get_thread_num()
if (ithread == 0 ) then
call zmq_set_running(handler) call zmq_set_running(handler)
!$OMP BARRIER
call davidson_run(handler, v_0, s_0, size(v_0,1)) call davidson_run(handler, v_0, s_0, size(v_0,1))
else if (ithread == 1 ) then else if (ithread == 1 ) then
!$OMP BARRIER
call davidson_miniserver_run (update_dets) call davidson_miniserver_run (update_dets)
else else
!$OMP BARRIER
call davidson_slave_inproc(ithread) call davidson_slave_inproc(ithread)
endif endif
!$OMP END PARALLEL !$OMP END PARALLEL

View File

@ -94,7 +94,7 @@ subroutine switch_qp_run_to_master
print *, 'This run should be started with the qp_run command' print *, 'This run should be started with the qp_run command'
stop -1 stop -1
endif endif
qp_run_address = trim(buffer) qp_run_address = adjustl(buffer)
print *, 'Switched to qp_run master : ', trim(qp_run_address) print *, 'Switched to qp_run master : ', trim(qp_run_address)
integer :: i integer :: i
@ -686,24 +686,22 @@ subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task)
integer :: rc, sze integer :: rc, sze
character(len=:), allocatable :: message character(len=:), allocatable :: message
sze = len(trim(task))+12+len(trim(zmq_state)) message='add_task '//trim(zmq_state)//' '//trim(task)
allocate(character(len=sze) :: message) sze = len(message)
write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc /= sze) then if (rc /= sze) then
print *, rc, sze print *, rc, sze
print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error' stop 'error'
endif endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, sze-1, 0)
if (message(1:rc) /= 'ok') then if (message(1:rc) /= 'ok') then
print *, trim(task) print *, trim(task)
print *, 'Unable to add the next task' print *, 'Unable to add the next task'
stop -1 stop -1
endif endif
deallocate(message)
end end
@ -720,7 +718,7 @@ subroutine add_task_to_taskserver_send(zmq_to_qp_run_socket,task)
character(len=:), allocatable :: message character(len=:), allocatable :: message
sze = len(trim(task))+12+len(trim(zmq_state)) sze = len(trim(task))+12+len(trim(zmq_state))
allocate(character(len=sze) :: message) message = repeat(' ',sze)
write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task)
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
@ -729,7 +727,6 @@ subroutine add_task_to_taskserver_send(zmq_to_qp_run_socket,task)
print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error' stop 'error'
endif endif
deallocate(message)
end end
@ -797,17 +794,17 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
write(message,*) 'get_task '//trim(zmq_state), worker_id write(message,*) 'get_task '//trim(zmq_state), worker_id
sze = len(trim(message)) sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
if (rc /= sze) then if (rc /= sze) then
print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error' stop 'error'
endif endif
message = repeat(' ',512)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc)) read(message(1:rc),*) reply
read(message,*) reply
if (trim(reply) == 'get_task_reply') then if (trim(reply) == 'get_task_reply') then
read(message,*) reply, task_id read(message(1:rc),*) reply, task_id
rc = 15 rc = 15
do while (message(rc:rc) == ' ') do while (message(rc:rc) == ' ')
rc += 1 rc += 1