diff --git a/Parallel/Farm.ml b/Parallel/Farm.ml index ee8e216..f3aa656 100644 --- a/Parallel/Farm.ml +++ b/Parallel/Farm.ml @@ -25,20 +25,24 @@ let run_parallel_server stream = in - let send_task client_rank task_list = - let task, rest = - match task_list with - | [] -> None , [] - | task :: rest -> Some task, rest + let send_task client_rank = + let task = + try Some (Stream.next stream) + with Stream.Failure -> None in - Mpi.send task client_rank 0 Mpi.comm_world; - rest + Mpi.send task client_rank 0 Mpi.comm_world in - let rec run result_list task_list = function + let rec run result_list n_todo = + let n_todo = + match Stream.peek stream with + | None -> n_todo-1 + | _ -> n_todo + in + match n_todo with | 0 -> result_list - | n_todo -> + | _ -> begin let result, client = fetch_result () in let new_result_list = @@ -46,20 +50,14 @@ let run_parallel_server stream = | None -> result_list | Some result -> result :: result_list in - let new_tasklist = send_task client task_list in - run new_result_list new_tasklist (n_todo-1) + send_task client; + run new_result_list n_todo end in let result = - let task_list = - let rec to_list accu = - try to_list (Stream.next stream :: accu) - with Stream.Failure -> List.rev accu - in to_list [] - in - let n_todo = List.length task_list in - run [] task_list (n_todo + Parallel.size - 1) + let n_todo = Mpi.comm_size Mpi.comm_world in + run [] n_todo |> Stream.of_list in Mpi.barrier Mpi.comm_world; diff --git a/Parallel/Farm.mli.bak b/Parallel/Farm.mli similarity index 72% rename from Parallel/Farm.mli.bak rename to Parallel/Farm.mli index 2130719..d9a155c 100644 --- a/Parallel/Farm.mli.bak +++ b/Parallel/Farm.mli @@ -4,8 +4,7 @@ The input is a stream of input data, and the output is a stream of data. *) -val run_parallel : ('a -> 'b) -> 'a Stream.t -> 'b Stream.t -val run_sequential : ('a -> 'b) -> 'a Stream.t -> 'b Stream.t +val run : ('a -> 'b) -> 'a Stream.t -> 'b Stream.t (** Run the [worker_function] on every process by popping elements from the input stream, and put the results on the output stream. The order of the output is consistent with the order of the input. *) diff --git a/run_parallel.ml b/run_parallel.ml index ca60d5b..e67818d 100644 --- a/run_parallel.ml +++ b/run_parallel.ml @@ -26,6 +26,6 @@ let () = let input = Stream.of_list [ (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10) ] in - Farm.run_parallel f input + Farm.run f input |> Stream.iter (fun (x,y) -> Printf.printf "%d %d\n" x y)