diff --git a/config/ifort.cfg b/config/ifort.cfg index b94d0cd4..77761c85 100644 --- a/config/ifort.cfg +++ b/config/ifort.cfg @@ -9,7 +9,7 @@ FC : ifort LAPACK_LIB : -mkl=parallel IRPF90 : irpf90 -IRPF90_FLAGS : --ninja --align=32 +IRPF90_FLAGS : --ninja --align=32 -DZMQ_PUSH # Global options ################ diff --git a/plugins/CAS_SD_ZMQ/run_selection_slave.irp.f b/plugins/CAS_SD_ZMQ/run_selection_slave.irp.f index 6619b6d0..3e0d3e07 100644 --- a/plugins/CAS_SD_ZMQ/run_selection_slave.irp.f +++ b/plugins/CAS_SD_ZMQ/run_selection_slave.irp.f @@ -22,21 +22,25 @@ subroutine run_selection_slave(thread,iproc,energy) double precision :: pt2(N_states) zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_push = new_zmq_push_socket(thread) - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) - if(worker_id == -1) then - print *, "WORKER -1" - !call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + + integer, external :: connect_to_taskserver + + if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_push_socket(zmq_socket_push,thread) return - end if + endif + + zmq_socket_push = new_zmq_push_socket(thread) + buf%N = 0 ctask = 1 pt2 = 0d0 do - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) + integer, external :: get_task_from_taskserver + if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) == -1) then + exit + endif done = task_id(ctask) == 0 if (done) then ctask = ctask - 1 @@ -53,10 +57,18 @@ subroutine run_selection_slave(thread,iproc,energy) call select_connected(i_generator,energy,pt2,buf) endif + integer, external :: task_done_to_taskserver if(done .or. ctask == size(task_id)) then if(buf%N == 0 .and. ctask > 0) stop "uninitialized selection_buffer" do i=1, ctask - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) + if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then + call sleep(1) + if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then + done = .True. + ctask = 0 + exit + endif + endif end do if(ctask > 0) then call push_selection_results(zmq_socket_push, pt2, buf, task_id(1), ctask) @@ -74,7 +86,12 @@ subroutine run_selection_slave(thread,iproc,energy) if(done) exit ctask = ctask + 1 end do - call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + + integer, external :: disconnect_from_taskserver + if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then + continue + endif + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) end subroutine diff --git a/plugins/CAS_SD_ZMQ/selection.irp.f b/plugins/CAS_SD_ZMQ/selection.irp.f index c8f17ac4..f5cdfa86 100644 --- a/plugins/CAS_SD_ZMQ/selection.irp.f +++ b/plugins/CAS_SD_ZMQ/selection.irp.f @@ -1200,6 +1200,10 @@ subroutine ZMQ_selection(N_in, pt2) integer, external :: omp_get_thread_num double precision, intent(out) :: pt2(N_states) integer, parameter :: maxtasks=10000 + integer, external :: zmq_put_psi + integer, external :: zmq_put_N_det_generators + integer, external :: zmq_put_N_det_selectors + integer, external :: zmq_put_dvector N = max(N_in,1) @@ -1211,10 +1215,18 @@ subroutine ZMQ_selection(N_in, pt2) PROVIDE psi_bilinear_matrix_transp_order call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'selection') - call zmq_put_psi(zmq_to_qp_run_socket,1) - call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) - call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) - call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) + if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then + stop 'Unable to put psi' + endif + if (zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) == -1) then + stop 'Unable to put N_det_generators' + endif + if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) then + stop 'Unable to put N_det_selectors' + endif + if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then + stop 'Unable to put energy' + endif call create_selection_buffer(N, N*2, b) endif @@ -1222,17 +1234,22 @@ subroutine ZMQ_selection(N_in, pt2) task = ' ' integer :: k + integer, external :: add_task_to_taskserver k=0 do i= 1, N_det_generators k = k+1 write(task(20*(k-1)+1:20*k),'(I9,1X,I9,''|'')') i, N if (k>=maxtasks) then k=0 - call add_task_to_taskserver(zmq_to_qp_run_socket,task) + if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then + stop 'Unable to add task to task server' + endif endif enddo if (k > 0) then - call add_task_to_taskserver(zmq_to_qp_run_socket,task) + if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then + stop 'Unable to add task to task server' + endif endif call zmq_set_running(zmq_to_qp_run_socket) @@ -1308,7 +1325,10 @@ subroutine selection_collector(zmq_socket_pull, b, pt2) if(task_id(i) == 0) then print *, "Error in collector" endif - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more) + integer, external :: zmq_delete_task + if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more) == -1) then + stop 'Unable to delete task' + endif end do done += ntask call CPU_TIME(time) diff --git a/plugins/CAS_SD_ZMQ/selection_cassd_slave.irp.f b/plugins/CAS_SD_ZMQ/selection_cassd_slave.irp.f index f8a4997a..43281eba 100644 --- a/plugins/CAS_SD_ZMQ/selection_cassd_slave.irp.f +++ b/plugins/CAS_SD_ZMQ/selection_cassd_slave.irp.f @@ -27,6 +27,7 @@ subroutine run_wf character*(64) :: states(4) integer :: rc, i + integer, external :: zmq_get_psi call provide_everything zmq_context = f77_zmq_ctx_new () @@ -50,7 +51,7 @@ subroutine run_wf ! --------- print *, 'Selection' - call zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) + if (zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) == -1) cycle !$OMP PARALLEL PRIVATE(i) i = omp_get_thread_num() @@ -64,7 +65,7 @@ subroutine run_wf ! -------- print *, 'Davidson' - call zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) + if (zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) == -1) cycle call omp_set_nested(.True.) call davidson_slave_tcp(0) call omp_set_nested(.False.) @@ -76,7 +77,7 @@ subroutine run_wf ! --- print *, 'PT2' - call zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) + if (zmq_get_psi(zmq_to_qp_run_socket,1,energy,N_states) == -1) cycle !$OMP PARALLEL PRIVATE(i) i = omp_get_thread_num() diff --git a/plugins/Full_CI_ZMQ/pt2_slave.irp.f b/plugins/Full_CI_ZMQ/pt2_slave.irp.f index 0e6ae715..718884d1 100644 --- a/plugins/Full_CI_ZMQ/pt2_slave.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_slave.irp.f @@ -25,7 +25,9 @@ subroutine run_wf double precision :: energy(N_states_diag) character*(64) :: states(1) integer :: rc, i + integer, external :: zmq_get_dvector + integer, external :: zmq_get_psi call provide_everything @@ -48,7 +50,7 @@ subroutine run_wf ! --------- print *, 'PT2' - call zmq_get_psi(zmq_to_qp_run_socket,1) + if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle PROVIDE psi_bilinear_matrix_columns_loc psi_det_alpha_unique psi_det_beta_unique diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index 6d27daea..1039d0f4 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -28,7 +28,6 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) double precision :: time double precision :: w(N_states) integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket - integer, external :: zmq_put_dvector if (N_det < max(10,N_states)) then pt2=0.d0 @@ -66,9 +65,20 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) print *, '========== ================= ================= =================' call new_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2') - call zmq_put_psi(zmq_to_qp_run_socket,1) - call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) - call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) + + integer, external :: zmq_put_psi + integer, external :: zmq_put_N_det_generators + integer, external :: zmq_put_N_det_selectors + integer, external :: zmq_put_dvector + if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then + stop 'Unable to put psi on ZMQ server' + endif + if (zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) == -1) then + stop 'Unable to put N_det_generators on ZMQ server' + endif + if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) then + stop 'Unable to put N_det_selectors on ZMQ server' + endif if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then stop 'Unable to put energy on ZMQ server' endif @@ -76,13 +86,17 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) integer :: ipos ipos=1 + + integer, external :: add_task_to_taskserver do i=1,tbc(0) if(tbc(i) > fragment_first) then write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') 0, tbc(i) ipos += 20 if (ipos > 63980) then - call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) + if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then + stop 'Unable to add task to task server' + endif ipos=1 endif else @@ -90,14 +104,18 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) write(task(ipos:ipos+20),'(I9,1X,I9,''|'')') j, tbc(i) ipos += 20 if (ipos > 63980) then - call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) + if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then + stop 'Unable to add task to task server' + endif ipos=1 endif end do end if end do if (ipos > 1) then - call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) + if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task(1:ipos))) == -1) then + stop 'Unable to add task to task server' + endif endif call zmq_set_running(zmq_to_qp_run_socket) @@ -254,7 +272,10 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_ if(parts_to_get(index(i)) == 0) actually_computed(index(i)) = .true. enddo - call zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) + integer, external :: zmq_delete_tasks + if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then + stop 'Unable to delete tasks' + endif if (more == 0) then loop = .False. endif @@ -269,11 +290,16 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_ end if end do - double precision :: E0, avg, prop + integer, external :: zmq_abort + if (firstTBDcomb > Ncomb) then - call zmq_abort(zmq_to_qp_run_socket) + if (zmq_abort(zmq_to_qp_run_socket) == -1) then + stop 'Error in sending abort signal' + endif exit pullLoop endif + + double precision :: E0, avg, prop call do_carlo(tbc, Ncomb+1-firstTBDcomb, comb(firstTBDcomb), pt2_detail, actually_computed, sumabove, sum2above, Nabove) firstTBDcomb = int(Nabove(1)) - orgTBDcomb + 1 if(Nabove(1) < 5d0) cycle @@ -295,7 +321,9 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_ pt2(pt2_stoch_istate) = avg error(pt2_stoch_istate) = eqt print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, '' - call zmq_abort(zmq_to_qp_run_socket) + if (zmq_abort(zmq_to_qp_run_socket) == -1) then + stop 'Error in sending abort signal' + endif else if (Nabove(tooth) > Nabove_old) then print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, '' diff --git a/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f b/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f index 5b6dc7c8..949eef3c 100644 --- a/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f +++ b/plugins/Full_CI_ZMQ/run_pt2_slave.irp.f @@ -28,14 +28,17 @@ subroutine run_pt2_slave(thread,iproc,energy) n_tasks_max = N_det_generators/100+1 allocate(task_id(n_tasks_max), task(n_tasks_max)) allocate(pt2(N_states,n_tasks_max), i_generator(n_tasks_max), subset(n_tasks_max)) + zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_push = new_zmq_push_socket(thread) - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) - if(worker_id == -1) then + + integer, external :: connect_to_taskserver + if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_push_socket(zmq_socket_push,thread) return - end if + endif + + zmq_socket_push = new_zmq_push_socket(thread) + buf%N = 0 n_tasks = 0 call create_selection_buffer(1, 2, buf) @@ -45,7 +48,10 @@ subroutine run_pt2_slave(thread,iproc,energy) n_tasks = min(n_tasks+1,n_tasks_max) - call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task, n_tasks) + integer, external :: get_tasks_from_taskserver + if (get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task, n_tasks) == -1) then + exit + endif done = task_id(n_tasks) == 0 if (done) n_tasks = n_tasks-1 if (n_tasks == 0) exit @@ -59,10 +65,18 @@ subroutine run_pt2_slave(thread,iproc,energy) buf%cur = 0 call select_connected(i_generator(k),energy,pt2(1,k),buf,subset(k)) enddo - call tasks_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id,n_tasks) + integer, external :: tasks_done_to_taskserver + if (tasks_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id,n_tasks) == -1) then + done = .true. + endif call push_pt2_results(zmq_socket_push, i_generator, pt2, task_id, n_tasks) end do - call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + + integer, external :: disconnect_from_taskserver + if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then + continue + endif + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) call delete_selection_buffer(buf) diff --git a/plugins/Full_CI_ZMQ/run_selection_slave.irp.f b/plugins/Full_CI_ZMQ/run_selection_slave.irp.f index 030c052e..fc89ff14 100644 --- a/plugins/Full_CI_ZMQ/run_selection_slave.irp.f +++ b/plugins/Full_CI_ZMQ/run_selection_slave.irp.f @@ -27,20 +27,25 @@ subroutine run_selection_slave(thread,iproc,energy) PROVIDE psi_bilinear_matrix_transp_order zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_push = new_zmq_push_socket(thread) - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) - if(worker_id == -1) then - call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_push_socket(zmq_socket_push,thread) + + integer, external :: connect_to_taskserver + if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) return - end if + endif + + zmq_socket_push = new_zmq_push_socket(thread) + buf%N = 0 buffer_ready = .False. ctask = 1 pt2(:) = 0d0 do - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) + integer, external :: get_task_from_taskserver + if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(ctask), task) == -1) then + exit + endif done = task_id(ctask) == 0 if (done) then ctask = ctask - 1 @@ -58,9 +63,18 @@ subroutine run_selection_slave(thread,iproc,energy) call select_connected(i_generator,energy,pt2,buf,0) endif + integer, external :: task_done_to_taskserver + if(done .or. ctask == size(task_id)) then do i=1, ctask - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) + if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then + call sleep(1) + if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(i)) == -1) then + ctask = 0 + done = .true. + exit + endif + endif end do if(ctask > 0) then call sort_selection_buffer(buf) @@ -76,7 +90,13 @@ subroutine run_selection_slave(thread,iproc,energy) if(done) exit ctask = ctask + 1 end do - call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + + + integer, external :: disconnect_from_taskserver + if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then + continue + endif + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) if (buffer_ready) then diff --git a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f index 8357a616..49943ad1 100644 --- a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f +++ b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f @@ -33,7 +33,9 @@ subroutine run_wf integer :: rc, i, ierr double precision :: t0, t1 - integer, external :: zmq_get_dvector + integer, external :: zmq_get_dvector, zmq_get_N_det_generators + integer, external :: zmq_get_psi, zmq_get_N_det_selectors + integer, external :: zmq_get_N_states_diag call provide_everything @@ -59,10 +61,10 @@ subroutine run_wf ! --------- call wall_time(t0) - call zmq_get_psi(zmq_to_qp_run_socket,1) + if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle + if (zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) == -1) cycle + if (zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) cycle if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle - call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) - call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) call wall_time(t1) call write_double(6,(t1-t0),'Broadcast time') @@ -80,8 +82,8 @@ subroutine run_wf print *, 'Davidson' call wall_time(t0) - call zmq_get_psi(zmq_to_qp_run_socket,1) - call zmq_get_N_states_diag(zmq_to_qp_run_socket,1) + if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle + if (zmq_get_N_states_diag(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states_diag) == -1) cycle call wall_time(t1) @@ -99,10 +101,10 @@ subroutine run_wf print *, 'PT2' call wall_time(t0) - call zmq_get_psi(zmq_to_qp_run_socket,1) + if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle - call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) - call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) + if (zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) == -1) cycle + if (zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) cycle call wall_time(t1) call write_double(6,(t1-t0),'Broadcast time') diff --git a/plugins/Full_CI_ZMQ/selection_slave.irp.f b/plugins/Full_CI_ZMQ/selection_slave.irp.f index c0f60c89..aa749151 100644 --- a/plugins/Full_CI_ZMQ/selection_slave.irp.f +++ b/plugins/Full_CI_ZMQ/selection_slave.irp.f @@ -27,7 +27,9 @@ subroutine run_wf double precision :: energy(N_states) character*(64) :: states(4) integer :: rc, i, ierr + integer, external :: zmq_get_dvector + integer, external :: zmq_get_psi call provide_everything @@ -52,7 +54,7 @@ subroutine run_wf ! --------- print *, 'Selection' - call zmq_get_psi(zmq_to_qp_run_socket,1) + if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle !$OMP PARALLEL PRIVATE(i) @@ -67,7 +69,7 @@ subroutine run_wf ! --- print *, 'PT2' - call zmq_get_psi(zmq_to_qp_run_socket,1) + if (zmq_get_psi(zmq_to_qp_run_socket,1) == -1) cycle if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle logical :: lstop diff --git a/plugins/Full_CI_ZMQ/zmq_selection.irp.f b/plugins/Full_CI_ZMQ/zmq_selection.irp.f index 251256f8..a7b901dd 100644 --- a/plugins/Full_CI_ZMQ/zmq_selection.irp.f +++ b/plugins/Full_CI_ZMQ/zmq_selection.irp.f @@ -11,7 +11,6 @@ subroutine ZMQ_selection(N_in, pt2) integer, external :: omp_get_thread_num double precision, intent(out) :: pt2(N_states) integer, parameter :: maxtasks=10000 - integer, external :: zmq_put_dvector PROVIDE fragment_count @@ -25,15 +24,28 @@ subroutine ZMQ_selection(N_in, pt2) PROVIDE psi_bilinear_matrix_transp_order call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'selection') - call zmq_put_psi(zmq_to_qp_run_socket,1) - call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) - call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) + + integer, external :: zmq_put_psi + integer, external :: zmq_put_N_det_generators + integer, external :: zmq_put_N_det_selectors + integer, external :: zmq_put_dvector + + if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then + stop 'Unable to put psi on ZMQ server' + endif + if (zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) == -1) then + stop 'Unable to put N_det_generators on ZMQ server' + endif + if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) == -1) then + stop 'Unable to put N_det_selectors on ZMQ server' + endif if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then stop 'Unable to put energy on ZMQ server' endif call create_selection_buffer(N, N*2, b) endif + integer, external :: add_task_to_taskserver character*(20*maxtasks) :: task task = ' ' @@ -44,11 +56,15 @@ subroutine ZMQ_selection(N_in, pt2) write(task(20*(k-1)+1:20*k),'(I9,1X,I9,''|'')') i, N if (k>=maxtasks) then k=0 - call add_task_to_taskserver(zmq_to_qp_run_socket,task) + if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then + stop 'Unable to add task to task server' + endif endif end do if (k > 0) then - call add_task_to_taskserver(zmq_to_qp_run_socket,task) + if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then + stop 'Unable to add task to task server' + endif endif call zmq_set_running(zmq_to_qp_run_socket) @@ -130,7 +146,10 @@ subroutine selection_collector(zmq_socket_pull, b, N, pt2) if(task_id(i) == 0) then print *, "Error in collector" endif - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more) + integer, external :: zmq_delete_task + if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(i),more) == -1) then + stop 'Unable to delete task' + endif end do end do diff --git a/plugins/Selectors_Utils/zmq.irp.f b/plugins/Selectors_Utils/zmq.irp.f index 111630b8..56d35ffb 100644 --- a/plugins/Selectors_Utils/zmq.irp.f +++ b/plugins/Selectors_Utils/zmq.irp.f @@ -1,6 +1,6 @@ BEGIN_TEMPLATE -subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id) +integer function zmq_put_$X(zmq_to_qp_run_socket,worker_id) use f77_zmq implicit none BEGIN_DOC @@ -11,29 +11,30 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id) integer :: rc character*(256) :: msg + zmq_put_$X = 0 + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then - print *, irp_here, ': Error sending $X' - stop 'error' + zmq_put_$X = -1 + return endif rc = f77_zmq_send(zmq_to_qp_run_socket,$X,4,0) if (rc /= 4) then - print *, irp_here, ': Error sending $X' - stop 'error' + zmq_put_$X = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:rc) /= 'put_data_reply ok') then - print *, rc, trim(msg) - print *, irp_here, ': Error in put_data_reply' - stop 'error' + zmq_put_$X = -1 + return endif end -subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id) +integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id) use f77_zmq implicit none BEGIN_DOC @@ -44,38 +45,53 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id) integer :: rc character*(256) :: msg - write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' - rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) then - print *, irp_here, ': Error getting $X' - stop 'error' + zmq_get_$X = 0 + if (mpi_master) then + + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) + if (rc /= len(trim(msg))) go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) + if (msg(1:14) /= 'get_data_reply') go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0) + if (rc /= 4) go to 10 + endif - rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') then - print *, rc, trim(msg) - print *, irp_here, ': Error in get_data_reply' - stop 'error' - endif - - rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0) - if (rc /= 4) then - print *, rc - print *, irp_here, ': Error getting $X' - stop 'error' - endif + ! Normal exit IRP_IF MPI include 'mpif.h' integer :: ierr - call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast N_det_generators' + stop -1 + endif + if (zmq_get_$X == 0) then + call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast N_det_generators' + stop -1 + endif + endif + IRP_ENDIF + + return + + ! Exception + 10 continue + zmq_get_$X = -1 + IRP_IF MPI + call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) if (ierr /= MPI_SUCCESS) then print *, irp_here//': Unable to broadcast N_det_generators' stop -1 endif IRP_ENDIF - end SUBST [ X ] diff --git a/plugins/mrcepa0/dressing_slave.irp.f b/plugins/mrcepa0/dressing_slave.irp.f index d7f081cd..7dfa33d5 100644 --- a/plugins/mrcepa0/dressing_slave.irp.f +++ b/plugins/mrcepa0/dressing_slave.irp.f @@ -56,12 +56,18 @@ subroutine mrsc2_dressing_slave(thread,iproc) logical, external :: is_in_wavefunction integer,allocatable :: komon(:) logical :: komoned + integer, external :: connect_to_taskserver, disconnect_from_taskserver !double precision, external :: get_dij - zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_push = new_zmq_push_socket(thread) + integer, external :: add_task_to_taskserver - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) + zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) + return + endif + + zmq_socket_push = new_zmq_push_socket(thread) allocate (delta(N_states,0:N_det_non_ref, 2)) allocate (delta_s2(N_states,0:N_det_non_ref, 2)) @@ -74,7 +80,10 @@ subroutine mrsc2_dressing_slave(thread,iproc) do - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) + integer, external :: get_task_from_taskserver + if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) == -1) then + exit + endif if (task_id == 0) exit read (task,*) i_I, J, k1, k2 do i_state=1, N_states @@ -191,12 +200,17 @@ subroutine mrsc2_dressing_slave(thread,iproc) end do ! kk call push_mrsc2_results(zmq_socket_push, I_i, J, delta, delta_s2, task_id) - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) + integer, external :: task_done_to_taskserver + if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then + stop 'Unable to send task_done to server' + endif enddo deallocate(delta) - call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) + continue + endif call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) @@ -389,17 +403,18 @@ end -subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2_) +subroutine mrsc2_dressing_collector(zmq_socket_pull,delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2_) use f77_zmq implicit none BEGIN_DOC ! Collects results from the AO integral calculation END_DOC - double precision,intent(inout) :: delta_ij_(N_states,N_det_non_ref,N_det_ref) - double precision,intent(inout) :: delta_ii_(N_states,N_det_ref) - double precision,intent(inout) :: delta_ij_s2_(N_states,N_det_non_ref,N_det_ref) - double precision,intent(inout) :: delta_ii_s2_(N_states,N_det_ref) + double precision,intent(inout) :: delta_ij_(N_states,N_det_non_ref,N_det_ref) + double precision,intent(inout) :: delta_ii_(N_states,N_det_ref) + double precision,intent(inout) :: delta_ij_s2_(N_states,N_det_non_ref,N_det_ref) + double precision,intent(inout) :: delta_ii_s2_(N_states,N_det_ref) + integer(ZMQ_PTR), intent(in) :: zmq_socket_pull ! integer :: j,l integer :: rc @@ -410,7 +425,6 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2 integer(ZMQ_PTR) :: zmq_to_qp_run_socket integer(ZMQ_PTR), external :: new_zmq_pull_socket - integer(ZMQ_PTR) :: zmq_socket_pull integer*8 :: control, accu integer :: task_id, more @@ -424,7 +438,6 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2 delta_ij_s2_(:,:,:) = 0d0 zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_pull = new_zmq_pull_socket() allocate ( delta(N_states,0:N_det_non_ref,2), delta_s2(N_states,0:N_det_non_ref,2) ) @@ -466,7 +479,10 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2 if (task_id /= 0) then - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) + integer, external :: zmq_delete_task + if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then + stop 'Unable to delete task' + endif endif @@ -474,7 +490,6 @@ subroutine mrsc2_dressing_collector(delta_ii_,delta_ij_,delta_ii_s2_,delta_ij_s2 deallocate( delta, delta_s2 ) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_pull_socket(zmq_socket_pull) end @@ -498,12 +513,12 @@ end integer, external :: get_index_in_psi_det_sorted_bit, searchDet, detCmp logical, external :: is_in_wavefunction, isInCassd, detEq character*(512) :: task - integer(ZMQ_PTR) :: zmq_to_qp_run_socket + integer(ZMQ_PTR) :: zmq_to_qp_run_socket, zmq_socket_pull integer :: KKsize = 1000000 - call new_parallel_job(zmq_to_qp_run_socket,'mrsc2') + call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'mrsc2') call wall_time(iwall) @@ -573,14 +588,18 @@ end do kk = 1 , nlink(i_I), KKsize write(task,*) I_i, J, kk, int(min(kk+KKsize-1, nlink(i_I))) - call add_task_to_taskserver(zmq_to_qp_run_socket,task) + if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then + stop 'Unable to add task to task server' + endif end do ! do kk = 1 , nlink(i_I) ! k = linked(kk,i_I) ! blok = blokMwen(kk,i_I) ! write(task,*) I_i, J, k, blok - ! call add_task_to_taskserver(zmq_to_qp_run_socket,task) + ! if (add_task_to_taskserver(zmq_to_qp_run_socket,task) == -1) then + ! stop 'Unable to add task to task server' + ! endif ! ! enddo !kk enddo !J @@ -593,17 +612,19 @@ end ! rc = pthread_create(collector_thread, mrsc2_dressing_collector) print *, nzer, ntot, float(nzer) / float(ntot) provide nproc - !$OMP PARALLEL DEFAULT(none) SHARED(delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old) PRIVATE(i) NUM_THREADS(nproc+1) + !$OMP PARALLEL DEFAULT(none) & + !$OMP SHARED(delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old,zmq_socket_pull)& + !$OMP PRIVATE(i) NUM_THREADS(nproc+1) i = omp_get_thread_num() if (i==0) then - call mrsc2_dressing_collector(delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old) + call mrsc2_dressing_collector(zmq_socket_pull,delta_ii_old,delta_ij_old,delta_ii_s2_old,delta_ij_s2_old) else call mrsc2_dressing_slave_inproc(i) endif !$OMP END PARALLEL ! rc = pthread_join(collector_thread) - call end_parallel_job(zmq_to_qp_run_socket, 'mrsc2') + call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'mrsc2') END_PROVIDER diff --git a/src/Davidson/davidson_parallel.irp.f b/src/Davidson/davidson_parallel.irp.f index 4e4680f0..fed80063 100644 --- a/src/Davidson/davidson_parallel.irp.f +++ b/src/Davidson/davidson_parallel.irp.f @@ -33,16 +33,22 @@ subroutine davidson_run_slave(thread,iproc) integer(ZMQ_PTR) :: zmq_socket_push zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + + integer, external :: connect_to_taskserver + + if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) + endif + zmq_socket_push = new_zmq_push_socket(thread) - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) - if(worker_id == -1) then - call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) - call end_zmq_push_socket(zmq_socket_push,thread) - return - end if - + call davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_states_diag, N_det, worker_id) - call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + + integer, external :: disconnect_from_taskserver + if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then + continue + endif + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) end subroutine @@ -62,6 +68,7 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze, integer, allocatable :: psi_det_read(:,:,:) double precision, allocatable :: v_t(:,:), s_t(:,:), u_t(:,:) + !DIR$ ATTRIBUTES ALIGN : $IRP_ALIGN :: u_t, v_t, s_t ! Get wave function (u_t) @@ -102,13 +109,20 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze, allocate(v_t(N_st,N_det), s_t(N_st,N_det)) do - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, msg) + integer, external :: get_task_from_taskserver + integer, external :: task_done_to_taskserver + call sleep(1) + if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, msg) == -1) then + exit + endif if(task_id == 0) exit read (msg,*) imin, imax, ishift, istep v_t = 0.d0 s_t = 0.d0 call H_S2_u_0_nstates_openmp_work(v_t,s_t,u_t,N_st,N_det,imin,imax,ishift,istep) - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) + if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then + print *, irp_here, 'Unable to send task_done' + endif call davidson_push_results(zmq_socket_push, v_t, s_t, imin, imax, task_id) end do deallocate(u_t,v_t, s_t) @@ -231,7 +245,10 @@ subroutine davidson_collector(zmq_to_qp_run_socket, zmq_socket_pull, v0, s0, sze s0(i,j) = s0(i,j) + s_t(j,i) enddo enddo - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) + integer, external :: zmq_delete_task + if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then + stop 'Unable to delete task' + endif end do deallocate(v_t,s_t) @@ -292,12 +309,16 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze) integer*8 :: rc8 double precision :: energy(N_st) - integer, external :: zmq_put_dvector + integer, external :: zmq_put_dvector, zmq_put_psi, zmq_put_N_states_diag energy = 0.d0 - call zmq_put_N_states_diag(zmq_to_qp_run_socket, 1) - call zmq_put_psi(zmq_to_qp_run_socket,1) + if (zmq_put_N_states_diag(zmq_to_qp_run_socket, 1) == -1) then + stop 'Unable to put N_states_diag on ZMQ server' + endif + if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then + stop 'Unable to put psi on ZMQ server' + endif if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then stop 'Unable to put energy on ZMQ server' endif @@ -313,6 +334,7 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze) integer :: istep, imin, imax, ishift double precision :: w, max_workload, N_det_inv, di + integer, external :: add_task_to_taskserver w = 0.d0 istep=1 ishift=0 @@ -325,7 +347,9 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze) if (w > max_workload) then do ishift=0,istep-1 write(task,'(4(I9,1X),1A)') imin, imax, ishift, istep, '|' - call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) + if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then + stop 'Unable to add task' + endif enddo imin = imax+1 w = 0.d0 @@ -335,7 +359,9 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze) imax = N_det do ishift=0,istep-1 write(task,'(4(I9,1X),1A)') imin, imax, ishift, istep, '|' - call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) + if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then + stop 'Unable to add task' + endif enddo endif @@ -378,7 +404,7 @@ BEGIN_PROVIDER [ integer, nthreads_davidson ] END_PROVIDER -subroutine zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id) +integer function zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id) use f77_zmq implicit none BEGIN_DOC @@ -389,29 +415,30 @@ subroutine zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id) integer :: rc character*(256) :: msg + zmq_put_N_states_diag = 0 + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'N_states_diag' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then - print *, irp_here, ': Error sending N_states_diag' - stop 'error' + zmq_put_N_states_diag = -1 + return endif rc = f77_zmq_send(zmq_to_qp_run_socket,N_states_diag,4,0) if (rc /= 4) then - print *, irp_here, ': Error sending N_states_diag' - stop 'error' + zmq_put_N_states_diag = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:rc) /= 'put_data_reply ok') then - print *, rc, trim(msg) - print *, irp_here, ': Error in put_data_reply' - stop 'error' + zmq_put_N_states_diag = -1 + return endif end -subroutine zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id) +integer function zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id) use f77_zmq implicit none BEGIN_DOC @@ -422,31 +449,45 @@ subroutine zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id) integer :: rc character*(256) :: msg - write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'N_states_diag' - rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) then - print *, irp_here, ': Error getting N_states_diag' - stop 'error' - endif + zmq_get_N_states_diag = 0 - rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') then - print *, rc, trim(msg) - print *, irp_here, ': Error in get_data_reply' - stop 'error' - endif - - rc = f77_zmq_recv(zmq_to_qp_run_socket,N_states_diag,4,0) - if (rc /= 4) then - print *, irp_here, ': Error getting N_states_diag' - stop 'error' + if (mpi_master) then + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'N_states_diag' + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) + if (rc /= len(trim(msg))) go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) + if (msg(1:14) /= 'get_data_reply') go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,N_states_diag,4,0) + if (rc /= 4) go to 10 endif IRP_IF MPI include 'mpif.h' integer :: ierr - call MPI_BCAST (N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + call MPI_BCAST (zmq_get_N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast N_states' + stop -1 + endif + if (zmq_get_N_states_diag == 0) then + call MPI_BCAST (N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast N_states' + stop -1 + endif + endif + IRP_ENDIF + + return + + ! Exception + 10 continue + zmq_get_N_states_diag = -1 + IRP_IF MPI + call MPI_BCAST (zmq_get_N_states_diag, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) if (ierr /= MPI_SUCCESS) then print *, irp_here//': Unable to broadcast N_states' stop -1 diff --git a/src/Determinants/H_apply_zmq.template.f b/src/Determinants/H_apply_zmq.template.f index cf74b590..bcc30726 100644 --- a/src/Determinants/H_apply_zmq.template.f +++ b/src/Determinants/H_apply_zmq.template.f @@ -33,9 +33,20 @@ subroutine $subroutine($params_main) call new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,'$subroutine') zmq_socket_pair = new_zmq_pair_socket(.True.) - call zmq_put_psi(zmq_to_qp_run_socket,1) - call zmq_put_N_det_generators(zmq_to_qp_run_socket, worker_id) - call zmq_put_N_det_selectors(zmq_to_qp_run_socket, worker_id) + integer, external :: zmq_put_psi + integer, external :: zmq_put_N_det_generators + integer, external :: zmq_put_N_det_selectors + integer, external :: zmq_put_dvector + + if (zmq_put_psi(zmq_to_qp_run_socket,1) == -1) then + stop 'Unable to put psi on ZMQ server' + endif + if (zmq_put_N_det_generators(zmq_to_qp_run_socket, worker_id) == -1) then + stop 'Unable to put N_det_generators on ZMQ server' + endif + if (zmq_put_N_det_selectors(zmq_to_qp_run_socket, worker_id) == -1) then + stop 'Unable to put N_det_selectors on ZMQ server' + endif if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then stop 'Unable to put energy on ZMQ server' endif @@ -43,7 +54,10 @@ subroutine $subroutine($params_main) do i_generator=1,N_det_generators $skip write(task,*) i_generator - call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) + integer, external :: add_task_to_taskserver + if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then + stop 'Unable to add task to taskserver' + endif enddo allocate ( pt2_generators(N_states,N_det_generators), & @@ -122,17 +136,24 @@ subroutine $subroutine_slave(thread, iproc) integer(ZMQ_PTR) :: zmq_socket_push zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() - zmq_socket_push = new_zmq_push_socket(thread) + integer, external :: connect_to_taskserver + if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) + return + endif + + zmq_socket_push = new_zmq_push_socket(thread) N_st = N_states allocate( pt2(N_st), norm_pert(N_st), H_pert_diag(N_st), & mask(N_int,2,6), fock_diag_tmp(2,mo_tot_num+1) ) - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) - do - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) + integer, external :: get_task_from_taskserver + if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) == -1) then + exit + endif if (task_id == 0) exit read(task,*) i_generator @@ -180,15 +201,21 @@ subroutine $subroutine_slave(thread, iproc) fock_diag_tmp, i_generator, iproc $params_post) endif - call task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) + integer, external :: task_done_to_taskserver + if (task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) == -1) then + print *, irp_here, ': Unable to send task_done' + endif call push_pt2(zmq_socket_push,pt2,norm_pert,H_pert_diag,i_generator,N_st,task_id) enddo - call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) - deallocate( mask, fock_diag_tmp, pt2, norm_pert, H_pert_diag ) + + integer, external :: disconnect_from_taskserver + if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then + continue + endif call end_zmq_push_socket(zmq_socket_push,thread) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) @@ -234,7 +261,10 @@ subroutine $subroutine_collector(zmq_socket_pull) H_pert_diag_result(k,i_generator) = H_pert_diag(k) enddo accu = accu + 1_8 - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) + integer, external :: zmq_delete_task + if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then + stop 'Unable to delete task' + endif endif enddo diff --git a/src/Determinants/zmq.irp.f b/src/Determinants/zmq.irp.f index 6eab1f8d..57bbd3ca 100644 --- a/src/Determinants/zmq.irp.f +++ b/src/Determinants/zmq.irp.f @@ -1,4 +1,4 @@ -subroutine zmq_put_psi(zmq_to_qp_run_socket,worker_id) +integer function zmq_put_psi(zmq_to_qp_run_socket,worker_id) use f77_zmq implicit none BEGIN_DOC @@ -8,11 +8,33 @@ subroutine zmq_put_psi(zmq_to_qp_run_socket,worker_id) integer, intent(in) :: worker_id character*(256) :: msg - call zmq_put_N_states(zmq_to_qp_run_socket, worker_id) - call zmq_put_N_det(zmq_to_qp_run_socket, worker_id) - call zmq_put_psi_det_size(zmq_to_qp_run_socket, worker_id) - call zmq_put_psi_det(zmq_to_qp_run_socket, worker_id) - call zmq_put_psi_coef(zmq_to_qp_run_socket, worker_id) + integer, external :: zmq_put_N_states + integer, external :: zmq_put_N_det + integer, external :: zmq_put_psi_det_size + integer, external :: zmq_put_psi_det + integer, external :: zmq_put_psi_coef + + zmq_put_psi = 0 + if (zmq_put_N_states(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_put_psi = -1 + return + endif + if (zmq_put_N_det(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_put_psi = -1 + return + endif + if (zmq_put_psi_det_size(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_put_psi = -1 + return + endif + if (zmq_put_psi_det(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_put_psi = -1 + return + endif + if (zmq_put_psi_coef(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_put_psi = -1 + return + endif end @@ -20,7 +42,7 @@ end BEGIN_TEMPLATE -subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id) +integer function zmq_put_$X(zmq_to_qp_run_socket,worker_id) use f77_zmq implicit none BEGIN_DOC @@ -31,29 +53,30 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id) integer :: rc character*(256) :: msg + zmq_put_$X = 0 + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then - print *, irp_here, ': Error sending $X' - stop 'error' + zmq_put_$X = -1 + return endif rc = f77_zmq_send(zmq_to_qp_run_socket,$X,4,0) if (rc /= 4) then - print *, irp_here, ': Error sending $X' - stop 'error' + zmq_put_$X = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:rc) /= 'put_data_reply ok') then - print *, rc, trim(msg) - print *, irp_here, ': Error in put_data_reply' - stop 'error' + zmq_put_$X = -1 + return endif end -subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id) +integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id) use f77_zmq implicit none BEGIN_DOC @@ -64,26 +87,48 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id) integer :: rc character*(256) :: msg - write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' - rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) then - print *, irp_here, ': Error getting $X' - stop 'error' + if (mpi_master) then + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) + if (rc /= len(trim(msg))) go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) + if (msg(1:14) /= 'get_data_reply') go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0) + if (rc /= 4) go to 10 + endif - rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') then - print *, rc, trim(msg) - print *, irp_here, ': Error in get_data_reply' - stop 'error' - endif + ! Normal exit + zmq_get_$X = 0 + IRP_IF MPI + include 'mpif.h' + integer :: ierr - rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0) - if (rc /= 4) then - print *, rc - print *, irp_here, ': Error getting $X' - stop 'error' - endif + call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + stop 'Unable to broadcast zmq_get_psi_det' + endif + if (zmq_get_$X == 0) then + call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + stop 'Unable to broadcast zmq_get_psi_det' + endif + endif + IRP_ENDIF + + return + + ! Exception + 10 continue + zmq_get_$X = -1 + IRP_IF MPI + call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + stop 'Unable to broadcast zmq_get_psi_det' + endif + IRP_ENDIF end SUBST [ X ] @@ -94,7 +139,7 @@ psi_det_size ;; END_TEMPLATE -subroutine zmq_put_psi_det(zmq_to_qp_run_socket,worker_id) +integer function zmq_put_psi_det(zmq_to_qp_run_socket,worker_id) use f77_zmq implicit none BEGIN_DOC @@ -106,28 +151,29 @@ subroutine zmq_put_psi_det(zmq_to_qp_run_socket,worker_id) integer*8 :: rc8 character*(256) :: msg + zmq_put_psi_det = 0 + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_det' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then - print *, irp_here, ': Error sending psi_det' - stop 'error' + zmq_put_psi_det = -1 + return endif rc8 = f77_zmq_send8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0) if (rc8 /= N_int*2_8*N_det*bit_kind) then - print *, irp_here, ': Error sending psi_det' - stop 'error' + zmq_put_psi_det = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:rc) /= 'put_data_reply ok') then - print *, rc, trim(msg) - print *, irp_here, ': Error in put_data_reply' - stop 'error' + zmq_put_psi_det = -1 + return endif end -subroutine zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id) +integer function zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id) use f77_zmq implicit none BEGIN_DOC @@ -139,31 +185,33 @@ subroutine zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id) integer*8 :: rc8 character*(256) :: msg + zmq_put_psi_coef = 0 + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_coef' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then - print *, irp_here, ': Error sending psi_coef' - stop 'error' + zmq_put_psi_coef = -1 + return endif rc8 = f77_zmq_send8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0) if (rc8 /= psi_det_size*N_states*8_8) then - print *, irp_here, ': Error sending psi_coef' - stop 'error' + zmq_put_psi_coef = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:rc) /= 'put_data_reply ok') then - print *, rc, trim(msg) - print *, irp_here, ': Error in put_data_reply' - stop 'error' + zmq_put_psi_coef = -1 + return endif + end !--------------------------------------------------------------------------- -subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id) +integer function zmq_get_psi(zmq_to_qp_run_socket, worker_id) use f77_zmq implicit none BEGIN_DOC @@ -172,9 +220,26 @@ subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id) integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket integer, intent(in) :: worker_id - call zmq_get_N_states(zmq_to_qp_run_socket, worker_id) - call zmq_get_N_det(zmq_to_qp_run_socket, worker_id) - call zmq_get_psi_det_size(zmq_to_qp_run_socket, worker_id) + integer, external :: zmq_get_N_states + integer, external :: zmq_get_N_det + integer, external :: zmq_get_psi_det_size + integer, external :: zmq_get_psi_det + integer, external :: zmq_get_psi_coef + + zmq_get_psi = 0 + + if (zmq_get_N_states(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_get_psi = -1 + return + endif + if (zmq_get_N_det(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_get_psi = -1 + return + endif + if (zmq_get_psi_det_size(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_get_psi = -1 + return + endif if (size(psi_det) /= N_int*2_8*psi_det_size*bit_kind) then deallocate(psi_det) @@ -186,14 +251,20 @@ subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id) allocate(psi_coef(psi_det_size,N_states)) endif - call zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) - call zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) + if (zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_get_psi = -1 + return + endif + if (zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) == -1) then + zmq_get_psi = -1 + return + endif SOFT_TOUCH psi_det psi_coef psi_det_size N_det N_states end -subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) +integer function zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) use f77_zmq implicit none BEGIN_DOC @@ -205,33 +276,46 @@ subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) integer*8 :: rc8 character*(256) :: msg - - write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det' - rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) then - print *, irp_here, ': Error getting psi_det' - stop 'error' + if (mpi_master) then + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det' + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) + if (rc /= len(trim(msg))) go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) + if (msg(1:14) /= 'get_data_reply') go to 10 + + rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0) + if (rc8 /= N_int*2_8*N_det*bit_kind) go to 10 endif - rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') then - print *, rc, trim(msg) - print *, irp_here, ': Error in get_data_reply' - stop 'error' - endif - - rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0) - if (rc8 /= N_int*2_8*N_det*bit_kind) then - print *, irp_here, ': Error getting psi_det', rc8, N_int*2_8*N_det*bit_kind - stop 'error' - endif + ! Normal exit + zmq_get_psi_det = 0 IRP_IF MPI - call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2) + include 'mpif.h' + integer :: ierr + call MPI_BCAST (zmq_get_psi_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + stop 'Unable to broadcast zmq_get_psi_det' + endif + if (zmq_get_psi_det == 0) then + call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2) + endif IRP_ENDIF + return + + ! Exception + 10 continue + zmq_get_psi_det = -1 + IRP_IF MPI + call MPI_BCAST (zmq_get_psi_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + stop 'Unable to broadcast zmq_get_psi_det' + endif + IRP_ENDIF end -subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) +integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) use f77_zmq implicit none BEGIN_DOC @@ -243,31 +327,44 @@ subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) integer*8 :: rc8 character*(256) :: msg + if (mpi_master) then + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef' + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) + if (rc /= len(trim(msg))) go to 10 - write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef' - rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) then - print *, irp_here, ': Error getting psi_coef' - stop 'error' - endif + rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) + if (msg(1:14) /= 'get_data_reply') go to 10 - rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') then - print *, rc, trim(msg) - print *, irp_here, ': Error in get_data_reply' - stop 'error' - endif - - rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0) - if (rc8 /= psi_det_size*N_states*8_8) then - print *, irp_here, ': Error getting psi_coef' - stop 'error' + rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0) + if (rc8 /= psi_det_size*N_states*8_8) go to 10 endif + + ! Normal exit + zmq_get_psi_coef = 0 IRP_IF MPI - call broadcast_chunks_double(psi_coef,N_states*N_det) + include 'mpif.h' + integer :: ierr + call MPI_BCAST (zmq_get_psi_coef, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + stop 'Unable to broadcast zmq_get_psi_coef' + endif + if (zmq_get_psi_coef == 0) then + call broadcast_chunks_double(psi_coef,N_states*N_det) + endif IRP_ENDIF + return + + ! Exception + 10 continue + zmq_get_psi_coef = -1 + IRP_IF MPI + call MPI_BCAST (zmq_get_psi_coef, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + stop 'Unable to broadcast zmq_get_psi_coef' + endif + IRP_ENDIF end diff --git a/src/Integrals_Bielec/ao_bi_integrals.irp.f b/src/Integrals_Bielec/ao_bi_integrals.irp.f index d7c503b4..3ede50df 100644 --- a/src/Integrals_Bielec/ao_bi_integrals.irp.f +++ b/src/Integrals_Bielec/ao_bi_integrals.irp.f @@ -373,7 +373,10 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ] write(fmt,*) '(', ao_num, '(I5,X,I5,''|''))' do l=1,ao_num write(task,fmt) (i,l, i=1,l) - call add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) + integer, external :: add_task_to_taskserver + if (add_task_to_taskserver(zmq_to_qp_run_socket,trim(task)) == -1) then + stop 'Unable to add task to server' + endif enddo deallocate(task) diff --git a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f index e86032e0..153993bc 100644 --- a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f +++ b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f @@ -98,23 +98,37 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc) character*(64) :: state zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + + integer, external :: connect_to_taskserver + if (connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) == -1) then + call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) + return + endif + zmq_socket_push = new_zmq_push_socket(thread) allocate ( buffer_i(ao_num*ao_num), buffer_value(ao_num*ao_num) ) - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) do - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) + integer, external :: get_task_from_taskserver + if (get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) == -1) then + exit + endif if (task_id == 0) exit read(task,*) j, l + integer, external :: task_done_to_taskserver call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value) - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) + if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then + stop 'Unable to send task_done' + endif call push_integrals(zmq_socket_push, n_integrals, buffer_i, buffer_value, task_id) enddo - - call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + integer, external :: disconnect_from_taskserver + if (disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) == -1) then + continue + endif deallocate( buffer_i, buffer_value ) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) @@ -200,7 +214,10 @@ IRP_ENDIF call insert_into_ao_integrals_map(n_integrals,buffer_i,buffer_value) accu += n_integrals if (task_id /= 0) then - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) + integer, external :: zmq_delete_task + if (zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) == -1) then + stop 'Unable to delete task' + endif endif endif diff --git a/src/ZMQ/put_get.irp.f b/src/ZMQ/put_get.irp.f index 230f54b9..2a7bbe74 100644 --- a/src/ZMQ/put_get.irp.f +++ b/src/ZMQ/put_get.irp.f @@ -12,32 +12,27 @@ integer function zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_ integer :: rc character*(256) :: msg - ! Failure - zmq_put_dvector = -1 + zmq_put_dvector = 0 write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, name rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then - print *, irp_here, ': Error sending '//name + zmq_put_dvector = -1 return endif rc = f77_zmq_send(zmq_to_qp_run_socket,x,size_x*8,0) if (rc /= size_x*8) then - print *, irp_here, ': Error sending '//name + zmq_put_dvector = -1 return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:rc) /= 'put_data_reply ok') then - print *, rc, trim(msg) - print *, irp_here, ': Error in put_data_reply' + zmq_put_dvector = -1 return endif - ! Success - zmq_put_dvector = 0 - end @@ -59,26 +54,19 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_ ! Success zmq_get_dvector = 0 - write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name - rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) then - print *, irp_here, ': Error getting '//name - zmq_get_dvector = -1 - endif - - rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') then - print *, rc, trim(msg) - print *, irp_here, ': Error in get_data_reply' - zmq_get_dvector = -1 - endif - - rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0) - if (rc /= size_x*8) then - print *, irp_here, ': Error getting '//name - zmq_get_dvector = -1 + if (mpi_master) then + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name + rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) + if (rc /= len(trim(msg))) go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) + if (msg(1:14) /= 'get_data_reply') go to 10 + + rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0) + if (rc /= size_x*8) go to 10 endif + ! Normal exit IRP_IF MPI integer :: ierr @@ -88,13 +76,27 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_ print *, irp_here//': Unable to broadcast zmq_get_dvector' stop -1 endif - call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here//': Unable to broadcast dvector' - stop -1 + if (zmq_get_dvector == 0) then + call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast dvector' + stop -1 + endif endif IRP_ENDIF + return + + ! Exception + 10 continue + zmq_get_dvector = -1 + IRP_IF MPI + call MPI_BCAST (zmq_get_dvector, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast zmq_get_dvector' + stop -1 + endif + IRP_ENDIF end diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 1189803c..47b2d965 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -644,7 +644,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) endif end -subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) +integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) use f77_zmq implicit none BEGIN_DOC @@ -657,38 +657,42 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) character*(512) :: message character*(128) :: reply, state, address integer :: rc + + !Success + connect_to_taskserver = 0 + if (thread == 1) then rc = f77_zmq_send(zmq_to_qp_run_socket, "connect inproc", 14, 0) if (rc /= 14) then - print *, 'f77_zmq_send(zmq_to_qp_run_socket, "connect inproc", 14, 0)' - stop 'error' + connect_to_taskserver = -1 + return endif else rc = f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0) if (rc /= 11) then - print *, 'f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0)' - stop 'error' + connect_to_taskserver = -1 + return endif endif rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) message = trim(message(1:rc)) if(message(1:5) == "error") then - worker_id = -1 + connect_to_taskserver = -1 return end if read(message,*, end=10, err=10) reply, state, worker_id, address if (trim(reply) /= 'connect_reply') then - print *, trim(message) - stop -1 + connect_to_taskserver = -1 + return endif + return 10 continue - print *, irp_here, ': Error in read' - stop + connect_to_taskserver = -1 end -subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & +integer function disconnect_from_taskserver(zmq_to_qp_run_socket, & zmq_socket_push, worker_id) use f77_zmq implicit none @@ -701,18 +705,17 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & integer :: rc, sze character*(64) :: message, reply, state + + disconnect_from_taskserver = 0 + write(message,*) 'disconnect '//trim(zmq_state), worker_id sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) -! if (rc == -1) then -! return -! endif if (rc /= sze) then - print *, rc, sze - print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' + disconnect_from_taskserver = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) @@ -720,26 +723,23 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & read(message,*, end=10, err=10) reply, state if ((trim(reply) == 'disconnect_reply').and.(trim(state) == trim(zmq_state))) then + disconnect_from_taskserver = -1 return endif if (trim(message) == 'error Wrong state') then + disconnect_from_taskserver = -1 + return + else if (trim(message) == 'error No job is running') then + disconnect_from_taskserver = -1 return endif - if (trim(message) == 'error No job is running') then - return - endif - - print *, 'Unable to disconnect : ', trim(zmq_state) - print *, trim(message) - stop -1 return 10 continue - print *, irp_here, ': Error in read' - stop + disconnect_from_taskserver = -1 end -subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task) +integer function add_task_to_taskserver(zmq_to_qp_run_socket,task) use f77_zmq implicit none BEGIN_DOC @@ -751,71 +751,27 @@ subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task) integer :: rc, sze character(len=:), allocatable :: message + add_task_to_taskserver = 0 + message='add_task '//trim(zmq_state)//' '//trim(task) sze = len(message) rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) if (rc /= sze) then - print *, rc, sze - print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' + add_task_to_taskserver = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket, message, sze-1, 0) if (message(1:rc) /= 'ok') then - print *, trim(message(1:rc)) - print *, trim(task) - print *, 'Unable to add the next task' - stop -1 + add_task_to_taskserver = -1 + return endif end -subroutine add_task_to_taskserver_send(zmq_to_qp_run_socket,task) - use f77_zmq - implicit none - BEGIN_DOC - ! Get a task from the task server - END_DOC - integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket - character*(*), intent(in) :: task - - integer :: rc, sze - character(len=:), allocatable :: message - sze = len(trim(task))+12+len(trim(zmq_state)) - message = repeat(' ',sze) - write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) - - rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) - if (rc /= sze) then - print *, rc, sze - print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' - endif - -end - -subroutine add_task_to_taskserver_recv(zmq_to_qp_run_socket) - use f77_zmq - implicit none - BEGIN_DOC - ! Get a task from the task server - END_DOC - integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket - - integer :: rc, sze - character*(512) :: message - rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) - if (message(1:rc) /= 'ok') then - print *, trim(message(1:rc)) - print *, 'Unable to add the next task' - stop -1 - endif - -end - -subroutine zmq_abort(zmq_to_qp_run_socket) +integer function zmq_abort(zmq_to_qp_run_socket) use f77_zmq implicit none BEGIN_DOC @@ -824,25 +780,26 @@ subroutine zmq_abort(zmq_to_qp_run_socket) integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket integer :: rc, sze character*(512) :: message + zmq_abort = 0 + write(message,*) 'abort ' sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc /= sze) then - print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' + zmq_abort = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) if (trim(message(1:rc)) /= 'ok') then - print *, trim(message(1:rc)) - print *, 'Unable to send abort message' - stop -1 + zmq_abort = -1 + return endif end -subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) +integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) use f77_zmq implicit none BEGIN_DOC @@ -853,25 +810,27 @@ subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) integer :: rc, sze character*(512) :: message + + task_done_to_taskserver = 0 + write(message,*) 'task_done '//trim(zmq_state), worker_id, task_id sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc /= sze) then - print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' + task_done_to_taskserver = -1 + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) if (trim(message(1:rc)) /= 'ok') then - print *, trim(message(1:rc)) - print *, 'Unable to send task_done message' - stop -1 + task_done_to_taskserver = -1 + return endif end -subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks) +integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks) use f77_zmq implicit none BEGIN_DOC @@ -884,6 +843,8 @@ subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_ character(LEN=:), allocatable :: message character*(64) :: fmt + tasks_done_to_taskserver = 0 + allocate(character(LEN=64+n_tasks*12) :: message) write(fmt,*) '(A,X,A,I10,X,', n_tasks, '(I11,1X))' write(message,*) 'task_done '//trim(zmq_state), worker_id, (task_id(k), k=1,n_tasks) @@ -891,27 +852,26 @@ subroutine tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_ sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc == -1) then - ! Server is shut down + tasks_done_to_taskserver = -1 deallocate(message) return endif if (rc /= sze) then - print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' + tasks_done_to_taskserver = -1 + deallocate(message) + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 64, 0) if (trim(message(1:rc)) /= 'ok') then - print *, trim(message(1:rc)) - print *, 'Unable to send task_done message' - stop -1 + tasks_done_to_taskserver = -1 endif deallocate(message) end -subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) +integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) use f77_zmq implicit none BEGIN_DOC @@ -926,13 +886,15 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) character*(64) :: reply integer :: rc, sze + get_task_from_taskserver = 0 + write(message,*) 'get_task '//trim(zmq_state), worker_id sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) if (rc /= sze) then - print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' + get_task_from_taskserver = -1 + return endif message = repeat(' ',512) @@ -960,19 +922,18 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) task_id = 0 task = 'terminate' else - print *, 'Unable to get the next task' - print *, trim(message) - stop -1 + get_task_from_taskserver = -1 + return endif return + 10 continue - print *, irp_here, ': Error in read' - stop + get_task_from_taskserver = -1 end -subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,n_tasks) +integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,n_tasks) use f77_zmq implicit none BEGIN_DOC @@ -988,15 +949,15 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task character*(64) :: reply integer :: rc, sze, i + get_tasks_from_taskserver = 0 + write(message,*) 'get_tasks '//trim(zmq_state), worker_id, n_tasks sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) if (rc /= sze) then - print *, trim(message) - print *, rc, sze - print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' - stop 'error' + get_tasks_from_taskserver = -1 + return endif message = repeat(' ',1024) @@ -1004,17 +965,16 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task rc = min(1024,rc) read(message(1:rc),*, end=10, err=10) reply if (trim(message) == 'get_tasks_reply ok') then - continue + continue else if (trim(message) == 'terminate') then - task_id(1) = 0 - task(1) = 'terminate' + task_id(1) = 0 + task(1) = 'terminate' else if (trim(message) == 'error No job is running') then - task_id(1) = 0 - task(1) = 'terminate' + task_id(1) = 0 + task(1) = 'terminate' else - print *, 'Unable to get the next task' - print *, ':'//trim(message)//':' - stop -1 + get_tasks_from_taskserver = -1 + return endif task(:) = repeat(' ',512) @@ -1038,11 +998,11 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task rc += 1 task(i) = message(rc:) enddo - return + 10 continue - print *, irp_here, ': Error in read' - stop + get_tasks_from_taskserver = -1 + return end @@ -1070,7 +1030,7 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) end -subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) +integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) use f77_zmq implicit none BEGIN_DOC @@ -1084,11 +1044,13 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) integer :: rc character*(512) :: message + zmq_delete_task = 0 + write(message,*) 'del_task ', zmq_state, task_id rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) if (rc /= len(trim(message))) then - print *, irp_here - stop 'error' + zmq_delete_task = -1 + return endif character*(64) :: reply @@ -1100,13 +1062,12 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) else if (reply(16:19) == 'done') then more = 0 else - print *, reply - print *, irp_here - stop 'error' + zmq_delete_task = -1 + return endif end -subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) +integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) use f77_zmq implicit none BEGIN_DOC @@ -1121,6 +1082,8 @@ subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks character*(64) :: fmt, reply character(LEN=:), allocatable :: message + zmq_delete_tasks = 0 + allocate(character(LEN=64+n_tasks*12) :: message) write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))' @@ -1129,8 +1092,9 @@ subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) if (rc /= len(trim(message))) then - print *, irp_here - stop 'error' + zmq_delete_tasks = -1 + deallocate(message) + return endif deallocate(message) @@ -1142,9 +1106,7 @@ subroutine zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks else if (reply(16:19) == 'done') then more = 0 else - print *, reply - print *, irp_here - stop 'error' + zmq_delete_tasks = -1 endif end