From 4a7a80679ba86f3ba8a9e7a6e4c447bc738d1f3b Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Wed, 29 Nov 2017 13:52:52 +0100 Subject: [PATCH] Control in put/get --- ocaml/Message.ml | 29 +++++---- ocaml/Message_lexer.mll | 16 ++--- ocaml/TaskServer.ml | 59 ++++++++++++++----- plugins/Full_CI_ZMQ/pt2_slave.irp.f | 3 +- plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f | 5 +- .../selection_davidson_slave.irp.f | 8 ++- plugins/Full_CI_ZMQ/selection_slave.irp.f | 5 +- plugins/Full_CI_ZMQ/zmq_selection.irp.f | 5 +- plugins/Selectors_Utils/zmq.irp.f | 4 +- src/Davidson/davidson_parallel.irp.f | 26 +++++--- src/Determinants/H_apply_zmq.template.f | 4 +- src/Determinants/zmq.irp.f | 12 ++-- src/ZMQ/put_get.irp.f | 36 +++++++---- 13 files changed, 143 insertions(+), 69 deletions(-) diff --git a/ocaml/Message.ml b/ocaml/Message.ml index 1ee1d910..2245a390 100644 --- a/ocaml/Message.ml +++ b/ocaml/Message.ml @@ -316,18 +316,21 @@ end module PutData_msg : sig type t = { client_id : Id.Client.t ; + state : State.t ; key : string; } - val create : client_id: int -> key: string -> t + val create : client_id: int -> state: string -> key: string -> t val to_string : t -> string end = struct type t = { client_id : Id.Client.t ; + state : State.t ; key : string; } - let create ~client_id ~key = + let create ~client_id ~state ~key = { client_id = Id.Client.of_int client_id ; + state = State.of_string state; key ; } let to_string x = - Printf.sprintf "put_data %d %s" + Printf.sprintf "put_data %s %d %s" (State.to_string x.state) (Id.Client.to_int x.client_id) x.key end @@ -349,17 +352,21 @@ end module GetData_msg : sig type t = { client_id : Id.Client.t ; + state : State.t ; key : string; } - val create : client_id: int -> key: string -> t + val create : client_id: int -> state: string -> key: string -> t val to_string : t -> string end = struct type t = { client_id : Id.Client.t ; + state : State.t ; key : string } - let create ~client_id ~key = - { client_id = Id.Client.of_int client_id ; key } + let create ~client_id ~state ~key = + { client_id = Id.Client.of_int client_id ; + state = State.of_string state; + key } let to_string x = - Printf.sprintf "get_data %d %s" + Printf.sprintf "get_data %s %d %s" (State.to_string x.state) (Id.Client.to_int x.client_id) x.key end @@ -510,10 +517,10 @@ let of_string s = Newjob (Newjob_msg.create push_address_tcp push_address_inproc state) | EndJob_ state -> Endjob (Endjob_msg.create state) - | GetData_ { client_id ; key } -> - GetData (GetData_msg.create ~client_id ~key) - | PutData_ { client_id ; key } -> - PutData (PutData_msg.create ~client_id ~key) + | GetData_ { state ; client_id ; key } -> + GetData (GetData_msg.create ~client_id ~state ~key) + | PutData_ { state ; client_id ; key } -> + PutData (PutData_msg.create ~client_id ~state ~key) | Terminate_ -> Terminate (Terminate_msg.create ) | Abort_ -> Abort (Abort_msg.create ) | SetWaiting_ -> SetWaiting diff --git a/ocaml/Message_lexer.mll b/ocaml/Message_lexer.mll index ef245270..44bbe122 100644 --- a/ocaml/Message_lexer.mll +++ b/ocaml/Message_lexer.mll @@ -33,7 +33,7 @@ type state_clientid_ntasks = { state : string ; client_id : int ; n_t type state_tcp_inproc = { state : string ; push_address_tcp : string ; push_address_inproc : string ; } type psi = { client_id: int ; n_state: int ; n_det: int ; psi_det_size: int ; n_det_generators: int option ; n_det_selectors: int option ; } -type client_id_key = { client_id: int ; key: string } +type state_client_id_key = { state: string ; client_id: int ; key: string } type msg = | AddTask_ of state_tasks @@ -47,8 +47,8 @@ type msg = | EndJob_ of string | Terminate_ | Abort_ - | GetData_ of client_id_key - | PutData_ of client_id_key + | GetData_ of state_client_id_key + | PutData_ of state_client_id_key | Ok_ | Error_ of string | SetStopped_ @@ -178,14 +178,16 @@ and kw = parse Disconnect_ { state ; client_id } | GET_DATA -> + let state = read_word lexbuf in let client_id = read_int lexbuf in let key = read_word lexbuf in - GetData_ { client_id ; key } + GetData_ { state ; client_id ; key } | PUT_DATA -> + let state = read_word lexbuf in let client_id = read_int lexbuf in let key = read_word lexbuf in - PutData_ { client_id ; key } + PutData_ { state ; client_id ; key } | CONNECT -> let socket = read_word lexbuf in @@ -258,8 +260,8 @@ and kw = parse | Connect_ socket -> Printf.sprintf "CONNECT socket:\"%s\"" socket | NewJob_ { state ; push_address_tcp ; push_address_inproc } -> Printf.sprintf "NEW_JOB state:\"%s\" tcp:\"%s\" inproc:\"%s\"" state push_address_tcp push_address_inproc | EndJob_ state -> Printf.sprintf "END_JOB state:\"%s\"" state - | GetData_ { client_id; key } -> Printf.sprintf "GET_DATA client_id:%d key:%s" client_id key - | PutData_ { client_id ; key } -> Printf.sprintf "PUT_DATA client_id:%d key:%s" client_id key + | GetData_ { state ; client_id; key } -> Printf.sprintf "GET_DATA state:%s client_id:%d key:%s" state client_id key + | PutData_ { state ; client_id ; key } -> Printf.sprintf "PUT_DATA state:%s client_id:%d key:%s" state client_id key | Terminate_ -> "TERMINATE" | Abort_ -> "ABORT" | SetWaiting_ -> "SET_WAITING" diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 5bf2a5a3..2ea895cb 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -547,28 +547,44 @@ let task_done msg program_state rep_socket = let put_data msg rest_of_msg program_state rep_socket = debug (Message.PutData_msg.to_string msg); - let () = - let key, value = - msg.Message.PutData_msg.key, - match rest_of_msg with - | [ x ] -> x - | _ -> failwith "Badly formed put_data message" - in + let state, key, value = + msg.Message.PutData_msg.state, + msg.Message.PutData_msg.key, + match rest_of_msg with + | [ x ] -> x + | _ -> failwith "Badly formed put_data message" + in + + let success () = Hashtbl.set program_state.data ~key ~data:value ; - Message.PutDataReply (Message.PutDataReply_msg.create ()) |> Message.to_string - |> ZMQ.Socket.send rep_socket + |> ZMQ.Socket.send rep_socket; + program_state + + and failure () = + reply_wrong_state rep_socket; + program_state in - program_state + + match program_state.state with + | None -> assert false + | Some state' -> + if (state = state') then + success () + else + failure () + let get_data msg program_state rep_socket = debug (Message.GetData_msg.to_string msg); - let () = - let key = + let state, key = + msg.Message.GetData_msg.state, msg.Message.GetData_msg.key - in + in + + let success () = let value = match Hashtbl.find program_state.data key with | Some value -> value @@ -576,10 +592,21 @@ let get_data msg program_state rep_socket = in Message.GetDataReply (Message.GetDataReply_msg.create ~value) |> Message.to_string_list - |> ZMQ.Socket.send_all rep_socket - in - program_state + |> ZMQ.Socket.send_all rep_socket; + program_state + and failure () = + reply_wrong_state rep_socket; + program_state + in + + match program_state.state with + | None -> assert false + | Some state' -> + if (state = state') then + success () + else + failure () let terminate program_state rep_socket = diff --git a/plugins/Full_CI_ZMQ/pt2_slave.irp.f b/plugins/Full_CI_ZMQ/pt2_slave.irp.f index da9e201f..0e6ae715 100644 --- a/plugins/Full_CI_ZMQ/pt2_slave.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_slave.irp.f @@ -25,6 +25,7 @@ subroutine run_wf double precision :: energy(N_states_diag) character*(64) :: states(1) integer :: rc, i + integer, external :: zmq_get_dvector call provide_everything @@ -48,7 +49,7 @@ subroutine run_wf print *, 'PT2' call zmq_get_psi(zmq_to_qp_run_socket,1) - call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) + if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle PROVIDE psi_bilinear_matrix_columns_loc psi_det_alpha_unique psi_det_beta_unique PROVIDE psi_bilinear_matrix_rows psi_det_sorted_order psi_bilinear_matrix_order diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index 1da4921f..6d27daea 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -28,6 +28,7 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) double precision :: time double precision :: w(N_states) integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket + integer, external :: zmq_put_dvector if (N_det < max(10,N_states)) then pt2=0.d0 @@ -68,7 +69,9 @@ subroutine ZMQ_pt2(E, pt2,relative_error, absolute_error, error) call zmq_put_psi(zmq_to_qp_run_socket,1) call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) - call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) + if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then + stop 'Unable to put energy on ZMQ server' + endif call create_selection_buffer(1, 1*2, b) integer :: ipos diff --git a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f index 33b3f205..8357a616 100644 --- a/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f +++ b/plugins/Full_CI_ZMQ/selection_davidson_slave.irp.f @@ -33,6 +33,8 @@ subroutine run_wf integer :: rc, i, ierr double precision :: t0, t1 + integer, external :: zmq_get_dvector + call provide_everything zmq_context = f77_zmq_ctx_new () @@ -58,7 +60,7 @@ subroutine run_wf call wall_time(t0) call zmq_get_psi(zmq_to_qp_run_socket,1) - call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) + if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) @@ -80,7 +82,7 @@ subroutine run_wf call wall_time(t0) call zmq_get_psi(zmq_to_qp_run_socket,1) call zmq_get_N_states_diag(zmq_to_qp_run_socket,1) - call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states_diag) + if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states_diag) == -1) cycle call wall_time(t1) call write_double(6,(t1-t0),'Broadcast time') @@ -98,7 +100,7 @@ subroutine run_wf print *, 'PT2' call wall_time(t0) call zmq_get_psi(zmq_to_qp_run_socket,1) - call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) + if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle call zmq_get_N_det_generators (zmq_to_qp_run_socket, 1) call zmq_get_N_det_selectors(zmq_to_qp_run_socket, 1) diff --git a/plugins/Full_CI_ZMQ/selection_slave.irp.f b/plugins/Full_CI_ZMQ/selection_slave.irp.f index a3277c22..c0f60c89 100644 --- a/plugins/Full_CI_ZMQ/selection_slave.irp.f +++ b/plugins/Full_CI_ZMQ/selection_slave.irp.f @@ -27,6 +27,7 @@ subroutine run_wf double precision :: energy(N_states) character*(64) :: states(4) integer :: rc, i, ierr + integer, external :: zmq_get_dvector call provide_everything @@ -52,7 +53,7 @@ subroutine run_wf print *, 'Selection' call zmq_get_psi(zmq_to_qp_run_socket,1) - call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) + if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle !$OMP PARALLEL PRIVATE(i) i = omp_get_thread_num() @@ -67,7 +68,7 @@ subroutine run_wf print *, 'PT2' call zmq_get_psi(zmq_to_qp_run_socket,1) - call zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) + if (zmq_get_dvector(zmq_to_qp_run_socket,1,'energy',energy,N_states) == -1) cycle logical :: lstop lstop = .False. diff --git a/plugins/Full_CI_ZMQ/zmq_selection.irp.f b/plugins/Full_CI_ZMQ/zmq_selection.irp.f index b0026f82..251256f8 100644 --- a/plugins/Full_CI_ZMQ/zmq_selection.irp.f +++ b/plugins/Full_CI_ZMQ/zmq_selection.irp.f @@ -11,6 +11,7 @@ subroutine ZMQ_selection(N_in, pt2) integer, external :: omp_get_thread_num double precision, intent(out) :: pt2(N_states) integer, parameter :: maxtasks=10000 + integer, external :: zmq_put_dvector PROVIDE fragment_count @@ -27,7 +28,9 @@ subroutine ZMQ_selection(N_in, pt2) call zmq_put_psi(zmq_to_qp_run_socket,1) call zmq_put_N_det_generators(zmq_to_qp_run_socket, 1) call zmq_put_N_det_selectors(zmq_to_qp_run_socket, 1) - call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) + if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',pt2_e0_denominator,size(pt2_e0_denominator)) == -1) then + stop 'Unable to put energy on ZMQ server' + endif call create_selection_buffer(N, N*2, b) endif diff --git a/plugins/Selectors_Utils/zmq.irp.f b/plugins/Selectors_Utils/zmq.irp.f index 82f972f4..111630b8 100644 --- a/plugins/Selectors_Utils/zmq.irp.f +++ b/plugins/Selectors_Utils/zmq.irp.f @@ -11,7 +11,7 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id) integer :: rc character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, '$X' + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then print *, irp_here, ': Error sending $X' @@ -44,7 +44,7 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id) integer :: rc character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, '$X' + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) if (rc /= len(trim(msg))) then print *, irp_here, ': Error getting $X' diff --git a/src/Davidson/davidson_parallel.irp.f b/src/Davidson/davidson_parallel.irp.f index e299aa90..4e4680f0 100644 --- a/src/Davidson/davidson_parallel.irp.f +++ b/src/Davidson/davidson_parallel.irp.f @@ -73,15 +73,19 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, N_st, sze, integer :: N_det_selectors_read, N_det_generators_read double precision, allocatable :: energy(:) + integer, external :: zmq_get_dvector allocate(u_t(N_st,N_det)) allocate (energy(N_st)) - if (mpi_master) then - - call zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'u_t', u_t, size(u_t)) - call zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'energy', energy, size(energy)) + if (zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'u_t', u_t, size(u_t)) == -1) then + deallocate(u_t,energy) + return + endif + if (zmq_get_dvector(zmq_to_qp_run_socket, worker_id, 'energy', energy, size(energy)) == -1) then + deallocate(u_t,energy) + return endif IRP_IF MPI @@ -288,12 +292,18 @@ subroutine H_S2_u_0_nstates_zmq(v_0,s_0,u_0,N_st,sze) integer*8 :: rc8 double precision :: energy(N_st) + integer, external :: zmq_put_dvector + energy = 0.d0 call zmq_put_N_states_diag(zmq_to_qp_run_socket, 1) call zmq_put_psi(zmq_to_qp_run_socket,1) - call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) - call zmq_put_dvector(zmq_to_qp_run_socket, 1, 'u_t', u_t, size(u_t)) + if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then + stop 'Unable to put energy on ZMQ server' + endif + if (zmq_put_dvector(zmq_to_qp_run_socket, 1, 'u_t', u_t, size(u_t)) == -1) then + stop 'Unable to put u_t on ZMQ server' + endif deallocate(u_t) @@ -379,7 +389,7 @@ subroutine zmq_put_N_states_diag(zmq_to_qp_run_socket,worker_id) integer :: rc character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, 'N_states_diag' + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'N_states_diag' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then print *, irp_here, ': Error sending N_states_diag' @@ -412,7 +422,7 @@ subroutine zmq_get_N_states_diag(zmq_to_qp_run_socket, worker_id) integer :: rc character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, 'N_states_diag' + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'N_states_diag' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) if (rc /= len(trim(msg))) then print *, irp_here, ': Error getting N_states_diag' diff --git a/src/Determinants/H_apply_zmq.template.f b/src/Determinants/H_apply_zmq.template.f index 3b44a9f7..cf74b590 100644 --- a/src/Determinants/H_apply_zmq.template.f +++ b/src/Determinants/H_apply_zmq.template.f @@ -36,7 +36,9 @@ subroutine $subroutine($params_main) call zmq_put_psi(zmq_to_qp_run_socket,1) call zmq_put_N_det_generators(zmq_to_qp_run_socket, worker_id) call zmq_put_N_det_selectors(zmq_to_qp_run_socket, worker_id) - call zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) + if (zmq_put_dvector(zmq_to_qp_run_socket,1,'energy',energy,size(energy)) == -1) then + stop 'Unable to put energy on ZMQ server' + endif do i_generator=1,N_det_generators $skip diff --git a/src/Determinants/zmq.irp.f b/src/Determinants/zmq.irp.f index f3d76065..6eab1f8d 100644 --- a/src/Determinants/zmq.irp.f +++ b/src/Determinants/zmq.irp.f @@ -31,7 +31,7 @@ subroutine zmq_put_$X(zmq_to_qp_run_socket,worker_id) integer :: rc character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, '$X' + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then print *, irp_here, ': Error sending $X' @@ -64,7 +64,7 @@ subroutine zmq_get_$X(zmq_to_qp_run_socket, worker_id) integer :: rc character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, '$X' + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, '$X' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) if (rc /= len(trim(msg))) then print *, irp_here, ': Error getting $X' @@ -106,7 +106,7 @@ subroutine zmq_put_psi_det(zmq_to_qp_run_socket,worker_id) integer*8 :: rc8 character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, 'psi_det' + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_det' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then print *, irp_here, ': Error sending psi_det' @@ -139,7 +139,7 @@ subroutine zmq_put_psi_coef(zmq_to_qp_run_socket,worker_id) integer*8 :: rc8 character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, 'psi_coef' + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, 'psi_coef' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then print *, irp_here, ': Error sending psi_coef' @@ -206,7 +206,7 @@ subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id) character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, 'psi_det' + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_det' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) if (rc /= len(trim(msg))) then print *, irp_here, ': Error getting psi_det' @@ -244,7 +244,7 @@ subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id) character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, 'psi_coef' + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, 'psi_coef' rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) if (rc /= len(trim(msg))) then print *, irp_here, ': Error getting psi_coef' diff --git a/src/ZMQ/put_get.irp.f b/src/ZMQ/put_get.irp.f index 09970bba..230f54b9 100644 --- a/src/ZMQ/put_get.irp.f +++ b/src/ZMQ/put_get.irp.f @@ -1,4 +1,4 @@ -subroutine zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x) +integer function zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x) use f77_zmq implicit none BEGIN_DOC @@ -12,31 +12,36 @@ subroutine zmq_put_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x) integer :: rc character*(256) :: msg + ! Failure + zmq_put_dvector = -1 - write(msg,'(A8,1X,I8,1X,A230)') 'put_data', worker_id, name + write(msg,'(A,1X,I8,1X,A200)') 'put_data '//trim(zmq_state), worker_id, name rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),ZMQ_SNDMORE) if (rc /= len(trim(msg))) then print *, irp_here, ': Error sending '//name - stop 'error' + return endif rc = f77_zmq_send(zmq_to_qp_run_socket,x,size_x*8,0) if (rc /= size_x*8) then print *, irp_here, ': Error sending '//name - stop 'error' + return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:rc) /= 'put_data_reply ok') then print *, rc, trim(msg) print *, irp_here, ': Error in put_data_reply' - stop 'error' + return endif + ! Success + zmq_put_dvector = 0 + end -subroutine zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x) +integer function zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x) use f77_zmq implicit none BEGIN_DOC @@ -51,35 +56,46 @@ subroutine zmq_get_dvector(zmq_to_qp_run_socket, worker_id, name, x, size_x) integer*8 :: rc8 character*(256) :: msg - write(msg,'(A8,1X,I8,1X,A230)') 'get_data', worker_id, name + ! Success + zmq_get_dvector = 0 + + write(msg,'(A,1X,I8,1X,A200)') 'get_data '//trim(zmq_state), worker_id, name rc = f77_zmq_send(zmq_to_qp_run_socket,trim(msg),len(trim(msg)),0) if (rc /= len(trim(msg))) then print *, irp_here, ': Error getting '//name - stop 'error' + zmq_get_dvector = -1 endif rc = f77_zmq_recv(zmq_to_qp_run_socket,msg,len(msg),0) if (msg(1:14) /= 'get_data_reply') then print *, rc, trim(msg) print *, irp_here, ': Error in get_data_reply' - stop 'error' + zmq_get_dvector = -1 endif rc = f77_zmq_recv(zmq_to_qp_run_socket,x,size_x*8,0) if (rc /= size_x*8) then print *, irp_here, ': Error getting '//name - stop 'error' + zmq_get_dvector = -1 endif + IRP_IF MPI integer :: ierr include 'mpif.h' + call MPI_BCAST (zmq_get_dvector, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr) + if (ierr /= MPI_SUCCESS) then + print *, irp_here//': Unable to broadcast zmq_get_dvector' + stop -1 + endif call MPI_BCAST (x, size_x, MPI_DOUBLE_PRECISION, 0, MPI_COMM_WORLD, ierr) if (ierr /= MPI_SUCCESS) then print *, irp_here//': Unable to broadcast dvector' stop -1 endif IRP_ENDIF + + end