From d680c85b8c3632613d8f2c0f366d32c49fb8b9ee Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Tue, 28 Nov 2017 15:51:21 +0100 Subject: [PATCH] Parallelism OK --- ocaml/TaskServer.ml | 16 ++++++- ocaml/qp_run.ml | 11 ++--- src/Davidson/davidson_parallel.irp.f | 1 - src/Determinants/zmq.irp.f | 42 ++++--------------- src/ZMQ/utils.irp.f | 62 +++++++++++++++++++++------- 5 files changed, 75 insertions(+), 57 deletions(-) diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 773a3da4..1ec1bf7d 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -188,6 +188,9 @@ let change_pub_state msg program_state rep_socket pair_socket = program_state +let force_state = + Message.State.of_string "force" + let end_job msg program_state rep_socket pair_socket = let failure () = @@ -202,7 +205,7 @@ let end_job msg program_state rep_socket pair_socket = } and wait n = - Printf.sprintf "waiting %d" n + Printf.sprintf "waiting for %d slaves..." n |> Message.Error_msg.create |> Message.Error_msg.to_string |> ZMQ.Socket.send rep_socket ; @@ -213,7 +216,13 @@ let end_job msg program_state rep_socket pair_socket = | None -> failure () | Some state -> begin - if (msg.Message.Endjob_msg.state = state) then + if (state = force_state) then + begin + string_of_pub_state Waiting + |> ZMQ.Socket.send pair_socket ; + success () + end + else if (msg.Message.Endjob_msg.state = state) then begin string_of_pub_state Waiting |> ZMQ.Socket.send pair_socket ; @@ -223,7 +232,10 @@ let end_job msg program_state rep_socket pair_socket = wait (Queuing_system.number_of_clients program_state.queue) end else +( +Printf.eprintf "STATE:%s%!" (Message.State.to_string state); failure () +) end diff --git a/ocaml/qp_run.ml b/ocaml/qp_run.ml index 22286cb0..f426932b 100644 --- a/ocaml/qp_run.ml +++ b/ocaml/qp_run.ml @@ -70,11 +70,12 @@ let run slave exe ezfio_file = (** Check input *) - begin - match (Sys.command ("qp_edit -c "^ezfio_file)) with - | 0 -> () - | i -> failwith "Error: Input inconsistent\n" - end; + if (not slave) then + begin + match (Sys.command ("qp_edit -c "^ezfio_file)) with + | 0 -> () + | i -> failwith "Error: Input inconsistent\n" + end; let qp_run_address_filename = Filename.concat (Qpackage.ezfio_work ezfio_file) "qp_run_address" diff --git a/src/Davidson/davidson_parallel.irp.f b/src/Davidson/davidson_parallel.irp.f index 483577bf..e299aa90 100644 --- a/src/Davidson/davidson_parallel.irp.f +++ b/src/Davidson/davidson_parallel.irp.f @@ -36,7 +36,6 @@ subroutine davidson_run_slave(thread,iproc) zmq_socket_push = new_zmq_push_socket(thread) call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) if(worker_id == -1) then - print *, 'Exited' call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) return diff --git a/src/Determinants/zmq.irp.f b/src/Determinants/zmq.irp.f index cb8b515c..f098341e 100644 --- a/src/Determinants/zmq.irp.f +++ b/src/Determinants/zmq.irp.f @@ -175,41 +175,10 @@ subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id) call zmq_get_N_states(zmq_to_qp_run_socket, worker_id) call zmq_get_N_det(zmq_to_qp_run_socket, worker_id) call zmq_get_psi_det_size(zmq_to_qp_run_socket, worker_id) - IRP_IF MPI - include 'mpif.h' - integer :: ierr - - call MPI_BCAST (N_states, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here//': Unable to broadcast N_states' - stop -1 - endif - - call MPI_BCAST (N_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here//': Unable to broadcast N_det' - stop -1 - endif - - call MPI_BCAST (psi_det_size, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here//': Unable to broadcast psi_det_size' - stop -1 - endif - IRP_ENDIF - TOUCH psi_det_size N_det N_states - if (mpi_master) then - call zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) - call zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) - endif - - IRP_IF MPI - call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2) - call broadcast_chunks_double(psi_coef,N_states*N_det) - IRP_ENDIF - + call zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) + call zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) SOFT_TOUCH psi_det psi_coef end @@ -247,6 +216,9 @@ subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) print *, irp_here, ': Error getting psi_det', rc8, N_int*2_8*N_det*bit_kind stop 'error' endif + IRP_IF MPI + call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2) + IRP_ENDIF end @@ -283,6 +255,10 @@ subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) stop 'error' endif + IRP_IF MPI + call broadcast_chunks_double(psi_coef,N_states*N_det) + IRP_ENDIF + end diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 276590f1..a9b692ee 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -32,10 +32,14 @@ END_PROVIDER do i=len(buffer),1,-1 if ( buffer(i:i) == ':') then qp_run_address = trim(buffer(1:i-1)) - read(buffer(i+1:), *) zmq_port_start + read(buffer(i+1:), *, err=10,end=10) zmq_port_start exit endif enddo + return + 10 continue + print *, irp_here, ': Error in read' + stop -1 END_PROVIDER BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ] @@ -98,12 +102,16 @@ subroutine switch_qp_run_to_master do i=len(buffer),1,-1 if ( buffer(i:i) == ':') then qp_run_address = trim(buffer(1:i-1)) - read(buffer(i+1:), *) zmq_port_start + read(buffer(i+1:), *, end=10, err=10) zmq_port_start exit endif enddo call reset_zmq_addresses + return + 10 continue + print *, irp_here, ': Error in read' + stop -1 end @@ -604,17 +612,20 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) stop 'Wrong end of job' endif - do i=1,30 + do i=6,1,-1 rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0) if (trim(message(1:13)) == 'error waiting') then - print *, trim(message(6:rc)) call sleep(1) cycle else if (message(1:2) == 'ok') then exit endif end do + if (i==0) then + rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job force',13,0) + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0) + endif zmq_state = 'No_state' call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_pull_socket(zmq_socket_pull) @@ -659,18 +670,18 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) message = trim(message(1:rc)) if(message(1:5) == "error") then - print *, trim(message(1:rc)) worker_id = -1 return end if - read(message,*) reply, state, worker_id, address - if ( (trim(reply) /= 'connect_reply') .and. & - (trim(state) /= trim(zmq_state)) ) then - print *, 'Reply: ', trim(reply) - print *, 'State: ', trim(state), '/', trim(zmq_state) - print *, 'Address: ', trim(address) + read(message,*, end=10, err=10) reply, state, worker_id, address + if (trim(reply) /= 'connect_reply') then + print *, trim(message) stop -1 endif + return + 10 continue + print *, irp_here, ': Error in read' + stop end subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & @@ -703,10 +714,13 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) message = trim(message(1:rc)) - read(message,*) reply, state + read(message,*, end=10, err=10) reply, state if ((trim(reply) == 'disconnect_reply').and.(trim(state) == trim(zmq_state))) then return endif + if (trim(message) == 'error Wrong state') then + return + endif if (trim(message) == 'error No job is running') then return endif @@ -715,6 +729,10 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, & print *, trim(message) stop -1 + return + 10 continue + print *, irp_here, ': Error in read' + stop end subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task) @@ -916,9 +934,9 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) message = repeat(' ',512) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0) rc = min(1024,rc) - read(message(1:rc),*) reply + read(message(1:rc),*, end=10, err=10) reply if (trim(reply) == 'get_task_reply') then - read(message(1:rc),*) reply, task_id + read(message(1:rc),*, end=10, err=10) reply, task_id rc = 15 do while (message(rc:rc) == ' ') rc += 1 @@ -934,11 +952,18 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) else if (trim(message) == 'error No job is running') then task_id = 0 task = 'terminate' + else if (trim(message) == 'error Wrong state') then + task_id = 0 + task = 'terminate' else print *, 'Unable to get the next task' print *, trim(message) stop -1 endif + return + 10 continue + print *, irp_here, ': Error in read' + stop end @@ -973,7 +998,7 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task message = repeat(' ',1024) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0) rc = min(1024,rc) - read(message(1:rc),*) reply + read(message(1:rc),*, end=10, err=10) reply if (trim(message) == 'get_tasks_reply ok') then continue else if (trim(message) == 'terminate') then @@ -993,7 +1018,7 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task message = repeat(' ',512) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0) rc = min(1024,rc) - read(message(1:rc),*) task_id(i) + read(message(1:rc),*, end=10, err=10) task_id(i) if (task_id(i) == 0) then task(i) = 'terminate' n_tasks = i @@ -1009,6 +1034,11 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task rc += 1 task(i) = message(rc:) enddo + + return + 10 continue + print *, irp_here, ': Error in read' + stop end