(* Single process function *) let run_sequential f stream = let rec next _ = try let task = Stream.next stream in Some (f task) with Stream.Failure -> None in Stream.from next (* Multi-process functions *) (* Server side *) let run_parallel_server stream = let fetch_result () = let result, rank, _tag = Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world in result, rank in let send_task client_rank = let task = try Some (Stream.next stream) with Stream.Failure -> None in Mpi.send task client_rank 0 Mpi.comm_world in let rec run result_list n_todo = let n_todo = match Stream.peek stream with | None -> n_todo-1 | _ -> n_todo in match n_todo with | 0 -> result_list | _ -> begin let result, client = fetch_result () in let new_result_list = match result with | None -> result_list | Some result -> result :: result_list in send_task client; run new_result_list n_todo end in let result = let n_todo = Mpi.comm_size Mpi.comm_world in run [] n_todo |> Stream.of_list in Mpi.barrier Mpi.comm_world; result (** Client side *) let run_parallel_client f = Mpi.send None 0 0 Mpi.comm_world; let rec run () = let task = Mpi.receive 0 Mpi.any_tag Mpi.comm_world in match task with | None -> Mpi.barrier Mpi.comm_world | Some task -> let result = f task in begin Mpi.send (Some result) 0 0 Mpi.comm_world; run () end in run (); Stream.of_list [] let run_parallel f stream = match Mpi.comm_rank Mpi.comm_world with | 0 -> run_parallel_server stream | _ -> run_parallel_client f let run f stream = match Mpi.comm_size Mpi.comm_world with | 1 -> run_sequential f stream | _ -> run_parallel f stream