From 2e5752f8f5e4302e2af6861c42cf90f8c78a026a Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Wed, 29 Nov 2017 19:10:27 +0100 Subject: [PATCH] Parallelism OK with 50 nodes --- config/ifort_mpi.cfg | 2 +- ocaml/TaskServer.ml | 148 ++++++++++++------- ocaml/TaskServer.mli | 15 +- plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f | 2 +- plugins/Full_CI_ZMQ/zmq_selection.irp.f | 2 +- plugins/Selectors_Utils/zmq.irp.f | 35 ++--- src/Determinants/zmq.irp.f | 104 ++++++------- src/ZMQ/put_get.irp.f | 38 ++--- src/ZMQ/utils.irp.f | 3 +- 9 files changed, 183 insertions(+), 166 deletions(-) diff --git a/config/ifort_mpi.cfg b/config/ifort_mpi.cfg index 41313f9d..735ffb68 100644 --- a/config/ifort_mpi.cfg +++ b/config/ifort_mpi.cfg @@ -9,7 +9,7 @@ FC : mpiifort LAPACK_LIB : -mkl=parallel IRPF90 : irpf90 -IRPF90_FLAGS : --ninja --align=32 -DMPI -DZMQ_PUSH +IRPF90_FLAGS : --ninja --align=32 -DMPI # Global options ################ diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 2ea895cb..31d6ab3b 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -21,13 +21,14 @@ let string_of_pub_state = function type t = { - queue : Queuing_system.t ; - state : Message.State.t option ; - address_tcp : Address.Tcp.t option ; - address_inproc : Address.Inproc.t option ; - progress_bar : Progress_bar.t option ; - running : bool; - data : (string, string) Hashtbl.t; + queue : Queuing_system.t ; + state : Message.State.t option ; + address_tcp : Address.Tcp.t option ; + address_inproc : Address.Inproc.t option ; + progress_bar : Progress_bar.t option ; + running : bool; + accepting_clients : bool; + data : (string, string) Hashtbl.t; } @@ -159,6 +160,7 @@ let new_job msg program_state rep_socket pair_socket = progress_bar = Some progress_bar ; address_tcp = Some msg.Message.Newjob_msg.address_tcp; address_inproc = Some msg.Message.Newjob_msg.address_inproc; + accepting_clients = true; } in reply_ok rep_socket; @@ -198,9 +200,15 @@ let end_job msg program_state rep_socket pair_socket = and success () = reply_ok rep_socket; - { program_state with - state = None ; - progress_bar = Progress_bar.clear (); + { + queue = Queuing_system.create (); + state = None ; + progress_bar = Progress_bar.clear (); + address_tcp = None; + address_inproc = None; + running = true; + accepting_clients = false; + data = Hashtbl.create ~hashable:String.hashable (); } and wait n = @@ -215,7 +223,7 @@ let end_job msg program_state rep_socket pair_socket = | None -> failure () | Some state -> begin - if (state = force_state) then + if (msg.Message.Endjob_msg.state = force_state) then begin string_of_pub_state Waiting |> ZMQ.Socket.send pair_socket ; @@ -231,48 +239,50 @@ let end_job msg program_state rep_socket pair_socket = wait (Queuing_system.number_of_clients program_state.queue) end else -( -Printf.eprintf "STATE:%s%!" (Message.State.to_string state); failure () -) end let connect msg program_state rep_socket = - let state = - match program_state.state with - | Some state -> state - | None -> assert false + let failure () = + reply_wrong_state rep_socket; + program_state in - - let push_address = - match msg with - | Message.Connect_msg.Tcp -> - begin - match program_state.address_tcp with - | Some address -> Address.Tcp address - | None -> failwith "Error: No TCP address" - end - | Message.Connect_msg.Inproc -> - begin - match program_state.address_inproc with - | Some address -> Address.Inproc address - | None -> failwith "Error: No inproc address" - end - | Message.Connect_msg.Ipc -> assert false - in - - let new_queue, client_id = - Queuing_system.add_client program_state.queue - in - Message.ConnectReply (Message.ConnectReply_msg.create - ~state:state ~client_id ~push_address) - |> Message.to_string - |> ZMQ.Socket.send rep_socket ; - { program_state with - queue = new_queue - } + + if (not program_state.accepting_clients) then + failure () + else + match program_state.state with + | None -> failure () + | Some state -> + let push_address = + match msg with + | Message.Connect_msg.Tcp -> + begin + match program_state.address_tcp with + | Some address -> Address.Tcp address + | None -> failwith "Error: No TCP address" + end + | Message.Connect_msg.Inproc -> + begin + match program_state.address_inproc with + | Some address -> Address.Inproc address + | None -> failwith "Error: No inproc address" + end + | Message.Connect_msg.Ipc -> assert false + in + + let new_queue, client_id = + Queuing_system.add_client program_state.queue + in + Message.ConnectReply (Message.ConnectReply_msg.create + ~state:state ~client_id ~push_address) + |> Message.to_string + |> ZMQ.Socket.send rep_socket ; + { program_state with + queue = new_queue + } let disconnect msg program_state rep_socket = @@ -323,14 +333,21 @@ let del_task msg program_state rep_socket = and success () = + let queue = + List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue) + ~init:program_state.queue task_ids + in + let accepting_clients = + (Queuing_system.number_of_queued queue > Queuing_system.number_of_clients queue) + in let new_program_state = { program_state with - queue = List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue) - ~init:program_state.queue task_ids + accepting_clients ; + queue ; } in let more = - (Queuing_system.number_of_tasks new_program_state.queue > 0) + (Queuing_system.number_of_tasks queue > 0) in Message.DelTaskReply (Message.DelTaskReply_msg.create ~task_ids ~more) |> Message.to_string @@ -393,12 +410,17 @@ let get_task msg program_state rep_socket pair_socket = and success () = - let new_queue, task_id, task = + let queue, task_id, task = Queuing_system.pop_task ~client_id program_state.queue in + let accepting_clients = + (Queuing_system.number_of_queued queue > + Queuing_system.number_of_clients queue) + in + let no_task = - Queuing_system.number_of_queued new_queue = 0 + Queuing_system.number_of_queued queue = 0 in if no_task then @@ -410,7 +432,8 @@ let get_task msg program_state rep_socket pair_socket = let new_program_state = { program_state with - queue = new_queue + queue ; + accepting_clients; } in @@ -467,6 +490,11 @@ let get_tasks msg program_state rep_socket pair_socket = Queuing_system.number_of_queued new_queue = 0 in + let accepting_clients = + (Queuing_system.number_of_queued new_queue > + Queuing_system.number_of_clients new_queue) + in + if no_task then string_of_pub_state Waiting |> ZMQ.Socket.send pair_socket @@ -476,7 +504,8 @@ let get_tasks msg program_state rep_socket pair_socket = let new_program_state = { program_state with - queue = new_queue + queue = new_queue; + accepting_clients; } in @@ -522,10 +551,17 @@ let task_done msg program_state rep_socket = increment_progress_bar bar) ~init:(program_state.queue, program_state.progress_bar) task_ids in + + let accepting_clients = + (Queuing_system.number_of_queued new_queue > + Queuing_system.number_of_clients new_queue) + in + let result = { program_state with queue = new_queue; - progress_bar = new_bar + progress_bar = new_bar; + accepting_clients } in reply_ok rep_socket; @@ -654,7 +690,8 @@ let abort program_state rep_socket = reply_ok rep_socket; { program_state with - queue + queue ; + accepting_clients = false; } @@ -738,6 +775,7 @@ let run ~port = address_tcp = None; address_inproc = None; progress_bar = None ; + accepting_clients = false; data = Hashtbl.create ~hashable:String.hashable (); } in diff --git a/ocaml/TaskServer.mli b/ocaml/TaskServer.mli index 0fe5c2dc..c492756d 100644 --- a/ocaml/TaskServer.mli +++ b/ocaml/TaskServer.mli @@ -1,12 +1,13 @@ type t = { - queue : Queuing_system.t ; - state : Message.State.t option ; - address_tcp : Address.Tcp.t option ; - address_inproc : Address.Inproc.t option ; - progress_bar : Progress_bar.t option ; - running : bool; - data : (string, string) Core.Hashtbl.t ; + queue : Queuing_system.t ; + state : Message.State.t option ; + address_tcp : Address.Tcp.t option ; + address_inproc : Address.Inproc.t option ; + progress_bar : Progress_bar.t option ; + running : bool; + accepting_clients : bool; + data : (string, string) Core.Hashtbl.t ; } diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index 1039d0f4..1269e45d 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -130,8 +130,8 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) call pt2_slave_inproc(i) endif !$OMP END PARALLEL - call delete_selection_buffer(b) call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2') + call delete_selection_buffer(b) print *, '========== ================= ================= =================' diff --git a/plugins/Full_CI_ZMQ/zmq_selection.irp.f b/plugins/Full_CI_ZMQ/zmq_selection.irp.f index a7b901dd..2099c7d6 100644 --- a/plugins/Full_CI_ZMQ/zmq_selection.irp.f +++ b/plugins/Full_CI_ZMQ/zmq_selection.irp.f @@ -66,11 +66,11 @@ subroutine ZMQ_selection(N_in, pt2) stop 'Unable to add task to task server' endif endif - call zmq_set_running(zmq_to_qp_run_socket) ASSERT (associated(b%det)) ASSERT (associated(b%val)) + call zmq_set_running(zmq_to_qp_run_socket) !$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1) i = omp_get_thread_num() if (i==0) then diff --git a/plugins/Selectors_Utils/zmq.irp.f b/plugins/Selectors_Utils/zmq.irp.f index 56d35ffb..5f40cd4f 100644 --- a/plugins/Selectors_Utils/zmq.irp.f +++ b/plugins/Selectors_Utils/zmq.irp.f @@ -50,17 +50,26 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id) write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) go to 10 + if (rc /= len(trim(msg))) then + zmq_get_$X = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') go to 10 + if (msg(1:14) /= 'get_data_reply') then + zmq_get_$X = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0) - if (rc /= 4) go to 10 + if (rc /= 4) then + zmq_get_$X = -1 + go to 10 + endif endif - ! Normal exit + 10 continue IRP_IF MPI include 'mpif.h' @@ -71,27 +80,13 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id) print *, irp_here//': Unable to broadcast N_det_generators' stop -1 endif - if (zmq_get_$X == 0) then - call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here//': Unable to broadcast N_det_generators' - stop -1 - endif - endif - IRP_ENDIF - - return - - ! Exception - 10 continue - zmq_get_$X = -1 - IRP_IF MPI - call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) if (ierr /= MPI_SUCCESS) then print *, irp_here//': Unable to broadcast N_det_generators' stop -1 endif IRP_ENDIF + end SUBST [ X ] diff --git a/src/Determinants/zmq.irp.f b/src/Determinants/zmq.irp.f index 57bbd3ca..b0ef55d6 100644 --- a/src/Determinants/zmq.irp.f +++ b/src/Determinants/zmq.irp.f @@ -87,21 +87,30 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id) integer :: rc character*(256) :: msg + zmq_get_$X = 0 if (mpi_master) then write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) go to 10 + if (rc /= len(trim(msg))) then + zmq_get_$X = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') go to 10 + if (msg(1:14) /= 'get_data_reply') then + zmq_get_$X = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0) - if (rc /= 4) go to 10 + if (rc /= 4) then + zmq_get_$X = -1 + go to 10 + endif endif - ! Normal exit - zmq_get_$X = 0 + 10 continue IRP_IF MPI include 'mpif.h' integer :: ierr @@ -110,25 +119,12 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id) if (ierr /= MPI_SUCCESS) then stop 'Unable to broadcast zmq_get_psi_det' endif - if (zmq_get_$X == 0) then - call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - stop 'Unable to broadcast zmq_get_psi_det' - endif - endif - IRP_ENDIF - - return - - ! Exception - 10 continue - zmq_get_$X = -1 - IRP_IF MPI - call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) if (ierr /= MPI_SUCCESS) then stop 'Unable to broadcast zmq_get_psi_det' endif IRP_ENDIF + end SUBST [ X ] @@ -276,20 +272,29 @@ integer function zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) integer*8 :: rc8 character*(256) :: msg + zmq_get_psi_det = 0 if (mpi_master) then write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) go to 10 + if (rc /= len(trim(msg))) then + zmq_get_psi_det = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') go to 10 + if (msg(1:14) /= 'get_data_reply') then + zmq_get_psi_det = -1 + go to 10 + endif rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0) - if (rc8 /= N_int*2_8*N_det*bit_kind) go to 10 + if (rc8 /= N_int*2_8*N_det*bit_kind) then + zmq_get_psi_det = -1 + go to 10 + endif endif - ! Normal exit - zmq_get_psi_det = 0 + 10 continue IRP_IF MPI include 'mpif.h' integer :: ierr @@ -297,22 +302,9 @@ integer function zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) if (ierr /= MPI_SUCCESS) then stop 'Unable to broadcast zmq_get_psi_det' endif - if (zmq_get_psi_det == 0) then - call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2) - endif + call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2) IRP_ENDIF - return - - ! Exception - 10 continue - zmq_get_psi_det = -1 - IRP_IF MPI - call MPI_BCAST (zmq_get_psi_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - stop 'Unable to broadcast zmq_get_psi_det' - endif - IRP_ENDIF end integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) @@ -327,20 +319,29 @@ integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) integer*8 :: rc8 character*(256) :: msg + zmq_get_psi_coef = 0 if (mpi_master) then write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) go to 10 + if (rc /= len(trim(msg))) then + zmq_get_psi_coef = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') go to 10 + if (msg(1:14) /= 'get_data_reply') then + zmq_get_psi_coef = -1 + go to 10 + endif rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0) - if (rc8 /= psi_det_size*N_states*8_8) go to 10 + if (rc8 /= psi_det_size*N_states*8_8) then + zmq_get_psi_coef = -1 + go to 10 + endif endif - ! Normal exit - zmq_get_psi_coef = 0 + 10 continue IRP_IF MPI include 'mpif.h' @@ -349,22 +350,9 @@ integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) if (ierr /= MPI_SUCCESS) then stop 'Unable to broadcast zmq_get_psi_coef' endif - if (zmq_get_psi_coef == 0) then - call broadcast_chunks_double(psi_coef,N_states*N_det) - endif + call broadcast_chunks_double(psi_coef,N_states*N_det) IRP_ENDIF - return - - ! Exception - 10 continue - zmq_get_psi_coef = -1 - IRP_IF MPI - call MPI_BCAST (zmq_get_psi_coef, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - stop 'Unable to broadcast zmq_get_psi_coef' - endif - IRP_ENDIF end diff --git a/src/ZMQ/put_get.irp.f b/src/ZMQ/put_get.irp.f index 2a7bbe74..4086b8ed 100644 --- a/src/ZMQ/put_get.irp.f +++ b/src/ZMQ/put_get.irp.f @@ -57,16 +57,25 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_ if (mpi_master) then write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) - if (rc /= len(trim(msg))) go to 10 + if (rc /= len(trim(msg))) then + zmq_get_dvector = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) - if (msg(1:14) /= 'get_data_reply') go to 10 + if (msg(1:14) /= 'get_data_reply') then + zmq_get_dvector = -1 + go to 10 + endif rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0) - if (rc /= size_x*8) go to 10 + if (rc /= size_x*8) then + zmq_get_dvector = -1 + go to 10 + endif endif - ! Normal exit + 10 continue IRP_IF MPI integer :: ierr @@ -76,28 +85,13 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_ print *, irp_here//': Unable to broadcast zmq_get_dvector' stop -1 endif - if (zmq_get_dvector == 0) then - call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr) - if (ierr /= MPI_SUCCESS) then - print *, irp_here//': Unable to broadcast dvector' - stop -1 - endif - endif - IRP_ENDIF - - return - - ! Exception - 10 continue - zmq_get_dvector = -1 - IRP_IF MPI - call MPI_BCAST (zmq_get_dvector, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr) if (ierr /= MPI_SUCCESS) then - print *, irp_here//': Unable to broadcast zmq_get_dvector' + print *, irp_here//': Unable to broadcast dvector' stop -1 endif IRP_ENDIF - + end diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 47b2d965..bf3abd8b 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -620,6 +620,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0) if (trim(message(1:13)) == 'error waiting') then + print *, trim(message(1:rc)) call sleep(1) cycle else if (message(1:2) == 'ok') then @@ -630,11 +631,11 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job force',13,0) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0) endif - zmq_state = 'No_state' call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_pull_socket(zmq_socket_pull) call omp_set_lock(zmq_lock) + zmq_state = 'No_state' rc = f77_zmq_ctx_term(zmq_context) zmq_context = 0_ZMQ_PTR call omp_unset_lock(zmq_lock)