(********************************************************************) (* Single process *) (********************************************************************) let run_sequential f stream = let rec next _ = try let task = Stream.next stream in Some (f task) with Stream.Failure -> None in Stream.from next (********************************************************************) (* Server side *) (********************************************************************) type task_id = int let debug _s = if true then () else Printf.eprintf "%d : %s : %s\n%!" (Mpi.comm_rank Mpi.comm_world) (Unix.gettimeofday () |> string_of_float) _s type status = | Initializing | Running | Done let run_parallel_server ~comm ~ordered stream = (* [status.(rank)] is [Initializing] if rank has not yet obtained a task, [Running] if rank is running a task and [Done] if [rank] is waiting at the barrier. *) let status = Array.make (Mpi.comm_size comm) Initializing in status.(0) <- Done; (** Fetches a result coming from any client. Returns the result as a (task_id * 'a) option and the rank of the client as an int. *) let fetch_result () : (task_id * 'a) option * int = let (message, rank, _tag) : (task_id * 'a) option * int * int = debug "Before receive_status"; (* Avoid busy receive *) let rec wait_and_receive () = match Mpi.iprobe Mpi.any_source Mpi.any_tag comm with | Some _ -> Mpi.receive_status Mpi.any_source Mpi.any_tag comm | None -> (Unix.sleepf 0.001 ; (wait_and_receive [@tailcall]) ()) in wait_and_receive () in debug @@ Printf.sprintf "After receive_status %d %d" rank _tag; message, rank in (** Pops a task from the stream and sends it to a client. If no task is available, sends [None]. The return value is a boolean telling if the stream is empty. *) let send_task (client_rank : int) : unit = let task = try let task_id = Stream.count stream in let element = Stream.next stream in Some (task_id, element) with Stream.Failure -> None in debug @@ Printf.sprintf "Sending to %d\n" client_rank; Mpi.send task client_rank 0 comm; debug @@ Printf.sprintf "Sent to %d : %s\n" client_rank (if task = None then "None" else "Some"); if task <> None then status.(client_rank) <- Running else status.(client_rank) <- Done in let all_done () = try Array.iter (fun i -> if i <> Done then raise Exit) status; true with Exit -> false in (** Main loop. While [n_todo > 0], fetch a result from a client and send it back a new task. If no more tasks are available, send [None]. If the result of the task is None, loop back into [get_result]. *) let rec get_result () : (task_id * 'a ) option = if all_done () then begin debug "Before barrier"; Mpi.barrier comm; debug "After barrier"; None end else begin let message, rank = fetch_result () in send_task rank; match message with | None -> get_result () | Some (task_id, result) -> Some (task_id, result) end in (** Function from which the output stream is built. *) let f = if not ordered then (* If [ordered] is false, the values are popped out whenever they arrive from the clients. *) fun _ -> match get_result () with | Some (_, result) -> Some result | None -> None else (* If [ordered] is true, out into the stream when the next expected task has been computed. *) let buffer = (* buffer of finished tasks with a task_id greater than the current result_id. It allows to put back the results in the correct order. *) Hashtbl.create 67 in fun i -> begin match Hashtbl.find_opt buffer i with | Some x -> begin Hashtbl.remove buffer i; Some x end | None -> let rec loop () = match get_result () with | None -> None | Some (task_id, result) -> if task_id = i then Some result else (Hashtbl.add buffer task_id result; (loop [@tailcall]) () ) in loop () end in Stream.from f (********************************************************************) (* Client side *) (********************************************************************) let run_parallel_client ~comm f = (** Send a first message containing [None] to request a task *) debug "Before send None"; Mpi.send None 0 0 comm; debug "After send None"; (** Main loop. Receive a message. If the message is [None], there are no more tasks to compute and we can go to the barrier. If the message is not [None], apply [f] to the task, send the result back to the server and loop. *) let rec run () = let message = debug "Before receive"; Mpi.receive 0 0 comm in debug "After receive" ; match message with | None -> ( debug "Before barrier"; Mpi.barrier comm; debug "After barrier";) | Some (task_id, task) -> let result = f task in begin debug @@ Printf.sprintf "Before send task_id %d" task_id ; Mpi.send (Some (task_id, result)) 0 0 comm; debug @@ Printf.sprintf "After send task_id %d" task_id ; (run [@tailcall]) () end in run (); (* The output is an empty stream so that the type of run_parallel_client is the same as the type of the server function. *) Stream.of_list [] let run_parallel ~comm ~ordered f stream = match Mpi.comm_rank comm with | 0 -> run_parallel_server ~comm ~ordered stream | _ -> run_parallel_client ~comm f let nested = ref false let run ?(ordered=true) ?(comm=Mpi.comm_world) ~f stream = if !nested then begin let message = "Nested parallel regions are not supported by Farm.ml" in Printf.eprintf "%s\n%!" message ; failwith message end; nested := true; let result = match Mpi.comm_size comm with | 1 -> run_sequential f stream | _ -> run_parallel ~comm ~ordered f stream in nested := false; result