(* Single process function *) let run_sequential f stream = let rec next _ = try let task = Stream.next stream in Some (f task) with Stream.Failure -> None in Stream.from next (* Multi-process functions *) type task_id = int (* Server side *) let run_parallel_server ~ordered stream = let fetch_result () : (task_id * 'a) option * int = let (message, rank, _tag) : (task_id * 'a) option * int * int = Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world in message, rank in let send_task (client_rank : int) : unit = let task = try let task_id = Stream.count stream in let element = Stream.next stream in Some (task_id, element) with Stream.Failure -> None in Mpi.send task client_rank 0 Mpi.comm_world in (* n_todo is required for clean termination. It is the number of tasks to wait for when the input stream is empty. *) let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in let rec get_result () : (task_id * 'a ) option = begin match Stream.peek stream with | None -> decr n_todo | _ -> () end; match !n_todo with | 0 -> begin Mpi.barrier Mpi.comm_world; None end | _ -> begin let message, rank = fetch_result () in send_task rank; match message with | None -> get_result () | Some (task_id, result) -> Some (task_id, result) end in let f = if ordered then (* buffer of finished tasks with a task_id greater than the current result_id. It allows to put back the results in the correct order. *) let buffer = Hashtbl.create 67 in fun i -> begin match Hashtbl.find_opt buffer i with | Some x -> begin Hashtbl.remove buffer i; Some x end | None -> let rec loop () = match get_result () with | None -> None | Some (task_id, result) -> if task_id = i then Some result else (Hashtbl.add buffer task_id result; loop () ) in loop () end else fun _ -> match get_result () with | Some (_, result) -> Some result | None -> None in Stream.from f (** Client side *) let run_parallel_client f = Mpi.send None 0 0 Mpi.comm_world; let rec run () = let message = Mpi.receive 0 Mpi.any_tag Mpi.comm_world in match message with | None -> Mpi.barrier Mpi.comm_world | Some (task_id, task) -> let result = f task in begin Mpi.send (Some (task_id, result)) 0 0 Mpi.comm_world; run () end in run (); Stream.of_list [] let run_parallel ~ordered f stream = match Mpi.comm_rank Mpi.comm_world with | 0 -> run_parallel_server ~ordered stream | _ -> run_parallel_client f let run ?(ordered=true) ~f stream = match Mpi.comm_size Mpi.comm_world with | 1 -> run_sequential f stream | _ -> run_parallel ~ordered f stream