From 471d01e11858fe51b709877dc601604a19d4d049 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Wed, 12 Jan 2022 19:05:15 +0100 Subject: [PATCH] Faster network transfers --- ocaml/Block.ml | 24 +----------------------- ocaml/Message.ml | 18 +++++++++++++++++- ocaml/Qmcchem_config.ml | 3 ++- ocaml/Qmcchem_dataserver.ml | 8 ++++++-- ocaml/Qmcchem_forwarder.ml | 10 ++++++++++ 5 files changed, 36 insertions(+), 27 deletions(-) diff --git a/ocaml/Block.ml b/ocaml/Block.ml index 0d84642..1eadfa5 100644 --- a/ocaml/Block.ml +++ b/ocaml/Block.ml @@ -116,28 +116,7 @@ let read_bytes b idx = -let of_bytes b = -(* - let rec loop accu s = - match read_bytes s with - | None -> [] - | Some (data, None) -> (data :: accu) - | Some (data, (Some rest)) -> loop (data :: accu) rest - in - let result = - match loop [] b with - | compute_node :: block_id :: pid :: weight :: value :: property :: [] -> - Some - { property = Property.of_bytes property; - value = Sample.of_bytes value; - weight = Weight.of_bytes weight; - pid = int_of_bytes pid; - block_id = Block_id.of_bytes block_id; - compute_node = Compute_node.of_bytes compute_node; - } - | _ -> None - in -*) +let of_bytes ?(idx=0) b = let get_x s idx = match read_bytes s idx with | Some ( data, i1) -> data, i1 @@ -146,7 +125,6 @@ let of_bytes b = let result = - let idx=0 in try let property, idx = get_x b idx in let value , idx = get_x b idx in diff --git a/ocaml/Message.ml b/ocaml/Message.ml index 8f6bb11..c90ccfc 100644 --- a/ocaml/Message.ml +++ b/ocaml/Message.ml @@ -11,7 +11,7 @@ type t = | Error of string -let create m = +let of_string_list m = try match m with | [ "cpu" ; c ; pid ; b ; "1" ; v ] -> @@ -80,7 +80,19 @@ let create m = | [ "unregister" ; c ; pid ] -> Unregister (Compute_node.of_string c, int_of_string pid) | [ "Test" ] -> Test | [ "Ezfio" ; ezfio_msg ] -> Ezfio ezfio_msg + | prop :: c :: pid :: b :: d :: w :: "bin" :: block :: [] -> + (* Block in binary format *) + let property = + Property.of_string prop + in + begin + assert (not (Property.is_scalar property)); + match Block.of_bytes ~idx:8 (Bytes.unsafe_of_string block) with + | Some block -> Property block + | None -> failwith "Invalid block" + end | prop :: c :: pid :: b :: d :: w :: l -> + (* Bock in text format *) let property = Property.of_string prop in @@ -109,6 +121,7 @@ let create m = | _ -> Error "Unknown error" + let to_string = function | Property b -> "Property : "^(Block.to_string b) | Walkers (h,p,w) -> Printf.sprintf "Walkers : %s %d : %d walkers" @@ -123,3 +136,6 @@ let to_string = function | Error msg -> "Error "^msg +let create m = + of_string_list m + diff --git a/ocaml/Qmcchem_config.ml b/ocaml/Qmcchem_config.ml index dd3679e..7934e5c 100644 --- a/ocaml/Qmcchem_config.ml +++ b/ocaml/Qmcchem_config.ml @@ -117,6 +117,7 @@ let ip_address = lazy ( let binary_io = try - Sys.getenv "QMCCHEM_IO" = "B" + let qmcchem_io = Sys.getenv "QMCCHEM_IO" in + qmcchem_io = "B" || qmcchem_io = "b" with Not_found -> false diff --git a/ocaml/Qmcchem_dataserver.ml b/ocaml/Qmcchem_dataserver.ml index c46c3e5..cf18db9 100644 --- a/ocaml/Qmcchem_dataserver.ml +++ b/ocaml/Qmcchem_dataserver.ml @@ -721,13 +721,17 @@ let run ?(daemon=true) ezfio_filename = Unix.gettimeofday () in let msg = + (* List.rev_map String.trim raw_msg |> List.rev |> Message.create - and msg_size = - List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg + *) + Message.create raw_msg in let recv_log = + let msg_size = + List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg + in send_log "pull" msg_size t0 in diff --git a/ocaml/Qmcchem_forwarder.ml b/ocaml/Qmcchem_forwarder.ml index dfe0a96..359b8a7 100644 --- a/ocaml/Qmcchem_forwarder.ml +++ b/ocaml/Qmcchem_forwarder.ml @@ -474,6 +474,16 @@ let run ezfio_filename dataserver = else List.concat [ [ "elec_coord" ; hostname ; pid ; id ; string_of_int (5*len)] ; ( select_n_of ~n:5 ~len rest ) ] + | prop :: c :: pid :: b :: d :: w :: [] -> message + | prop :: c :: pid :: b :: d :: w :: l -> + if Qmcchem_config.binary_io then + match Message.create message with + | Message.Property block -> + prop :: c :: pid :: b :: d :: w :: "bin" :: + (Block.to_bytes block |> Bytes.unsafe_to_string ) :: [] + | _ -> failwith "Inconsistent message" + else + message | _ -> message in Zmq.Socket.send_all push_socket new_message