mirror of
https://github.com/LCPQ/quantum_package
synced 2024-12-31 16:45:54 +01:00
Improved Parallelism
This commit is contained in:
parent
a9aeda4958
commit
68967d2101
@ -9,7 +9,7 @@
|
|||||||
FC : ifort
|
FC : ifort
|
||||||
LAPACK_LIB : -mkl=parallel
|
LAPACK_LIB : -mkl=parallel
|
||||||
IRPF90 : irpf90
|
IRPF90 : irpf90
|
||||||
IRPF90_FLAGS : --ninja --align=32
|
IRPF90_FLAGS : --ninja --align=32 -DZMQ_PUSH
|
||||||
|
|
||||||
# Global options
|
# Global options
|
||||||
################
|
################
|
||||||
|
@ -194,14 +194,21 @@ let end_job msg program_state rep_socket pair_socket =
|
|||||||
reply_wrong_state rep_socket;
|
reply_wrong_state rep_socket;
|
||||||
program_state
|
program_state
|
||||||
|
|
||||||
and success state =
|
and success () =
|
||||||
reply_ok rep_socket;
|
reply_ok rep_socket;
|
||||||
{ program_state with
|
{ program_state with
|
||||||
state = None ;
|
state = None ;
|
||||||
progress_bar = Progress_bar.clear ();
|
progress_bar = Progress_bar.clear ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
and wait n =
|
||||||
|
Printf.sprintf "waiting %d" n
|
||||||
|
|> Message.Error_msg.create
|
||||||
|
|> Message.Error_msg.to_string
|
||||||
|
|> ZMQ.Socket.send rep_socket ;
|
||||||
|
program_state
|
||||||
in
|
in
|
||||||
|
|
||||||
match program_state.state with
|
match program_state.state with
|
||||||
| None -> failure ()
|
| None -> failure ()
|
||||||
| Some state ->
|
| Some state ->
|
||||||
@ -210,7 +217,10 @@ let end_job msg program_state rep_socket pair_socket =
|
|||||||
begin
|
begin
|
||||||
string_of_pub_state Waiting
|
string_of_pub_state Waiting
|
||||||
|> ZMQ.Socket.send pair_socket ;
|
|> ZMQ.Socket.send pair_socket ;
|
||||||
success state
|
if (Queuing_system.number_of_clients program_state.queue = 0) then
|
||||||
|
success ()
|
||||||
|
else
|
||||||
|
wait (Queuing_system.number_of_clients program_state.queue)
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
failure ()
|
failure ()
|
||||||
@ -600,6 +610,9 @@ let abort program_state rep_socket =
|
|||||||
List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue)
|
List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue)
|
||||||
~init:queue tasks
|
~init:queue tasks
|
||||||
in
|
in
|
||||||
|
let queue =
|
||||||
|
Queuing_system.del_client ~client_id queue
|
||||||
|
in
|
||||||
reply_ok rep_socket;
|
reply_ok rep_socket;
|
||||||
|
|
||||||
{ program_state with
|
{ program_state with
|
||||||
|
@ -10,7 +10,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
|
|||||||
implicit none
|
implicit none
|
||||||
|
|
||||||
character(len=64000) :: task
|
character(len=64000) :: task
|
||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_to_qp_run_socket2
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull
|
||||||
type(selection_buffer) :: b
|
type(selection_buffer) :: b
|
||||||
integer, external :: omp_get_thread_num
|
integer, external :: omp_get_thread_num
|
||||||
double precision, intent(in) :: relative_error, absolute_error, E(N_states)
|
double precision, intent(in) :: relative_error, absolute_error, E(N_states)
|
||||||
@ -27,6 +27,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
|
|||||||
double precision, external :: omp_get_wtime
|
double precision, external :: omp_get_wtime
|
||||||
double precision :: time
|
double precision :: time
|
||||||
double precision :: w(N_states)
|
double precision :: w(N_states)
|
||||||
|
integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket
|
||||||
|
|
||||||
if (N_det < max(10,N_states)) then
|
if (N_det < max(10,N_states)) then
|
||||||
pt2=0.d0
|
pt2=0.d0
|
||||||
@ -55,22 +56,21 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
|
|||||||
computed(i) = .true.
|
computed(i) = .true.
|
||||||
end do
|
end do
|
||||||
|
|
||||||
|
Ncomb=size(comb)
|
||||||
|
call get_carlo_workbatch(computed, comb, Ncomb, tbc)
|
||||||
|
|
||||||
pt2_detail = 0d0
|
pt2_detail = 0d0
|
||||||
print *, '========== ================= ================= ================='
|
print *, '========== ================= ================= ================='
|
||||||
print *, ' Samples Energy Stat. Error Seconds '
|
print *, ' Samples Energy Stat. Error Seconds '
|
||||||
print *, '========== ================= ================= ================='
|
print *, '========== ================= ================= ================='
|
||||||
|
|
||||||
call new_parallel_job(zmq_to_qp_run_socket,'pt2')
|
call new_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2')
|
||||||
call zmq_put_psi(zmq_to_qp_run_socket,1)
|
call zmq_put_psi(zmq_to_qp_run_socket,1)
|
||||||
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
|
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
|
||||||
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
|
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
|
||||||
call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator))
|
call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator))
|
||||||
call create_selection_buffer(1, 1*2, b)
|
call create_selection_buffer(1, 1*2, b)
|
||||||
|
|
||||||
Ncomb=size(comb)
|
|
||||||
call get_carlo_workbatch(computed, comb, Ncomb, tbc)
|
|
||||||
|
|
||||||
integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket
|
|
||||||
integer :: ipos
|
integer :: ipos
|
||||||
ipos=1
|
ipos=1
|
||||||
|
|
||||||
@ -103,14 +103,14 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
|
|||||||
!$OMP PRIVATE(i)
|
!$OMP PRIVATE(i)
|
||||||
i = omp_get_thread_num()
|
i = omp_get_thread_num()
|
||||||
if (i==0) then
|
if (i==0) then
|
||||||
call pt2_collector(E(pt2_stoch_istate), b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, w, error)
|
call pt2_collector(zmq_socket_pull,E(pt2_stoch_istate), b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, w, error)
|
||||||
pt2(pt2_stoch_istate) = w(pt2_stoch_istate)
|
pt2(pt2_stoch_istate) = w(pt2_stoch_istate)
|
||||||
else
|
else
|
||||||
call pt2_slave_inproc(i)
|
call pt2_slave_inproc(i)
|
||||||
endif
|
endif
|
||||||
!$OMP END PARALLEL
|
!$OMP END PARALLEL
|
||||||
call delete_selection_buffer(b)
|
call delete_selection_buffer(b)
|
||||||
call end_parallel_job(zmq_to_qp_run_socket, 'pt2')
|
call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2')
|
||||||
|
|
||||||
print *, '========== ================= ================= ================='
|
print *, '========== ================= ================= ================='
|
||||||
|
|
||||||
@ -163,7 +163,7 @@ subroutine pt2_slave_inproc(i)
|
|||||||
call run_pt2_slave(1,i,pt2_e0_denominator)
|
call run_pt2_slave(1,i,pt2_e0_denominator)
|
||||||
end
|
end
|
||||||
|
|
||||||
subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, pt2,error)
|
subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, pt2,error)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
use selection_types
|
use selection_types
|
||||||
use bitmasks
|
use bitmasks
|
||||||
@ -171,6 +171,7 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove,
|
|||||||
|
|
||||||
|
|
||||||
integer, intent(in) :: Ncomb
|
integer, intent(in) :: Ncomb
|
||||||
|
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
|
||||||
double precision, intent(inout) :: pt2_detail(N_states, N_det_generators)
|
double precision, intent(inout) :: pt2_detail(N_states, N_det_generators)
|
||||||
double precision, intent(in) :: comb(Ncomb), relative_error, absolute_error, E
|
double precision, intent(in) :: comb(Ncomb), relative_error, absolute_error, E
|
||||||
logical, intent(inout) :: computed(N_det_generators)
|
logical, intent(inout) :: computed(N_det_generators)
|
||||||
@ -184,8 +185,6 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove,
|
|||||||
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
|
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
|
||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
||||||
|
|
||||||
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
|
||||||
integer(ZMQ_PTR) :: zmq_socket_pull
|
|
||||||
|
|
||||||
integer :: msg_size, rc, more
|
integer :: msg_size, rc, more
|
||||||
integer :: acc, i, j, robin, N, n_tasks
|
integer :: acc, i, j, robin, N, n_tasks
|
||||||
@ -227,7 +226,6 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove,
|
|||||||
firstTBDcomb = 1
|
firstTBDcomb = 1
|
||||||
|
|
||||||
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
||||||
zmq_socket_pull = new_zmq_pull_socket()
|
|
||||||
allocate(val(b%N), det(N_int, 2, b%N), task_id(n_tasks_max), index(n_tasks_max))
|
allocate(val(b%N), det(N_int, 2, b%N), task_id(n_tasks_max), index(n_tasks_max))
|
||||||
more = 1
|
more = 1
|
||||||
call wall_time(time0)
|
call wall_time(time0)
|
||||||
@ -253,16 +251,14 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove,
|
|||||||
if(parts_to_get(index(i)) == 0) actually_computed(index(i)) = .true.
|
if(parts_to_get(index(i)) == 0) actually_computed(index(i)) = .true.
|
||||||
enddo
|
enddo
|
||||||
|
|
||||||
do i=1, n_tasks
|
call zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
|
||||||
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more)
|
if (more == 0) then
|
||||||
if (more /= 1) then
|
loop = .False.
|
||||||
loop = .False.
|
endif
|
||||||
endif
|
|
||||||
end do
|
|
||||||
|
|
||||||
time = omp_get_wtime()
|
time = omp_get_wtime()
|
||||||
|
|
||||||
if(time - timeLast > 10d0 .or. more /= 1) then
|
if(time - timeLast > 10d0 .or. (.not.loop)) then
|
||||||
timeLast = time
|
timeLast = time
|
||||||
do i=1, first_det_of_teeth(1)-1
|
do i=1, first_det_of_teeth(1)-1
|
||||||
if(.not.(actually_computed(i))) then
|
if(.not.(actually_computed(i))) then
|
||||||
@ -313,7 +309,6 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove,
|
|||||||
pt2(pt2_stoch_istate) = E0 + (sumabove(tooth) / Nabove(tooth))
|
pt2(pt2_stoch_istate) = E0 + (sumabove(tooth) / Nabove(tooth))
|
||||||
|
|
||||||
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
||||||
call end_zmq_pull_socket(zmq_socket_pull)
|
|
||||||
call sort_selection_buffer(b)
|
call sort_selection_buffer(b)
|
||||||
end subroutine
|
end subroutine
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ subroutine run_pt2_slave(thread,iproc,energy)
|
|||||||
return
|
return
|
||||||
end if
|
end if
|
||||||
buf%N = 0
|
buf%N = 0
|
||||||
n_tasks = 1
|
n_tasks = 0
|
||||||
call create_selection_buffer(1, 2, buf)
|
call create_selection_buffer(1, 2, buf)
|
||||||
|
|
||||||
done = .False.
|
done = .False.
|
||||||
@ -48,6 +48,7 @@ subroutine run_pt2_slave(thread,iproc,energy)
|
|||||||
call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task, n_tasks)
|
call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task, n_tasks)
|
||||||
done = task_id(n_tasks) == 0
|
done = task_id(n_tasks) == 0
|
||||||
if (done) n_tasks = n_tasks-1
|
if (done) n_tasks = n_tasks-1
|
||||||
|
if (n_tasks == 0) exit
|
||||||
|
|
||||||
do k=1,n_tasks
|
do k=1,n_tasks
|
||||||
read (task(k),*) subset(k), i_generator(k)
|
read (task(k),*) subset(k), i_generator(k)
|
||||||
@ -58,9 +59,7 @@ subroutine run_pt2_slave(thread,iproc,energy)
|
|||||||
buf%cur = 0
|
buf%cur = 0
|
||||||
call select_connected(i_generator(k),energy,pt2(1,k),buf,subset(k))
|
call select_connected(i_generator(k),energy,pt2(1,k),buf,subset(k))
|
||||||
enddo
|
enddo
|
||||||
do k=1,n_tasks
|
call tasks_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id,n_tasks)
|
||||||
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(k))
|
|
||||||
enddo
|
|
||||||
call push_pt2_results(zmq_socket_push, i_generator, pt2, task_id, n_tasks)
|
call push_pt2_results(zmq_socket_push, i_generator, pt2, task_id, n_tasks)
|
||||||
end do
|
end do
|
||||||
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
|
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
|
||||||
@ -81,17 +80,29 @@ subroutine push_pt2_results(zmq_socket_push, index, pt2, task_id, n_tasks)
|
|||||||
integer :: rc
|
integer :: rc
|
||||||
|
|
||||||
rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE)
|
rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE)
|
||||||
|
if (rc == -1) then
|
||||||
|
return
|
||||||
|
endif
|
||||||
if(rc /= 4) stop 'push'
|
if(rc /= 4) stop 'push'
|
||||||
|
|
||||||
|
|
||||||
rc = f77_zmq_send( zmq_socket_push, index, 4*n_tasks, ZMQ_SNDMORE)
|
rc = f77_zmq_send( zmq_socket_push, index, 4*n_tasks, ZMQ_SNDMORE)
|
||||||
|
if (rc == -1) then
|
||||||
|
return
|
||||||
|
endif
|
||||||
if(rc /= 4*n_tasks) stop 'push'
|
if(rc /= 4*n_tasks) stop 'push'
|
||||||
|
|
||||||
|
|
||||||
rc = f77_zmq_send( zmq_socket_push, pt2, 8*N_states*n_tasks, ZMQ_SNDMORE)
|
rc = f77_zmq_send( zmq_socket_push, pt2, 8*N_states*n_tasks, ZMQ_SNDMORE)
|
||||||
|
if (rc == -1) then
|
||||||
|
return
|
||||||
|
endif
|
||||||
if(rc /= 8*N_states*n_tasks) stop 'push'
|
if(rc /= 8*N_states*n_tasks) stop 'push'
|
||||||
|
|
||||||
rc = f77_zmq_send( zmq_socket_push, task_id, n_tasks*4, 0)
|
rc = f77_zmq_send( zmq_socket_push, task_id, n_tasks*4, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
return
|
||||||
|
endif
|
||||||
if(rc /= 4*n_tasks) stop 'push'
|
if(rc /= 4*n_tasks) stop 'push'
|
||||||
|
|
||||||
! Activate is zmq_socket_push is a REQ
|
! Activate is zmq_socket_push is a REQ
|
||||||
@ -99,6 +110,9 @@ IRP_IF ZMQ_PUSH
|
|||||||
IRP_ELSE
|
IRP_ELSE
|
||||||
character*(2) :: ok
|
character*(2) :: ok
|
||||||
rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0)
|
rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
return
|
||||||
|
endif
|
||||||
if ((rc /= 2).and.(ok(1:2) /= 'ok')) then
|
if ((rc /= 2).and.(ok(1:2) /= 'ok')) then
|
||||||
print *, irp_here//': error in receiving ok'
|
print *, irp_here//': error in receiving ok'
|
||||||
stop -1
|
stop -1
|
||||||
@ -119,21 +133,41 @@ subroutine pull_pt2_results(zmq_socket_pull, index, pt2, task_id, n_tasks)
|
|||||||
integer :: rc, rn, i
|
integer :: rc, rn, i
|
||||||
|
|
||||||
rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0)
|
rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
n_tasks = 1
|
||||||
|
task_id(1) = 0
|
||||||
|
endif
|
||||||
if(rc /= 4) stop 'pull'
|
if(rc /= 4) stop 'pull'
|
||||||
|
|
||||||
rc = f77_zmq_recv( zmq_socket_pull, index, 4*n_tasks, 0)
|
rc = f77_zmq_recv( zmq_socket_pull, index, 4*n_tasks, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
n_tasks = 1
|
||||||
|
task_id(1) = 0
|
||||||
|
endif
|
||||||
if(rc /= 4*n_tasks) stop 'pull'
|
if(rc /= 4*n_tasks) stop 'pull'
|
||||||
|
|
||||||
rc = f77_zmq_recv( zmq_socket_pull, pt2, N_states*8*n_tasks, 0)
|
rc = f77_zmq_recv( zmq_socket_pull, pt2, N_states*8*n_tasks, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
n_tasks = 1
|
||||||
|
task_id(1) = 0
|
||||||
|
endif
|
||||||
if(rc /= 8*N_states*n_tasks) stop 'pull'
|
if(rc /= 8*N_states*n_tasks) stop 'pull'
|
||||||
|
|
||||||
rc = f77_zmq_recv( zmq_socket_pull, task_id, n_tasks*4, 0)
|
rc = f77_zmq_recv( zmq_socket_pull, task_id, n_tasks*4, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
n_tasks = 1
|
||||||
|
task_id(1) = 0
|
||||||
|
endif
|
||||||
if(rc /= 4*n_tasks) stop 'pull'
|
if(rc /= 4*n_tasks) stop 'pull'
|
||||||
|
|
||||||
! Activate is zmq_socket_pull is a REP
|
! Activate is zmq_socket_pull is a REP
|
||||||
IRP_IF ZMQ_PUSH
|
IRP_IF ZMQ_PUSH
|
||||||
IRP_ELSE
|
IRP_ELSE
|
||||||
rc = f77_zmq_send( zmq_socket_pull, 'ok', 2, 0)
|
rc = f77_zmq_send( zmq_socket_pull, 'ok', 2, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
n_tasks = 1
|
||||||
|
task_id(1) = 0
|
||||||
|
endif
|
||||||
if (rc /= 2) then
|
if (rc /= 2) then
|
||||||
print *, irp_here//': error in sending ok'
|
print *, irp_here//': error in sending ok'
|
||||||
stop -1
|
stop -1
|
||||||
|
@ -5,8 +5,8 @@ BEGIN_PROVIDER [ integer, fragment_count ]
|
|||||||
BEGIN_DOC
|
BEGIN_DOC
|
||||||
! Number of fragments for the deterministic part
|
! Number of fragments for the deterministic part
|
||||||
END_DOC
|
END_DOC
|
||||||
|
! fragment_count = (elec_alpha_num-n_core_orb)*mo_tot_num
|
||||||
fragment_count = (elec_alpha_num-n_core_orb)**2
|
fragment_count = (elec_alpha_num-n_core_orb)**2
|
||||||
! fragment_count = mo_tot_num*mo_tot_num
|
|
||||||
END_PROVIDER
|
END_PROVIDER
|
||||||
|
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ subroutine ZMQ_selection(N_in, pt2)
|
|||||||
|
|
||||||
implicit none
|
implicit none
|
||||||
|
|
||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket , zmq_socket_pull
|
||||||
integer, intent(in) :: N_in
|
integer, intent(in) :: N_in
|
||||||
type(selection_buffer) :: b
|
type(selection_buffer) :: b
|
||||||
integer :: i, N
|
integer :: i, N
|
||||||
@ -23,7 +23,7 @@ subroutine ZMQ_selection(N_in, pt2)
|
|||||||
PROVIDE psi_bilinear_matrix_transp_rows_loc psi_bilinear_matrix_transp_columns
|
PROVIDE psi_bilinear_matrix_transp_rows_loc psi_bilinear_matrix_transp_columns
|
||||||
PROVIDE psi_bilinear_matrix_transp_order
|
PROVIDE psi_bilinear_matrix_transp_order
|
||||||
|
|
||||||
call new_parallel_job(zmq_to_qp_run_socket,"selection")
|
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,"selection")
|
||||||
call zmq_put_psi(zmq_to_qp_run_socket,1)
|
call zmq_put_psi(zmq_to_qp_run_socket,1)
|
||||||
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
|
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
|
||||||
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
|
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
|
||||||
@ -55,12 +55,12 @@ subroutine ZMQ_selection(N_in, pt2)
|
|||||||
!$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1)
|
!$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1)
|
||||||
i = omp_get_thread_num()
|
i = omp_get_thread_num()
|
||||||
if (i==0) then
|
if (i==0) then
|
||||||
call selection_collector(b, N, pt2)
|
call selection_collector(zmq_socket_pull, b, N, pt2)
|
||||||
else
|
else
|
||||||
call selection_slave_inproc(i)
|
call selection_slave_inproc(i)
|
||||||
endif
|
endif
|
||||||
!$OMP END PARALLEL
|
!$OMP END PARALLEL
|
||||||
call end_parallel_job(zmq_to_qp_run_socket, 'selection')
|
call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'selection')
|
||||||
do i=N_det+1,N_states
|
do i=N_det+1,N_states
|
||||||
pt2(i) = 0.d0
|
pt2(i) = 0.d0
|
||||||
enddo
|
enddo
|
||||||
@ -84,13 +84,14 @@ subroutine selection_slave_inproc(i)
|
|||||||
call run_selection_slave(1,i,pt2_e0_denominator)
|
call run_selection_slave(1,i,pt2_e0_denominator)
|
||||||
end
|
end
|
||||||
|
|
||||||
subroutine selection_collector(b, N, pt2)
|
subroutine selection_collector(zmq_socket_pull, b, N, pt2)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
use selection_types
|
use selection_types
|
||||||
use bitmasks
|
use bitmasks
|
||||||
implicit none
|
implicit none
|
||||||
|
|
||||||
|
|
||||||
|
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
|
||||||
type(selection_buffer), intent(inout) :: b
|
type(selection_buffer), intent(inout) :: b
|
||||||
integer, intent(in) :: N
|
integer, intent(in) :: N
|
||||||
double precision, intent(out) :: pt2(N_states)
|
double precision, intent(out) :: pt2(N_states)
|
||||||
@ -99,7 +100,6 @@ subroutine selection_collector(b, N, pt2)
|
|||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
||||||
|
|
||||||
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
||||||
integer(ZMQ_PTR) :: zmq_socket_pull
|
|
||||||
|
|
||||||
integer :: msg_size, rc, more
|
integer :: msg_size, rc, more
|
||||||
integer :: acc, i, j, robin, ntask
|
integer :: acc, i, j, robin, ntask
|
||||||
@ -109,7 +109,6 @@ subroutine selection_collector(b, N, pt2)
|
|||||||
type(selection_buffer) :: b2
|
type(selection_buffer) :: b2
|
||||||
|
|
||||||
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
||||||
zmq_socket_pull = new_zmq_pull_socket()
|
|
||||||
call create_selection_buffer(N, N*2, b2)
|
call create_selection_buffer(N, N*2, b2)
|
||||||
allocate(task_id(N_det_generators))
|
allocate(task_id(N_det_generators))
|
||||||
more = 1
|
more = 1
|
||||||
@ -136,6 +135,5 @@ subroutine selection_collector(b, N, pt2)
|
|||||||
call delete_selection_buffer(b2)
|
call delete_selection_buffer(b2)
|
||||||
call sort_selection_buffer(b)
|
call sort_selection_buffer(b)
|
||||||
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
||||||
call end_zmq_pull_socket(zmq_socket_pull)
|
|
||||||
end subroutine
|
end subroutine
|
||||||
|
|
||||||
|
@ -200,10 +200,11 @@ end subroutine
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
subroutine davidson_collector(zmq_to_qp_run_socket, v0, s0, sze, N_st)
|
subroutine davidson_collector(zmq_to_qp_run_socket, zmq_socket_pull, v0, s0, sze, N_st)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
implicit none
|
implicit none
|
||||||
|
|
||||||
|
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
|
||||||
integer, intent(in) :: sze, N_st
|
integer, intent(in) :: sze, N_st
|
||||||
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
|
||||||
|
|
||||||
@ -214,14 +215,11 @@ subroutine davidson_collector(zmq_to_qp_run_socket, v0, s0, sze, N_st)
|
|||||||
|
|
||||||
double precision, allocatable :: v_t(:,:), s_t(:,:)
|
double precision, allocatable :: v_t(:,:), s_t(:,:)
|
||||||
integer :: i,j
|
integer :: i,j
|
||||||
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
|
||||||
integer(ZMQ_PTR) :: zmq_socket_pull
|
|
||||||
|
|
||||||
allocate(v_t(N_st,N_det), s_t(N_st,N_det))
|
allocate(v_t(N_st,N_det), s_t(N_st,N_det))
|
||||||
v0 = 0.d0
|
v0 = 0.d0
|
||||||
s0 = 0.d0
|
s0 = 0.d0
|
||||||
more = 1
|
more = 1
|
||||||
zmq_socket_pull = new_zmq_pull_socket()
|
|
||||||
do while (more == 1)
|
do while (more == 1)
|
||||||
call davidson_pull_results(zmq_socket_pull, v_t, s_t, imin, imax, task_id)
|
call davidson_pull_results(zmq_socket_pull, v_t, s_t, imin, imax, task_id)
|
||||||
do j=1,N_st
|
do j=1,N_st
|
||||||
@ -233,7 +231,6 @@ subroutine davidson_collector(zmq_to_qp_run_socket, v0, s0, sze, N_st)
|
|||||||
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
|
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
|
||||||
end do
|
end do
|
||||||
deallocate(v_t,s_t)
|
deallocate(v_t,s_t)
|
||||||
call end_zmq_pull_socket(zmq_socket_pull)
|
|
||||||
|
|
||||||
end subroutine
|
end subroutine
|
||||||
|
|
||||||
@ -280,12 +277,12 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze)
|
|||||||
N_det, N_st)
|
N_det, N_st)
|
||||||
|
|
||||||
|
|
||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull
|
||||||
|
|
||||||
ASSERT (N_st == N_states_diag)
|
ASSERT (N_st == N_states_diag)
|
||||||
ASSERT (sze >= N_det)
|
ASSERT (sze >= N_det)
|
||||||
|
|
||||||
call new_parallel_job(zmq_to_qp_run_socket,'davidson')
|
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'davidson')
|
||||||
|
|
||||||
character*(512) :: task
|
character*(512) :: task
|
||||||
integer :: rc
|
integer :: rc
|
||||||
@ -342,12 +339,12 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze)
|
|||||||
!$OMP PARALLEL NUM_THREADS(2) PRIVATE(ithread)
|
!$OMP PARALLEL NUM_THREADS(2) PRIVATE(ithread)
|
||||||
ithread = omp_get_thread_num()
|
ithread = omp_get_thread_num()
|
||||||
if (ithread == 0 ) then
|
if (ithread == 0 ) then
|
||||||
call davidson_collector(zmq_to_qp_run_socket, v_0, s_0, N_det, N_st)
|
call davidson_collector(zmq_to_qp_run_socket, zmq_socket_pull, v_0, s_0, N_det, N_st)
|
||||||
else
|
else
|
||||||
call davidson_slave_inproc(1)
|
call davidson_slave_inproc(1)
|
||||||
endif
|
endif
|
||||||
!$OMP END PARALLEL
|
!$OMP END PARALLEL
|
||||||
call end_parallel_job(zmq_to_qp_run_socket, 'davidson')
|
call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'davidson')
|
||||||
|
|
||||||
do k=1,N_st
|
do k=1,N_st
|
||||||
call dset_order(v_0(1,k),psi_bilinear_matrix_order_reverse,N_det)
|
call dset_order(v_0(1,k),psi_bilinear_matrix_order_reverse,N_det)
|
||||||
|
@ -22,7 +22,7 @@ subroutine $subroutine($params_main)
|
|||||||
$initialization
|
$initialization
|
||||||
PROVIDE H_apply_buffer_allocated mo_bielec_integrals_in_map psi_det_generators psi_coef_generators
|
PROVIDE H_apply_buffer_allocated mo_bielec_integrals_in_map psi_det_generators psi_coef_generators
|
||||||
|
|
||||||
integer(ZMQ_PTR), external :: new_zmq_pair_socket
|
integer(ZMQ_PTR), external :: new_zmq_pair_socket, zmq_socket_pull
|
||||||
integer(ZMQ_PTR) :: zmq_socket_pair
|
integer(ZMQ_PTR) :: zmq_socket_pair
|
||||||
|
|
||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
||||||
@ -30,7 +30,7 @@ subroutine $subroutine($params_main)
|
|||||||
double precision, allocatable :: H_pert_diag_generators(:,:)
|
double precision, allocatable :: H_pert_diag_generators(:,:)
|
||||||
double precision :: energy(N_st)
|
double precision :: energy(N_st)
|
||||||
|
|
||||||
call new_parallel_job(zmq_to_qp_run_socket,'$subroutine')
|
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'$subroutine')
|
||||||
zmq_socket_pair = new_zmq_pair_socket(.True.)
|
zmq_socket_pair = new_zmq_pair_socket(.True.)
|
||||||
|
|
||||||
call zmq_put_psi(zmq_to_qp_run_socket,1)
|
call zmq_put_psi(zmq_to_qp_run_socket,1)
|
||||||
@ -55,7 +55,7 @@ subroutine $subroutine($params_main)
|
|||||||
!$OMP num_threads(nproc+1)
|
!$OMP num_threads(nproc+1)
|
||||||
i = omp_get_thread_num()
|
i = omp_get_thread_num()
|
||||||
if (i == 0) then
|
if (i == 0) then
|
||||||
call $subroutine_collector()
|
call $subroutine_collector(zmq_socket_pull)
|
||||||
integer :: n, task_id
|
integer :: n, task_id
|
||||||
call pull_pt2(zmq_socket_pair, pt2_generators, norm_pert_generators, H_pert_diag_generators, i_generator, size(pt2_generators), n, task_id)
|
call pull_pt2(zmq_socket_pair, pt2_generators, norm_pert_generators, H_pert_diag_generators, i_generator, size(pt2_generators), n, task_id)
|
||||||
else
|
else
|
||||||
@ -65,7 +65,7 @@ subroutine $subroutine($params_main)
|
|||||||
|
|
||||||
|
|
||||||
call end_zmq_pair_socket(zmq_socket_pair)
|
call end_zmq_pair_socket(zmq_socket_pair)
|
||||||
call end_parallel_job(zmq_to_qp_run_socket,'$subroutine')
|
call end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'$subroutine')
|
||||||
|
|
||||||
|
|
||||||
$copy_buffer
|
$copy_buffer
|
||||||
@ -192,7 +192,7 @@ subroutine $subroutine_slave(thread, iproc)
|
|||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
subroutine $subroutine_collector
|
subroutine $subroutine_collector(zmq_socket_pull)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
implicit none
|
implicit none
|
||||||
BEGIN_DOC
|
BEGIN_DOC
|
||||||
@ -202,7 +202,7 @@ subroutine $subroutine_collector
|
|||||||
integer :: k, rc
|
integer :: k, rc
|
||||||
|
|
||||||
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
||||||
integer(ZMQ_PTR) :: zmq_socket_pull
|
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
|
||||||
integer*8 :: control, accu
|
integer*8 :: control, accu
|
||||||
integer :: n, more, task_id, i_generator
|
integer :: n, more, task_id, i_generator
|
||||||
|
|
||||||
@ -210,7 +210,6 @@ subroutine $subroutine_collector
|
|||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
||||||
|
|
||||||
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
||||||
zmq_socket_pull = new_zmq_pull_socket()
|
|
||||||
|
|
||||||
double precision, allocatable :: pt2(:), norm_pert(:), H_pert_diag(:)
|
double precision, allocatable :: pt2(:), norm_pert(:), H_pert_diag(:)
|
||||||
double precision, allocatable :: pt2_result(:,:), norm_pert_result(:,:), H_pert_diag_result(:,:)
|
double precision, allocatable :: pt2_result(:,:), norm_pert_result(:,:), H_pert_diag_result(:,:)
|
||||||
@ -238,7 +237,6 @@ subroutine $subroutine_collector
|
|||||||
|
|
||||||
enddo
|
enddo
|
||||||
|
|
||||||
call end_zmq_pull_socket(zmq_socket_pull)
|
|
||||||
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
||||||
|
|
||||||
|
|
||||||
|
@ -365,8 +365,8 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ]
|
|||||||
call wall_time(wall_1)
|
call wall_time(wall_1)
|
||||||
call cpu_time(cpu_1)
|
call cpu_time(cpu_1)
|
||||||
|
|
||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull
|
||||||
call new_parallel_job(zmq_to_qp_run_socket,'ao_integrals')
|
call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'ao_integrals')
|
||||||
|
|
||||||
character(len=:), allocatable :: task
|
character(len=:), allocatable :: task
|
||||||
allocate(character(len=ao_num*12) :: task)
|
allocate(character(len=ao_num*12) :: task)
|
||||||
@ -380,16 +380,16 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ]
|
|||||||
call zmq_set_running(zmq_to_qp_run_socket)
|
call zmq_set_running(zmq_to_qp_run_socket)
|
||||||
|
|
||||||
PROVIDE nproc
|
PROVIDE nproc
|
||||||
!$OMP PARALLEL DEFAULT(private) num_threads(nproc+1)
|
!$OMP PARALLEL DEFAULT(shared) private(i) num_threads(nproc+1)
|
||||||
i = omp_get_thread_num()
|
i = omp_get_thread_num()
|
||||||
if (i==0) then
|
if (i==0) then
|
||||||
call ao_bielec_integrals_in_map_collector(i)
|
call ao_bielec_integrals_in_map_collector(zmq_socket_pull)
|
||||||
else
|
else
|
||||||
call ao_bielec_integrals_in_map_slave_inproc(i)
|
call ao_bielec_integrals_in_map_slave_inproc(i)
|
||||||
endif
|
endif
|
||||||
!$OMP END PARALLEL
|
!$OMP END PARALLEL
|
||||||
|
|
||||||
call end_parallel_job(zmq_to_qp_run_socket, 'ao_integrals')
|
call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'ao_integrals')
|
||||||
|
|
||||||
|
|
||||||
print*, 'Sorting the map'
|
print*, 'Sorting the map'
|
||||||
|
@ -122,7 +122,7 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc)
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
subroutine ao_bielec_integrals_in_map_collector
|
subroutine ao_bielec_integrals_in_map_collector(zmq_socket_pull)
|
||||||
use map_module
|
use map_module
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
implicit none
|
implicit none
|
||||||
@ -130,6 +130,7 @@ subroutine ao_bielec_integrals_in_map_collector
|
|||||||
! Collects results from the AO integral calculation
|
! Collects results from the AO integral calculation
|
||||||
END_DOC
|
END_DOC
|
||||||
|
|
||||||
|
integer(ZMQ_PTR), intent(in) :: zmq_socket_pull
|
||||||
integer :: j,l,n_integrals
|
integer :: j,l,n_integrals
|
||||||
integer :: rc
|
integer :: rc
|
||||||
|
|
||||||
@ -140,13 +141,11 @@ subroutine ao_bielec_integrals_in_map_collector
|
|||||||
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
|
||||||
|
|
||||||
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
integer(ZMQ_PTR), external :: new_zmq_pull_socket
|
||||||
integer(ZMQ_PTR) :: zmq_socket_pull
|
|
||||||
|
|
||||||
integer*8 :: control, accu, sze
|
integer*8 :: control, accu, sze
|
||||||
integer :: task_id, more
|
integer :: task_id, more
|
||||||
|
|
||||||
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
||||||
zmq_socket_pull = new_zmq_pull_socket()
|
|
||||||
|
|
||||||
sze = ao_num*ao_num
|
sze = ao_num*ao_num
|
||||||
allocate ( buffer_i(sze), buffer_value(sze) )
|
allocate ( buffer_i(sze), buffer_value(sze) )
|
||||||
@ -223,7 +222,6 @@ IRP_ENDIF
|
|||||||
endif
|
endif
|
||||||
|
|
||||||
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
||||||
call end_zmq_pull_socket(zmq_socket_pull)
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -140,15 +140,15 @@ function new_zmq_to_qp_run_socket()
|
|||||||
stop 'Unable to create zmq req socket'
|
stop 'Unable to create zmq req socket'
|
||||||
endif
|
endif
|
||||||
|
|
||||||
! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 300000, 4)
|
rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 10000, 4)
|
||||||
! if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
! stop 'Unable to set send timeout in new_zmq_to_qp_run_socket'
|
stop 'Unable to set send timeout in new_zmq_to_qp_run_socket'
|
||||||
! endif
|
endif
|
||||||
!
|
|
||||||
! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 300000, 4)
|
rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 10000, 4)
|
||||||
! if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
! stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket'
|
stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket'
|
||||||
! endif
|
endif
|
||||||
|
|
||||||
rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0)))
|
rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0)))
|
||||||
if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
@ -242,17 +242,17 @@ IRP_ENDIF
|
|||||||
stop 'Unable to create zmq pull socket'
|
stop 'Unable to create zmq pull socket'
|
||||||
endif
|
endif
|
||||||
|
|
||||||
! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4)
|
rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,30000,4)
|
||||||
! if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
! stop 'Unable to set ZMQ_LINGER on pull socket'
|
stop 'Unable to set ZMQ_LINGER on pull socket'
|
||||||
! endif
|
endif
|
||||||
!
|
|
||||||
! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVBUF,100000000,4)
|
! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVBUF,100000000,4)
|
||||||
! if (rc /= 0) then
|
! if (rc /= 0) then
|
||||||
! stop 'Unable to set ZMQ_RCVBUF on pull socket'
|
! stop 'Unable to set ZMQ_RCVBUF on pull socket'
|
||||||
! endif
|
! endif
|
||||||
|
|
||||||
rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,3,4)
|
rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,5,4)
|
||||||
if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
stop 'Unable to set ZMQ_RCVHWM on pull socket'
|
stop 'Unable to set ZMQ_RCVHWM on pull socket'
|
||||||
endif
|
endif
|
||||||
@ -281,7 +281,9 @@ IRP_ENDIF
|
|||||||
rc = f77_zmq_bind(new_zmq_pull_socket, zmq_socket_pull_tcp_address)
|
rc = f77_zmq_bind(new_zmq_pull_socket, zmq_socket_pull_tcp_address)
|
||||||
if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
icount = icount-1
|
icount = icount-1
|
||||||
call sleep(3)
|
! call sleep(3)
|
||||||
|
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2+icount*100)//' '
|
||||||
|
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2+icount*100)//' '
|
||||||
else
|
else
|
||||||
exit
|
exit
|
||||||
endif
|
endif
|
||||||
@ -322,12 +324,12 @@ IRP_ENDIF
|
|||||||
stop 'Unable to create zmq push socket'
|
stop 'Unable to create zmq push socket'
|
||||||
endif
|
endif
|
||||||
|
|
||||||
! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4)
|
rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,30000,4)
|
||||||
! if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
! stop 'Unable to set ZMQ_LINGER on push socket'
|
stop 'Unable to set ZMQ_LINGER on push socket'
|
||||||
! endif
|
endif
|
||||||
|
|
||||||
rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,2,4)
|
rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,5,4)
|
||||||
if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
stop 'Unable to set ZMQ_SNDHWM on push socket'
|
stop 'Unable to set ZMQ_SNDHWM on push socket'
|
||||||
endif
|
endif
|
||||||
@ -336,16 +338,16 @@ IRP_ENDIF
|
|||||||
! if (rc /= 0) then
|
! if (rc /= 0) then
|
||||||
! stop 'Unable to set ZMQ_SNDBUF on push socket'
|
! stop 'Unable to set ZMQ_SNDBUF on push socket'
|
||||||
! endif
|
! endif
|
||||||
!
|
|
||||||
rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_IMMEDIATE,1,4)
|
rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_IMMEDIATE,0,4)
|
||||||
if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
stop 'Unable to set ZMQ_IMMEDIATE on push socket'
|
stop 'Unable to set ZMQ_IMMEDIATE on push socket'
|
||||||
endif
|
endif
|
||||||
|
|
||||||
! rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 100000, 4)
|
rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 10000, 4)
|
||||||
! if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
! stop 'Unable to set send timout in new_zmq_push_socket'
|
stop 'Unable to set send timout in new_zmq_push_socket'
|
||||||
! endif
|
endif
|
||||||
|
|
||||||
if (thread == 1) then
|
if (thread == 1) then
|
||||||
rc = f77_zmq_connect(new_zmq_push_socket, zmq_socket_push_inproc_address)
|
rc = f77_zmq_connect(new_zmq_push_socket, zmq_socket_push_inproc_address)
|
||||||
@ -478,10 +480,10 @@ subroutine end_zmq_push_socket(zmq_socket_push,thread)
|
|||||||
integer :: rc
|
integer :: rc
|
||||||
character*(8), external :: zmq_port
|
character*(8), external :: zmq_port
|
||||||
|
|
||||||
! rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4)
|
rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,30000,4)
|
||||||
! if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
! stop 'Unable to set ZMQ_LINGER on push socket'
|
stop 'Unable to set ZMQ_LINGER on push socket'
|
||||||
! endif
|
endif
|
||||||
|
|
||||||
call omp_set_lock(zmq_lock)
|
call omp_set_lock(zmq_lock)
|
||||||
rc = f77_zmq_close(zmq_socket_push)
|
rc = f77_zmq_close(zmq_socket_push)
|
||||||
@ -503,7 +505,7 @@ BEGIN_PROVIDER [ character*(128), zmq_state ]
|
|||||||
zmq_state = 'No_state'
|
zmq_state = 'No_state'
|
||||||
END_PROVIDER
|
END_PROVIDER
|
||||||
|
|
||||||
subroutine new_parallel_job(zmq_to_qp_run_socket,name_in)
|
subroutine new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
implicit none
|
implicit none
|
||||||
BEGIN_DOC
|
BEGIN_DOC
|
||||||
@ -514,7 +516,8 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,name_in)
|
|||||||
character*(512) :: message, name
|
character*(512) :: message, name
|
||||||
integer :: rc, sze
|
integer :: rc, sze
|
||||||
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
|
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
|
||||||
integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR),external :: new_zmq_pull_socket
|
||||||
|
integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket, zmq_socket_pull
|
||||||
|
|
||||||
call omp_set_lock(zmq_lock)
|
call omp_set_lock(zmq_lock)
|
||||||
zmq_context = f77_zmq_ctx_new ()
|
zmq_context = f77_zmq_ctx_new ()
|
||||||
@ -527,7 +530,9 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,name_in)
|
|||||||
! print *, 'Unable to set the number of ZMQ IO threads to', nproc
|
! print *, 'Unable to set the number of ZMQ IO threads to', nproc
|
||||||
! endif
|
! endif
|
||||||
|
|
||||||
|
|
||||||
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
|
||||||
|
zmq_socket_pull = new_zmq_pull_socket ()
|
||||||
name = name_in
|
name = name_in
|
||||||
sze = len(trim(name))
|
sze = len(trim(name))
|
||||||
call lowercase(name,sze)
|
call lowercase(name,sze)
|
||||||
@ -580,13 +585,13 @@ subroutine zmq_set_running(zmq_to_qp_run_socket)
|
|||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
subroutine end_parallel_job(zmq_to_qp_run_socket,name_in)
|
subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
implicit none
|
implicit none
|
||||||
BEGIN_DOC
|
BEGIN_DOC
|
||||||
! End a new parallel job with name 'name'. The slave tasks execute subroutine 'slave'
|
! End a new parallel job with name 'name'. The slave tasks execute subroutine 'slave'
|
||||||
END_DOC
|
END_DOC
|
||||||
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
|
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket, zmq_socket_pull
|
||||||
character*(*), intent(in) :: name_in
|
character*(*), intent(in) :: name_in
|
||||||
|
|
||||||
character*(512) :: message, name
|
character*(512) :: message, name
|
||||||
@ -599,15 +604,20 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,name_in)
|
|||||||
stop 'Wrong end of job'
|
stop 'Wrong end of job'
|
||||||
endif
|
endif
|
||||||
|
|
||||||
call sleep(1)
|
do i=1,30
|
||||||
rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0)
|
rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0)
|
||||||
rc = f77_zmq_recv(zmq_to_qp_run_socket, zmq_state, 2, 0)
|
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0)
|
||||||
if (rc /= 2) then
|
if (trim(message(1:13)) == 'error waiting') then
|
||||||
print *, 'f77_zmq_recv(zmq_to_qp_run_socket, zmq_state, 2, 0)'
|
print *, trim(message(6:rc))
|
||||||
stop 'error'
|
call sleep(1)
|
||||||
endif
|
cycle
|
||||||
|
else if (message(1:2) == 'ok') then
|
||||||
|
exit
|
||||||
|
endif
|
||||||
|
end do
|
||||||
zmq_state = 'No_state'
|
zmq_state = 'No_state'
|
||||||
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
||||||
|
call end_zmq_pull_socket(zmq_socket_pull)
|
||||||
|
|
||||||
call omp_set_lock(zmq_lock)
|
call omp_set_lock(zmq_lock)
|
||||||
rc = f77_zmq_ctx_term(zmq_context)
|
rc = f77_zmq_ctx_term(zmq_context)
|
||||||
@ -649,6 +659,7 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
|
|||||||
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
|
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
|
||||||
message = trim(message(1:rc))
|
message = trim(message(1:rc))
|
||||||
if(message(1:5) == "error") then
|
if(message(1:5) == "error") then
|
||||||
|
print *, trim(message(1:rc))
|
||||||
worker_id = -1
|
worker_id = -1
|
||||||
return
|
return
|
||||||
end if
|
end if
|
||||||
@ -679,9 +690,9 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
|
|||||||
|
|
||||||
sze = len(trim(message))
|
sze = len(trim(message))
|
||||||
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
|
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
|
||||||
if (rc == -1) then
|
! if (rc == -1) then
|
||||||
return
|
! return
|
||||||
endif
|
! endif
|
||||||
|
|
||||||
if (rc /= sze) then
|
if (rc /= sze) then
|
||||||
print *, rc, sze
|
print *, rc, sze
|
||||||
@ -838,6 +849,46 @@ subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id)
|
|||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks)
|
||||||
|
use f77_zmq
|
||||||
|
implicit none
|
||||||
|
BEGIN_DOC
|
||||||
|
! Get a task from the task server
|
||||||
|
END_DOC
|
||||||
|
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
|
||||||
|
integer, intent(in) :: n_tasks, worker_id, task_id(n_tasks)
|
||||||
|
|
||||||
|
integer :: rc, sze, k
|
||||||
|
character(LEN=:), allocatable :: message
|
||||||
|
character*(64) :: fmt
|
||||||
|
|
||||||
|
allocate(character(LEN=64+n_tasks*12) :: message)
|
||||||
|
write(fmt,*) '(A,X,A,I10,X,', n_tasks, '(I11,1X))'
|
||||||
|
write(message,*) 'task_done '//trim(zmq_state), worker_id, (task_id(k), k=1,n_tasks)
|
||||||
|
|
||||||
|
sze = len(trim(message))
|
||||||
|
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
|
||||||
|
if (rc == -1) then
|
||||||
|
! Server is shut down
|
||||||
|
deallocate(message)
|
||||||
|
return
|
||||||
|
endif
|
||||||
|
|
||||||
|
if (rc /= sze) then
|
||||||
|
print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
|
||||||
|
stop 'error'
|
||||||
|
endif
|
||||||
|
|
||||||
|
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 64, 0)
|
||||||
|
if (trim(message(1:rc)) /= 'ok') then
|
||||||
|
print *, trim(message(1:rc))
|
||||||
|
print *, 'Unable to send task_done message'
|
||||||
|
stop -1
|
||||||
|
endif
|
||||||
|
deallocate(message)
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
|
subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
implicit none
|
implicit none
|
||||||
@ -913,11 +964,13 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
|
|||||||
sze = len(trim(message))
|
sze = len(trim(message))
|
||||||
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
|
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
|
||||||
if (rc /= sze) then
|
if (rc /= sze) then
|
||||||
|
print *, trim(message)
|
||||||
|
print *, rc, sze
|
||||||
print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
|
print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
|
||||||
stop 'error'
|
stop 'error'
|
||||||
endif
|
endif
|
||||||
|
|
||||||
message = repeat(' ',512)
|
message = repeat(' ',1024)
|
||||||
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
|
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
|
||||||
rc = min(1024,rc)
|
rc = min(1024,rc)
|
||||||
read(message(1:rc),*) reply
|
read(message(1:rc),*) reply
|
||||||
@ -970,10 +1023,10 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
|
|||||||
character*(8), external :: zmq_port
|
character*(8), external :: zmq_port
|
||||||
integer :: rc
|
integer :: rc
|
||||||
|
|
||||||
! rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4)
|
rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,30000,4)
|
||||||
! if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
! stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket'
|
stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket'
|
||||||
! endif
|
endif
|
||||||
|
|
||||||
rc = f77_zmq_close(zmq_to_qp_run_socket)
|
rc = f77_zmq_close(zmq_to_qp_run_socket)
|
||||||
if (rc /= 0) then
|
if (rc /= 0) then
|
||||||
@ -995,11 +1048,11 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
|
|||||||
integer, intent(in) :: task_id
|
integer, intent(in) :: task_id
|
||||||
integer, intent(out) :: more
|
integer, intent(out) :: more
|
||||||
integer :: rc
|
integer :: rc
|
||||||
character*(512) :: msg
|
character*(512) :: message
|
||||||
|
|
||||||
write(msg,*) 'del_task ', zmq_state, task_id
|
write(message,*) 'del_task ', zmq_state, task_id
|
||||||
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
|
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
|
||||||
if (rc /= len(trim(msg))) then
|
if (rc /= len(trim(message))) then
|
||||||
print *, irp_here
|
print *, irp_here
|
||||||
stop 'error'
|
stop 'error'
|
||||||
endif
|
endif
|
||||||
@ -1019,6 +1072,48 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
|
|||||||
endif
|
endif
|
||||||
end
|
end
|
||||||
|
|
||||||
|
subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
|
||||||
|
use f77_zmq
|
||||||
|
implicit none
|
||||||
|
BEGIN_DOC
|
||||||
|
! When a task is done, it has to be removed from the list of tasks on the qp_run
|
||||||
|
! queue. This guarantees that the results have been received in the pull.
|
||||||
|
END_DOC
|
||||||
|
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
|
||||||
|
integer(ZMQ_PTR) :: zmq_socket_pull
|
||||||
|
integer, intent(in) :: n_tasks, task_id(n_tasks)
|
||||||
|
integer, intent(out) :: more
|
||||||
|
integer :: rc, k
|
||||||
|
character*(64) :: fmt, reply
|
||||||
|
character(LEN=:), allocatable :: message
|
||||||
|
|
||||||
|
allocate(character(LEN=64+n_tasks*12) :: message)
|
||||||
|
|
||||||
|
write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))'
|
||||||
|
write(message,*) 'del_task '//trim(zmq_state), (task_id(k), k=1,n_tasks)
|
||||||
|
|
||||||
|
|
||||||
|
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
|
||||||
|
if (rc /= len(trim(message))) then
|
||||||
|
print *, irp_here
|
||||||
|
stop 'error'
|
||||||
|
endif
|
||||||
|
deallocate(message)
|
||||||
|
|
||||||
|
reply = ''
|
||||||
|
rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0)
|
||||||
|
|
||||||
|
if (reply(16:19) == 'more') then
|
||||||
|
more = 1
|
||||||
|
else if (reply(16:19) == 'done') then
|
||||||
|
more = 0
|
||||||
|
else
|
||||||
|
print *, reply
|
||||||
|
print *, irp_here
|
||||||
|
stop 'error'
|
||||||
|
endif
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
subroutine wait_for_next_state(state)
|
subroutine wait_for_next_state(state)
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
|
Loading…
Reference in New Issue
Block a user