From 4e65d70a520aec7c48262e1afc4531973371b0a5 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Thu, 28 Feb 2019 22:21:13 +0100 Subject: [PATCH] Improved parallel --- CI/CI.ml | 2 +- Parallel_mpi/Farm.ml | 163 ++++++++++++++++++++++++++++++------------- 2 files changed, 117 insertions(+), 48 deletions(-) diff --git a/CI/CI.ml b/CI/CI.ml index f670f53..5e45724 100644 --- a/CI/CI.ml +++ b/CI/CI.ml @@ -91,7 +91,7 @@ let create_matrix_spin f det_space = (** Update function when ki and kj are connected *) let update i j ki kj = let x = f ki kj in - if x <> 0. then + if abs_float x < Constants.epsilon then result.(i) <- (j, x) :: result.(i) ; in diff --git a/Parallel_mpi/Farm.ml b/Parallel_mpi/Farm.ml index 4e9e86a..15965e0 100644 --- a/Parallel_mpi/Farm.ml +++ b/Parallel_mpi/Farm.ml @@ -1,4 +1,7 @@ -(* Single process function *) +(********************************************************************) +(* Single process *) +(********************************************************************) + let run_sequential f stream = let rec next _ = @@ -10,24 +13,47 @@ let run_sequential f stream = Stream.from next -(* Multi-process functions *) + + +(********************************************************************) +(* Server side *) +(********************************************************************) type task_id = int +let debug _s = + () + (* + Printf.eprintf "%d : %s : %s\n%!" (Mpi.comm_rank Mpi.comm_world) (Unix.gettimeofday () |> string_of_float) _s + *) -(* Server side *) let run_parallel_server ~ordered stream = + (* n_running is the number of running tasks, required for clean + termination. It is the number of tasks to wait for when the input stream + is empty. + *) + let n_running = ref 0 in + (** Fetches a result coming from any client. Returns the result + as a (task_id * 'a) option and the rank of the client as an int. + *) let fetch_result () : (task_id * 'a) option * int = let (message, rank, _tag) : (task_id * 'a) option * int * int = + debug "Before receive_status"; Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world in + debug @@ Printf.sprintf "After receive_status %d %d" rank _tag; + decr n_running; message, rank in - let send_task (client_rank : int) : unit = + (** Pops a task from the stream and sends it to a client. + If no task is available, sends [None]. + The return value is a boolean telling if the stream is empty. + *) + let send_task (client_rank : int) : bool = let task = try let task_id = Stream.count stream in @@ -35,46 +61,69 @@ let run_parallel_server ~ordered stream = Some (task_id, element) with Stream.Failure -> None in - Mpi.send task client_rank 0 Mpi.comm_world + debug @@ Printf.sprintf "Sending to %d\n" client_rank; + Mpi.send task client_rank 0 Mpi.comm_world; + debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank + (if task = None then "None" else "Some") + ; + let running = task <> None in + if running then incr n_running; + running in - (* n_todo is required for clean termination. It is the - number of tasks to wait for when the input stream is - empty. *) - let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in - let rec get_result () : (task_id * 'a ) option = - begin - match Stream.peek stream with - | None -> decr n_todo - | _ -> () - end; - match !n_todo with - | 0 -> - begin - Mpi.barrier Mpi.comm_world; - None - end - | _ -> - begin - let message, rank = fetch_result () in - send_task rank; - match message with - | None -> get_result () - | Some (task_id, result) -> Some (task_id, result) - end + + (** Main loop. + While [n_todo > 0], fetch a result from a client + and send it back a new task. If no more tasks are + available, send [None]. If the result of the task + is None, loop back into [get_result]. + TODO : bug is probably here... + *) + let rec get_result () : (task_id * 'a ) option = + let message, rank = fetch_result () in + let iterate = send_task rank in + match iterate, message with + | true , None -> (incr n_running ; get_result ()) + | true , Some (task_id, result) -> Some (task_id, result) + | false, Some (task_id, result) -> + if !n_running > 0 then + Some (task_id, result) + else + ( debug "Before barrier"; + Mpi.barrier Mpi.comm_world; + debug "After barrier"; + None;) + | false, None -> assert false in + + (** Function from which the output stream is built. *) let f = - if ordered then - - (* buffer of finished tasks with a task_id greater than the - current result_id. It allows to put back the results in - the correct order. + if not ordered then + (* If [ordered] is false, the values are popped out whenever they arrive + from the clients. *) - let buffer = Hashtbl.create 67 in + + fun _ -> + match get_result () with + | Some (_, result) -> Some result + | None -> None + + else + (* If [ordered] is true, out into the stream when the next expected task has + been computed. + *) + + let buffer = + (* buffer of finished tasks with a task_id greater than the + current result_id. It allows to put back the results in + the correct order. + *) + Hashtbl.create 67 + in fun i -> begin @@ -94,36 +143,56 @@ let run_parallel_server ~ordered stream = in loop () end - else - - fun _ -> - match get_result () with - | Some (_, result) -> Some result - | None -> None - in Stream.from f -(** Client side *) -let run_parallel_client f = - Mpi.send None 0 0 Mpi.comm_world; + +(********************************************************************) +(* Client side *) +(********************************************************************) + +let run_parallel_client f = + + (** Send a first message containing [None] to request a task *) + debug "Before send None"; + Mpi.send None 0 0 Mpi.comm_world; + debug "After send None"; + + (** Main loop. + Receive a message. If the message is [None], there are no more + tasks to compute and we can go to the barrier. + If the message is not [None], apply [f] to the task, send the + result back to the server and loop. + *) let rec run () = + let message = - Mpi.receive 0 Mpi.any_tag Mpi.comm_world + debug "Before receive"; + Mpi.receive 0 0 Mpi.comm_world in + debug "After receive" ; match message with - | None -> Mpi.barrier Mpi.comm_world + | None -> + ( debug "Before barrier"; + Mpi.barrier Mpi.comm_world; + debug "After barrier";) | Some (task_id, task) -> let result = f task in begin + debug @@ Printf.sprintf "Before send task_id %d" task_id ; Mpi.send (Some (task_id, result)) 0 0 Mpi.comm_world; + debug @@ Printf.sprintf "After send task_id %d" task_id ; run () end in run (); + + (* The output is an empty stream so that the type of run_parallel_client + is the same as the type of the server function. + *) Stream.of_list []