Parallelism OK with 50 nodes

This commit is contained in:
Anthony Scemama 2017-11-29 19:10:27 +01:00
parent db0e74bf37
commit 2e5752f8f5
9 changed files with 183 additions and 166 deletions

View File

@ -9,7 +9,7 @@
FC : mpiifort
LAPACK_LIB : -mkl=parallel
IRPF90 : irpf90
IRPF90_FLAGS : --ninja --align=32 -DMPI -DZMQ_PUSH
IRPF90_FLAGS : --ninja --align=32 -DMPI
# Global options
################

View File

@ -21,13 +21,14 @@ let string_of_pub_state = function
type t =
{
queue : Queuing_system.t ;
state : Message.State.t option ;
address_tcp : Address.Tcp.t option ;
address_inproc : Address.Inproc.t option ;
progress_bar : Progress_bar.t option ;
running : bool;
data : (string, string) Hashtbl.t;
queue : Queuing_system.t ;
state : Message.State.t option ;
address_tcp : Address.Tcp.t option ;
address_inproc : Address.Inproc.t option ;
progress_bar : Progress_bar.t option ;
running : bool;
accepting_clients : bool;
data : (string, string) Hashtbl.t;
}
@ -159,6 +160,7 @@ let new_job msg program_state rep_socket pair_socket =
progress_bar = Some progress_bar ;
address_tcp = Some msg.Message.Newjob_msg.address_tcp;
address_inproc = Some msg.Message.Newjob_msg.address_inproc;
accepting_clients = true;
}
in
reply_ok rep_socket;
@ -198,9 +200,15 @@ let end_job msg program_state rep_socket pair_socket =
and success () =
reply_ok rep_socket;
{ program_state with
state = None ;
progress_bar = Progress_bar.clear ();
{
queue = Queuing_system.create ();
state = None ;
progress_bar = Progress_bar.clear ();
address_tcp = None;
address_inproc = None;
running = true;
accepting_clients = false;
data = Hashtbl.create ~hashable:String.hashable ();
}
and wait n =
@ -215,7 +223,7 @@ let end_job msg program_state rep_socket pair_socket =
| None -> failure ()
| Some state ->
begin
if (state = force_state) then
if (msg.Message.Endjob_msg.state = force_state) then
begin
string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ;
@ -231,48 +239,50 @@ let end_job msg program_state rep_socket pair_socket =
wait (Queuing_system.number_of_clients program_state.queue)
end
else
(
Printf.eprintf "STATE:%s%!" (Message.State.to_string state);
failure ()
)
end
let connect msg program_state rep_socket =
let state =
match program_state.state with
| Some state -> state
| None -> assert false
let failure () =
reply_wrong_state rep_socket;
program_state
in
let push_address =
match msg with
| Message.Connect_msg.Tcp ->
begin
match program_state.address_tcp with
| Some address -> Address.Tcp address
| None -> failwith "Error: No TCP address"
end
| Message.Connect_msg.Inproc ->
begin
match program_state.address_inproc with
| Some address -> Address.Inproc address
| None -> failwith "Error: No inproc address"
end
| Message.Connect_msg.Ipc -> assert false
in
let new_queue, client_id =
Queuing_system.add_client program_state.queue
in
Message.ConnectReply (Message.ConnectReply_msg.create
~state:state ~client_id ~push_address)
|> Message.to_string
|> ZMQ.Socket.send rep_socket ;
{ program_state with
queue = new_queue
}
if (not program_state.accepting_clients) then
failure ()
else
match program_state.state with
| None -> failure ()
| Some state ->
let push_address =
match msg with
| Message.Connect_msg.Tcp ->
begin
match program_state.address_tcp with
| Some address -> Address.Tcp address
| None -> failwith "Error: No TCP address"
end
| Message.Connect_msg.Inproc ->
begin
match program_state.address_inproc with
| Some address -> Address.Inproc address
| None -> failwith "Error: No inproc address"
end
| Message.Connect_msg.Ipc -> assert false
in
let new_queue, client_id =
Queuing_system.add_client program_state.queue
in
Message.ConnectReply (Message.ConnectReply_msg.create
~state:state ~client_id ~push_address)
|> Message.to_string
|> ZMQ.Socket.send rep_socket ;
{ program_state with
queue = new_queue
}
let disconnect msg program_state rep_socket =
@ -323,14 +333,21 @@ let del_task msg program_state rep_socket =
and success () =
let queue =
List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue)
~init:program_state.queue task_ids
in
let accepting_clients =
(Queuing_system.number_of_queued queue > Queuing_system.number_of_clients queue)
in
let new_program_state =
{ program_state with
queue = List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue)
~init:program_state.queue task_ids
accepting_clients ;
queue ;
}
in
let more =
(Queuing_system.number_of_tasks new_program_state.queue > 0)
(Queuing_system.number_of_tasks queue > 0)
in
Message.DelTaskReply (Message.DelTaskReply_msg.create ~task_ids ~more)
|> Message.to_string
@ -393,12 +410,17 @@ let get_task msg program_state rep_socket pair_socket =
and success () =
let new_queue, task_id, task =
let queue, task_id, task =
Queuing_system.pop_task ~client_id program_state.queue
in
let accepting_clients =
(Queuing_system.number_of_queued queue >
Queuing_system.number_of_clients queue)
in
let no_task =
Queuing_system.number_of_queued new_queue = 0
Queuing_system.number_of_queued queue = 0
in
if no_task then
@ -410,7 +432,8 @@ let get_task msg program_state rep_socket pair_socket =
let new_program_state =
{ program_state with
queue = new_queue
queue ;
accepting_clients;
}
in
@ -467,6 +490,11 @@ let get_tasks msg program_state rep_socket pair_socket =
Queuing_system.number_of_queued new_queue = 0
in
let accepting_clients =
(Queuing_system.number_of_queued new_queue >
Queuing_system.number_of_clients new_queue)
in
if no_task then
string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket
@ -476,7 +504,8 @@ let get_tasks msg program_state rep_socket pair_socket =
let new_program_state =
{ program_state with
queue = new_queue
queue = new_queue;
accepting_clients;
}
in
@ -522,10 +551,17 @@ let task_done msg program_state rep_socket =
increment_progress_bar bar)
~init:(program_state.queue, program_state.progress_bar) task_ids
in
let accepting_clients =
(Queuing_system.number_of_queued new_queue >
Queuing_system.number_of_clients new_queue)
in
let result =
{ program_state with
queue = new_queue;
progress_bar = new_bar
progress_bar = new_bar;
accepting_clients
}
in
reply_ok rep_socket;
@ -654,7 +690,8 @@ let abort program_state rep_socket =
reply_ok rep_socket;
{ program_state with
queue
queue ;
accepting_clients = false;
}
@ -738,6 +775,7 @@ let run ~port =
address_tcp = None;
address_inproc = None;
progress_bar = None ;
accepting_clients = false;
data = Hashtbl.create ~hashable:String.hashable ();
}
in

View File

@ -1,12 +1,13 @@
type t =
{
queue : Queuing_system.t ;
state : Message.State.t option ;
address_tcp : Address.Tcp.t option ;
address_inproc : Address.Inproc.t option ;
progress_bar : Progress_bar.t option ;
running : bool;
data : (string, string) Core.Hashtbl.t ;
queue : Queuing_system.t ;
state : Message.State.t option ;
address_tcp : Address.Tcp.t option ;
address_inproc : Address.Inproc.t option ;
progress_bar : Progress_bar.t option ;
running : bool;
accepting_clients : bool;
data : (string, string) Core.Hashtbl.t ;
}

View File

@ -130,8 +130,8 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
call pt2_slave_inproc(i)
endif
!$OMP END PARALLEL
call delete_selection_buffer(b)
call end_parallel_job(zmq_to_qp_run_socket, zmq_socket_pull, 'pt2')
call delete_selection_buffer(b)
print *, '========== ================= ================= ================='

View File

@ -66,11 +66,11 @@ subroutine ZMQ_selection(N_in, pt2)
stop 'Unable to add task to task server'
endif
endif
call zmq_set_running(zmq_to_qp_run_socket)
ASSERT (associated(b%det))
ASSERT (associated(b%val))
call zmq_set_running(zmq_to_qp_run_socket)
!$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1)
i = omp_get_thread_num()
if (i==0) then

View File

@ -50,17 +50,26 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id)
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
if (rc /= len(trim(msg))) then
zmq_get_$X = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
if (msg(1:14) /= 'get_data_reply') then
zmq_get_$X = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) go to 10
if (rc /= 4) then
zmq_get_$X = -1
go to 10
endif
endif
! Normal exit
10 continue
IRP_IF MPI
include 'mpif.h'
@ -71,27 +80,13 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id)
print *, irp_here//': Unable to broadcast N_det_generators'
stop -1
endif
if (zmq_get_$X == 0) then
call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_det_generators'
stop -1
endif
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_$X = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_det_generators'
stop -1
endif
IRP_ENDIF
end
SUBST [ X ]

View File

@ -87,21 +87,30 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id)
integer :: rc
character*(256) :: msg
zmq_get_$X = 0
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
if (rc /= len(trim(msg))) then
zmq_get_$X = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
if (msg(1:14) /= 'get_data_reply') then
zmq_get_$X = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,$X,4,0)
if (rc /= 4) go to 10
if (rc /= 4) then
zmq_get_$X = -1
go to 10
endif
endif
! Normal exit
zmq_get_$X = 0
10 continue
IRP_IF MPI
include 'mpif.h'
integer :: ierr
@ -110,25 +119,12 @@ integer function zmq_get_$X(zmq_to_qp_run_socket, worker_id)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
if (zmq_get_$X == 0) then
call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_$X = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_$X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
call MPI_BCAST ($X, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
IRP_ENDIF
end
SUBST [ X ]
@ -276,20 +272,29 @@ integer function zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
integer*8 :: rc8
character*(256) :: msg
zmq_get_psi_det = 0
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
if (rc /= len(trim(msg))) then
zmq_get_psi_det = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
if (msg(1:14) /= 'get_data_reply') then
zmq_get_psi_det = -1
go to 10
endif
rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_det,int(N_int*2_8*N_det*bit_kind,8),0)
if (rc8 /= N_int*2_8*N_det*bit_kind) go to 10
if (rc8 /= N_int*2_8*N_det*bit_kind) then
zmq_get_psi_det = -1
go to 10
endif
endif
! Normal exit
zmq_get_psi_det = 0
10 continue
IRP_IF MPI
include 'mpif.h'
integer :: ierr
@ -297,22 +302,9 @@ integer function zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
if (zmq_get_psi_det == 0) then
call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2)
endif
call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2)
IRP_ENDIF
return
! Exception
10 continue
zmq_get_psi_det = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_psi_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_det'
endif
IRP_ENDIF
end
integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
@ -327,20 +319,29 @@ integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
integer*8 :: rc8
character*(256) :: msg
zmq_get_psi_coef = 0
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
if (rc /= len(trim(msg))) then
zmq_get_psi_coef = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
if (msg(1:14) /= 'get_data_reply') then
zmq_get_psi_coef = -1
go to 10
endif
rc8 = f77_zmq_recv8(zmq_to_qp_run_socket,psi_coef,int(psi_det_size*N_states*8_8,8),0)
if (rc8 /= psi_det_size*N_states*8_8) go to 10
if (rc8 /= psi_det_size*N_states*8_8) then
zmq_get_psi_coef = -1
go to 10
endif
endif
! Normal exit
zmq_get_psi_coef = 0
10 continue
IRP_IF MPI
include 'mpif.h'
@ -349,22 +350,9 @@ integer function zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_coef'
endif
if (zmq_get_psi_coef == 0) then
call broadcast_chunks_double(psi_coef,N_states*N_det)
endif
call broadcast_chunks_double(psi_coef,N_states*N_det)
IRP_ENDIF
return
! Exception
10 continue
zmq_get_psi_coef = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_psi_coef, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
stop 'Unable to broadcast zmq_get_psi_coef'
endif
IRP_ENDIF
end

View File

@ -57,16 +57,25 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_
if (mpi_master) then
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) go to 10
if (rc /= len(trim(msg))) then
zmq_get_dvector = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') go to 10
if (msg(1:14) /= 'get_data_reply') then
zmq_get_dvector = -1
go to 10
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0)
if (rc /= size_x*8) go to 10
if (rc /= size_x*8) then
zmq_get_dvector = -1
go to 10
endif
endif
! Normal exit
10 continue
IRP_IF MPI
integer :: ierr
@ -76,28 +85,13 @@ integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_
print *, irp_here//': Unable to broadcast zmq_get_dvector'
stop -1
endif
if (zmq_get_dvector == 0) then
call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast dvector'
stop -1
endif
endif
IRP_ENDIF
return
! Exception
10 continue
zmq_get_dvector = -1
IRP_IF MPI
call MPI_BCAST (zmq_get_dvector, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast zmq_get_dvector'
print *, irp_here//': Unable to broadcast dvector'
stop -1
endif
IRP_ENDIF
end

View File

@ -620,6 +620,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0)
if (trim(message(1:13)) == 'error waiting') then
print *, trim(message(1:rc))
call sleep(1)
cycle
else if (message(1:2) == 'ok') then
@ -630,11 +631,11 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job force',13,0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0)
endif
zmq_state = 'No_state'
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_pull_socket(zmq_socket_pull)
call omp_set_lock(zmq_lock)
zmq_state = 'No_state'
rc = f77_zmq_ctx_term(zmq_context)
zmq_context = 0_ZMQ_PTR
call omp_unset_lock(zmq_lock)