From 758c65e31ca466da45c7592c6cbbe4f00a6b2501 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Mon, 22 Oct 2018 13:12:43 +0200 Subject: [PATCH] Farm works --- Parallel/Farm.ml | 58 ++++++++++++++++++------------------------------ 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/Parallel/Farm.ml b/Parallel/Farm.ml index e081856..ee8e216 100644 --- a/Parallel/Farm.ml +++ b/Parallel/Farm.ml @@ -1,29 +1,26 @@ +(* Single process function *) let run_sequential f stream = - (* Stream -> f -> Stream *) - let rec next _ = - try - let task = Stream.next stream in - Some (f task) - with Stream.Failure -> None in - Stream.from next + let rec next _ = + try + let task = Stream.next stream in + Some (f task) + with Stream.Failure -> None in + Stream.from next +(* Multi-process functions *) -let debug msg = Printf.eprintf "%d %s\n%!" Parallel.rank msg - -(** Server side *) +(* Server side *) let run_parallel_server stream = let fetch_result () = - let result, rank, _ = + let result, rank, _tag = Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world in -let a, b = match result with | Some t -> t | None -> (Parallel.rank, (-1)) in -Printf.eprintf "Master: Fetch result %d,%d from %d\n%!" a b rank; result, rank in @@ -34,8 +31,6 @@ Printf.eprintf "Master: Fetch result %d,%d from %d\n%!" a b rank; | [] -> None , [] | task :: rest -> Some task, rest in -let a, b = match task with | Some t -> t | None -> 0, 0 in -Printf.eprintf "Master: Send task %d,%d to %d \n%!" a b client_rank; Mpi.send task client_rank 0 Mpi.comm_world; rest in @@ -45,9 +40,12 @@ Printf.eprintf "Master: Send task %d,%d to %d \n%!" a b client_rank; | 0 -> result_list | n_todo -> begin -Printf.eprintf "n_todo = %d\n%!" n_todo; let result, client = fetch_result () in - let new_result_list = result :: result_list in + let new_result_list = + match result with + | None -> result_list + | Some result -> result :: result_list + in let new_tasklist = send_task client task_list in run new_result_list new_tasklist (n_todo-1) end @@ -55,56 +53,38 @@ Printf.eprintf "n_todo = %d\n%!" n_todo; let result = let task_list = - (* let rec to_list accu = try to_list (Stream.next stream :: accu) with Stream.Failure -> List.rev accu in to_list [] - *) - [] in let n_todo = List.length task_list in run [] task_list (n_todo + Parallel.size - 1) |> Stream.of_list in -debug "Master: barrier"; Mpi.barrier Mpi.comm_world; -debug "Master: barrier ok"; result (** Client side *) let run_parallel_client f = - Unix.sleep (Parallel.rank); Mpi.send None 0 0 Mpi.comm_world; let rec run () = let task = Mpi.receive 0 Mpi.any_tag Mpi.comm_world in -let a, b = match task with | Some t -> t | None -> 0, 0 in -Printf.eprintf "Client %d: Received task %d,%d\n%!" Parallel.rank a b; -debug "task received"; match task with - | None -> -begin -Printf.eprintf "Client %d: barrier\n%!" Parallel.rank; - Mpi.barrier Mpi.comm_world; -Printf.eprintf "Client %d: barrier ok\n%!" Parallel.rank; -end + | None -> Mpi.barrier Mpi.comm_world | Some task -> let result = f task in begin - Unix.sleep 1; -let a, b = result in -Printf.eprintf "Client %d: Send result %d %d\n%!" Parallel.rank a b; Mpi.send (Some result) 0 0 Mpi.comm_world; run () end in run (); -Printf.eprintf "Client %d: barrier ok" Parallel.rank; Stream.of_list [] @@ -114,3 +94,9 @@ let run_parallel f stream = | 0 -> run_parallel_server stream | _ -> run_parallel_client f + +let run f stream = + match Mpi.comm_size Mpi.comm_world with + | 1 -> run_sequential f stream + | _ -> run_parallel f stream +