let run_sequential f stream = (* Stream -> f -> Stream *) let rec next _ = try let task = Stream.next stream in Some (f task) with Stream.Failure -> None in Stream.from next let debug msg = Printf.eprintf "%d %s\n%!" Parallel.rank msg (** Server side *) let run_parallel_server stream = let fetch_result () = let result, rank, _ = Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world in let a, b = match result with | Some t -> t | None -> (Parallel.rank, (-1)) in Printf.eprintf "Master: Fetch result %d,%d from %d\n%!" a b rank; result, rank in let send_task client_rank task_list = let task, rest = match task_list with | [] -> None , [] | task :: rest -> Some task, rest in let a, b = match task with | Some t -> t | None -> 0, 0 in Printf.eprintf "Master: Send task %d,%d to %d \n%!" a b client_rank; Mpi.send task client_rank 0 Mpi.comm_world; rest in let rec run result_list task_list = function | 0 -> result_list | n_todo -> begin Printf.eprintf "n_todo = %d\n%!" n_todo; let result, client = fetch_result () in let new_result_list = result :: result_list in let new_tasklist = send_task client task_list in run new_result_list new_tasklist (n_todo-1) end in let result = let task_list = (* let rec to_list accu = try to_list (Stream.next stream :: accu) with Stream.Failure -> List.rev accu in to_list [] *) [] in let n_todo = List.length task_list in run [] task_list (n_todo + Parallel.size - 1) |> Stream.of_list in debug "Master: barrier"; Mpi.barrier Mpi.comm_world; debug "Master: barrier ok"; result (** Client side *) let run_parallel_client f = Unix.sleep (Parallel.rank); Mpi.send None 0 0 Mpi.comm_world; let rec run () = let task = Mpi.receive 0 Mpi.any_tag Mpi.comm_world in let a, b = match task with | Some t -> t | None -> 0, 0 in Printf.eprintf "Client %d: Received task %d,%d\n%!" Parallel.rank a b; debug "task received"; match task with | None -> begin Printf.eprintf "Client %d: barrier\n%!" Parallel.rank; Mpi.barrier Mpi.comm_world; Printf.eprintf "Client %d: barrier ok\n%!" Parallel.rank; end | Some task -> let result = f task in begin Unix.sleep 1; let a, b = result in Printf.eprintf "Client %d: Send result %d %d\n%!" Parallel.rank a b; Mpi.send (Some result) 0 0 Mpi.comm_world; run () end in run (); Printf.eprintf "Client %d: barrier ok" Parallel.rank; Stream.of_list [] let run_parallel f stream = match Mpi.comm_rank Mpi.comm_world with | 0 -> run_parallel_server stream | _ -> run_parallel_client f