From 68967d21017a7a68ddbab9b8985dc9fd2724a193 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Mon, 27 Nov 2017 23:18:18 +0100 Subject: [PATCH] Improved Parallelism --- config/ifort.cfg | 2 +- ocaml/TaskServer.ml | 17 +- plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f | 35 ++- plugins/Full_CI_ZMQ/run_pt2_slave.irp.f | 42 +++- plugins/Full_CI_ZMQ/selection.irp.f | 2 +- plugins/Full_CI_ZMQ/zmq_selection.irp.f | 14 +- src/Davidson/davidson_parallel.irp.f | 15 +- src/Determinants/H_apply_zmq.template.f | 14 +- src/Integrals_Bielec/ao_bi_integrals.irp.f | 10 +- .../ao_bielec_integrals_in_map_slave.irp.f | 6 +- src/ZMQ/utils.irp.f | 203 +++++++++++++----- 11 files changed, 244 insertions(+), 116 deletions(-) diff --git a/config/ifort.cfg b/config/ifort.cfg index 843e887b..77761c85 100644 --- a/config/ifort.cfg +++ b/config/ifort.cfg @@ -9,7 +9,7 @@ FC : ifort LAPACK_LIB : -mkl=parallel IRPF90 : irpf90 -IRPF90_FLAGS : --ninja --align=32 +IRPF90_FLAGS : --ninja --align=32 -DZMQ_PUSH # Global options ################ diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index a98efd66..773a3da4 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -194,14 +194,21 @@ let end_job msg program_state rep_socket pair_socket = reply_wrong_state rep_socket; program_state - and success state = + and success () = reply_ok rep_socket; { program_state with state = None ; progress_bar = Progress_bar.clear (); } + and wait n = + Printf.sprintf "waiting %d" n + |> Message.Error_msg.create + |> Message.Error_msg.to_string + |> ZMQ.Socket.send rep_socket ; + program_state in + match program_state.state with | None -> failure () | Some state -> @@ -210,7 +217,10 @@ let end_job msg program_state rep_socket pair_socket = begin string_of_pub_state Waiting |> ZMQ.Socket.send pair_socket ; - success state + if (Queuing_system.number_of_clients program_state.queue = 0) then + success () + else + wait (Queuing_system.number_of_clients program_state.queue) end else failure () @@ -600,6 +610,9 @@ let abort program_state rep_socket = List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue) ~init:queue tasks in + let queue = + Queuing_system.del_client ~client_id queue + in reply_ok rep_socket; { program_state with diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index d08fb3f0..1da4921f 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -10,7 +10,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) implicit none character(len=64000) :: task - integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_to_qp_run_socket2 + integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull type(selection_buffer) :: b integer, external :: omp_get_thread_num double precision, intent(in) :: relative_error, absolute_error, E(N_states) @@ -27,6 +27,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) double precision, external :: omp_get_wtime double precision :: time double precision :: w(N_states) + integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket if (N_det < max(10,N_states)) then pt2=0.d0 @@ -55,22 +56,21 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) computed(i) = .true. end do + Ncomb=size(comb) + call get_carlo_workbatch(computed, comb, Ncomb, tbc) + pt2_detail = 0d0 print *, '========== ================= ================= =================' print *, ' Samples Energy Stat. Error Seconds ' print *, '========== ================= ================= =================' - call new_parallel_job(zmq_to_qp_run_socket,'pt2') + call new_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2') call zmq_put_psi(zmq_to_qp_run_socket,1) call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) call create_selection_buffer(1, 1*2, b) - Ncomb=size(comb) - call get_carlo_workbatch(computed, comb, Ncomb, tbc) - - integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket integer :: ipos ipos=1 @@ -103,14 +103,14 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) !$OMP PRIVATE(i) i = omp_get_thread_num() if (i==0) then - call pt2_collector(E(pt2_stoch_istate), b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, w, error) + call pt2_collector(zmq_socket_pull,E(pt2_stoch_istate), b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, w, error) pt2(pt2_stoch_istate) = w(pt2_stoch_istate) else call pt2_slave_inproc(i) endif !$OMP END PARALLEL call delete_selection_buffer(b) - call end_parallel_job(zmq_to_qp_run_socket, 'pt2') + call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2') print *, '========== ================= ================= =================' @@ -163,7 +163,7 @@ subroutine pt2_slave_inproc(i) call run_pt2_slave(1,i,pt2_e0_denominator) end -subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, pt2,error) +subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, sum2above, Nabove, relative_error, absolute_error, pt2,error) use f77_zmq use selection_types use bitmasks @@ -171,6 +171,7 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, integer, intent(in) :: Ncomb + integer(ZMQ_PTR), intent(in) :: zmq_socket_pull double precision, intent(inout) :: pt2_detail(N_states, N_det_generators) double precision, intent(in) :: comb(Ncomb), relative_error, absolute_error, E logical, intent(inout) :: computed(N_det_generators) @@ -184,8 +185,6 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket integer(ZMQ_PTR) :: zmq_to_qp_run_socket - integer(ZMQ_PTR), external :: new_zmq_pull_socket - integer(ZMQ_PTR) :: zmq_socket_pull integer :: msg_size, rc, more integer :: acc, i, j, robin, N, n_tasks @@ -227,7 +226,6 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, firstTBDcomb = 1 zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_pull = new_zmq_pull_socket() allocate(val(b%N), det(N_int, 2, b%N), task_id(n_tasks_max), index(n_tasks_max)) more = 1 call wall_time(time0) @@ -253,16 +251,14 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, if(parts_to_get(index(i)) == 0) actually_computed(index(i)) = .true. enddo - do i=1, n_tasks - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more) - if (more /= 1) then - loop = .False. - endif - end do + call zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) + if (more == 0) then + loop = .False. + endif time = omp_get_wtime() - if(time - timeLast > 10d0 .or. more /= 1) then + if(time - timeLast > 10d0 .or. (.not.loop)) then timeLast = time do i=1, first_det_of_teeth(1)-1 if(.not.(actually_computed(i))) then @@ -313,7 +309,6 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, pt2(pt2_stoch_istate) = E0 + (sumabove(tooth) / Nabove(tooth)) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_pull_socket(zmq_socket_pull) call sort_selection_buffer(b) end subroutine diff --git a/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f b/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f index 2a743dad..5b6dc7c8 100644 --- a/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f +++ b/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f @@ -37,7 +37,7 @@ subroutine run_pt2_slave(thread,iproc,energy) return end if buf%N = 0 - n_tasks = 1 + n_tasks = 0 call create_selection_buffer(1, 2, buf) done = .False. @@ -48,6 +48,7 @@ subroutine run_pt2_slave(thread,iproc,energy) call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task, n_tasks) done = task_id(n_tasks) == 0 if (done) n_tasks = n_tasks-1 + if (n_tasks == 0) exit do k=1,n_tasks read (task(k),*) subset(k), i_generator(k) @@ -58,9 +59,7 @@ subroutine run_pt2_slave(thread,iproc,energy) buf%cur = 0 call select_connected(i_generator(k),energy,pt2(1,k),buf,subset(k)) enddo - do k=1,n_tasks - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(k)) - enddo + call tasks_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id,n_tasks) call push_pt2_results(zmq_socket_push, i_generator, pt2, task_id, n_tasks) end do call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) @@ -81,17 +80,29 @@ subroutine push_pt2_results(zmq_socket_push, index, pt2, task_id, n_tasks) integer :: rc rc = f77_zmq_send( zmq_socket_push, n_tasks, 4, ZMQ_SNDMORE) + if (rc == -1) then + return + endif if(rc /= 4) stop 'push' rc = f77_zmq_send( zmq_socket_push, index, 4*n_tasks, ZMQ_SNDMORE) + if (rc == -1) then + return + endif if(rc /= 4*n_tasks) stop 'push' rc = f77_zmq_send( zmq_socket_push, pt2, 8*N_states*n_tasks, ZMQ_SNDMORE) + if (rc == -1) then + return + endif if(rc /= 8*N_states*n_tasks) stop 'push' rc = f77_zmq_send( zmq_socket_push, task_id, n_tasks*4, 0) + if (rc == -1) then + return + endif if(rc /= 4*n_tasks) stop 'push' ! Activate is zmq_socket_push is a REQ @@ -99,6 +110,9 @@ IRP_IF ZMQ_PUSH IRP_ELSE character*(2) :: ok rc = f77_zmq_recv( zmq_socket_push, ok, 2, 0) + if (rc == -1) then + return + endif if ((rc /= 2).and.(ok(1:2) /= 'ok')) then print *, irp_here//': error in receiving ok' stop -1 @@ -119,21 +133,41 @@ subroutine pull_pt2_results(zmq_socket_pull, index, pt2, task_id, n_tasks) integer :: rc, rn, i rc = f77_zmq_recv( zmq_socket_pull, n_tasks, 4, 0) + if (rc == -1) then + n_tasks = 1 + task_id(1) = 0 + endif if(rc /= 4) stop 'pull' rc = f77_zmq_recv( zmq_socket_pull, index, 4*n_tasks, 0) + if (rc == -1) then + n_tasks = 1 + task_id(1) = 0 + endif if(rc /= 4*n_tasks) stop 'pull' rc = f77_zmq_recv( zmq_socket_pull, pt2, N_states*8*n_tasks, 0) + if (rc == -1) then + n_tasks = 1 + task_id(1) = 0 + endif if(rc /= 8*N_states*n_tasks) stop 'pull' rc = f77_zmq_recv( zmq_socket_pull, task_id, n_tasks*4, 0) + if (rc == -1) then + n_tasks = 1 + task_id(1) = 0 + endif if(rc /= 4*n_tasks) stop 'pull' ! Activate is zmq_socket_pull is a REP IRP_IF ZMQ_PUSH IRP_ELSE rc = f77_zmq_send( zmq_socket_pull, 'ok', 2, 0) + if (rc == -1) then + n_tasks = 1 + task_id(1) = 0 + endif if (rc /= 2) then print *, irp_here//': error in sending ok' stop -1 diff --git a/plugins/Full_CI_ZMQ/selection.irp.f b/plugins/Full_CI_ZMQ/selection.irp.f index cf0cfe18..81ff5795 100644 --- a/plugins/Full_CI_ZMQ/selection.irp.f +++ b/plugins/Full_CI_ZMQ/selection.irp.f @@ -5,8 +5,8 @@ BEGIN_PROVIDER [ integer, fragment_count ] BEGIN_DOC ! Number of fragments for the deterministic part END_DOC +! fragment_count = (elec_alpha_num-n_core_orb)*mo_tot_num fragment_count = (elec_alpha_num-n_core_orb)**2 -! fragment_count = mo_tot_num*mo_tot_num END_PROVIDER diff --git a/plugins/Full_CI_ZMQ/zmq_selection.irp.f b/plugins/Full_CI_ZMQ/zmq_selection.irp.f index 78d0939c..5ef0f407 100644 --- a/plugins/Full_CI_ZMQ/zmq_selection.irp.f +++ b/plugins/Full_CI_ZMQ/zmq_selection.irp.f @@ -4,7 +4,7 @@ subroutine ZMQ_selection(N_in, pt2) implicit none - integer(ZMQ_PTR) :: zmq_to_qp_run_socket + integer(ZMQ_PTR) :: zmq_to_qp_run_socket , zmq_socket_pull integer, intent(in) :: N_in type(selection_buffer) :: b integer :: i, N @@ -23,7 +23,7 @@ subroutine ZMQ_selection(N_in, pt2) PROVIDE psi_bilinear_matrix_transp_rows_loc psi_bilinear_matrix_transp_columns PROVIDE psi_bilinear_matrix_transp_order - call new_parallel_job(zmq_to_qp_run_socket,"selection") + call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,"selection") call zmq_put_psi(zmq_to_qp_run_socket,1) call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) @@ -55,12 +55,12 @@ subroutine ZMQ_selection(N_in, pt2) !$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1) i = omp_get_thread_num() if (i==0) then - call selection_collector(b, N, pt2) + call selection_collector(zmq_socket_pull, b, N, pt2) else call selection_slave_inproc(i) endif !$OMP END PARALLEL - call end_parallel_job(zmq_to_qp_run_socket, 'selection') + call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'selection') do i=N_det+1,N_states pt2(i) = 0.d0 enddo @@ -84,13 +84,14 @@ subroutine selection_slave_inproc(i) call run_selection_slave(1,i,pt2_e0_denominator) end -subroutine selection_collector(b, N, pt2) +subroutine selection_collector(zmq_socket_pull, b, N, pt2) use f77_zmq use selection_types use bitmasks implicit none + integer(ZMQ_PTR), intent(in) :: zmq_socket_pull type(selection_buffer), intent(inout) :: b integer, intent(in) :: N double precision, intent(out) :: pt2(N_states) @@ -99,7 +100,6 @@ subroutine selection_collector(b, N, pt2) integer(ZMQ_PTR) :: zmq_to_qp_run_socket integer(ZMQ_PTR), external :: new_zmq_pull_socket - integer(ZMQ_PTR) :: zmq_socket_pull integer :: msg_size, rc, more integer :: acc, i, j, robin, ntask @@ -109,7 +109,6 @@ subroutine selection_collector(b, N, pt2) type(selection_buffer) :: b2 zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_pull = new_zmq_pull_socket() call create_selection_buffer(N, N*2, b2) allocate(task_id(N_det_generators)) more = 1 @@ -136,6 +135,5 @@ subroutine selection_collector(b, N, pt2) call delete_selection_buffer(b2) call sort_selection_buffer(b) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_pull_socket(zmq_socket_pull) end subroutine diff --git a/src/Davidson/davidson_parallel.irp.f b/src/Davidson/davidson_parallel.irp.f index ec2440d5..59f3e759 100644 --- a/src/Davidson/davidson_parallel.irp.f +++ b/src/Davidson/davidson_parallel.irp.f @@ -200,10 +200,11 @@ end subroutine -subroutine davidson_collector(zmq_to_qp_run_socket, v0, s0, sze, N_st) +subroutine davidson_collector(zmq_to_qp_run_socket, zmq_socket_pull, v0, s0, sze, N_st) use f77_zmq implicit none + integer(ZMQ_PTR), intent(in) :: zmq_socket_pull integer, intent(in) :: sze, N_st integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket @@ -214,14 +215,11 @@ subroutine davidson_collector(zmq_to_qp_run_socket, v0, s0, sze, N_st) double precision, allocatable :: v_t(:,:), s_t(:,:) integer :: i,j - integer(ZMQ_PTR), external :: new_zmq_pull_socket - integer(ZMQ_PTR) :: zmq_socket_pull allocate(v_t(N_st,N_det), s_t(N_st,N_det)) v0 = 0.d0 s0 = 0.d0 more = 1 - zmq_socket_pull = new_zmq_pull_socket() do while (more == 1) call davidson_pull_results(zmq_socket_pull, v_t, s_t, imin, imax, task_id) do j=1,N_st @@ -233,7 +231,6 @@ subroutine davidson_collector(zmq_to_qp_run_socket, v0, s0, sze, N_st) call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) end do deallocate(v_t,s_t) - call end_zmq_pull_socket(zmq_socket_pull) end subroutine @@ -280,12 +277,12 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze) N_det, N_st) - integer(ZMQ_PTR) :: zmq_to_qp_run_socket + integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull ASSERT (N_st == N_states_diag) ASSERT (sze >= N_det) - call new_parallel_job(zmq_to_qp_run_socket,'davidson') + call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'davidson') character*(512) :: task integer :: rc @@ -342,12 +339,12 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze) !$OMP PARALLEL NUM_THREADS(2) PRIVATE(ithread) ithread = omp_get_thread_num() if (ithread == 0 ) then - call davidson_collector(zmq_to_qp_run_socket, v_0, s_0, N_det, N_st) + call davidson_collector(zmq_to_qp_run_socket, zmq_socket_pull, v_0, s_0, N_det, N_st) else call davidson_slave_inproc(1) endif !$OMP END PARALLEL - call end_parallel_job(zmq_to_qp_run_socket, 'davidson') + call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'davidson') do k=1,N_st call dset_order(v_0(1,k),psi_bilinear_matrix_order_reverse,N_det) diff --git a/src/Determinants/H_apply_zmq.template.f b/src/Determinants/H_apply_zmq.template.f index 51efb1b2..3b44a9f7 100644 --- a/src/Determinants/H_apply_zmq.template.f +++ b/src/Determinants/H_apply_zmq.template.f @@ -22,7 +22,7 @@ subroutine $subroutine($params_main) $initialization PROVIDE H_apply_buffer_allocated mo_bielec_integrals_in_map psi_det_generators psi_coef_generators - integer(ZMQ_PTR), external :: new_zmq_pair_socket + integer(ZMQ_PTR), external :: new_zmq_pair_socket, zmq_socket_pull integer(ZMQ_PTR) :: zmq_socket_pair integer(ZMQ_PTR) :: zmq_to_qp_run_socket @@ -30,7 +30,7 @@ subroutine $subroutine($params_main) double precision, allocatable :: H_pert_diag_generators(:,:) double precision :: energy(N_st) - call new_parallel_job(zmq_to_qp_run_socket,'$subroutine') + call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'$subroutine') zmq_socket_pair = new_zmq_pair_socket(.True.) call zmq_put_psi(zmq_to_qp_run_socket,1) @@ -55,7 +55,7 @@ subroutine $subroutine($params_main) !$OMP num_threads(nproc+1) i = omp_get_thread_num() if (i == 0) then - call $subroutine_collector() + call $subroutine_collector(zmq_socket_pull) integer :: n, task_id call pull_pt2(zmq_socket_pair, pt2_generators, norm_pert_generators, H_pert_diag_generators, i_generator, size(pt2_generators), n, task_id) else @@ -65,7 +65,7 @@ subroutine $subroutine($params_main) call end_zmq_pair_socket(zmq_socket_pair) - call end_parallel_job(zmq_to_qp_run_socket,'$subroutine') + call end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'$subroutine') $copy_buffer @@ -192,7 +192,7 @@ subroutine $subroutine_slave(thread, iproc) end -subroutine $subroutine_collector +subroutine $subroutine_collector(zmq_socket_pull) use f77_zmq implicit none BEGIN_DOC @@ -202,7 +202,7 @@ subroutine $subroutine_collector integer :: k, rc integer(ZMQ_PTR), external :: new_zmq_pull_socket - integer(ZMQ_PTR) :: zmq_socket_pull + integer(ZMQ_PTR), intent(in) :: zmq_socket_pull integer*8 :: control, accu integer :: n, more, task_id, i_generator @@ -210,7 +210,6 @@ subroutine $subroutine_collector integer(ZMQ_PTR) :: zmq_to_qp_run_socket zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_pull = new_zmq_pull_socket() double precision, allocatable :: pt2(:), norm_pert(:), H_pert_diag(:) double precision, allocatable :: pt2_result(:,:), norm_pert_result(:,:), H_pert_diag_result(:,:) @@ -238,7 +237,6 @@ subroutine $subroutine_collector enddo - call end_zmq_pull_socket(zmq_socket_pull) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) diff --git a/src/Integrals_Bielec/ao_bi_integrals.irp.f b/src/Integrals_Bielec/ao_bi_integrals.irp.f index 2ee14962..0d9345c2 100644 --- a/src/Integrals_Bielec/ao_bi_integrals.irp.f +++ b/src/Integrals_Bielec/ao_bi_integrals.irp.f @@ -365,8 +365,8 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ] call wall_time(wall_1) call cpu_time(cpu_1) - integer(ZMQ_PTR) :: zmq_to_qp_run_socket - call new_parallel_job(zmq_to_qp_run_socket,'ao_integrals') + integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull + call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'ao_integrals') character(len=:), allocatable :: task allocate(character(len=ao_num*12) :: task) @@ -380,16 +380,16 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ] call zmq_set_running(zmq_to_qp_run_socket) PROVIDE nproc - !$OMP PARALLEL DEFAULT(private) num_threads(nproc+1) + !$OMP PARALLEL DEFAULT(shared) private(i) num_threads(nproc+1) i = omp_get_thread_num() if (i==0) then - call ao_bielec_integrals_in_map_collector(i) + call ao_bielec_integrals_in_map_collector(zmq_socket_pull) else call ao_bielec_integrals_in_map_slave_inproc(i) endif !$OMP END PARALLEL - call end_parallel_job(zmq_to_qp_run_socket, 'ao_integrals') + call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'ao_integrals') print*, 'Sorting the map' diff --git a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f index 1333753d..e86032e0 100644 --- a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f +++ b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f @@ -122,7 +122,7 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc) end -subroutine ao_bielec_integrals_in_map_collector +subroutine ao_bielec_integrals_in_map_collector(zmq_socket_pull) use map_module use f77_zmq implicit none @@ -130,6 +130,7 @@ subroutine ao_bielec_integrals_in_map_collector ! Collects results from the AO integral calculation END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_socket_pull integer :: j,l,n_integrals integer :: rc @@ -140,13 +141,11 @@ subroutine ao_bielec_integrals_in_map_collector integer(ZMQ_PTR) :: zmq_to_qp_run_socket integer(ZMQ_PTR), external :: new_zmq_pull_socket - integer(ZMQ_PTR) :: zmq_socket_pull integer*8 :: control, accu, sze integer :: task_id, more zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_pull = new_zmq_pull_socket() sze = ao_num*ao_num allocate ( buffer_i(sze), buffer_value(sze) ) @@ -223,7 +222,6 @@ IRP_ENDIF endif call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_pull_socket(zmq_socket_pull) end diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 06cff585..276590f1 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -140,15 +140,15 @@ function new_zmq_to_qp_run_socket() stop 'Unable to create zmq req socket' endif -! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 300000, 4) -! if (rc /= 0) then -! stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' -! endif -! -! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 300000, 4) -! if (rc /= 0) then -! stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 10000, 4) + if (rc /= 0) then + stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' + endif + + rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 10000, 4) + if (rc /= 0) then + stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' + endif rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0))) if (rc /= 0) then @@ -242,17 +242,17 @@ IRP_ENDIF stop 'Unable to create zmq pull socket' endif -! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on pull socket' -! endif -! + rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,30000,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_LINGER on pull socket' + endif + ! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVBUF,100000000,4) ! if (rc /= 0) then ! stop 'Unable to set ZMQ_RCVBUF on pull socket' ! endif - rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,3,4) + rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,5,4) if (rc /= 0) then stop 'Unable to set ZMQ_RCVHWM on pull socket' endif @@ -281,7 +281,9 @@ IRP_ENDIF rc = f77_zmq_bind(new_zmq_pull_socket, zmq_socket_pull_tcp_address) if (rc /= 0) then icount = icount-1 - call sleep(3) +! call sleep(3) + zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2+icount*100)//' ' + zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2+icount*100)//' ' else exit endif @@ -322,12 +324,12 @@ IRP_ENDIF stop 'Unable to create zmq push socket' endif -! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on push socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,30000,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_LINGER on push socket' + endif - rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,2,4) + rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,5,4) if (rc /= 0) then stop 'Unable to set ZMQ_SNDHWM on push socket' endif @@ -336,16 +338,16 @@ IRP_ENDIF ! if (rc /= 0) then ! stop 'Unable to set ZMQ_SNDBUF on push socket' ! endif -! - rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_IMMEDIATE,1,4) + + rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_IMMEDIATE,0,4) if (rc /= 0) then stop 'Unable to set ZMQ_IMMEDIATE on push socket' endif -! rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 100000, 4) -! if (rc /= 0) then -! stop 'Unable to set send timout in new_zmq_push_socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 10000, 4) + if (rc /= 0) then + stop 'Unable to set send timout in new_zmq_push_socket' + endif if (thread == 1) then rc = f77_zmq_connect(new_zmq_push_socket, zmq_socket_push_inproc_address) @@ -478,10 +480,10 @@ subroutine end_zmq_push_socket(zmq_socket_push,thread) integer :: rc character*(8), external :: zmq_port -! rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on push socket' -! endif + rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,30000,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_LINGER on push socket' + endif call omp_set_lock(zmq_lock) rc = f77_zmq_close(zmq_socket_push) @@ -503,7 +505,7 @@ BEGIN_PROVIDER [ character*(128), zmq_state ] zmq_state = 'No_state' END_PROVIDER -subroutine new_parallel_job(zmq_to_qp_run_socket,name_in) +subroutine new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) use f77_zmq implicit none BEGIN_DOC @@ -514,7 +516,8 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,name_in) character*(512) :: message, name integer :: rc, sze integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket - integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket + integer(ZMQ_PTR),external :: new_zmq_pull_socket + integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket, zmq_socket_pull call omp_set_lock(zmq_lock) zmq_context = f77_zmq_ctx_new () @@ -527,7 +530,9 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,name_in) ! print *, 'Unable to set the number of ZMQ IO threads to', nproc ! endif + zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + zmq_socket_pull = new_zmq_pull_socket () name = name_in sze = len(trim(name)) call lowercase(name,sze) @@ -580,13 +585,13 @@ subroutine zmq_set_running(zmq_to_qp_run_socket) end -subroutine end_parallel_job(zmq_to_qp_run_socket,name_in) +subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) use f77_zmq implicit none BEGIN_DOC ! End a new parallel job with name 'name'. The slave tasks execute subroutine 'slave' END_DOC - integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket, zmq_socket_pull character*(*), intent(in) :: name_in character*(512) :: message, name @@ -599,15 +604,20 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,name_in) stop 'Wrong end of job' endif - call sleep(1) - rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0) - rc = f77_zmq_recv(zmq_to_qp_run_socket, zmq_state, 2, 0) - if (rc /= 2) then - print *, 'f77_zmq_recv(zmq_to_qp_run_socket, zmq_state, 2, 0)' - stop 'error' - endif + do i=1,30 + rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0) + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0) + if (trim(message(1:13)) == 'error waiting') then + print *, trim(message(6:rc)) + call sleep(1) + cycle + else if (message(1:2) == 'ok') then + exit + endif + end do zmq_state = 'No_state' call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) + call end_zmq_pull_socket(zmq_socket_pull) call omp_set_lock(zmq_lock) rc = f77_zmq_ctx_term(zmq_context) @@ -649,6 +659,7 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) message = trim(message(1:rc)) if(message(1:5) == "error") then + print *, trim(message(1:rc)) worker_id = -1 return end if @@ -679,9 +690,9 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) - if (rc == -1) then - return - endif +! if (rc == -1) then +! return +! endif if (rc /= sze) then print *, rc, sze @@ -838,6 +849,46 @@ subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) end +subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks) + use f77_zmq + implicit none + BEGIN_DOC + ! Get a task from the task server + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer, intent(in) :: n_tasks, worker_id, task_id(n_tasks) + + integer :: rc, sze, k + character(LEN=:), allocatable :: message + character*(64) :: fmt + + allocate(character(LEN=64+n_tasks*12) :: message) + write(fmt,*) '(A,X,A,I10,X,', n_tasks, '(I11,1X))' + write(message,*) 'task_done '//trim(zmq_state), worker_id, (task_id(k), k=1,n_tasks) + + sze = len(trim(message)) + rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) + if (rc == -1) then + ! Server is shut down + deallocate(message) + return + endif + + if (rc /= sze) then + print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' + stop 'error' + endif + + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 64, 0) + if (trim(message(1:rc)) /= 'ok') then + print *, trim(message(1:rc)) + print *, 'Unable to send task_done message' + stop -1 + endif + deallocate(message) + +end + subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) use f77_zmq implicit none @@ -913,11 +964,13 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) if (rc /= sze) then + print *, trim(message) + print *, rc, sze print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' stop 'error' endif - message = repeat(' ',512) + message = repeat(' ',1024) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0) rc = min(1024,rc) read(message(1:rc),*) reply @@ -970,10 +1023,10 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) character*(8), external :: zmq_port integer :: rc -! rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' -! endif + rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,30000,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' + endif rc = f77_zmq_close(zmq_to_qp_run_socket) if (rc /= 0) then @@ -995,11 +1048,11 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) integer, intent(in) :: task_id integer, intent(out) :: more integer :: rc - character*(512) :: msg + character*(512) :: message - write(msg,*) 'del_task ', zmq_state, task_id - rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) then + write(message,*) 'del_task ', zmq_state, task_id + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) + if (rc /= len(trim(message))) then print *, irp_here stop 'error' endif @@ -1019,6 +1072,48 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) endif end +subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) + use f77_zmq + implicit none + BEGIN_DOC +! When a task is done, it has to be removed from the list of tasks on the qp_run +! queue. This guarantees that the results have been received in the pull. + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer(ZMQ_PTR) :: zmq_socket_pull + integer, intent(in) :: n_tasks, task_id(n_tasks) + integer, intent(out) :: more + integer :: rc, k + character*(64) :: fmt, reply + character(LEN=:), allocatable :: message + + allocate(character(LEN=64+n_tasks*12) :: message) + + write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))' + write(message,*) 'del_task '//trim(zmq_state), (task_id(k), k=1,n_tasks) + + + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) + if (rc /= len(trim(message))) then + print *, irp_here + stop 'error' + endif + deallocate(message) + + reply = '' + rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0) + + if (reply(16:19) == 'more') then + more = 1 + else if (reply(16:19) == 'done') then + more = 0 + else + print *, reply + print *, irp_here + stop 'error' + endif +end + subroutine wait_for_next_state(state) use f77_zmq