10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-06-26 07:02:14 +02:00

Replace subs by funcs in ZMQ

This commit is contained in:
Anthony Scemama 2017-11-29 15:15:10 +01:00
parent 2dea5ea1af
commit db0e74bf37
20 changed files with 751 additions and 437 deletions

View File

@ -9,7 +9,7 @@
FC : ifort
LAPACK_LIB : -mkl=parallel
IRPF90 : irpf90
IRPF90_FLAGS : --ninja --align=32
IRPF90_FLAGS : --ninja --align=32 -DZMQ_PUSH
# Global options
################

View File

@ -22,21 +22,25 @@ subroutine run_selection_slave(thread,iproc,energy)
double precision :: pt2(N_states)
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
if(worker_id == -1) then
print *, "WORKER -1"
!call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
integer, external :: connect_to_taskserver
if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
return
end if
endif
zmq_socket_push = new_zmq_push_socket(thread)
buf%N = 0
ctask = 1
pt2 = 0d0
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task)
integer, external :: get_task_from_taskserver
if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) == -1) then
exit
endif
done = task_id(ctask) == 0
if (done) then
ctask = ctask - 1
@ -53,10 +57,18 @@ subroutine run_selection_slave(thread,iproc,energy)
call select_connected(i_generator,energy,pt2,buf)
endif
integer, external :: task_done_to_taskserver
if(done .or. ctask == size(task_id)) then
if(buf%N == 0 .and. ctask > 0) stop "uninitialized selection_buffer"
do i=1, ctask
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i))
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then
call sleep(1)
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then
done = .True.
ctask = 0
exit
endif
endif
end do
if(ctask > 0) then
call push_selection_results(zmq_socket_push, pt2, buf, task_id(1), ctask)
@ -74,7 +86,12 @@ subroutine run_selection_slave(thread,iproc,energy)
if(done) exit
ctask = ctask + 1
end do
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
integer, external :: disconnect_from_taskserver
if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then
continue
endif
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
end subroutine

View File

@ -1200,6 +1200,10 @@ subroutine ZMQ_selection(N_in, pt2)
integer, external :: omp_get_thread_num
double precision, intent(out) :: pt2(N_states)
integer, parameter :: maxtasks=10000
integer, external :: zmq_put_psi
integer, external :: zmq_put_N_det_generators
integer, external :: zmq_put_N_det_selectors
integer, external :: zmq_put_dvector
N = max(N_in,1)
@ -1211,10 +1215,18 @@ subroutine ZMQ_selection(N_in, pt2)
PROVIDE psi_bilinear_matrix_transp_order
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'selection')
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator))
if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then
stop 'Unable to put psi'
endif
if (zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) == -1) then
stop 'Unable to put N_det_generators'
endif
if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) then
stop 'Unable to put N_det_selectors'
endif
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then
stop 'Unable to put energy'
endif
call create_selection_buffer(N, N*2, b)
endif
@ -1222,17 +1234,22 @@ subroutine ZMQ_selection(N_in, pt2)
task = ' '
integer :: k
integer, external :: add_task_to_taskserver
k=0
do i= 1, N_det_generators
k = k+1
write(task(20*(k-1)+1:20*k),'(I9,1X,I9,''|'')') i, N
if (k>=maxtasks) then
k=0
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then
stop 'Unable to add task to task server'
endif
endif
enddo
if (k > 0) then
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then
stop 'Unable to add task to task server'
endif
endif
call zmq_set_running(zmq_to_qp_run_socket)
@ -1308,7 +1325,10 @@ subroutine selection_collector(zmq_socket_pull, b, pt2)
if(task_id(i) == 0) then
print *, "Error in collector"
endif
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more)
integer, external :: zmq_delete_task
if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more) == -1) then
stop 'Unable to delete task'
endif
end do
done += ntask
call CPU_TIME(time)

View File

@ -27,6 +27,7 @@ subroutine run_wf
character*(64) :: states(4)
integer :: rc, i
integer, external :: zmq_get_psi
call provide_everything
zmq_context = f77_zmq_ctx_new ()
@ -50,7 +51,7 @@ subroutine run_wf
! ---------
print *, 'Selection'
call zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states)
if (zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) == -1) cycle
!$OMP PARALLEL PRIVATE(i)
i = omp_get_thread_num()
@ -64,7 +65,7 @@ subroutine run_wf
! --------
print *, 'Davidson'
call zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states)
if (zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) == -1) cycle
call omp_set_nested(.True.)
call davidson_slave_tcp(0)
call omp_set_nested(.False.)
@ -76,7 +77,7 @@ subroutine run_wf
! ---
print *, 'PT2'
call zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states)
if (zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) == -1) cycle
!$OMP PARALLEL PRIVATE(i)
i = omp_get_thread_num()

View File

@ -25,7 +25,9 @@ subroutine run_wf
double precision :: energy(N_states_diag)
character*(64) :: states(1)
integer :: rc, i
integer, external :: zmq_get_dvector
integer, external :: zmq_get_psi
call provide_everything
@ -48,7 +50,7 @@ subroutine run_wf
! ---------
print *, 'PT2'
call zmq_get_psi(zmq_to_qp_run_socket,1)
if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
PROVIDE psi_bilinear_matrix_columns_loc psi_det_alpha_unique psi_det_beta_unique

View File

@ -28,7 +28,6 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
double precision :: time
double precision :: w(N_states)
integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket
integer, external :: zmq_put_dvector
if (N_det < max(10,N_states)) then
pt2=0.d0
@ -66,9 +65,20 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
print *, '========== ================= ================= ================='
call new_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2')
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
integer, external :: zmq_put_psi
integer, external :: zmq_put_N_det_generators
integer, external :: zmq_put_N_det_selectors
integer, external :: zmq_put_dvector
if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then
stop 'Unable to put psi on ZMQ server'
endif
if (zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) == -1) then
stop 'Unable to put N_det_generators on ZMQ server'
endif
if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) then
stop 'Unable to put N_det_selectors on ZMQ server'
endif
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
@ -76,13 +86,17 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
integer :: ipos
ipos=1
integer, external :: add_task_to_taskserver
do i=1,tbc(0)
if(tbc(i) > fragment_first) then
write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') 0, tbc(i)
ipos += 20
if (ipos > 63980) then
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos)))
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
stop 'Unable to add task to task server'
endif
ipos=1
endif
else
@ -90,14 +104,18 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') j, tbc(i)
ipos += 20
if (ipos > 63980) then
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos)))
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
stop 'Unable to add task to task server'
endif
ipos=1
endif
end do
end if
end do
if (ipos > 1) then
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos)))
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then
stop 'Unable to add task to task server'
endif
endif
call zmq_set_running(zmq_to_qp_run_socket)
@ -254,7 +272,10 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_
if(parts_to_get(index(i)) == 0) actually_computed(index(i)) = .true.
enddo
call zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
integer, external :: zmq_delete_tasks
if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then
stop 'Unable to delete tasks'
endif
if (more == 0) then
loop = .False.
endif
@ -269,11 +290,16 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_
end if
end do
double precision :: E0, avg, prop
integer, external :: zmq_abort
if (firstTBDcomb > Ncomb) then
call zmq_abort(zmq_to_qp_run_socket)
if (zmq_abort(zmq_to_qp_run_socket) == -1) then
stop 'Error in sending abort signal'
endif
exit pullLoop
endif
double precision :: E0, avg, prop
call do_carlo(tbc, Ncomb+1-firstTBDcomb, comb(firstTBDcomb), pt2_detail, actually_computed, sumabove, sum2above, Nabove)
firstTBDcomb = int(Nabove(1)) - orgTBDcomb + 1
if(Nabove(1) < 5d0) cycle
@ -295,7 +321,9 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_
pt2(pt2_stoch_istate) = avg
error(pt2_stoch_istate) = eqt
print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, ''
call zmq_abort(zmq_to_qp_run_socket)
if (zmq_abort(zmq_to_qp_run_socket) == -1) then
stop 'Error in sending abort signal'
endif
else
if (Nabove(tooth) > Nabove_old) then
print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, ''

View File

@ -28,14 +28,17 @@ subroutine run_pt2_slave(thread,iproc,energy)
n_tasks_max = N_det_generators/100+1
allocate(task_id(n_tasks_max), task(n_tasks_max))
allocate(pt2(N_states,n_tasks_max), i_generator(n_tasks_max), subset(n_tasks_max))
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
if(worker_id == -1) then
integer, external :: connect_to_taskserver
if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
return
end if
endif
zmq_socket_push = new_zmq_push_socket(thread)
buf%N = 0
n_tasks = 0
call create_selection_buffer(1, 2, buf)
@ -45,7 +48,10 @@ subroutine run_pt2_slave(thread,iproc,energy)
n_tasks = min(n_tasks+1,n_tasks_max)
call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task, n_tasks)
integer, external :: get_tasks_from_taskserver
if (get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task, n_tasks) == -1) then
exit
endif
done = task_id(n_tasks) == 0
if (done) n_tasks = n_tasks-1
if (n_tasks == 0) exit
@ -59,10 +65,18 @@ subroutine run_pt2_slave(thread,iproc,energy)
buf%cur = 0
call select_connected(i_generator(k),energy,pt2(1,k),buf,subset(k))
enddo
call tasks_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id,n_tasks)
integer, external :: tasks_done_to_taskserver
if (tasks_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id,n_tasks) == -1) then
done = .true.
endif
call push_pt2_results(zmq_socket_push, i_generator, pt2, task_id, n_tasks)
end do
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
integer, external :: disconnect_from_taskserver
if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then
continue
endif
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
call delete_selection_buffer(buf)

View File

@ -27,20 +27,25 @@ subroutine run_selection_slave(thread,iproc,energy)
PROVIDE psi_bilinear_matrix_transp_order
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
if(worker_id == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
integer, external :: connect_to_taskserver
if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
return
end if
endif
zmq_socket_push = new_zmq_push_socket(thread)
buf%N = 0
buffer_ready = .False.
ctask = 1
pt2(:) = 0d0
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task)
integer, external :: get_task_from_taskserver
if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) == -1) then
exit
endif
done = task_id(ctask) == 0
if (done) then
ctask = ctask - 1
@ -58,9 +63,18 @@ subroutine run_selection_slave(thread,iproc,energy)
call select_connected(i_generator,energy,pt2,buf,0)
endif
integer, external :: task_done_to_taskserver
if(done .or. ctask == size(task_id)) then
do i=1, ctask
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i))
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then
call sleep(1)
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then
ctask = 0
done = .true.
exit
endif
endif
end do
if(ctask > 0) then
call sort_selection_buffer(buf)
@ -76,7 +90,13 @@ subroutine run_selection_slave(thread,iproc,energy)
if(done) exit
ctask = ctask + 1
end do
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
integer, external :: disconnect_from_taskserver
if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then
continue
endif
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
if (buffer_ready) then

View File

@ -33,7 +33,9 @@ subroutine run_wf
integer :: rc, i, ierr
double precision :: t0, t1
integer, external :: zmq_get_dvector
integer, external :: zmq_get_dvector, zmq_get_N_det_generators
integer, external :: zmq_get_psi, zmq_get_N_det_selectors
integer, external :: zmq_get_N_states_diag
call provide_everything
@ -59,10 +61,10 @@ subroutine run_wf
! ---------
call wall_time(t0)
call zmq_get_psi(zmq_to_qp_run_socket,1)
if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle
if (zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) == -1) cycle
if (zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) cycle
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1)
call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1)
call wall_time(t1)
call write_double(6,(t1-t0),'Broadcast time')
@ -80,8 +82,8 @@ subroutine run_wf
print *, 'Davidson'
call wall_time(t0)
call zmq_get_psi(zmq_to_qp_run_socket,1)
call zmq_get_N_states_diag(zmq_to_qp_run_socket,1)
if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle
if (zmq_get_N_states_diag(zmq_to_qp_run_socket,1) == -1) cycle
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states_diag) == -1) cycle
call wall_time(t1)
@ -99,10 +101,10 @@ subroutine run_wf
print *, 'PT2'
call wall_time(t0)
call zmq_get_psi(zmq_to_qp_run_socket,1)
if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1)
call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1)
if (zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) == -1) cycle
if (zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) cycle
call wall_time(t1)
call write_double(6,(t1-t0),'Broadcast time')

View File

@ -27,7 +27,9 @@ subroutine run_wf
double precision :: energy(N_states)
character*(64) :: states(4)
integer :: rc, i, ierr
integer, external :: zmq_get_dvector
integer, external :: zmq_get_psi
call provide_everything
@ -52,7 +54,7 @@ subroutine run_wf
! ---------
print *, 'Selection'
call zmq_get_psi(zmq_to_qp_run_socket,1)
if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
!$OMP PARALLEL PRIVATE(i)
@ -67,7 +69,7 @@ subroutine run_wf
! ---
print *, 'PT2'
call zmq_get_psi(zmq_to_qp_run_socket,1)
if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
logical :: lstop

View File

@ -11,7 +11,6 @@ subroutine ZMQ_selection(N_in, pt2)
integer, external :: omp_get_thread_num
double precision, intent(out) :: pt2(N_states)
integer, parameter :: maxtasks=10000
integer, external :: zmq_put_dvector
PROVIDE fragment_count
@ -25,15 +24,28 @@ subroutine ZMQ_selection(N_in, pt2)
PROVIDE psi_bilinear_matrix_transp_order
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'selection')
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
integer, external :: zmq_put_psi
integer, external :: zmq_put_N_det_generators
integer, external :: zmq_put_N_det_selectors
integer, external :: zmq_put_dvector
if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then
stop 'Unable to put psi on ZMQ server'
endif
if (zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) == -1) then
stop 'Unable to put N_det_generators on ZMQ server'
endif
if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) then
stop 'Unable to put N_det_selectors on ZMQ server'
endif
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
call create_selection_buffer(N, N*2, b)
endif
integer, external :: add_task_to_taskserver
character*(20*maxtasks) :: task
task = ' '
@ -44,11 +56,15 @@ subroutine ZMQ_selection(N_in, pt2)
write(task(20*(k-1)+1:20*k),'(I9,1X,I9,''|'')') i, N
if (k>=maxtasks) then
k=0
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then
stop 'Unable to add task to task server'
endif
endif
end do
if (k > 0) then
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then
stop 'Unable to add task to task server'
endif
endif
call zmq_set_running(zmq_to_qp_run_socket)
@ -130,7 +146,10 @@ subroutine selection_collector(zmq_socket_pull, b, N, pt2)
if(task_id(i) == 0) then
print *, "Error in collector"
endif
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more)
integer, external :: zmq_delete_task
if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more) == -1) then
stop 'Unable to delete task'
endif
end do
end do

View File

@ -1,6 +1,6 @@
BEGIN_TEMPLATE
subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id)
integer function zmq_put_$X(zmq_to_qp_run_socket,worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -11,29 +11,30 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id)
integer :: rc
character*(256) :: msg
zmq_put_$X = 0
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending $X'
stop 'error'
zmq_put_$X = -1
return
endif
rc = f77_zmq_send(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) then
print *, irp_here, ': Error sending $X'
stop 'error'
zmq_put_$X = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:rc) /= 'put_data_reply ok') then
print *, rc, trim(msg)
print *, irp_here, ': Error in put_data_reply'
stop 'error'
zmq_put_$X = -1
return
endif
end
subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id)
integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -44,38 +45,53 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting $X'
stop 'error'
zmq_get_$X = 0
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') then
print *, rc, trim(msg)
print *, irp_here, ': Error in get_data_reply'
stop 'error'
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) then
print *, rc
print *, irp_here, ': Error getting $X'
stop 'error'
endif
! Normal exit
IRP_IF MPI
include 'mpif.h'
integer :: ierr
call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_det_generators'
stop -1
endif
if (zmq_get_$X == 0) then
call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_det_generators'
stop -1
endif
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_$X = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_det_generators'
stop -1
endif
IRP_ENDIF
end
SUBST [ X ]

View File

@ -56,12 +56,18 @@ subroutine mrsc2_dressing_slave(thread,iproc)
logical, external :: is_in_wavefunction
integer,allocatable :: komon(:)
logical :: komoned
integer, external :: connect_to_taskserver, disconnect_from_taskserver
!double precision, external :: get_dij
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
integer, external :: add_task_to_taskserver
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
return
endif
zmq_socket_push = new_zmq_push_socket(thread)
allocate (delta(N_states,0:N_det_non_ref, 2))
allocate (delta_s2(N_states,0:N_det_non_ref, 2))
@ -74,7 +80,10 @@ subroutine mrsc2_dressing_slave(thread,iproc)
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task)
integer, external :: get_task_from_taskserver
if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) == -1) then
exit
endif
if (task_id == 0) exit
read (task,*) i_I, J, k1, k2
do i_state=1, N_states
@ -191,12 +200,17 @@ subroutine mrsc2_dressing_slave(thread,iproc)
end do ! kk
call push_mrsc2_results(zmq_socket_push, I_i, J, delta, delta_s2, task_id)
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id)
integer, external :: task_done_to_taskserver
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then
stop 'Unable to send task_done to server'
endif
enddo
deallocate(delta)
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1)
continue
endif
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
@ -389,17 +403,18 @@ end
subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2_)
subroutine mrsc2_dressing_collector(zmq_socket_pull,delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2_)
use f77_zmq
implicit none
BEGIN_DOC
! Collects results from the AO integral calculation
END_DOC
double precision,intent(inout) :: delta_ij_(N_states,N_det_non_ref,N_det_ref)
double precision,intent(inout) :: delta_ii_(N_states,N_det_ref)
double precision,intent(inout) :: delta_ij_s2_(N_states,N_det_non_ref,N_det_ref)
double precision,intent(inout) :: delta_ii_s2_(N_states,N_det_ref)
double precision,intent(inout) :: delta_ij_(N_states,N_det_non_ref,N_det_ref)
double precision,intent(inout) :: delta_ii_(N_states,N_det_ref)
double precision,intent(inout) :: delta_ij_s2_(N_states,N_det_non_ref,N_det_ref)
double precision,intent(inout) :: delta_ii_s2_(N_states,N_det_ref)
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
! integer :: j,l
integer :: rc
@ -410,7 +425,6 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
integer(ZMQ_PTR), external :: new_zmq_pull_socket
integer(ZMQ_PTR) :: zmq_socket_pull
integer*8 :: control, accu
integer :: task_id, more
@ -424,7 +438,6 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2
delta_ij_s2_(:,:,:) = 0d0
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_pull = new_zmq_pull_socket()
allocate ( delta(N_states,0:N_det_non_ref,2), delta_s2(N_states,0:N_det_non_ref,2) )
@ -466,7 +479,10 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2
if (task_id /= 0) then
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
integer, external :: zmq_delete_task
if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then
stop 'Unable to delete task'
endif
endif
@ -474,7 +490,6 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2
deallocate( delta, delta_s2 )
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_pull_socket(zmq_socket_pull)
end
@ -498,12 +513,12 @@ end
integer, external :: get_index_in_psi_det_sorted_bit, searchDet, detCmp
logical, external :: is_in_wavefunction, isInCassd, detEq
character*(512) :: task
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull
integer :: KKsize = 1000000
call new_parallel_job(zmq_to_qp_run_socket,'mrsc2')
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'mrsc2')
call wall_time(iwall)
@ -573,14 +588,18 @@ end
do kk = 1 , nlink(i_I), KKsize
write(task,*) I_i, J, kk, int(min(kk+KKsize-1, nlink(i_I)))
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then
stop 'Unable to add task to task server'
endif
end do
! do kk = 1 , nlink(i_I)
! k = linked(kk,i_I)
! blok = blokMwen(kk,i_I)
! write(task,*) I_i, J, k, blok
! call add_task_to_taskserver(zmq_to_qp_run_socket,task)
! if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then
! stop 'Unable to add task to task server'
! endif
!
! enddo !kk
enddo !J
@ -593,17 +612,19 @@ end
! rc = pthread_create(collector_thread, mrsc2_dressing_collector)
print *, nzer, ntot, float(nzer) / float(ntot)
provide nproc
!$OMP PARALLEL DEFAULT(none) SHARED(delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old) PRIVATE(i) NUM_THREADS(nproc+1)
!$OMP PARALLEL DEFAULT(none) &
!$OMP SHARED(delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old,zmq_socket_pull)&
!$OMP PRIVATE(i) NUM_THREADS(nproc+1)
i = omp_get_thread_num()
if (i==0) then
call mrsc2_dressing_collector(delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old)
call mrsc2_dressing_collector(zmq_socket_pull,delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old)
else
call mrsc2_dressing_slave_inproc(i)
endif
!$OMP END PARALLEL
! rc = pthread_join(collector_thread)
call end_parallel_job(zmq_to_qp_run_socket, 'mrsc2')
call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'mrsc2')
END_PROVIDER

View File

@ -33,16 +33,22 @@ subroutine davidson_run_slave(thread,iproc)
integer(ZMQ_PTR) :: zmq_socket_push
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
integer, external :: connect_to_taskserver
if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
endif
zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
if(worker_id == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
return
end if
call davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_states_diag, N_det, worker_id)
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
integer, external :: disconnect_from_taskserver
if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then
continue
endif
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
end subroutine
@ -62,6 +68,7 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze,
integer, allocatable :: psi_det_read(:,:,:)
double precision, allocatable :: v_t(:,:), s_t(:,:), u_t(:,:)
!DIR$ ATTRIBUTES ALIGN : $IRP_ALIGN :: u_t, v_t, s_t
! Get wave function (u_t)
@ -102,13 +109,20 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze,
allocate(v_t(N_st,N_det), s_t(N_st,N_det))
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, msg)
integer, external :: get_task_from_taskserver
integer, external :: task_done_to_taskserver
call sleep(1)
if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, msg) == -1) then
exit
endif
if(task_id == 0) exit
read (msg,*) imin, imax, ishift, istep
v_t = 0.d0
s_t = 0.d0
call H_S2_u_0_nstates_openmp_work(v_t,s_t,u_t,N_st,N_det,imin,imax,ishift,istep)
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id)
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then
print *, irp_here, 'Unable to send task_done'
endif
call davidson_push_results(zmq_socket_push, v_t, s_t, imin, imax, task_id)
end do
deallocate(u_t,v_t, s_t)
@ -231,7 +245,10 @@ subroutine davidson_collector(zmq_to_qp_run_socket, zmq_socket_pull, v0, s0, sze
s0(i,j) = s0(i,j) + s_t(j,i)
enddo
enddo
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
integer, external :: zmq_delete_task
if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then
stop 'Unable to delete task'
endif
end do
deallocate(v_t,s_t)
@ -292,12 +309,16 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze)
integer*8 :: rc8
double precision :: energy(N_st)
integer, external :: zmq_put_dvector
integer, external :: zmq_put_dvector, zmq_put_psi, zmq_put_N_states_diag
energy = 0.d0
call zmq_put_N_states_diag(zmq_to_qp_run_socket, 1)
call zmq_put_psi(zmq_to_qp_run_socket,1)
if (zmq_put_N_states_diag(zmq_to_qp_run_socket, 1) == -1) then
stop 'Unable to put N_states_diag on ZMQ server'
endif
if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then
stop 'Unable to put psi on ZMQ server'
endif
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
@ -313,6 +334,7 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze)
integer :: istep, imin, imax, ishift
double precision :: w, max_workload, N_det_inv, di
integer, external :: add_task_to_taskserver
w = 0.d0
istep=1
ishift=0
@ -325,7 +347,9 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze)
if (w > max_workload) then
do ishift=0,istep-1
write(task,'(4(I9,1X),1A)') imin, imax, ishift, istep, '|'
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task))
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then
stop 'Unable to add task'
endif
enddo
imin = imax+1
w = 0.d0
@ -335,7 +359,9 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze)
imax = N_det
do ishift=0,istep-1
write(task,'(4(I9,1X),1A)') imin, imax, ishift, istep, '|'
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task))
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then
stop 'Unable to add task'
endif
enddo
endif
@ -378,7 +404,7 @@ BEGIN_PROVIDER [ integer, nthreads_davidson ]
END_PROVIDER
subroutine zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id)
integer function zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -389,29 +415,30 @@ subroutine zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id)
integer :: rc
character*(256) :: msg
zmq_put_N_states_diag = 0
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'N_states_diag'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending N_states_diag'
stop 'error'
zmq_put_N_states_diag = -1
return
endif
rc = f77_zmq_send(zmq_to_qp_run_socket,N_states_diag,4,0)
if (rc /= 4) then
print *, irp_here, ': Error sending N_states_diag'
stop 'error'
zmq_put_N_states_diag = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:rc) /= 'put_data_reply ok') then
print *, rc, trim(msg)
print *, irp_here, ': Error in put_data_reply'
stop 'error'
zmq_put_N_states_diag = -1
return
endif
end
subroutine zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id)
integer function zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -422,31 +449,45 @@ subroutine zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'N_states_diag'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting N_states_diag'
stop 'error'
endif
zmq_get_N_states_diag = 0
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') then
print *, rc, trim(msg)
print *, irp_here, ': Error in get_data_reply'
stop 'error'
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,N_states_diag,4,0)
if (rc /= 4) then
print *, irp_here, ': Error getting N_states_diag'
stop 'error'
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'N_states_diag'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,N_states_diag,4,0)
if (rc /= 4) go to 10
endif
IRP_IF MPI
include 'mpif.h'
integer :: ierr
call MPI_BCAST (N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
call MPI_BCAST (zmq_get_N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_states'
stop -1
endif
if (zmq_get_N_states_diag == 0) then
call MPI_BCAST (N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_states'
stop -1
endif
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_N_states_diag = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_states'
stop -1

View File

@ -33,9 +33,20 @@ subroutine $subroutine($params_main)
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'$subroutine')
zmq_socket_pair = new_zmq_pair_socket(.True.)
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_N_det_generators(zmq_to_qp_run_socket, worker_id)
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, worker_id)
integer, external :: zmq_put_psi
integer, external :: zmq_put_N_det_generators
integer, external :: zmq_put_N_det_selectors
integer, external :: zmq_put_dvector
if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then
stop 'Unable to put psi on ZMQ server'
endif
if (zmq_put_N_det_generators(zmq_to_qp_run_socket, worker_id) == -1) then
stop 'Unable to put N_det_generators on ZMQ server'
endif
if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, worker_id) == -1) then
stop 'Unable to put N_det_selectors on ZMQ server'
endif
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
@ -43,7 +54,10 @@ subroutine $subroutine($params_main)
do i_generator=1,N_det_generators
$skip
write(task,*) i_generator
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task))
integer, external :: add_task_to_taskserver
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then
stop 'Unable to add task to taskserver'
endif
enddo
allocate ( pt2_generators(N_states,N_det_generators), &
@ -122,17 +136,24 @@ subroutine $subroutine_slave(thread, iproc)
integer(ZMQ_PTR) :: zmq_socket_push
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
integer, external :: connect_to_taskserver
if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
return
endif
zmq_socket_push = new_zmq_push_socket(thread)
N_st = N_states
allocate( pt2(N_st), norm_pert(N_st), H_pert_diag(N_st), &
mask(N_int,2,6), fock_diag_tmp(2,mo_tot_num+1) )
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task)
integer, external :: get_task_from_taskserver
if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) == -1) then
exit
endif
if (task_id == 0) exit
read(task,*) i_generator
@ -180,15 +201,21 @@ subroutine $subroutine_slave(thread, iproc)
fock_diag_tmp, i_generator, iproc $params_post)
endif
call task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id)
integer, external :: task_done_to_taskserver
if (task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) == -1) then
print *, irp_here, ': Unable to send task_done'
endif
call push_pt2(zmq_socket_push,pt2,norm_pert,H_pert_diag,i_generator,N_st,task_id)
enddo
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
deallocate( mask, fock_diag_tmp, pt2, norm_pert, H_pert_diag )
integer, external :: disconnect_from_taskserver
if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then
continue
endif
call end_zmq_push_socket(zmq_socket_push,thread)
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
@ -234,7 +261,10 @@ subroutine $subroutine_collector(zmq_socket_pull)
H_pert_diag_result(k,i_generator) = H_pert_diag(k)
enddo
accu = accu + 1_8
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
integer, external :: zmq_delete_task
if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then
stop 'Unable to delete task'
endif
endif
enddo

View File

@ -1,4 +1,4 @@
subroutine zmq_put_psi(zmq_to_qp_run_socket,worker_id)
integer function zmq_put_psi(zmq_to_qp_run_socket,worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -8,11 +8,33 @@ subroutine zmq_put_psi(zmq_to_qp_run_socket,worker_id)
integer, intent(in) :: worker_id
character*(256) :: msg
call zmq_put_N_states(zmq_to_qp_run_socket, worker_id)
call zmq_put_N_det(zmq_to_qp_run_socket, worker_id)
call zmq_put_psi_det_size(zmq_to_qp_run_socket, worker_id)
call zmq_put_psi_det(zmq_to_qp_run_socket, worker_id)
call zmq_put_psi_coef(zmq_to_qp_run_socket, worker_id)
integer, external :: zmq_put_N_states
integer, external :: zmq_put_N_det
integer, external :: zmq_put_psi_det_size
integer, external :: zmq_put_psi_det
integer, external :: zmq_put_psi_coef
zmq_put_psi = 0
if (zmq_put_N_states(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_put_psi = -1
return
endif
if (zmq_put_N_det(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_put_psi = -1
return
endif
if (zmq_put_psi_det_size(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_put_psi = -1
return
endif
if (zmq_put_psi_det(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_put_psi = -1
return
endif
if (zmq_put_psi_coef(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_put_psi = -1
return
endif
end
@ -20,7 +42,7 @@ end
BEGIN_TEMPLATE
subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id)
integer function zmq_put_$X(zmq_to_qp_run_socket,worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -31,29 +53,30 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id)
integer :: rc
character*(256) :: msg
zmq_put_$X = 0
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending $X'
stop 'error'
zmq_put_$X = -1
return
endif
rc = f77_zmq_send(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) then
print *, irp_here, ': Error sending $X'
stop 'error'
zmq_put_$X = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:rc) /= 'put_data_reply ok') then
print *, rc, trim(msg)
print *, irp_here, ': Error in put_data_reply'
stop 'error'
zmq_put_$X = -1
return
endif
end
subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id)
integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -64,26 +87,48 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting $X'
stop 'error'
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') then
print *, rc, trim(msg)
print *, irp_here, ': Error in get_data_reply'
stop 'error'
endif
! Normal exit
zmq_get_$X = 0
IRP_IF MPI
include 'mpif.h'
integer :: ierr
rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) then
print *, rc
print *, irp_here, ': Error getting $X'
stop 'error'
endif
call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
if (zmq_get_$X == 0) then
call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_$X = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
IRP_ENDIF
end
SUBST [ X ]
@ -94,7 +139,7 @@ psi_det_size ;;
END_TEMPLATE
subroutine zmq_put_psi_det(zmq_to_qp_run_socket,worker_id)
integer function zmq_put_psi_det(zmq_to_qp_run_socket,worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -106,28 +151,29 @@ subroutine zmq_put_psi_det(zmq_to_qp_run_socket,worker_id)
integer*8 :: rc8
character*(256) :: msg
zmq_put_psi_det = 0
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_det'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending psi_det'
stop 'error'
zmq_put_psi_det = -1
return
endif
rc8 = f77_zmq_send8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0)
if (rc8 /= N_int*2_8*N_det*bit_kind) then
print *, irp_here, ': Error sending psi_det'
stop 'error'
zmq_put_psi_det = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:rc) /= 'put_data_reply ok') then
print *, rc, trim(msg)
print *, irp_here, ': Error in put_data_reply'
stop 'error'
zmq_put_psi_det = -1
return
endif
end
subroutine zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id)
integer function zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -139,31 +185,33 @@ subroutine zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id)
integer*8 :: rc8
character*(256) :: msg
zmq_put_psi_coef = 0
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_coef'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending psi_coef'
stop 'error'
zmq_put_psi_coef = -1
return
endif
rc8 = f77_zmq_send8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0)
if (rc8 /= psi_det_size*N_states*8_8) then
print *, irp_here, ': Error sending psi_coef'
stop 'error'
zmq_put_psi_coef = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:rc) /= 'put_data_reply ok') then
print *, rc, trim(msg)
print *, irp_here, ': Error in put_data_reply'
stop 'error'
zmq_put_psi_coef = -1
return
endif
end
!---------------------------------------------------------------------------
subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id)
integer function zmq_get_psi(zmq_to_qp_run_socket, worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -172,9 +220,26 @@ subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id)
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer, intent(in) :: worker_id
call zmq_get_N_states(zmq_to_qp_run_socket, worker_id)
call zmq_get_N_det(zmq_to_qp_run_socket, worker_id)
call zmq_get_psi_det_size(zmq_to_qp_run_socket, worker_id)
integer, external :: zmq_get_N_states
integer, external :: zmq_get_N_det
integer, external :: zmq_get_psi_det_size
integer, external :: zmq_get_psi_det
integer, external :: zmq_get_psi_coef
zmq_get_psi = 0
if (zmq_get_N_states(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_get_psi = -1
return
endif
if (zmq_get_N_det(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_get_psi = -1
return
endif
if (zmq_get_psi_det_size(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_get_psi = -1
return
endif
if (size(psi_det) /= N_int*2_8*psi_det_size*bit_kind) then
deallocate(psi_det)
@ -186,14 +251,20 @@ subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id)
allocate(psi_coef(psi_det_size,N_states))
endif
call zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
call zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
if (zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_get_psi = -1
return
endif
if (zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) == -1) then
zmq_get_psi = -1
return
endif
SOFT_TOUCH psi_det psi_coef psi_det_size N_det N_states
end
subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
integer function zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -205,33 +276,46 @@ subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
integer*8 :: rc8
character*(256) :: msg
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting psi_det'
stop 'error'
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0)
if (rc8 /= N_int*2_8*N_det*bit_kind) go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') then
print *, rc, trim(msg)
print *, irp_here, ': Error in get_data_reply'
stop 'error'
endif
rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0)
if (rc8 /= N_int*2_8*N_det*bit_kind) then
print *, irp_here, ': Error getting psi_det', rc8, N_int*2_8*N_det*bit_kind
stop 'error'
endif
! Normal exit
zmq_get_psi_det = 0
IRP_IF MPI
call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2)
include 'mpif.h'
integer :: ierr
call MPI_BCAST (zmq_get_psi_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
if (zmq_get_psi_det == 0) then
call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2)
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_psi_det = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_psi_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
IRP_ENDIF
end
subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -243,31 +327,44 @@ subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
integer*8 :: rc8
character*(256) :: msg
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting psi_coef'
stop 'error'
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') then
print *, rc, trim(msg)
print *, irp_here, ': Error in get_data_reply'
stop 'error'
endif
rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0)
if (rc8 /= psi_det_size*N_states*8_8) then
print *, irp_here, ': Error getting psi_coef'
stop 'error'
rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0)
if (rc8 /= psi_det_size*N_states*8_8) go to 10
endif
! Normal exit
zmq_get_psi_coef = 0
IRP_IF MPI
call broadcast_chunks_double(psi_coef,N_states*N_det)
include 'mpif.h'
integer :: ierr
call MPI_BCAST (zmq_get_psi_coef, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_coef'
endif
if (zmq_get_psi_coef == 0) then
call broadcast_chunks_double(psi_coef,N_states*N_det)
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_psi_coef = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_psi_coef, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_coef'
endif
IRP_ENDIF
end

View File

@ -373,7 +373,10 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ]
write(fmt,*) '(', ao_num, '(I5,X,I5,''|''))'
do l=1,ao_num
write(task,fmt) (i,l, i=1,l)
call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task))
integer, external :: add_task_to_taskserver
if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then
stop 'Unable to add task to server'
endif
enddo
deallocate(task)

View File

@ -98,23 +98,37 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc)
character*(64) :: state
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
integer, external :: connect_to_taskserver
if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
return
endif
zmq_socket_push = new_zmq_push_socket(thread)
allocate ( buffer_i(ao_num*ao_num), buffer_value(ao_num*ao_num) )
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task)
integer, external :: get_task_from_taskserver
if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) == -1) then
exit
endif
if (task_id == 0) exit
read(task,*) j, l
integer, external :: task_done_to_taskserver
call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value)
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id)
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then
stop 'Unable to send task_done'
endif
call push_integrals(zmq_socket_push, n_integrals, buffer_i, buffer_value, task_id)
enddo
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
integer, external :: disconnect_from_taskserver
if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then
continue
endif
deallocate( buffer_i, buffer_value )
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
@ -200,7 +214,10 @@ IRP_ENDIF
call insert_into_ao_integrals_map(n_integrals,buffer_i,buffer_value)
accu += n_integrals
if (task_id /= 0) then
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
integer, external :: zmq_delete_task
if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then
stop 'Unable to delete task'
endif
endif
endif

View File

@ -12,32 +12,27 @@ integer function zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_
integer :: rc
character*(256) :: msg
! Failure
zmq_put_dvector = -1
zmq_put_dvector = 0
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, name
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending '//name
zmq_put_dvector = -1
return
endif
rc = f77_zmq_send(zmq_to_qp_run_socket,x,size_x*8,0)
if (rc /= size_x*8) then
print *, irp_here, ': Error sending '//name
zmq_put_dvector = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:rc) /= 'put_data_reply ok') then
print *, rc, trim(msg)
print *, irp_here, ': Error in put_data_reply'
zmq_put_dvector = -1
return
endif
! Success
zmq_put_dvector = 0
end
@ -59,26 +54,19 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_
! Success
zmq_get_dvector = 0
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting '//name
zmq_get_dvector = -1
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') then
print *, rc, trim(msg)
print *, irp_here, ': Error in get_data_reply'
zmq_get_dvector = -1
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0)
if (rc /= size_x*8) then
print *, irp_here, ': Error getting '//name
zmq_get_dvector = -1
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0)
if (rc /= size_x*8) go to 10
endif
! Normal exit
IRP_IF MPI
integer :: ierr
@ -88,13 +76,27 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_
print *, irp_here//': Unable to broadcast zmq_get_dvector'
stop -1
endif
call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast dvector'
stop -1
if (zmq_get_dvector == 0) then
call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast dvector'
stop -1
endif
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_dvector = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_dvector, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast zmq_get_dvector'
stop -1
endif
IRP_ENDIF
end

View File

@ -644,7 +644,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
endif
end
subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
use f77_zmq
implicit none
BEGIN_DOC
@ -657,38 +657,42 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
character*(512) :: message
character*(128) :: reply, state, address
integer :: rc
!Success
connect_to_taskserver = 0
if (thread == 1) then
rc = f77_zmq_send(zmq_to_qp_run_socket, "connect inproc", 14, 0)
if (rc /= 14) then
print *, 'f77_zmq_send(zmq_to_qp_run_socket, "connect inproc", 14, 0)'
stop 'error'
connect_to_taskserver = -1
return
endif
else
rc = f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0)
if (rc /= 11) then
print *, 'f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0)'
stop 'error'
connect_to_taskserver = -1
return
endif
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
if(message(1:5) == "error") then
worker_id = -1
connect_to_taskserver = -1
return
end if
read(message,*, end=10, err=10) reply, state, worker_id, address
if (trim(reply) /= 'connect_reply') then
print *, trim(message)
stop -1
connect_to_taskserver = -1
return
endif
return
10 continue
print *, irp_here, ': Error in read'
stop
connect_to_taskserver = -1
end
subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
integer function disconnect_from_taskserver(zmq_to_qp_run_socket, &
zmq_socket_push, worker_id)
use f77_zmq
implicit none
@ -701,18 +705,17 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
integer :: rc, sze
character*(64) :: message, reply, state
disconnect_from_taskserver = 0
write(message,*) 'disconnect '//trim(zmq_state), worker_id
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
! if (rc == -1) then
! return
! endif
if (rc /= sze) then
print *, rc, sze
print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
disconnect_from_taskserver = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
@ -720,26 +723,23 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
read(message,*, end=10, err=10) reply, state
if ((trim(reply) == 'disconnect_reply').and.(trim(state) == trim(zmq_state))) then
disconnect_from_taskserver = -1
return
endif
if (trim(message) == 'error Wrong state') then
disconnect_from_taskserver = -1
return
else if (trim(message) == 'error No job is running') then
disconnect_from_taskserver = -1
return
endif
if (trim(message) == 'error No job is running') then
return
endif
print *, 'Unable to disconnect : ', trim(zmq_state)
print *, trim(message)
stop -1
return
10 continue
print *, irp_here, ': Error in read'
stop
disconnect_from_taskserver = -1
end
subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task)
integer function add_task_to_taskserver(zmq_to_qp_run_socket,task)
use f77_zmq
implicit none
BEGIN_DOC
@ -751,71 +751,27 @@ subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task)
integer :: rc, sze
character(len=:), allocatable :: message
add_task_to_taskserver = 0
message='add_task '//trim(zmq_state)//' '//trim(task)
sze = len(message)
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
if (rc /= sze) then
print *, rc, sze
print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
add_task_to_taskserver = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, sze-1, 0)
if (message(1:rc) /= 'ok') then
print *, trim(message(1:rc))
print *, trim(task)
print *, 'Unable to add the next task'
stop -1
add_task_to_taskserver = -1
return
endif
end
subroutine add_task_to_taskserver_send(zmq_to_qp_run_socket,task)
use f77_zmq
implicit none
BEGIN_DOC
! Get a task from the task server
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
character*(*), intent(in) :: task
integer :: rc, sze
character(len=:), allocatable :: message
sze = len(trim(task))+12+len(trim(zmq_state))
message = repeat(' ',sze)
write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task)
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc /= sze) then
print *, rc, sze
print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
endif
end
subroutine add_task_to_taskserver_recv(zmq_to_qp_run_socket)
use f77_zmq
implicit none
BEGIN_DOC
! Get a task from the task server
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer :: rc, sze
character*(512) :: message
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
if (message(1:rc) /= 'ok') then
print *, trim(message(1:rc))
print *, 'Unable to add the next task'
stop -1
endif
end
subroutine zmq_abort(zmq_to_qp_run_socket)
integer function zmq_abort(zmq_to_qp_run_socket)
use f77_zmq
implicit none
BEGIN_DOC
@ -824,25 +780,26 @@ subroutine zmq_abort(zmq_to_qp_run_socket)
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer :: rc, sze
character*(512) :: message
zmq_abort = 0
write(message,*) 'abort '
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc /= sze) then
print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
zmq_abort = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
if (trim(message(1:rc)) /= 'ok') then
print *, trim(message(1:rc))
print *, 'Unable to send abort message'
stop -1
zmq_abort = -1
return
endif
end
subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id)
integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id)
use f77_zmq
implicit none
BEGIN_DOC
@ -853,25 +810,27 @@ subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id)
integer :: rc, sze
character*(512) :: message
task_done_to_taskserver = 0
write(message,*) 'task_done '//trim(zmq_state), worker_id, task_id
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc /= sze) then
print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
task_done_to_taskserver = -1
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
if (trim(message(1:rc)) /= 'ok') then
print *, trim(message(1:rc))
print *, 'Unable to send task_done message'
stop -1
task_done_to_taskserver = -1
return
endif
end
subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks)
integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks)
use f77_zmq
implicit none
BEGIN_DOC
@ -884,6 +843,8 @@ subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_
character(LEN=:), allocatable :: message
character*(64) :: fmt
tasks_done_to_taskserver = 0
allocate(character(LEN=64+n_tasks*12) :: message)
write(fmt,*) '(A,X,A,I10,X,', n_tasks, '(I11,1X))'
write(message,*) 'task_done '//trim(zmq_state), worker_id, (task_id(k), k=1,n_tasks)
@ -891,27 +852,26 @@ subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc == -1) then
! Server is shut down
tasks_done_to_taskserver = -1
deallocate(message)
return
endif
if (rc /= sze) then
print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
tasks_done_to_taskserver = -1
deallocate(message)
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 64, 0)
if (trim(message(1:rc)) /= 'ok') then
print *, trim(message(1:rc))
print *, 'Unable to send task_done message'
stop -1
tasks_done_to_taskserver = -1
endif
deallocate(message)
end
subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
use f77_zmq
implicit none
BEGIN_DOC
@ -926,13 +886,15 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
character*(64) :: reply
integer :: rc, sze
get_task_from_taskserver = 0
write(message,*) 'get_task '//trim(zmq_state), worker_id
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
if (rc /= sze) then
print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
get_task_from_taskserver = -1
return
endif
message = repeat(' ',512)
@ -960,19 +922,18 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
task_id = 0
task = 'terminate'
else
print *, 'Unable to get the next task'
print *, trim(message)
stop -1
get_task_from_taskserver = -1
return
endif
return
10 continue
print *, irp_here, ': Error in read'
stop
get_task_from_taskserver = -1
end
subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,n_tasks)
integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,n_tasks)
use f77_zmq
implicit none
BEGIN_DOC
@ -988,15 +949,15 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
character*(64) :: reply
integer :: rc, sze, i
get_tasks_from_taskserver = 0
write(message,*) 'get_tasks '//trim(zmq_state), worker_id, n_tasks
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
if (rc /= sze) then
print *, trim(message)
print *, rc, sze
print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
get_tasks_from_taskserver = -1
return
endif
message = repeat(' ',1024)
@ -1004,17 +965,16 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
rc = min(1024,rc)
read(message(1:rc),*, end=10, err=10) reply
if (trim(message) == 'get_tasks_reply ok') then
continue
continue
else if (trim(message) == 'terminate') then
task_id(1) = 0
task(1) = 'terminate'
task_id(1) = 0
task(1) = 'terminate'
else if (trim(message) == 'error No job is running') then
task_id(1) = 0
task(1) = 'terminate'
task_id(1) = 0
task(1) = 'terminate'
else
print *, 'Unable to get the next task'
print *, ':'//trim(message)//':'
stop -1
get_tasks_from_taskserver = -1
return
endif
task(:) = repeat(' ',512)
@ -1038,11 +998,11 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
rc += 1
task(i) = message(rc:)
enddo
return
10 continue
print *, irp_here, ': Error in read'
stop
get_tasks_from_taskserver = -1
return
end
@ -1070,7 +1030,7 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
end
subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
use f77_zmq
implicit none
BEGIN_DOC
@ -1084,11 +1044,13 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
integer :: rc
character*(512) :: message
zmq_delete_task = 0
write(message,*) 'del_task ', zmq_state, task_id
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
if (rc /= len(trim(message))) then
print *, irp_here
stop 'error'
zmq_delete_task = -1
return
endif
character*(64) :: reply
@ -1100,13 +1062,12 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
else if (reply(16:19) == 'done') then
more = 0
else
print *, reply
print *, irp_here
stop 'error'
zmq_delete_task = -1
return
endif
end
subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
use f77_zmq
implicit none
BEGIN_DOC
@ -1121,6 +1082,8 @@ subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks
character*(64) :: fmt, reply
character(LEN=:), allocatable :: message
zmq_delete_tasks = 0
allocate(character(LEN=64+n_tasks*12) :: message)
write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))'
@ -1129,8 +1092,9 @@ subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
if (rc /= len(trim(message))) then
print *, irp_here
stop 'error'
zmq_delete_tasks = -1
deallocate(message)
return
endif
deallocate(message)
@ -1142,9 +1106,7 @@ subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks
else if (reply(16:19) == 'done') then
more = 0
else
print *, reply
print *, irp_here
stop 'error'
zmq_delete_tasks = -1
endif
end