From d7b40e6d1fc4056b4d996b852493b153f9089ece Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Thu, 13 Oct 2016 12:32:22 +0200 Subject: [PATCH] Removed core from queuing_system --- ocaml/Id.ml | 24 +++---- ocaml/Id.mli | 23 +++++++ ocaml/Queuing_system.ml | 143 ++++++++++++++++++++++++++++----------- ocaml/Queuing_system.mli | 63 +++++++++++++++++ ocaml/TaskServer.ml | 6 +- 5 files changed, 201 insertions(+), 58 deletions(-) create mode 100644 ocaml/Id.mli create mode 100644 ocaml/Queuing_system.mli diff --git a/ocaml/Id.ml b/ocaml/Id.ml index 660c3452..3e616922 100644 --- a/ocaml/Id.ml +++ b/ocaml/Id.ml @@ -1,26 +1,22 @@ -open Core.Std - -module Id : sig - type t - val of_int : int -> t - val to_int : t -> int - val of_string : string -> t - val to_string : t -> string - val increment : t -> t - val decrement : t -> t -end -= struct +module Id = struct type t = int + let of_int x = assert (x>0); x + let to_int x = x + let of_string x = - Int.of_string x + int_of_string x |> of_int + let to_string x = - Int.to_string x + string_of_int x + let increment x = x + 1 let decrement x = x - 1 + + let compare = compare end module Task = struct diff --git a/ocaml/Id.mli b/ocaml/Id.mli new file mode 100644 index 00000000..02d1efca --- /dev/null +++ b/ocaml/Id.mli @@ -0,0 +1,23 @@ +module Id : + sig + type t + val of_int : int -> t + val to_int : t -> int + val of_string : string -> t + val to_string : t -> string + val increment : t -> t + val decrement : t -> t + val compare : t -> t -> int + end + + +module Task : + sig + include (module type of Id) + end + + +module Client : + sig + include (module type of Id) + end diff --git a/ocaml/Queuing_system.ml b/ocaml/Queuing_system.ml index 29a60538..5ee263a1 100644 --- a/ocaml/Queuing_system.ml +++ b/ocaml/Queuing_system.ml @@ -1,27 +1,33 @@ -open Core.Std -open Qptypes - +module RunningMap = Map.Make (Id.Task) +module TasksMap = Map.Make (Id.Task) +module ClientsSet = Set.Make (Id.Client) type t = { queued : Id.Task.t list ; - running : (Id.Task.t, Id.Client.t) Map.Poly.t ; - tasks : (Id.Task.t, string) Map.Poly.t; - clients : Id.Client.t Set.Poly.t; + running : Id.Client.t RunningMap.t; + tasks : string TasksMap.t; + clients : ClientsSet.t; next_client_id : Id.Client.t; next_task_id : Id.Task.t; - number_of_queued : int; + number_of_queued : int; + number_of_running : int; + number_of_tasks : int; + number_of_clients : int; } let create () = { queued = [] ; - running = Map.Poly.empty ; - tasks = Map.Poly.empty; - clients = Set.Poly.empty; + running = RunningMap.empty ; + tasks = TasksMap.empty; + clients = ClientsSet.empty; next_client_id = Id.Client.of_int 1; next_task_id = Id.Task.of_int 1; - number_of_queued = 0; + number_of_queued = 0; + number_of_running = 0; + number_of_tasks = 0; + number_of_clients = 0; } @@ -33,9 +39,10 @@ let add_task ~task q = in { q with queued = task_id :: q.queued ; - tasks = Map.add q.tasks ~key:task_id ~data:task ; + tasks = TasksMap.add task_id task q.tasks; next_task_id = Id.Task.increment task_id ; number_of_queued = q.number_of_queued + 1; + number_of_tasks = q.number_of_tasks + 1; } @@ -46,8 +53,9 @@ let add_client q = q.next_client_id in { q with - clients = Set.add q.clients client_id; + clients = ClientsSet.add client_id q.clients; next_client_id = Id.Client.increment client_id; + number_of_clients = q.number_of_clients + 1; }, client_id @@ -55,47 +63,57 @@ let pop_task ~client_id q = let { queued ; running ; _ } = q in - assert (Set.mem q.clients client_id); + assert (ClientsSet.mem client_id q.clients); match queued with | task_id :: new_queue -> let new_q = { q with queued = new_queue ; - running = Map.add running ~key:task_id ~data:client_id ; - number_of_queued = q.number_of_queued - 1; + running = RunningMap.add task_id client_id running; + number_of_queued = q.number_of_queued - 1; + number_of_running = q.number_of_running + 1; } - in new_q, Some task_id, (Map.find q.tasks task_id) + and found = + try Some (TasksMap.find task_id q.tasks) + with Not_found -> None + in new_q, Some task_id, found | [] -> q, None, None let del_client ~client_id q = - assert (Set.mem q.clients client_id); + assert (ClientsSet.mem client_id q.clients); { q with - clients = Set.remove q.clients client_id } + clients = ClientsSet.remove client_id q.clients; + number_of_clients = q.number_of_clients - 1 + } let end_task ~task_id ~client_id q = let { running ; tasks ; _ } = q in - assert (Set.mem q.clients client_id); - let () = - match Map.Poly.find running task_id with - | None -> failwith "Task already finished" - | Some client_id_check -> assert (client_id_check = client_id) + assert (ClientsSet.mem client_id q.clients); + let () = + let client_id_check = + try RunningMap.find task_id running with + Not_found -> failwith "Task already finished" + in + assert (client_id_check = client_id) in { q with - running = Map.remove running task_id ; + running = RunningMap.remove task_id running ; + number_of_running = q.number_of_running - 1 } - + let del_task ~task_id q = let { tasks ; _ } = q in - if (Map.mem tasks task_id) then + if (TasksMap.mem task_id tasks) then { q with - tasks = Map.remove tasks task_id ; + tasks = TasksMap.remove task_id tasks; + number_of_tasks = q.number_of_tasks - 1; } else Printf.sprintf "Task %d is already deleted" (Id.Task.to_int task_id) @@ -103,36 +121,79 @@ let del_task ~task_id q = -let number q = - Map.length q.tasks +let number_of_tasks q = + assert (q.number_of_tasks >= 0); + q.number_of_tasks let number_of_queued q = + assert (q.number_of_queued >= 0); q.number_of_queued let number_of_running q = - Map.length q.running + assert (q.number_of_running >= 0); + q.number_of_running + +let number_of_clients q = + assert (q.number_of_clients >= 0); + q.number_of_clients -let to_string { queued ; running ; tasks ; _ } = +let to_string qs = + let { queued ; running ; tasks ; _ } = qs in let q = - List.map ~f:Id.Task.to_string queued - |> String.concat ~sep:" ; " + List.map Id.Task.to_string queued + |> String.concat " ; " and r = - Map.Poly.to_alist running - |> List.map ~f:(fun (t,c) -> "("^(Id.Task.to_string t)^", " + RunningMap.bindings running + |> List.map (fun (t,c) -> "("^(Id.Task.to_string t)^", " ^(Id.Client.to_string c)^")") - |> String.concat ~sep:" ; " + |> String.concat " ; " and t = - Map.Poly.to_alist tasks - |> List.map ~f:(fun (t,c) -> "("^(Id.Task.to_string t)^", \"" + TasksMap.bindings tasks + |> List.map (fun (t,c) -> "("^(Id.Task.to_string t)^", \"" ^c^"\")") - |> String.concat ~sep:" ; " + |> String.concat " ; " in Printf.sprintf "{ +Tasks : %d Queued : %d Running : %d Clients : %d queued : { %s } running : { %s } tasks : [ %s ] -}" q r t +}" +(number_of_tasks qs) (number_of_queued qs) (number_of_running qs) (number_of_clients qs) +q r t + +let test () = + let q = + create () + |> add_task ~task:"First Task" + |> add_task ~task:"Second Task" + in + let q, client_id = + add_client q + in + let q, task_id, task_content = + match pop_task ~client_id q with + | q, Some x, Some y -> q, Id.Task.to_int x, y + | _ -> assert false + in + Printf.printf "Task_id : %d \t\t Task : %s\n" task_id task_content; + let q, task_id, task_content = + match pop_task ~client_id q with + | q, Some x, Some y -> q, Id.Task.to_int x, y + | _ -> assert false + in + Printf.printf "Task_id : %d \t\t Task : %s\n" task_id task_content; + let q, task_id, task_content = + match pop_task ~client_id q with + | q, None, None -> q, 0, "None" + | _ -> assert false + in + Printf.printf "Task_id : %d \t\t Task : %s\n" task_id task_content; + q + |> to_string + |> print_endline + diff --git a/ocaml/Queuing_system.mli b/ocaml/Queuing_system.mli new file mode 100644 index 00000000..f0e8f941 --- /dev/null +++ b/ocaml/Queuing_system.mli @@ -0,0 +1,63 @@ +module RunningMap : Map.S with type key = Id.Task.t +module TasksMap : Map.S with type key = Id.Task.t +module ClientsSet : Set.S with type elt = Id.Client.t + +type t = { + queued : Id.Task.t list ; + running : Id.Client.t RunningMap.t ; + tasks : string TasksMap.t ; + clients : ClientsSet.t ; + next_client_id : Id.Client.t ; + next_task_id : Id.Task.t ; + number_of_queued : int ; + number_of_running : int ; + number_of_tasks : int ; + number_of_clients : int ; +} + +(** Creates a new queuing system. Returns the new queue. *) +val create : unit -> t + +(** Add a new task represented as a string. Returns the queue with the added task. *) +val add_task : task:string -> t -> t + +(** Add a new client. Returns the queue and a new client_id. *) +val add_client : t -> t * Id.Client.t + +(** Pops a task from the queue. The task is set as running on client client_id. + Returns the queue, a task_id and the content of the task. If the queue contains + no task, the task_id and the task content are None. *) +val pop_task : + client_id:ClientsSet.elt -> t -> t * Id.Task.t option * string option + +(** Deletes a client from the queuing system *) +val del_client : client_id:ClientsSet.elt -> t -> t + +(** Deletes a client from the queuing system. The client is assumed to be a member + of the set of clients. Returns the queue without the removed client. *) +val end_task : task_id:RunningMap.key -> client_id:ClientsSet.elt -> t -> t + +(** Deletes a task from the queuing system. The task is assumed to be a member + of the map of tasks. Returns the queue without the removed task. *) +val del_task : task_id:TasksMap.key -> t -> t + +(** Returns the number of tasks, assumed >= 0 *) +val number_of_tasks : t -> int + +(** Returns the number of queued tasks, assumed >= 0 *) +val number_of_queued : t -> int + +(** Returns the number of running tasks, assumed >= 0 *) +val number_of_running : t -> int + +(** Returns the number of connected clients, assumed >= 0 *) +val number_of_clients : t -> int + +(** Prints the content of the queue *) +val to_string : t -> string + +(** Test function for debug *) +val test : unit -> unit + + + diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index cfc22cfc..9a1797f8 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -306,7 +306,7 @@ let del_task msg program_state rep_socket = } in let more = - (Queuing_system.number new_program_state.queue > 0) + (Queuing_system.number_of_tasks new_program_state.queue > 0) in Message.DelTaskReply (Message.DelTaskReply_msg.create ~task_id ~more) |> Message.to_string @@ -678,9 +678,9 @@ let run ~port = (** Debug input *) Printf.sprintf "q:%d r:%d n:%d : %s\n%!" - (Queuing_system.number_of_queued program_state.queue) + (Queuing_system.number_of_queued program_state.queue) (Queuing_system.number_of_running program_state.queue) - (Queuing_system.number program_state.queue) + (Queuing_system.number_of_tasks program_state.queue) (Message.to_string message) |> debug;