Faster network transfers

This commit is contained in:
Anthony Scemama 2022-01-12 19:05:15 +01:00
parent 70dbec8ab8
commit 471d01e118
5 changed files with 36 additions and 27 deletions

View File

@ -116,28 +116,7 @@ let read_bytes b idx =
let of_bytes b =
(*
let rec loop accu s =
match read_bytes s with
| None -> []
| Some (data, None) -> (data :: accu)
| Some (data, (Some rest)) -> loop (data :: accu) rest
in
let result =
match loop [] b with
| compute_node :: block_id :: pid :: weight :: value :: property :: [] ->
Some
{ property = Property.of_bytes property;
value = Sample.of_bytes value;
weight = Weight.of_bytes weight;
pid = int_of_bytes pid;
block_id = Block_id.of_bytes block_id;
compute_node = Compute_node.of_bytes compute_node;
}
| _ -> None
in
*)
let of_bytes ?(idx=0) b =
let get_x s idx =
match read_bytes s idx with
| Some ( data, i1) -> data, i1
@ -146,7 +125,6 @@ let of_bytes b =
let result =
let idx=0 in
try
let property, idx = get_x b idx in
let value , idx = get_x b idx in

View File

@ -11,7 +11,7 @@ type t =
| Error of string
let create m =
let of_string_list m =
try
match m with
| [ "cpu" ; c ; pid ; b ; "1" ; v ] ->
@ -80,7 +80,19 @@ let create m =
| [ "unregister" ; c ; pid ] -> Unregister (Compute_node.of_string c, int_of_string pid)
| [ "Test" ] -> Test
| [ "Ezfio" ; ezfio_msg ] -> Ezfio ezfio_msg
| prop :: c :: pid :: b :: d :: w :: "bin" :: block :: [] ->
(* Block in binary format *)
let property =
Property.of_string prop
in
begin
assert (not (Property.is_scalar property));
match Block.of_bytes ~idx:8 (Bytes.unsafe_of_string block) with
| Some block -> Property block
| None -> failwith "Invalid block"
end
| prop :: c :: pid :: b :: d :: w :: l ->
(* Bock in text format *)
let property =
Property.of_string prop
in
@ -109,6 +121,7 @@ let create m =
| _ -> Error "Unknown error"
let to_string = function
| Property b -> "Property : "^(Block.to_string b)
| Walkers (h,p,w) -> Printf.sprintf "Walkers : %s %d : %d walkers"
@ -123,3 +136,6 @@ let to_string = function
| Error msg -> "Error "^msg
let create m =
of_string_list m

View File

@ -117,6 +117,7 @@ let ip_address = lazy (
let binary_io =
try
Sys.getenv "QMCCHEM_IO" = "B"
let qmcchem_io = Sys.getenv "QMCCHEM_IO" in
qmcchem_io = "B" || qmcchem_io = "b"
with Not_found -> false

View File

@ -721,13 +721,17 @@ let run ?(daemon=true) ezfio_filename =
Unix.gettimeofday ()
in
let msg =
(*
List.rev_map String.trim raw_msg
|> List.rev
|> Message.create
and msg_size =
List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg
*)
Message.create raw_msg
in
let recv_log =
let msg_size =
List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg
in
send_log "pull" msg_size t0
in

View File

@ -474,6 +474,16 @@ let run ezfio_filename dataserver =
else
List.concat [ [ "elec_coord" ; hostname ; pid ; id ;
string_of_int (5*len)] ; ( select_n_of ~n:5 ~len rest ) ]
| prop :: c :: pid :: b :: d :: w :: [] -> message
| prop :: c :: pid :: b :: d :: w :: l ->
if Qmcchem_config.binary_io then
match Message.create message with
| Message.Property block ->
prop :: c :: pid :: b :: d :: w :: "bin" ::
(Block.to_bytes block |> Bytes.unsafe_to_string ) :: []
| _ -> failwith "Inconsistent message"
else
message
| _ -> message
in
Zmq.Socket.send_all push_socket new_message