10
1
mirror of https://gitlab.com/scemama/QCaml.git synced 2024-07-05 19:05:50 +02:00

Ordering is preserved

This commit is contained in:
Anthony Scemama 2018-10-22 19:10:38 +02:00
parent b95b80000c
commit 60b4bc6713
2 changed files with 78 additions and 37 deletions

View File

@ -13,52 +13,90 @@ let run_sequential f stream =
(* Multi-process functions *) (* Multi-process functions *)
type task_id = int
(* Server side *) (* Server side *)
let run_parallel_server stream = let run_parallel_server ~ordered stream =
let fetch_result () : 'a option * int = let fetch_result () : (task_id * 'a) option * int =
let result, rank, _tag = let (message, rank, _tag) : (task_id * 'a) option * int * int =
Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world Mpi.receive_status Mpi.any_source Mpi.any_tag Mpi.comm_world
in in
result, rank message, rank
in in
let send_task (client_rank : int) : unit = let send_task (client_rank : int) : unit =
let task = let task =
try Some (Stream.next stream) try
let task_id = Stream.count stream in
let element = Stream.next stream in
Some (task_id, element)
with Stream.Failure -> None with Stream.Failure -> None
in in
Mpi.send task client_rank 0 Mpi.comm_world Mpi.send task client_rank 0 Mpi.comm_world
in in
(* n_todo is required for clean termination. It is the
number of tasks to wait for when the input stream is
empty. *)
let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in let n_todo = ref (Mpi.comm_size Mpi.comm_world ) in
let f i = (* buffer of finished tasks with a task_id greater than the
let rec get_result () = current result_id. It allows to put back the results in
the correct order.
*)
let rec get_result () : (task_id * 'a ) option =
begin
match Stream.peek stream with
| None -> decr n_todo
| _ -> ()
end;
match !n_todo with
| 0 ->
begin begin
match Stream.peek stream with Mpi.barrier Mpi.comm_world;
| None -> decr n_todo None
| _ -> () end
end; | _ ->
match !n_todo with begin
| 0 -> let message, rank = fetch_result () in
begin send_task rank;
Mpi.barrier Mpi.comm_world; match message with
None | None -> get_result ()
end | Some (task_id, result) -> Some (task_id, result)
| _ -> end
begin in
let result, client = fetch_result () in
send_task client; let f =
match result with if ordered then
| None -> get_result () let buffer = Hashtbl.create 67 in
| _ -> result fun i ->
end begin
in match Hashtbl.find_opt buffer i with
get_result () | Some x ->
begin
Hashtbl.remove buffer i;
Some x
end
| None ->
let rec loop () =
match get_result () with
| None -> None
| Some (task_id, result) ->
if task_id = i then Some result
else (Hashtbl.add buffer task_id result; loop () )
in loop ()
end
else
fun _ ->
match get_result () with
| Some (_, result) -> Some result
| None -> None
in in
Stream.from f Stream.from f
@ -68,16 +106,16 @@ let run_parallel_client f =
Mpi.send None 0 0 Mpi.comm_world; Mpi.send None 0 0 Mpi.comm_world;
let rec run () = let rec run () =
let task = let message =
Mpi.receive 0 Mpi.any_tag Mpi.comm_world Mpi.receive 0 Mpi.any_tag Mpi.comm_world
in in
match task with match message with
| None -> Mpi.barrier Mpi.comm_world | None -> Mpi.barrier Mpi.comm_world
| Some task -> | Some (task_id, task) ->
let result = f task in let result = f task in
begin begin
Mpi.send (Some result) 0 0 Mpi.comm_world; Mpi.send (Some (task_id, result)) 0 0 Mpi.comm_world;
run () run ()
end end
in in
@ -86,14 +124,14 @@ let run_parallel_client f =
let run_parallel f stream = let run_parallel ~ordered f stream =
match Mpi.comm_rank Mpi.comm_world with match Mpi.comm_rank Mpi.comm_world with
| 0 -> run_parallel_server stream | 0 -> run_parallel_server ~ordered stream
| _ -> run_parallel_client f | _ -> run_parallel_client f
let run f stream = let run ?(ordered=true) ~f stream =
match Mpi.comm_size Mpi.comm_world with match Mpi.comm_size Mpi.comm_world with
| 1 -> run_sequential f stream | 1 -> run_sequential f stream
| _ -> run_parallel f stream | _ -> run_parallel ~ordered f stream

View File

@ -24,10 +24,13 @@ let v = Parallel.Vec.init 47 (fun i -> float_of_int i) in
let () = let () =
let f (a,b) = (Parallel.rank, a+b) in let f (a,b) = (Parallel.rank, a+b) in
let input = Stream.of_list let input = Stream.of_list
[ (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10) ] [ (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10)
; (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10)
; (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10)
; (1,2) ; (3,4) ; (5,6) ; (7,8) ; (9,10) ]
in in
let stream = let stream =
Farm.run f input Farm.run ~f input
in in
Stream.iter (fun (x,y) -> Printf.printf "%d %d\n%!" x y) stream Stream.iter (fun (x,y) -> Printf.printf "%d %d\n%!" x y) stream