diff --git a/ocaml/Message.ml b/ocaml/Message.ml index 2db843b..a6601ba 100644 --- a/ocaml/Message.ml +++ b/ocaml/Message.ml @@ -1,15 +1,14 @@ -open Core open Qptypes type t = -| Property of Block.t -| Walkers of Compute_node.t * Pid.t * (float array) array -| Register of Compute_node.t * Pid.t -| Unregister of Compute_node.t * Pid.t -| Test -| GetWalkers of Strictly_positive_int.t -| Ezfio of string -| Error of string + | Property of Block.t + | Walkers of Compute_node.t * int * (float array) array + | Register of Compute_node.t * int + | Unregister of Compute_node.t * int + | Test + | GetWalkers of Strictly_positive_int.t + | Ezfio of string + | Error of string let create m = @@ -19,45 +18,45 @@ let create m = let open Block in Property { property = Property.Cpu; - value = Sample.of_float (Float.of_string v) ; + value = Sample.of_float (float_of_string v) ; weight = Weight.of_float 1.; compute_node = Compute_node.of_string c; - pid = Pid.of_string pid; - block_id = Block_id.of_int (Int.of_string b); + pid = int_of_string pid; + block_id = Block_id.of_int (int_of_string b); } | [ "accep" ; c ; pid ; b ; "1" ; v ] -> let open Block in Property { property = Property.Accep; - value = Sample.of_float (Float.of_string v) ; + value = Sample.of_float (float_of_string v) ; weight = Weight.of_float 1.; compute_node = Compute_node.of_string c; - pid = Pid.of_string pid; - block_id = Block_id.of_int (Int.of_string b); + pid = int_of_string pid; + block_id = Block_id.of_int (int_of_string b); } | [ prop ; c ; pid ; b ; w ; v ] -> let open Block in Property { property = Property.of_string prop; - value = Sample.of_float (Float.of_string v); - weight = Weight.of_float (Float.of_string w); + value = Sample.of_float (float_of_string v); + weight = Weight.of_float (float_of_string w); compute_node = Compute_node.of_string c; - pid = Pid.of_string pid; - block_id = Block_id.of_int (Int.of_string b); + pid = int_of_string pid; + block_id = Block_id.of_int (int_of_string b); } | "elec_coord" :: c :: pid :: _ :: n ::walkers -> begin let elec_num = Lazy.force Qputils.elec_num and n = - Int.of_string n + int_of_string n in assert (n = List.length walkers); let rec build_walker accu = function | (0,tail) -> let result = List.rev accu - |> List.map ~f:Float.of_string + |> List.map float_of_string |> Array.of_list in (result, tail) @@ -73,11 +72,11 @@ let create m = in build (result::accu) tail in - Walkers (Compute_node.of_string c, Pid.of_string pid, build [] walkers) + Walkers (Compute_node.of_string c, int_of_string pid, build [] walkers) end - | [ "get_walkers" ; n ] -> GetWalkers (n |> Int.of_string |> Strictly_positive_int.of_int) - | [ "register" ; c ; pid ] -> Register (Compute_node.of_string c, Pid.of_string pid) - | [ "unregister" ; c ; pid ] -> Unregister (Compute_node.of_string c, Pid.of_string pid) + | [ "get_walkers" ; n ] -> GetWalkers (n |> int_of_string |> Strictly_positive_int.of_int) + | [ "register" ; c ; pid ] -> Register (Compute_node.of_string c, int_of_string pid) + | [ "unregister" ; c ; pid ] -> Unregister (Compute_node.of_string c, int_of_string pid) | [ "Test" ] -> Test | [ "Ezfio" ; ezfio_msg ] -> Ezfio ezfio_msg | prop :: c :: pid :: b :: d :: w :: l -> @@ -88,22 +87,22 @@ let create m = assert (not (Property.is_scalar property)); let a = Array.of_list l - |> Array.map ~f:Float.of_string + |> Array.map float_of_string and dim = - Int.of_string d + int_of_string d in assert (Array.length a = dim); let open Block in Property { property = property ; value = Sample.of_float_array ~dim a; - weight = Weight.of_float (Float.of_string w); + weight = Weight.of_float (float_of_string w); compute_node = Compute_node.of_string c; - pid = Pid.of_string pid; - block_id = Block_id.of_int (Int.of_string b); + pid = int_of_string pid; + block_id = Block_id.of_int (int_of_string b); } end - | l -> Error (String.concat ~sep:":" l) + | l -> Error (String.concat ":" l) with | Assert_failure (l,_,_) -> Error l | _ -> Error "Unknown error" @@ -111,14 +110,13 @@ let create m = let to_string = function | Property b -> "Property : "^(Block.to_string b) - | Walkers (h,p,w) -> Printf.sprintf "Walkers : %s %s : %d walkers" - (Compute_node.to_string h) (Pid.to_string p) - (Array.length w) + | Walkers (h,p,w) -> Printf.sprintf "Walkers : %s %d : %d walkers" + (Compute_node.to_string h) p (Array.length w) | GetWalkers n -> Printf.sprintf "GetWalkers %d" (Strictly_positive_int.to_int n) - | Register (h,p) -> Printf.sprintf "Register : %s %s" - (Compute_node.to_string h) (Pid.to_string p) - | Unregister (h,p) -> Printf.sprintf "Unregister : %s %s" - (Compute_node.to_string h) (Pid.to_string p) + | Register (h,p) -> Printf.sprintf "Register : %s %d" + (Compute_node.to_string h) p + | Unregister (h,p) -> Printf.sprintf "Unregister : %s %d" + (Compute_node.to_string h) p | Test -> "Test" | Ezfio msg -> "Ezfio "^msg | Error msg -> "Error "^msg diff --git a/ocaml/Qmcchem_dataserver.ml b/ocaml/Qmcchem_dataserver.ml index a9a9c14..fb46fae 100644 --- a/ocaml/Qmcchem_dataserver.ml +++ b/ocaml/Qmcchem_dataserver.ml @@ -1,4 +1,3 @@ -open Core open Qptypes (** Data server of QMC=Chem. @@ -35,20 +34,27 @@ let run ?(daemon=true) ezfio_filename = if ( not(Ezfio.has_electrons_elec_coord_pool ()) ) then begin Printf.printf "Generating initial walkers...\n%!"; - Unix.fork_exec ~prog:(Lazy.force Qmcchem_config.qmc_create_walkers) - ~argv:["qmc_create_walkers" ; ezfio_filename] () - |> Unix.waitpid_exn ; - Printf.printf "Initial walkers ready\n%!" - end ; + match Unix.fork () with + | 0 -> + Unix.execv + (Lazy.force Qmcchem_config.qmc_create_walkers) + [|"qmc_create_walkers" ; ezfio_filename|] + | pid -> + begin + let pid', status = Unix.waitpid [] pid in + assert (status = Unix.WEXITED 0); + Printf.printf "Initial walkers ready\n%!" + end + end in - (** Measures the time difference between [t0] and [Time.now ()] *) + (** Measures the time difference between [t0] and [Unix.time ()] *) let delta_t t0 = let t1 = - Time.now () + Unix.time () in - Time.abs_diff t1 t0 + t1 -. t0 in (** {2 ZeroMQ initialization} *) @@ -60,8 +66,7 @@ let run ?(daemon=true) ezfio_filename = (** Maximum size of the blocks file before compressing *) - let max_file_size = ref ( - Byte_units.create `Kilobytes 64.) + let max_file_size = ref ( 64 * 1024 ) in @@ -88,9 +93,9 @@ let run ?(daemon=true) ezfio_filename = "tcp://*:" in let result = - List.fold [0;1;2;3] ~init:true ~f:(fun accu i -> + List.fold_left (fun accu i -> let address = - adress_prefix ^ (Int.to_string (n+i)) + adress_prefix ^ (string_of_int (n+i)) in let socket = Zmq.Socket.create zmq_context Zmq.Socket.rep @@ -104,7 +109,7 @@ let run ?(daemon=true) ezfio_filename = in Zmq.Socket.close socket; result - ) + ) true [0;1;2;3] in if (result) then `Available @@ -145,8 +150,8 @@ let run ?(daemon=true) ezfio_filename = delta_t t0 in let message = - Printf.sprintf "%20s : %8d : %10s : %s" - socket size text (Time.Span.to_string dt) + Printf.sprintf "%20s : %8d : %10s : %f" + socket size text dt in Zmq.Socket.send debug_socket message in @@ -179,7 +184,7 @@ let run ?(daemon=true) ezfio_filename = (** Array of walkers. The size is [walk_num_tot]. *) let walkers_array = let t0 = - Time.now () + Unix.time () in let j = 3*elec_num + 3 @@ -190,17 +195,17 @@ let run ?(daemon=true) ezfio_filename = and ez = Ezfio.get_electrons_elec_coord_pool () |> Ezfio.flattened_ezfio - |> Array.map ~f:Float.to_string + |> Array.map string_of_float in try - Array.init walk_num_tot ~f:(fun i -> - Array.sub ~pos:(j*(i mod size)) ~len:j ez) + Array.init walk_num_tot (fun i -> + Array.sub ez (j*(i mod size)) j ) with | Invalid_argument _ -> failwith "Walkers file is broken." in - String.concat [ "Read " ; Int.to_string (Array.length result) ; - " walkers"] + String.concat " " [ "Read" ; string_of_int (Array.length result) ; + "walkers"] |> send_log "status" 0 t0 ; result in @@ -214,28 +219,28 @@ let run ?(daemon=true) ezfio_filename = (** Last time when the walkers were saved to disk. *) let last_save_walkers = - ref (Time.now ()) + ref (Unix.time ()) in (** Saves the walkers to disk. *) let save_walkers () = - if (delta_t !last_save_walkers > (Time.Span.of_sec 10.) ) then + if (delta_t !last_save_walkers > 10. ) then begin let t0 = - Time.now () + Unix.time () in Ezfio.set_electrons_elec_coord_pool_size walk_num_tot ; let walkers_list = - Array.map walkers_array ~f:Array.to_list + Array.map Array.to_list walkers_array |> Array.to_list |> List.concat - |> List.map ~f:Float.of_string + |> List.map float_of_string in Ezfio.set_electrons_elec_coord_pool (Ezfio.ezfio_array_of_list ~rank:3 ~dim:[| elec_num+1 ; 3 ; walk_num_tot |] ~data:walkers_list); send_log "status" walk_num_tot t0 "Saved walkers"; - last_save_walkers := Time.now (); + last_save_walkers := Unix.time (); end in @@ -245,7 +250,7 @@ let run ?(daemon=true) ezfio_filename = disk if the array of walkers is filled. In that case, sets the last_walker to 0. *) let increment_last_walker () = - last_walker := !last_walker + 1; + incr last_walker; if (!last_walker = walk_num_tot) then begin last_walker := 0 ; @@ -266,16 +271,16 @@ let run ?(daemon=true) ezfio_filename = (** The hash table for workers *) let workers_hash = - String.Table.create () + Hashtbl.create 63 in (** Creates a key using the couple ([Compute_node], [PID]) *) let key compute_node pid = - String.concat [ - (Compute_node.to_string compute_node); " "; - (Pid.to_string pid) ] + String.concat " " [ + (Compute_node.to_string compute_node); + (string_of_int pid) ] in @@ -286,9 +291,9 @@ let run ?(daemon=true) ezfio_filename = let s = key w pid in - match Hashtbl.add workers_hash ~key:s ~data:(Time.now ()) with - | `Ok -> () - | `Duplicate -> failwith (s^" already registered") + match Hashtbl.find_opt workers_hash s with + | Some _ -> failwith (s^" already registered") + | None -> Hashtbl.add workers_hash s (Unix.time ()) in @@ -299,29 +304,33 @@ let run ?(daemon=true) ezfio_filename = let s = key w pid in - match Hashtbl.find workers_hash s with + match Hashtbl.find_opt workers_hash s with | Some x -> Hashtbl.remove workers_hash s | None -> failwith (s^" not registered") in - (** Sets the last access of the worker to [Time.now ()] *) + (** Sets the last access of the worker to [Unix.time ()] *) let touch_worker w pid = let s = key w pid in - Hashtbl.set workers_hash ~key:s ~data:(Time.now ()) + Hashtbl.replace workers_hash s (Unix.time ()) in (** Returns the number of connected workers *) let n_connected hash now = let delta = - Time.Span.of_sec (initialization_timeout +. block_time *. 2.) + initialization_timeout +. block_time *. 2. in - Hashtbl.filter hash ~f:(fun x -> (Time.abs_diff now x) <= delta) - |> Hashtbl.length + Hashtbl.fold (fun k v accu -> + if (now -. v) <= delta then + v :: accu + else + accu ) hash [] + |> List.length in @@ -339,12 +348,11 @@ let run ?(daemon=true) ezfio_filename = Lazy.force Block.dir_name in let () = - match Sys.is_directory dirname with - | `Yes -> () - | _ -> Unix.mkdir_p dirname + if not (Sys.is_directory dirname) then + Unix.mkdir dirname 0o600 in Filename.concat dirname ( - hostname ^ "." ^ (Pid.to_string dataserver_pid) + hostname ^ "." ^ (string_of_int dataserver_pid) ) in @@ -361,14 +369,14 @@ let run ?(daemon=true) ezfio_filename = (** [Out_channel] corresponding to the blocks file written by the current process. *) let block_channel = try - ref (Out_channel.create block_channel_filename_locked) + ref (open_out block_channel_filename_locked) with | Sys_error _ -> begin (* NFS Stale file handle : * Wait 5 seconds, and retry *) - Time.Span.of_sec 5. |> Time.pause; - ref (Out_channel.create block_channel_filename_locked) + Unix.sleep 5; + ref (open_out block_channel_filename_locked) end in @@ -379,13 +387,13 @@ let run ?(daemon=true) ezfio_filename = *) let compress_block_file filename = let t0 = - Time.now () + Unix.time () in - Out_channel.close !block_channel; - Unix.rename ~src:block_channel_filename_locked ~dst:block_channel_filename_tmp; + close_out !block_channel; + Unix.rename block_channel_filename_locked block_channel_filename_tmp; Random_variable.compress_files (); send_log "status" 0 t0 "Compressed block file"; - block_channel := Out_channel.create ~append:true block_channel_filename_locked + block_channel := open_out_gen [ Open_append ] 0o660 block_channel_filename_locked in @@ -396,7 +404,7 @@ let run ?(daemon=true) ezfio_filename = let start_status_thread = let t0 = - Time.now () + Unix.time () in Thread.create (fun () -> send_log "status" 0 t0 "Starting status thread"; @@ -407,16 +415,14 @@ let run ?(daemon=true) ezfio_filename = Printf.sprintf "tcp://*:%d" (port+1) in bind_socket "PUB" socket address; - let delay = - Time.Span.of_ms 300. - and delay_read = - Time.Span.of_sec 2. + let delay = 0.3 + and delay_read = 2. in let start_time = - Time.now () + Unix.time () and stop_time = - ref (Time.Span.of_sec Input.Stop_time.(read () |> to_float) ) + ref (Input.Stop_time.(read () |> to_float) ) in let last_update = @@ -425,9 +431,9 @@ let run ?(daemon=true) ezfio_filename = while (!status <> Status.Stopped) do - Time.pause delay; + Unix.sleepf delay; let now = - Time.now () + Unix.time () in let status_string = Status.to_string !status @@ -436,14 +442,14 @@ let run ?(daemon=true) ezfio_filename = send_log "status" (String.length status_string) now status_string; let test = - if (Time.abs_diff now !last_update > delay_read) then + if (now -. !last_update > delay_read) then let n_connect = n_connected workers_hash now in `Update n_connect - else if (Time.abs_diff now start_time > !stop_time) then + else if (now -. start_time > !stop_time) then `Terminate - else if (Time.abs_diff now start_time > Time.Span.of_sec initialization_timeout) then + else if (now -. start_time > initialization_timeout) then `Timeout else `None @@ -458,7 +464,7 @@ let run ?(daemon=true) ezfio_filename = begin status := Status.read (); last_update := now; - stop_time := Time.Span.of_sec Input.Stop_time.(read () |> to_float) ; + stop_time := Input.Stop_time.(read () |> to_float) ; let n_tot = Hashtbl.length workers_hash in @@ -482,7 +488,7 @@ let run ?(daemon=true) ezfio_filename = let start_log_thread = let t0 = - Time.now () + Unix.time () in Thread.create (fun () -> send_log "status" 0 t0 "Starting log thread"; @@ -509,10 +515,10 @@ let run ?(daemon=true) ezfio_filename = begin let message = Zmq.Socket.recv_all ~block:false socket - |> String.concat ~sep:" " + |> String.concat " " in let now = - Time.now () + Unix.time () in send_log "log" 0 now message end @@ -548,7 +554,7 @@ let run ?(daemon=true) ezfio_filename = let start_main_thread = let wall0 = - Time.now () + Unix.time () in let f () = @@ -569,19 +575,18 @@ let run ?(daemon=true) ezfio_filename = (** EZFIO Cache *) let ezfio_cache = - String.Table.create () + Hashtbl.create 63 in let handle_ezfio msg = - match Hashtbl.find ezfio_cache msg with + match Hashtbl.find_opt ezfio_cache msg with | Some result -> result | None -> begin let result = decode_ezfio_message msg in - match (Hashtbl.add ezfio_cache ~key:msg ~data:result) with - | `Ok -> result - | `Duplicate -> result + Hashtbl.add ezfio_cache msg result; + result end in @@ -620,13 +625,13 @@ let run ?(daemon=true) ezfio_filename = Zmq.Socket.recv_all ~block:false rep_socket in let t0 = - Time.now () + Unix.time () in let msg = - List.map ~f:String.strip raw_msg + List.map String.trim raw_msg |> Message.create and msg_size = - List.fold ~init:0 ~f:(fun accu x -> accu + (String.length x)) raw_msg + List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg in let handle = function | Message.Error _ -> () @@ -654,9 +659,9 @@ let run ?(daemon=true) ezfio_filename = | Status.Queued | Status.Running -> begin - String.concat [ "Register : " ; - (Compute_node.to_string w) ; " " ; - (Pid.to_string pid) ] + String.concat " " [ "Register :" ; + Compute_node.to_string w ; + string_of_int pid ] |> send_log "req" msg_size t0; add_worker w pid; if (!status = Status.Queued) then @@ -670,15 +675,15 @@ let run ?(daemon=true) ezfio_filename = end | Message.Unregister (w,pid) -> begin - String.concat [ "Unregister : " ; - (Compute_node.to_string w) ; " " ; - (Pid.to_string pid) ] + String.concat " " [ "Unregister :" ; + (Compute_node.to_string w) ; + (string_of_int pid) ] |> send_log "req" msg_size t0; Zmq.Socket.send rep_socket "OK"; del_worker w pid; - String.concat [ "Unregister : "; - (Hashtbl.length workers_hash) |> Int.to_string ; - " remaining" ] + String.concat " " [ "Unregister :"; + (Hashtbl.length workers_hash) |> string_of_int ; + "remaining" ] |> send_log "rep" 2 t0 ; let n_connect = n_connected workers_hash t0 @@ -705,13 +710,13 @@ let run ?(daemon=true) ezfio_filename = Zmq.Socket.recv_all ~block:false pull_socket in let t0 = - Time.now () + Unix.time () in let msg = - List.map ~f:String.strip raw_msg + List.map String.trim raw_msg |> Message.create and msg_size = - List.fold ~init:0 ~f:(fun accu x -> accu + (String.length x)) raw_msg + List.fold_left (fun accu x -> accu + (String.length x)) 0 raw_msg in let recv_log = send_log "pull" msg_size t0 @@ -730,22 +735,25 @@ let run ?(daemon=true) ezfio_filename = recv_log log_msg ; for i=0 to ((Array.length w)-1) do + (* Array.replace walkers_array (!last_walker) (fun _ -> Array.map - ~f:Float.to_string w.(i)); + string_of_float w.(i)); + *) + walkers_array.(!last_walker) <- Array.map string_of_float w.(i); increment_last_walker (); done; let wall = Printf.sprintf "%f %f # %s %s %s %d" - (Time.Span.to_sec (Time.abs_diff (Time.now ()) wall0)) + (Unix.time () -. wall0) 1. (Property.to_string Property.Wall) - hostname (Pid.to_string dataserver_pid) 1 + hostname (string_of_int dataserver_pid) 1 |> Block.of_string in match wall with | Some wall -> begin - Out_channel.output_string !block_channel (Block.to_string wall); - Out_channel.output_char !block_channel '\n'; + output_string !block_channel (Block.to_string wall); + output_char !block_channel '\n'; end | _ -> () end @@ -753,8 +761,8 @@ let run ?(daemon=true) ezfio_filename = begin if (status = Status.Running) then touch_worker b.Block.compute_node b.Block.pid ; - Out_channel.output_string !block_channel (Block.to_string b); - Out_channel.output_char !block_channel '\n'; + output_string !block_channel (Block.to_string b); + output_char !block_channel '\n'; recv_log (Block.to_string b) end | Message.Test @@ -780,22 +788,20 @@ let run ?(daemon=true) ezfio_filename = | Some Zmq.Poll.In -> handle_rep () | _ -> begin - Out_channel.flush !block_channel ; + flush !block_channel ; let file_size = (Unix.stat block_channel_filename_locked).Unix.st_size - |> Float.of_int64 - |> Byte_units.create `Bytes in if (file_size > !max_file_size) then begin compress_block_file (); - max_file_size := Byte_units.scale file_size 1.2; + max_file_size := (file_size * 12) / 10; end end end done; - List.iter ~f:(fun socket -> + List.iter (fun socket -> Zmq.Socket.set_linger_period socket 1000 ; Zmq.Socket.close socket) [ rep_socket ; pull_socket ] @@ -819,8 +825,8 @@ let run ?(daemon=true) ezfio_filename = Zmq.Context.terminate zmq_context; begin try - Out_channel.close !block_channel; - Unix.remove block_channel_filename_locked + close_out !block_channel; + Unix.unlink block_channel_filename_locked with | _ -> () end; @@ -830,27 +836,28 @@ let run ?(daemon=true) ezfio_filename = (** {3 Main function} *) let t0 = - Time.now () + Unix.time () in (* Handle signals *) let handler s = - Printf.printf "Dataserver received the %s signal... killing\n%!" (Signal.to_string s); + Printf.printf "Dataserver received signal %d... killing\n%!" s; Watchdog.kill (); in - List.iter [ - Signal.int ; - Signal.term ; - Signal.quit ; + List.iter (fun s -> ignore @@ Sys.signal s (Sys.Signal_handle handler)) + [ + Sys.sigint ; + Sys.sigterm ; + Sys.sigquit ; ] - ~f:(fun x -> Signal.Expert.handle x handler) ; + (* Run threads *) begin try - (List.iter ~f:Thread.join + (List.iter Thread.join [ start_status_thread () ; start_log_thread () ; start_main_thread () ; @@ -860,7 +867,7 @@ let run ?(daemon=true) ezfio_filename = begin print_endline "Trapped error. Waiting 10 seconds..."; change_status Status.Stopping; - Time.Span.of_sec 10. |> Time.pause; + Unix.sleep 10; finalize ~t0; raise err end diff --git a/ocaml/Qmcchem_debug.ml b/ocaml/Qmcchem_debug.ml index 717b085..55c5a59 100644 --- a/ocaml/Qmcchem_debug.ml +++ b/ocaml/Qmcchem_debug.ml @@ -1,6 +1,3 @@ -open Core - - let run ~t ezfio_filename= Qputils.set_ezfio_filename ezfio_filename; @@ -20,7 +17,7 @@ let run ~t ezfio_filename= let address = match (Ezfio.get_simulation_http_server () - |> String.rsplit2 ~on:':' ) + |> String_ext.rsplit2 ~on:':' ) with | Some (a,p) -> a^":"^( (Int.of_string p)+4 |> Int.to_string ) | None -> failwith "Badly formed address" diff --git a/ocaml/Qmcchem_edit.ml b/ocaml/Qmcchem_edit.ml index 3820c57..5f57ed2 100644 --- a/ocaml/Qmcchem_edit.ml +++ b/ocaml/Qmcchem_edit.ml @@ -339,7 +339,7 @@ let () = anonymous "EZFIO_DIR" Mandatory "EZFIO directory"; anonymous "FILE" Optional "Name of the input file"; ] - |> set_specs ; + |> set_specs end; let c = Command_line.get_bool "clear" in diff --git a/ocaml/Qmcchem_forwarder.ml b/ocaml/Qmcchem_forwarder.ml index 8b9fb5c..9852ab6 100644 --- a/ocaml/Qmcchem_forwarder.ml +++ b/ocaml/Qmcchem_forwarder.ml @@ -1,17 +1,15 @@ -open Core - let bind_socket ~socket_type ~socket ~address = let rec loop = function - | 0 -> failwith @@ Printf.sprintf + | 0 -> failwith @@ Printf.sprintf "Unable to bind the forwarder's %s socket : %s\n" socket_type address - | -1 -> () - | i -> + | -1 -> () + | i -> try Zmq.Socket.bind socket address; loop (-1) with - | Unix.Unix_error _ -> (Time.pause @@ Time.Span.of_sec 1. ; loop (i-1) ) + | Unix.Unix_error _ -> (Unix.sleep 1 ; loop (i-1) ) | other_exception -> raise other_exception in loop 10 @@ -20,9 +18,8 @@ let bind_socket ~socket_type ~socket ~address = let run ezfio_filename dataserver = let dataserver_address, dataserver_port = - Substring.create ~pos:6 (Bytes.of_string dataserver) - |> Substring.to_string - |> String.lsplit2_exn ~on:':' + String.sub dataserver 6 (String.length dataserver - 6) + |> String_ext.lsplit2_exn ~on:':' and qmc = Lazy.force Qmcchem_config.qmc in @@ -36,14 +33,14 @@ let run ezfio_filename dataserver = (* Port of the data server *) let port = - (Int.of_string dataserver_port)+10 + (int_of_string dataserver_port)+10 in (* Build qmc executable command *) let prog, argv = qmc, - [ qmc ; ezfio_filename ; - Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm port ]; + [| qmc ; ezfio_filename ; + Printf.sprintf "ipc://%s:%d" Qmcchem_config.dev_shm port |]; in (* Create the temporary directory. If it is possible, then the process is a @@ -51,48 +48,45 @@ let run ezfio_filename dataserver = *) let () = try - Unix.mkdir tmpdir; + Unix.mkdir tmpdir 0o600; Unix.chdir tmpdir with | Unix.Unix_error _ -> - begin - Unix.chdir tmpdir; - Time.pause @@ Time.Span.of_sec 0.1; - match (Sys.file_exists "PID") with - | `No - | `Unknown -> () - | `Yes -> - let pid = - In_channel.with_file "PID" ~f:(fun ic -> - match (In_channel.input_line ic) with - | Some x -> x - | None -> "-1" ) - |> Int.of_string - in - match pid with - | -1 -> () - | pid -> - begin - match Signal.send (Signal.of_system_int 0) (`Pid (Pid.of_int pid)) with - | `No_such_process -> () - | _ -> ignore @@ Unix.exec ~prog ~argv () - end - end + begin + Unix.chdir tmpdir; + Unix.sleepf 0.1 ; + if Sys.file_exists "PID" then + begin + let pid = + let ic = open_in "PID" in + try + int_of_string (input_line ic) + with + | End_of_file -> -1 + in + match pid with + | -1 -> () + | pid -> + try + Unix.kill pid 0 ; + ignore @@ Unix.execv prog argv + with + | Unix.Unix_error (Unix.ESRCH, _, _) -> () + end + end in (* Now, only one forwarder will execute the following code *) - Out_channel.with_file "PID" ~f:(fun oc -> - Unix.getpid () - |> Pid.to_int - |> Printf.sprintf "%d\n" - |> Out_channel.output_string oc); + let oc = open_out "PID" in + Unix.getpid () + |> Printf.sprintf "%d\n" + |> output_string oc + ; (* Fork a qmc *) ignore @@ - Watchdog.fork_exec ~prog ~argv (); + Watchdog.fork_exec ~prog ~args:argv (); - (* If there are MICs, use them here (TODO) *) - (* Fetch input *) let zmq_context = Zmq.Context.create () @@ -104,9 +98,10 @@ let run ezfio_filename dataserver = let command = Printf.sprintf "rm -rf -- \"%s\" " tmpdir in - match Unix.system command with - | Ok _ -> () - | _ -> print_endline "Unable to remove temporary directory" + try + ignore @@ Unix.system command + with + | Unix.Unix_error _ -> print_endline "Unable to remove temporary directory" ; Zmq.Context.terminate zmq_context ; for i=port to port+4 @@ -126,15 +121,15 @@ let run ezfio_filename dataserver = (* Signal handler to Kill properly all the processes *) let handler s = - Printf.printf "Forwarder received the %s signal... killing\n%!" (Signal.to_string s); + Printf.printf "Forwarder received signal %d... killing\n%!" s; terminate (); in - List.iter [ - Signal.int ; - Signal.term ; - Signal.quit ; + List.iter (fun s -> ignore @@ Sys.signal s (Sys.Signal_handle handler)) + [ + Sys.sigint ; + Sys.sigterm ; + Sys.sigquit ; ] - ~f:(fun x -> Signal.Expert.handle x handler) ; @@ -189,7 +184,7 @@ let run ezfio_filename dataserver = status := Status.of_string msg; end; done; - List.iter ~f:(fun socket -> + List.iter (fun socket -> Zmq.Socket.set_linger_period socket 1000 ; Zmq.Socket.close socket) [ sub_socket ; pub_socket ] @@ -239,7 +234,7 @@ let run ezfio_filename dataserver = |> Zmq.Socket.send sub_socket ; end done; - List.iter ~f:(fun socket -> + List.iter (fun socket -> Zmq.Socket.set_linger_period socket 1000 ; Zmq.Socket.close socket) [ sub_socket ; pub_socket ] @@ -268,7 +263,7 @@ let run ezfio_filename dataserver = Zmq.Socket.set_linger_period dealer_socket 600_000; let fetch_walkers () = - Zmq.Socket.send_all req_socket ["get_walkers" ; Int.to_string !walk_num ]; + Zmq.Socket.send_all req_socket ["get_walkers" ; string_of_int !walk_num ]; Zmq.Socket.recv_all req_socket in @@ -280,10 +275,10 @@ let run ezfio_filename dataserver = (* EZFIO Cache *) let ezfio_cache = - String.Table.create () + Hashtbl.create 63 in let handle_ezfio msg = - match Hashtbl.find ezfio_cache msg with + match Hashtbl.find_opt ezfio_cache msg with | Some result -> result | None -> begin @@ -291,9 +286,8 @@ let run ezfio_filename dataserver = let result = Zmq.Socket.recv_all req_socket in - match (Hashtbl.add ezfio_cache ~key:msg ~data:result) with - | `Ok -> result - | `Duplicate -> result + Hashtbl.add ezfio_cache msg result; + result end in @@ -315,7 +309,7 @@ let run ezfio_filename dataserver = | head :: tail -> aux (head::header) tail | _ -> failwith "Too many routers in the middle" in - aux [] (List.map ~f:String.strip raw_msg) + aux [] (List.map String.trim raw_msg) in let handle message = match message with @@ -407,7 +401,7 @@ let run ezfio_filename dataserver = | head :: tail -> aux (head::header) tail | _ -> failwith "Too many routers in the middle" in - aux [] (List.map ~f:String.strip raw_msg) + aux [] (List.map String.trim raw_msg) in let handle message = match message with @@ -469,7 +463,7 @@ let run ezfio_filename dataserver = match message with | "elec_coord":: hostname :: pid :: id :: n_str :: rest -> let n = - Int.of_string n_str + int_of_string n_str in let len = n / !walk_num @@ -478,7 +472,7 @@ let run ezfio_filename dataserver = message else List.concat [ [ "elec_coord" ; hostname ; pid ; id ; - Int.to_string (5*len)] ; ( select_n_of ~n:5 ~len rest ) ] + string_of_int (5*len)] ; ( select_n_of ~n:5 ~len rest ) ] | _ -> message in Zmq.Socket.send_all push_socket new_message @@ -508,7 +502,7 @@ let run ezfio_filename dataserver = if (polling.(3) = Some Zmq.Poll.In) then handle_proxy (); done; - List.iter ~f:(fun socket -> + List.iter (fun socket -> Zmq.Socket.set_linger_period socket 1000 ; Zmq.Socket.close socket) [ router_socket ; dealer_socket ; push_socket ; pull_socket ; proxy_socket ] @@ -520,7 +514,7 @@ let run ezfio_filename dataserver = (* Start the status thread and the main thread *) begin try - (List.iter ~f:Thread.join + (List.iter Thread.join [ start_status_thread (); start_log_thread (); start_proxy_thread (); @@ -531,7 +525,7 @@ let run ezfio_filename dataserver = begin print_endline "Trapped error. Waiting 10 seconds..."; status := Status.Stopping; - Time.Span.of_sec 10. |> Time.pause; + Unix.sleep 10 ; raise err end end; diff --git a/ocaml/Qmcchem_info.ml b/ocaml/Qmcchem_info.ml index 6360c4e..678f2dc 100644 --- a/ocaml/Qmcchem_info.ml +++ b/ocaml/Qmcchem_info.ml @@ -1,4 +1,3 @@ -open Core let run ezfio_filename = @@ -8,10 +7,10 @@ let run ezfio_filename = in let prog, argv = qmcchem_info, - [ qmcchem_info ; ezfio_filename ] + [| qmcchem_info ; ezfio_filename |] in ignore @@ - Unix.exec ~prog ~argv () + Unix.exec prog argv let spec = diff --git a/ocaml/Qmcchem_md5.ml b/ocaml/Qmcchem_md5.ml index 4287af9..073cbf7 100644 --- a/ocaml/Qmcchem_md5.ml +++ b/ocaml/Qmcchem_md5.ml @@ -1,4 +1,3 @@ -open Core let run ?c ?d ~l ~update ezfio_filename = @@ -22,7 +21,7 @@ let run ?c ?d ~l ~update ezfio_filename = let filename = filename_of_key key in - Sys.file_exists_exn filename + Sys.file_exists filename in if (update) then @@ -37,27 +36,28 @@ let run ?c ?d ~l ~update ezfio_filename = if (old_key <> new_key) then begin + let prefix = + Filename.concat ezfio_filename "blocks" + in let new_name = - String.concat ~sep:"/" [ ezfio_filename; "blocks"; new_key ] + Filename.concat prefix new_key and old_name = - String.concat ~sep:"/" [ ezfio_filename; "blocks"; old_key ] + Filename.concat prefix old_key in Printf.printf "Renaming %s -> %s\n" old_name new_name; try Sys.rename old_name new_name with | Sys_error _ -> (); let old_name = - String.concat ~sep:"/" [ ezfio_filename; "input"; old_key ] + String.concat "/" [ ezfio_filename; "input"; old_key ] in Printf.printf "Removing %s\n%!" old_name; - try Sys.remove old_name with + try Sys.unlink old_name with | Sys_error _ -> (); end in - let l = - Sys.ls_dir input_directory - in - List.iter l ~f:(fun x -> update_one x) ; + Sys.readdir input_directory + |> Array.iter (fun x -> update_one x) ; Printf.printf "Done\n%!" ; end ; @@ -76,8 +76,8 @@ let run ?c ?d ~l ~update ezfio_filename = match l with | false -> () | true -> - Sys.ls_dir input_directory - |> List.iter ~f:(fun md5 -> + Sys.readdir input_directory + |> Array.iter (fun md5 -> let filename = Filename.concat input_directory md5 in @@ -102,13 +102,12 @@ let run ?c ?d ~l ~update ezfio_filename = | Some other_key -> if (key_is_valid other_key) then let command = - String.concat ~sep:" " + String.concat " " [ "diff" ; "-u" ; "-w" ; (filename_of_key current_md5) ; (filename_of_key other_key) ] in - match (Unix.system command) with - | _ -> () + ignore @@ Unix.system command else failwith ("Error: " ^ other_key ^ " does not exist") in @@ -122,30 +121,39 @@ let run ?c ?d ~l ~update ezfio_filename = | _ -> handle_options () -let spec = - let open Command.Spec in - empty - +> flag "c" (optional string) - ~doc:(" Change to input to ") - +> flag "d" (optional string) - ~doc:(" Show input differences with ") - +> flag "l" no_arg - ~doc:(" List all the saved MD5 keys.") - +> flag "update" no_arg - ~doc:(" Update to the latest MD5 format.") - +> anon ("ezfio_file" %: string) +let () = + let open Command_line in + begin + set_header_doc (Sys.argv.(0) ^ " - QMC=Chem command"); + set_description_doc "Manipulate input MD5 keys"; + [ { short='c' ; long="clear" ; opt=Optional ; + doc="Change to input to " ; + arg=With_arg "" ; }; + { short='d' ; long="diff" ; opt=Optional ; + doc="Show input differences with " ; + arg=With_arg "" ; }; -let command = - Command.basic_spec - ~summary: "Manipulate input MD5 keys" - ~readme:(fun () -> - " -Manipulate input MD5 keys - ") - spec - (fun c d l update ezfio_file () -> run ?c ?d ~l ~update ezfio_file ) + { short='l' ; long="list" ; opt=Optional ; + doc="List all the saved MD5 keys." ; + arg=Without_arg ; }; + + { short='u' ; long="update" ; opt=Optional ; + doc="Update to the latest MD5 format." ; + arg=Without_arg ; }; + + anonymous "EZFIO_DIR" Mandatory "EZFIO directory"; + ] + |> set_specs + end; + + let update = Command_line.get_bool "update" in + let c = Command_line.get "clear" in + let d = Command_line.get "diff" in + let l = Command_line.get_bool "list" in + + run ?c ?d ~l ~update ezfio_file diff --git a/ocaml/Qmcchem_result.ml b/ocaml/Qmcchem_result.ml index 926af1a..bc26cf1 100644 --- a/ocaml/Qmcchem_result.ml +++ b/ocaml/Qmcchem_result.ml @@ -1,4 +1,3 @@ -open Core open Qptypes (** Display a table that can be plotted by gnuplot *) @@ -12,19 +11,17 @@ let display_table ~range property = and data = p.Random_variable.data in let results = - List.map2_exn conv rconv ~f:(fun (val1, err1) (val2,err2) -> (val1, err1, val2, err2)) + List.map2 (fun (val1, err1) (val2,err2) -> (val1, err1, val2, err2)) conv rconv in - List.iter2_exn results data ~f:(fun (val1, err1, val2, err2) block -> + List.iter2 (fun (val1, err1, val2, err2) block -> Printf.printf "%10.6f %10.6f %10.6f %10.6f %10.6f\n" val1 err1 val2 err2 (Sample.to_float block.Block.value) - ) -;; + ) results data (** Display a convergence plot of the requested property *) let display_plot ~range property = print_string ("display_plot "^property^".\n") -;; (** Display a convergence table of the error *) @@ -51,7 +48,7 @@ let display_err_convergence ~range property = | (ave, None) -> () in aux 1 p -;; + (** Display the centered cumulants of a property *) let display_cumulants ~range property = @@ -71,7 +68,7 @@ let display_cumulants ~range property = 1. /. 48. *. cum.(3) *. cum.(3) in Printf.printf "Non-gaussianity = %16.10f\n" n -;; + (** Display a table for the autocovariance of the property *) @@ -81,9 +78,9 @@ let display_autocovariance ~range property = |> Random_variable.of_raw_data ~range in Random_variable.autocovariance p - |> List.iteri ~f:(fun i x -> + |> List.iteri (fun i x -> Printf.printf "%10d %16.10f\n" i x) -;; + (** Display a histogram of the property *) let display_histogram ~range property = @@ -104,8 +101,8 @@ let display_histogram ~range property = let g = Random_variable.GaussianDist.eval ~g in - List.iter histo ~f:( fun (x,y) -> - Printf.printf "%16.10f %16.10f %16.10f\n" x y (g ~x)) + List.iter ( fun (x,y) -> + Printf.printf "%16.10f %16.10f %16.10f\n" x y (g ~x)) histo (* and sigma2 = (Random_variable.centered_cumulants p).(1) @@ -128,7 +125,7 @@ let display_histogram ~range property = |> List.iter ~f:(fun (x,y,g) -> Printf.printf "%16.10f %16.10f %16.10f\n" x y g) *) -;; + @@ -144,7 +141,7 @@ let display_summary ~range = (Property.to_string property) (Random_variable.to_string p) in - List.iter properties ~f:print_property ; + List.iter print_property properties ; let cpu = @@ -159,8 +156,8 @@ let display_summary ~range = let speedup = cpu /. wall in - Printf.printf "%20s : %10.2f x\n" "Speedup" speedup; -;; + Printf.printf "%20s : %10.2f x\n" "Speedup" speedup + let run ?a ?c ?e ?h ?t ?p ?rmin ?rmax ezfio_file = @@ -170,15 +167,15 @@ let run ?a ?c ?e ?h ?t ?p ?rmin ?rmax ezfio_file = let rmin = match rmin with | None -> 0. - | Some x when (x<0) -> failwith "rmin should be >= 0" - | Some x when (x>100) -> failwith "rmin should be <= 100" - | Some x -> Float.of_int x + | Some x when (float_of_string x < 0.) -> failwith "rmin should be >= 0" + | Some x when (float_of_string x > 100.) -> failwith "rmin should be <= 100" + | Some x -> float_of_string x and rmax = match rmax with | None -> 100. - | Some x when (x<0) -> failwith "rmax should be >= 0" - | Some x when (x>100) -> failwith "rmax should be <= 100" - | Some x -> Float.of_int x + | Some x when (float_of_string x < 0.) -> failwith "rmax should be >= 0" + | Some x when (float_of_string x > 100.) -> failwith "rmax should be <= 100" + | Some x -> float_of_string x in let range = (rmin, rmax) @@ -194,54 +191,80 @@ let run ?a ?c ?e ?h ?t ?p ?rmin ?rmax ezfio_file = ] in - let f (x,func) = - match x with - | Some property -> func ~range property - | None -> () - in + List.iter (fun (x,func) -> + match x with + | Some property -> func ~range property + | None -> () + ) l; - List.iter ~f l - ; - - if (List.fold ~init:true ~f:(fun accu x -> + if (List.fold_left (fun accu x -> match x with | (None, _) -> accu && true | (Some _,_) -> false - ) l + ) true l ) then display_summary ~range -;; -let spec = - let open Command.Spec in - empty - +> flag "a" (optional string) - ~doc:"property Display the autcovariance function of the property" - +> flag "c" (optional string) - ~doc:"property Print the centered cumulants of a property" - +> flag "e" (optional string) - ~doc:"property Display the convergence of the error of the property by merging blocks" - +> flag "h" (optional string) - ~doc:"property Display the histogram of the property blocks" - +> flag "p" (optional string) - ~doc:"property Display a convergence plot for a property" - +> flag "rmin" (optional int) - ~doc:"int Lower bound of the percentage of the total weight to consider (default 0)" - +> flag "rmax" (optional int) - ~doc:"int Upper bound of the percentage of the total weight to consider (default 100)" - +> flag "t" (optional string) - ~doc:"property Print a table for the convergence of a property" - +> anon ("ezfio_file" %: string) -;; +let () = + let open Command_line in + begin + set_header_doc (Sys.argv.(0) ^ " - QMC=Chem command"); + set_description_doc "Displays the results computed in an EZFIO directory."; -let command = - Command.basic_spec - ~summary: "Displays the results computed in an EZFIO directory." - ~readme:(fun () -> "Displays the results computed in an EZFIO directory.") - spec - (fun a c e h p rmin rmax t ezfio_file () -> run ?a ?c ?e ?h ?t ?p ?rmin ?rmax ezfio_file ) -;; + [ { short='a' ; long="autocovariance" ; opt=Optional ; + doc="Display the autcovariance function of the property"; + arg=With_arg "" ; }; + + { short='c' ; long="centered-cumulants" ; opt=Optional ; + doc="Print the centered cumulants of a property" ; + arg=With_arg ""; }; + + { short='e' ; long="error" ; opt=Optional ; + doc="Display the convergence of the error of the property by merging blocks"; + arg=With_arg ""; }; + + { short='h' ; long="histogram" ; opt=Optional ; + doc="Display the histogram of the property blocks" ; + arg=With_arg ""; }; + + { short='p' ; long="plot" ; opt=Optional ; + doc="Display a convergence plot for a property"; + arg=With_arg ""; }; + + { short='m' ; long="rmin" ; opt=Optional ; + doc="Lower bound of the percentage of the total weight to consider (default 0)" ; + arg=With_arg ""; }; + + { short='n' ; long="rmax" ; opt=Optional ; + doc="Upper bound of the percentage of the total weight to consider (default 100)" ; + arg=With_arg ""; }; + + { short='t' ; long="table" ; opt=Optional ; + doc="Print a table for the convergence of a property" ; + arg=With_arg ""; }; + + anonymous "EZFIO_DIR" Mandatory "EZFIO directory"; + ] + + |> set_specs ; + end; + + let a = Command_line.get "autocovariance" in + let c = Command_line.get "centered-cumulants" in + let e = Command_line.get "error" in + let h = Command_line.get "histogram" in + let t = Command_line.get "table" in + let p = Command_line.get "plot" in + let rmin = Command_line.get "m" in + let rmax = Command_line.get "n" in + + let ezfio_file = + match Command_line.anon_args () with + | ezfio_file :: [] -> ezfio_file + | _ -> (Command_line.help () ; failwith "Inconsistent command line") + in + run ?a ?c ?e ?h ?t ?p ?rmin ?rmax ezfio_file diff --git a/ocaml/Qmcchem_stop.ml b/ocaml/Qmcchem_stop.ml index 595084e..8204283 100644 --- a/ocaml/Qmcchem_stop.ml +++ b/ocaml/Qmcchem_stop.ml @@ -1,5 +1,3 @@ -open Core - let run ezfio_filename = Qputils.set_ezfio_filename ezfio_filename; diff --git a/ocaml/qptypes_generator.ml b/ocaml/qptypes_generator.ml index 04691f8..04d0c58 100644 --- a/ocaml/qptypes_generator.ml +++ b/ocaml/qptypes_generator.ml @@ -344,12 +344,14 @@ match msg with " ] @ ) @ [" | x -> failwith (x^\" : Unknown EZFIO function\")\n;;"] in String.concat "\n" result + |> print_endline (** Main *) let () = parse_input input_data ; parse_input_ezfio input_ezfio; - print_endline untouched + print_endline untouched; + create_ezfio_handler ()