diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 44a46f52..9fa27d14 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -208,7 +208,7 @@ let end_job msg program_state rep_socket pair_socket = address_tcp = None; address_inproc = None; running = true; - accepting_clients = false; + accepting_clients = false; data = StringHashtbl.create (); } diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index aea967eb..e6758e8f 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -89,12 +89,12 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) if (zmq_put_ivector(zmq_to_qp_run_socket,1,'pt2_stoch_istate',pt2_stoch_istate,1) == -1) then stop 'Unable to put pt2_stoch_istate on ZMQ server' endif - if (zmq_put_dvector(zmq_to_qp_run_socket,1,'threshold_selectors',threshold_selectors,1) == -1) then - stop 'Unable to put threshold_selectors on ZMQ server' - endif - if (zmq_put_dvector(zmq_to_qp_run_socket,1,'threshold_generators',threshold_generators,1) == -1) then - stop 'Unable to put threshold_generators on ZMQ server' - endif + if (zmq_put_dvector(zmq_to_qp_run_socket,1,'threshold_selectors',threshold_selectors,1) == -1) then + stop 'Unable to put threshold_selectors on ZMQ server' + endif + if (zmq_put_dvector(zmq_to_qp_run_socket,1,'threshold_generators',threshold_generators,1) == -1) then + stop 'Unable to put threshold_generators on ZMQ server' + endif call create_selection_buffer(1, 1*2, b) @@ -139,7 +139,6 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) endif - call omp_set_nested(.true.) !$OMP PARALLEL DEFAULT(shared) NUM_THREADS(nproc+1) & !$OMP PRIVATE(i) i = omp_get_thread_num() @@ -150,7 +149,6 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) call pt2_slave_inproc(i) endif !$OMP END PARALLEL - call omp_set_nested(.false.) call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2') call delete_selection_buffer(b) @@ -279,13 +277,7 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_ loop = .True. pullLoop : do while (loop) - integer, external :: zmq_delete_tasks_async_send, zmq_delete_tasks_async_recv - integer, external :: zmq_delete_tasks - call pull_pt2_results(zmq_socket_pull, index, pt2_mwen, task_id, n_tasks) - if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then - stop 'Unable to send delete tasks' - endif do i=1,n_tasks pt2_detail(1:N_states, index(i)) += pt2_mwen(1:N_states,i) parts_to_get(index(i)) -= 1 @@ -298,13 +290,18 @@ subroutine pt2_collector(zmq_socket_pull, E, b, tbc, comb, Ncomb, computed, pt2_ if(parts_to_get(index(i)) == 0) actually_computed(index(i)) = .true. enddo - call wall_time(time) - + integer, external :: zmq_delete_tasks + if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) == -1) then + cycle + endif if (more == 0) then loop = .False. endif - if(time - timeLast > 4d0 .or. (.not.loop)) then + call wall_time(time) + + + if(time - timeLast > 5d0 .or. (.not.loop)) then timeLast = time do i=1, first_det_of_teeth(1)-1 if(.not.(actually_computed(i))) then diff --git a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f index 3ecf2fb4..88b30172 100644 --- a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f +++ b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f @@ -52,27 +52,23 @@ subroutine run_wf do if (mpi_master) then - print *, trim(zmq_state) call wait_for_states(states,zmq_state,size(states)) + if (zmq_state(1:64) == old_state(1:64)) then + call sleep(1) + cycle + else + old_state(1:64) = zmq_state(1:64) + endif + print *, trim(zmq_state) endif IRP_IF MPI - call MPI_BARRIER(MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here, 'error in barrier' - endif call MPI_BCAST (zmq_state, 128, MPI_CHARACTER, 0, MPI_COMM_WORLD, ierr) if (ierr /= MPI_SUCCESS) then print *, irp_here, 'error in broadcast of zmq_state' endif IRP_ENDIF - if (zmq_state == old_state) then - cycle - else - old_state = zmq_state - endif - if(zmq_state(1:7) == 'Stopped') then exit endif @@ -110,6 +106,13 @@ subroutine run_wf call run_selection_slave(0,i,energy) !$OMP END PARALLEL print *, 'Selection done' + IRP_IF MPI + call MPI_BARRIER(MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here, 'error in barrier' + endif + IRP_ENDIF + print *, 'All selection done' else if (zmq_state(1:8) == 'davidson') then @@ -130,6 +133,13 @@ subroutine run_wf call davidson_slave_tcp(0) call omp_set_nested(.False.) print *, 'Davidson done' + IRP_IF MPI + call MPI_BARRIER(MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here, 'error in barrier' + endif + IRP_ENDIF + print *, 'All Davidson done' else if (zmq_state(1:3) == 'pt2') then @@ -168,11 +178,19 @@ subroutine run_wf print *, 'PT2 done' FREE state_average_weight + IRP_IF MPI + call MPI_BARRIER(MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here, 'error in barrier' + endif + IRP_ENDIF + print *, 'All PT2 done' + endif end do IRP_IF MPI - call MPI_finalize(i) + call MPI_finalize(ierr) IRP_ENDIF end diff --git a/src/Determinants/density_matrix.irp.f b/src/Determinants/density_matrix.irp.f index a4e5f5f0..0bd0a187 100644 --- a/src/Determinants/density_matrix.irp.f +++ b/src/Determinants/density_matrix.irp.f @@ -376,9 +376,14 @@ BEGIN_PROVIDER [ double precision, l3_weight, (N_states) ] enddo l3_weight(i) = min(1.d0/l3_weight(i), 100.d0) enddo - print *, 'L3 weights' - print *, '----------' - print *, l3_weight(1:N_states) + if (mpi_master) then + print *, '' + print *, 'L3 weights' + print *, '----------' + print *, '' + print *, l3_weight(1:N_states) + print *, '' + endif END_PROVIDER diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 14ab4302..d0c73f17 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -148,15 +148,15 @@ function new_zmq_to_qp_run_socket() stop 'Unable to create zmq req socket' endif -! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 300000, 4) -! if (rc /= 0) then -! stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' -! endif -! -! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 300000, 4) -! if (rc /= 0) then -! stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 300000, 4) + if (rc /= 0) then + stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' + endif + + rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 300000, 4) + if (rc /= 0) then + stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' + endif rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0))) if (rc /= 0) then @@ -188,25 +188,11 @@ function new_zmq_pair_socket(bind) endif -! rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_SNDHWM, 2, 4) -! if (rc /= 0) then -! stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_SNDHWM, 2, 4)' -! endif -! -! rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_RCVHWM, 2, 4) -! if (rc /= 0) then -! stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_RCVHWM, 2, 4)' -! endif -! rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_IMMEDIATE, 1, 4) if (rc /= 0) then stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_IMMEDIATE, 1, 4)' endif -! -! rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_LINGER, 600000, 4) -! if (rc /= 0) then -! stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_LINGER, 600000, 4)' -! endif + if (bind) then rc = f77_zmq_bind(new_zmq_pair_socket,zmq_socket_pair_inproc_address) @@ -250,20 +236,20 @@ IRP_ENDIF stop 'Unable to create zmq pull socket' endif -! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on pull socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_LINGER on pull socket' + endif ! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVBUF,100000000,4) ! if (rc /= 0) then ! stop 'Unable to set ZMQ_RCVBUF on pull socket' ! endif -! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,5,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_RCVHWM on pull socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,1,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_RCVHWM on pull socket' + endif integer :: icount @@ -332,15 +318,15 @@ IRP_ENDIF stop 'Unable to create zmq push socket' endif -! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on push socket' -! endif -! -! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,1,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_SNDHWM on push socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_LINGER on push socket' + endif + + rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,1,4) + if (rc /= 0) then + stop 'Unable to set ZMQ_SNDHWM on push socket' + endif ! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDBUF,100000000,4) ! if (rc /= 0) then @@ -352,10 +338,10 @@ IRP_ENDIF stop 'Unable to set ZMQ_IMMEDIATE on push socket' endif -! rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 300000, 4) -! if (rc /= 0) then -! stop 'Unable to set send timout in new_zmq_push_socket' -! endif + rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 300000, 4) + if (rc /= 0) then + stop 'Unable to set send timout in new_zmq_push_socket' + endif if (thread == 1) then rc = f77_zmq_connect(new_zmq_push_socket, zmq_socket_push_inproc_address) @@ -488,10 +474,10 @@ subroutine end_zmq_push_socket(zmq_socket_push,thread) integer :: rc character*(8), external :: zmq_port -! rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on push socket' -! endif + rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) + if (rc /= 0) then + print *, 'warning: Unable to set ZMQ_LINGER on push socket' + endif call omp_set_lock(zmq_lock) rc = f77_zmq_close(zmq_socket_push) @@ -615,7 +601,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) stop 'Wrong end of job' endif - do i=10,1,-1 + do i=300,1,-1 rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0) if (trim(message(1:13)) == 'error waiting') then @@ -685,6 +671,14 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) connect_to_taskserver = -1 return endif + if (trim(state) /= zmq_state) then + integer, external :: disconnect_from_taskserver_state + if (disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_id, state) == -1) then + continue + endif + connect_to_taskserver = -1 + return + endif return 10 continue @@ -699,19 +693,32 @@ integer function disconnect_from_taskserver(zmq_to_qp_run_socket, worker_id) END_DOC integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket integer, intent(in) :: worker_id + integer, external :: disconnect_from_taskserver_state + disconnect_from_taskserver = disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_id, zmq_state(1:128)) +end + +integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_id, state) + use f77_zmq + implicit none + BEGIN_DOC + ! Disconnect from the task server + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer, intent(in) :: worker_id integer :: rc, sze - character*(512) :: message, reply, state + character*(512) :: message, reply + character*(128) :: state - disconnect_from_taskserver = 0 + disconnect_from_taskserver_state = 0 - write(message,*) 'disconnect '//trim(zmq_state), worker_id + write(message,*) 'disconnect '//trim(state), worker_id sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc /= sze) then - disconnect_from_taskserver = -1 + disconnect_from_taskserver_state = -1 return endif @@ -720,20 +727,20 @@ integer function disconnect_from_taskserver(zmq_to_qp_run_socket, worker_id) read(message,*, end=10, err=10) reply, state if ((trim(reply) == 'disconnect_reply').and.(trim(state) == trim(zmq_state))) then - disconnect_from_taskserver = -1 + disconnect_from_taskserver_state = -1 return endif if (trim(message) == 'error Wrong state') then - disconnect_from_taskserver = -1 + disconnect_from_taskserver_state = -1 return else if (trim(message) == 'error No job is running') then - disconnect_from_taskserver = -1 + disconnect_from_taskserver_state = -1 return endif return 10 continue - disconnect_from_taskserver = -1 + disconnect_from_taskserver_state = -1 end integer function add_task_to_taskserver(zmq_to_qp_run_socket,task) @@ -1019,10 +1026,10 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) character*(8), external :: zmq_port integer :: rc -! rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4) -! if (rc /= 0) then -! stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' -! endif + rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,300000,4) + if (rc /= 0) then + print *, 'warning: Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' + endif rc = f77_zmq_close(zmq_to_qp_run_socket) if (rc /= 0) then