diff --git a/Parallel/Farm.ml b/Parallel/Farm.ml index f3aa656..a292289 100644 --- a/Parallel/Farm.ml +++ b/Parallel/Farm.ml @@ -17,15 +17,15 @@ let run_sequential f stream = let run_parallel_server stream = - let fetch_result () = + let fetch_result () : 'a option * int = let result, rank, _tag = Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world in - result, rank + result, rank in - let send_task client_rank = + let send_task (client_rank : int) : unit = let task = try Some (Stream.next stream) with Stream.Failure -> None @@ -34,34 +34,33 @@ let run_parallel_server stream = in - let rec run result_list n_todo = - let n_todo = - match Stream.peek stream with - | None -> n_todo-1 - | _ -> n_todo - in - match n_todo with - | 0 -> result_list - | _ -> + let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in + + let f i = + let rec get_result () = begin - let result, client = fetch_result () in - let new_result_list = + match Stream.peek stream with + | None -> decr n_todo + | _ -> () + end; + match !n_todo with + | 0 -> + begin + Mpi.barrier Mpi.comm_world; + None + end + | _ -> + begin + let result, client = fetch_result () in + send_task client; match result with - | None -> result_list - | Some result -> result :: result_list - in - send_task client; - run new_result_list n_todo - end + | None -> get_result () + | _ -> result + end + in + get_result () in - - let result = - let n_todo = Mpi.comm_size Mpi.comm_world in - run [] n_todo - |> Stream.of_list - in - Mpi.barrier Mpi.comm_world; - result + Stream.from f (** Client side *) diff --git a/Parallel/Farm.mli b/Parallel/Farm.mli deleted file mode 100644 index d9a155c..0000000 --- a/Parallel/Farm.mli +++ /dev/null @@ -1,12 +0,0 @@ -(** The Farm skeleton, similar to SklMl. - -The input is a stream of input data, and the output is a stream of data. -*) - - -val run : ('a -> 'b) -> 'a Stream.t -> 'b Stream.t -(** Run the [worker_function] on every process by popping elements from the - input stream, and put the results on the output stream. The order of the - output is consistent with the order of the input. *) - - diff --git a/run_parallel.ml b/run_parallel.ml index e67818d..ff39b7f 100644 --- a/run_parallel.ml +++ b/run_parallel.ml @@ -26,6 +26,8 @@ let () = let input = Stream.of_list [ (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10) ] in - Farm.run f input - |> Stream.iter (fun (x,y) -> Printf.printf "%d %d\n" x y) + let stream = + Farm.run f input + in + Stream.iter (fun (x,y) -> Printf.printf "%d %d\n%!" x y) stream