Control in put/get

This commit is contained in:
Anthony Scemama 2017-11-29 13:52:52 +01:00
parent 140a8e30fd
commit 4a7a80679b
13 changed files with 143 additions and 69 deletions

View File

@ -316,18 +316,21 @@ end
module PutData_msg : sig
type t =
{ client_id : Id.Client.t ;
state : State.t ;
key : string; }
val create : client_id: int -> key: string -> t
val create : client_id: int -> state: string -> key: string -> t
val to_string : t -> string
end = struct
type t =
{ client_id : Id.Client.t ;
state : State.t ;
key : string; }
let create ~client_id ~key =
let create ~client_id ~state ~key =
{ client_id = Id.Client.of_int client_id ;
state = State.of_string state;
key ; }
let to_string x =
Printf.sprintf "put_data %d %s"
Printf.sprintf "put_data %s %d %s" (State.to_string x.state)
(Id.Client.to_int x.client_id) x.key
end
@ -349,17 +352,21 @@ end
module GetData_msg : sig
type t =
{ client_id : Id.Client.t ;
state : State.t ;
key : string; }
val create : client_id: int -> key: string -> t
val create : client_id: int -> state: string -> key: string -> t
val to_string : t -> string
end = struct
type t =
{ client_id : Id.Client.t ;
state : State.t ;
key : string }
let create ~client_id ~key =
{ client_id = Id.Client.of_int client_id ; key }
let create ~client_id ~state ~key =
{ client_id = Id.Client.of_int client_id ;
state = State.of_string state;
key }
let to_string x =
Printf.sprintf "get_data %d %s"
Printf.sprintf "get_data %s %d %s" (State.to_string x.state)
(Id.Client.to_int x.client_id) x.key
end
@ -510,10 +517,10 @@ let of_string s =
Newjob (Newjob_msg.create push_address_tcp push_address_inproc state)
| EndJob_ state ->
Endjob (Endjob_msg.create state)
| GetData_ { client_id ; key } ->
GetData (GetData_msg.create ~client_id ~key)
| PutData_ { client_id ; key } ->
PutData (PutData_msg.create ~client_id ~key)
| GetData_ { state ; client_id ; key } ->
GetData (GetData_msg.create ~client_id ~state ~key)
| PutData_ { state ; client_id ; key } ->
PutData (PutData_msg.create ~client_id ~state ~key)
| Terminate_ -> Terminate (Terminate_msg.create )
| Abort_ -> Abort (Abort_msg.create )
| SetWaiting_ -> SetWaiting

View File

@ -33,7 +33,7 @@ type state_clientid_ntasks = { state : string ; client_id : int ; n_t
type state_tcp_inproc = { state : string ; push_address_tcp : string ; push_address_inproc : string ; }
type psi = { client_id: int ; n_state: int ; n_det: int ; psi_det_size: int ;
n_det_generators: int option ; n_det_selectors: int option ; }
type client_id_key = { client_id: int ; key: string }
type state_client_id_key = { state: string ; client_id: int ; key: string }
type msg =
| AddTask_ of state_tasks
@ -47,8 +47,8 @@ type msg =
| EndJob_ of string
| Terminate_
| Abort_
| GetData_ of client_id_key
| PutData_ of client_id_key
| GetData_ of state_client_id_key
| PutData_ of state_client_id_key
| Ok_
| Error_ of string
| SetStopped_
@ -178,14 +178,16 @@ and kw = parse
Disconnect_ { state ; client_id }
| GET_DATA ->
let state = read_word lexbuf in
let client_id = read_int lexbuf in
let key = read_word lexbuf in
GetData_ { client_id ; key }
GetData_ { state ; client_id ; key }
| PUT_DATA ->
let state = read_word lexbuf in
let client_id = read_int lexbuf in
let key = read_word lexbuf in
PutData_ { client_id ; key }
PutData_ { state ; client_id ; key }
| CONNECT ->
let socket = read_word lexbuf in
@ -258,8 +260,8 @@ and kw = parse
| Connect_ socket -> Printf.sprintf "CONNECT socket:\"%s\"" socket
| NewJob_ { state ; push_address_tcp ; push_address_inproc } -> Printf.sprintf "NEW_JOB state:\"%s\" tcp:\"%s\" inproc:\"%s\"" state push_address_tcp push_address_inproc
| EndJob_ state -> Printf.sprintf "END_JOB state:\"%s\"" state
| GetData_ { client_id; key } -> Printf.sprintf "GET_DATA client_id:%d key:%s" client_id key
| PutData_ { client_id ; key } -> Printf.sprintf "PUT_DATA client_id:%d key:%s" client_id key
| GetData_ { state ; client_id; key } -> Printf.sprintf "GET_DATA state:%s client_id:%d key:%s" state client_id key
| PutData_ { state ; client_id ; key } -> Printf.sprintf "PUT_DATA state:%s client_id:%d key:%s" state client_id key
| Terminate_ -> "TERMINATE"
| Abort_ -> "ABORT"
| SetWaiting_ -> "SET_WAITING"

View File

@ -547,28 +547,44 @@ let task_done msg program_state rep_socket =
let put_data msg rest_of_msg program_state rep_socket =
debug (Message.PutData_msg.to_string msg);
let () =
let key, value =
msg.Message.PutData_msg.key,
match rest_of_msg with
| [ x ] -> x
| _ -> failwith "Badly formed put_data message"
in
let state, key, value =
msg.Message.PutData_msg.state,
msg.Message.PutData_msg.key,
match rest_of_msg with
| [ x ] -> x
| _ -> failwith "Badly formed put_data message"
in
let success () =
Hashtbl.set program_state.data ~key ~data:value ;
Message.PutDataReply (Message.PutDataReply_msg.create ())
|> Message.to_string
|> ZMQ.Socket.send rep_socket
|> ZMQ.Socket.send rep_socket;
program_state
and failure () =
reply_wrong_state rep_socket;
program_state
in
program_state
match program_state.state with
| None -> assert false
| Some state' ->
if (state = state') then
success ()
else
failure ()
let get_data msg program_state rep_socket =
debug (Message.GetData_msg.to_string msg);
let () =
let key =
let state, key =
msg.Message.GetData_msg.state,
msg.Message.GetData_msg.key
in
in
let success () =
let value =
match Hashtbl.find program_state.data key with
| Some value -> value
@ -576,10 +592,21 @@ let get_data msg program_state rep_socket =
in
Message.GetDataReply (Message.GetDataReply_msg.create ~value)
|> Message.to_string_list
|> ZMQ.Socket.send_all rep_socket
in
program_state
|> ZMQ.Socket.send_all rep_socket;
program_state
and failure () =
reply_wrong_state rep_socket;
program_state
in
match program_state.state with
| None -> assert false
| Some state' ->
if (state = state') then
success ()
else
failure ()
let terminate program_state rep_socket =

View File

@ -25,6 +25,7 @@ subroutine run_wf
double precision :: energy(N_states_diag)
character*(64) :: states(1)
integer :: rc, i
integer, external :: zmq_get_dvector
call provide_everything
@ -48,7 +49,7 @@ subroutine run_wf
print *, 'PT2'
call zmq_get_psi(zmq_to_qp_run_socket,1)
call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states)
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
PROVIDE psi_bilinear_matrix_columns_loc psi_det_alpha_unique psi_det_beta_unique
PROVIDE psi_bilinear_matrix_rows psi_det_sorted_order psi_bilinear_matrix_order

View File

@ -28,6 +28,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
double precision :: time
double precision :: w(N_states)
integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket
integer, external :: zmq_put_dvector
if (N_det < max(10,N_states)) then
pt2=0.d0
@ -68,7 +69,9 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error)
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator))
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
call create_selection_buffer(1, 1*2, b)
integer :: ipos

View File

@ -33,6 +33,8 @@ subroutine run_wf
integer :: rc, i, ierr
double precision :: t0, t1
integer, external :: zmq_get_dvector
call provide_everything
zmq_context = f77_zmq_ctx_new ()
@ -58,7 +60,7 @@ subroutine run_wf
call wall_time(t0)
call zmq_get_psi(zmq_to_qp_run_socket,1)
call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states)
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1)
call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1)
@ -80,7 +82,7 @@ subroutine run_wf
call wall_time(t0)
call zmq_get_psi(zmq_to_qp_run_socket,1)
call zmq_get_N_states_diag(zmq_to_qp_run_socket,1)
call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states_diag)
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states_diag) == -1) cycle
call wall_time(t1)
call write_double(6,(t1-t0),'Broadcast time')
@ -98,7 +100,7 @@ subroutine run_wf
print *, 'PT2'
call wall_time(t0)
call zmq_get_psi(zmq_to_qp_run_socket,1)
call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states)
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1)
call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1)

View File

@ -27,6 +27,7 @@ subroutine run_wf
double precision :: energy(N_states)
character*(64) :: states(4)
integer :: rc, i, ierr
integer, external :: zmq_get_dvector
call provide_everything
@ -52,7 +53,7 @@ subroutine run_wf
print *, 'Selection'
call zmq_get_psi(zmq_to_qp_run_socket,1)
call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states)
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
!$OMP PARALLEL PRIVATE(i)
i = omp_get_thread_num()
@ -67,7 +68,7 @@ subroutine run_wf
print *, 'PT2'
call zmq_get_psi(zmq_to_qp_run_socket,1)
call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states)
if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle
logical :: lstop
lstop = .False.

View File

@ -11,6 +11,7 @@ subroutine ZMQ_selection(N_in, pt2)
integer, external :: omp_get_thread_num
double precision, intent(out) :: pt2(N_states)
integer, parameter :: maxtasks=10000
integer, external :: zmq_put_dvector
PROVIDE fragment_count
@ -27,7 +28,9 @@ subroutine ZMQ_selection(N_in, pt2)
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1)
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1)
call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator))
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
call create_selection_buffer(N, N*2, b)
endif

View File

@ -11,7 +11,7 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, '$X'
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending $X'
@ -44,7 +44,7 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, '$X'
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting $X'

View File

@ -73,15 +73,19 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze,
integer :: N_det_selectors_read, N_det_generators_read
double precision, allocatable :: energy(:)
integer, external :: zmq_get_dvector
allocate(u_t(N_st,N_det))
allocate (energy(N_st))
if (mpi_master) then
call zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'u_t', u_t, size(u_t))
call zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'energy', energy, size(energy))
if (zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'u_t', u_t, size(u_t)) == -1) then
deallocate(u_t,energy)
return
endif
if (zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'energy', energy, size(energy)) == -1) then
deallocate(u_t,energy)
return
endif
IRP_IF MPI
@ -288,12 +292,18 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze)
integer*8 :: rc8
double precision :: energy(N_st)
integer, external :: zmq_put_dvector
energy = 0.d0
call zmq_put_N_states_diag(zmq_to_qp_run_socket, 1)
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy))
call zmq_put_dvector(zmq_to_qp_run_socket, 1, 'u_t', u_t, size(u_t))
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
if (zmq_put_dvector(zmq_to_qp_run_socket, 1, 'u_t', u_t, size(u_t)) == -1) then
stop 'Unable to put u_t on ZMQ server'
endif
deallocate(u_t)
@ -379,7 +389,7 @@ subroutine zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, 'N_states_diag'
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'N_states_diag'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending N_states_diag'
@ -412,7 +422,7 @@ subroutine zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, 'N_states_diag'
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'N_states_diag'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting N_states_diag'

View File

@ -36,7 +36,9 @@ subroutine $subroutine($params_main)
call zmq_put_psi(zmq_to_qp_run_socket,1)
call zmq_put_N_det_generators(zmq_to_qp_run_socket, worker_id)
call zmq_put_N_det_selectors(zmq_to_qp_run_socket, worker_id)
call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy))
if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then
stop 'Unable to put energy on ZMQ server'
endif
do i_generator=1,N_det_generators
$skip

View File

@ -31,7 +31,7 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, '$X'
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending $X'
@ -64,7 +64,7 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id)
integer :: rc
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, '$X'
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting $X'
@ -106,7 +106,7 @@ subroutine zmq_put_psi_det(zmq_to_qp_run_socket,worker_id)
integer*8 :: rc8
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, 'psi_det'
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_det'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending psi_det'
@ -139,7 +139,7 @@ subroutine zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id)
integer*8 :: rc8
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, 'psi_coef'
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_coef'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending psi_coef'
@ -206,7 +206,7 @@ subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, 'psi_det'
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting psi_det'
@ -244,7 +244,7 @@ subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, 'psi_coef'
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef'
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting psi_coef'

View File

@ -1,4 +1,4 @@
subroutine zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x)
integer function zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x)
use f77_zmq
implicit none
BEGIN_DOC
@ -12,31 +12,36 @@ subroutine zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x)
integer :: rc
character*(256) :: msg
! Failure
zmq_put_dvector = -1
write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, name
write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, name
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error sending '//name
stop 'error'
return
endif
rc = f77_zmq_send(zmq_to_qp_run_socket,x,size_x*8,0)
if (rc /= size_x*8) then
print *, irp_here, ': Error sending '//name
stop 'error'
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:rc) /= 'put_data_reply ok') then
print *, rc, trim(msg)
print *, irp_here, ': Error in put_data_reply'
stop 'error'
return
endif
! Success
zmq_put_dvector = 0
end
subroutine zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x)
integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x)
use f77_zmq
implicit none
BEGIN_DOC
@ -51,35 +56,46 @@ subroutine zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x)
integer*8 :: rc8
character*(256) :: msg
write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, name
! Success
zmq_get_dvector = 0
write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0)
if (rc /= len(trim(msg))) then
print *, irp_here, ': Error getting '//name
stop 'error'
zmq_get_dvector = -1
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0)
if (msg(1:14) /= 'get_data_reply') then
print *, rc, trim(msg)
print *, irp_here, ': Error in get_data_reply'
stop 'error'
zmq_get_dvector = -1
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0)
if (rc /= size_x*8) then
print *, irp_here, ': Error getting '//name
stop 'error'
zmq_get_dvector = -1
endif
IRP_IF MPI
integer :: ierr
include 'mpif.h'
call MPI_BCAST (zmq_get_dvector, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast zmq_get_dvector'
stop -1
endif
call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast dvector'
stop -1
endif
IRP_ENDIF
end