10
0
mirror of https://github.com/LCPQ/quantum_package synced 2025-01-10 21:18:29 +01:00

Removed core from queuing_system

This commit is contained in:
Anthony Scemama 2016-10-13 12:32:22 +02:00
parent 5f83602578
commit d7b40e6d1f
5 changed files with 201 additions and 58 deletions

View File

@ -1,26 +1,22 @@
open Core.Std module Id = struct
module Id : sig
type t
val of_int : int -> t
val to_int : t -> int
val of_string : string -> t
val to_string : t -> string
val increment : t -> t
val decrement : t -> t
end
= struct
type t = int type t = int
let of_int x = let of_int x =
assert (x>0); x assert (x>0); x
let to_int x = x let to_int x = x
let of_string x = let of_string x =
Int.of_string x int_of_string x
|> of_int |> of_int
let to_string x = let to_string x =
Int.to_string x string_of_int x
let increment x = x + 1 let increment x = x + 1
let decrement x = x - 1 let decrement x = x - 1
let compare = compare
end end
module Task = struct module Task = struct

23
ocaml/Id.mli Normal file
View File

@ -0,0 +1,23 @@
module Id :
sig
type t
val of_int : int -> t
val to_int : t -> int
val of_string : string -> t
val to_string : t -> string
val increment : t -> t
val decrement : t -> t
val compare : t -> t -> int
end
module Task :
sig
include (module type of Id)
end
module Client :
sig
include (module type of Id)
end

View File

@ -1,27 +1,33 @@
open Core.Std module RunningMap = Map.Make (Id.Task)
open Qptypes module TasksMap = Map.Make (Id.Task)
module ClientsSet = Set.Make (Id.Client)
type t = type t =
{ queued : Id.Task.t list ; { queued : Id.Task.t list ;
running : (Id.Task.t, Id.Client.t) Map.Poly.t ; running : Id.Client.t RunningMap.t;
tasks : (Id.Task.t, string) Map.Poly.t; tasks : string TasksMap.t;
clients : Id.Client.t Set.Poly.t; clients : ClientsSet.t;
next_client_id : Id.Client.t; next_client_id : Id.Client.t;
next_task_id : Id.Task.t; next_task_id : Id.Task.t;
number_of_queued : int; number_of_queued : int;
number_of_running : int;
number_of_tasks : int;
number_of_clients : int;
} }
let create () = let create () =
{ queued = [] ; { queued = [] ;
running = Map.Poly.empty ; running = RunningMap.empty ;
tasks = Map.Poly.empty; tasks = TasksMap.empty;
clients = Set.Poly.empty; clients = ClientsSet.empty;
next_client_id = Id.Client.of_int 1; next_client_id = Id.Client.of_int 1;
next_task_id = Id.Task.of_int 1; next_task_id = Id.Task.of_int 1;
number_of_queued = 0; number_of_queued = 0;
number_of_running = 0;
number_of_tasks = 0;
number_of_clients = 0;
} }
@ -33,9 +39,10 @@ let add_task ~task q =
in in
{ q with { q with
queued = task_id :: q.queued ; queued = task_id :: q.queued ;
tasks = Map.add q.tasks ~key:task_id ~data:task ; tasks = TasksMap.add task_id task q.tasks;
next_task_id = Id.Task.increment task_id ; next_task_id = Id.Task.increment task_id ;
number_of_queued = q.number_of_queued + 1; number_of_queued = q.number_of_queued + 1;
number_of_tasks = q.number_of_tasks + 1;
} }
@ -46,8 +53,9 @@ let add_client q =
q.next_client_id q.next_client_id
in in
{ q with { q with
clients = Set.add q.clients client_id; clients = ClientsSet.add client_id q.clients;
next_client_id = Id.Client.increment client_id; next_client_id = Id.Client.increment client_id;
number_of_clients = q.number_of_clients + 1;
}, client_id }, client_id
@ -55,37 +63,46 @@ let pop_task ~client_id q =
let { queued ; running ; _ } = let { queued ; running ; _ } =
q q
in in
assert (Set.mem q.clients client_id); assert (ClientsSet.mem client_id q.clients);
match queued with match queued with
| task_id :: new_queue -> | task_id :: new_queue ->
let new_q = let new_q =
{ q with { q with
queued = new_queue ; queued = new_queue ;
running = Map.add running ~key:task_id ~data:client_id ; running = RunningMap.add task_id client_id running;
number_of_queued = q.number_of_queued - 1; number_of_queued = q.number_of_queued - 1;
number_of_running = q.number_of_running + 1;
} }
in new_q, Some task_id, (Map.find q.tasks task_id) and found =
try Some (TasksMap.find task_id q.tasks)
with Not_found -> None
in new_q, Some task_id, found
| [] -> q, None, None | [] -> q, None, None
let del_client ~client_id q = let del_client ~client_id q =
assert (Set.mem q.clients client_id); assert (ClientsSet.mem client_id q.clients);
{ q with { q with
clients = Set.remove q.clients client_id } clients = ClientsSet.remove client_id q.clients;
number_of_clients = q.number_of_clients - 1
}
let end_task ~task_id ~client_id q = let end_task ~task_id ~client_id q =
let { running ; tasks ; _ } = let { running ; tasks ; _ } =
q q
in in
assert (Set.mem q.clients client_id); assert (ClientsSet.mem client_id q.clients);
let () = let () =
match Map.Poly.find running task_id with let client_id_check =
| None -> failwith "Task already finished" try RunningMap.find task_id running with
| Some client_id_check -> assert (client_id_check = client_id) Not_found -> failwith "Task already finished"
in
assert (client_id_check = client_id)
in in
{ q with { q with
running = Map.remove running task_id ; running = RunningMap.remove task_id running ;
number_of_running = q.number_of_running - 1
} }
let del_task ~task_id q = let del_task ~task_id q =
@ -93,9 +110,10 @@ let del_task ~task_id q =
q q
in in
if (Map.mem tasks task_id) then if (TasksMap.mem task_id tasks) then
{ q with { q with
tasks = Map.remove tasks task_id ; tasks = TasksMap.remove task_id tasks;
number_of_tasks = q.number_of_tasks - 1;
} }
else else
Printf.sprintf "Task %d is already deleted" (Id.Task.to_int task_id) Printf.sprintf "Task %d is already deleted" (Id.Task.to_int task_id)
@ -103,36 +121,79 @@ let del_task ~task_id q =
let number q = let number_of_tasks q =
Map.length q.tasks assert (q.number_of_tasks >= 0);
q.number_of_tasks
let number_of_queued q = let number_of_queued q =
assert (q.number_of_queued >= 0);
q.number_of_queued q.number_of_queued
let number_of_running q = let number_of_running q =
Map.length q.running assert (q.number_of_running >= 0);
q.number_of_running
let number_of_clients q =
assert (q.number_of_clients >= 0);
q.number_of_clients
let to_string { queued ; running ; tasks ; _ } = let to_string qs =
let { queued ; running ; tasks ; _ } = qs in
let q = let q =
List.map ~f:Id.Task.to_string queued List.map Id.Task.to_string queued
|> String.concat ~sep:" ; " |> String.concat " ; "
and r = and r =
Map.Poly.to_alist running RunningMap.bindings running
|> List.map ~f:(fun (t,c) -> "("^(Id.Task.to_string t)^", " |> List.map (fun (t,c) -> "("^(Id.Task.to_string t)^", "
^(Id.Client.to_string c)^")") ^(Id.Client.to_string c)^")")
|> String.concat ~sep:" ; " |> String.concat " ; "
and t = and t =
Map.Poly.to_alist tasks TasksMap.bindings tasks
|> List.map ~f:(fun (t,c) -> "("^(Id.Task.to_string t)^", \"" |> List.map (fun (t,c) -> "("^(Id.Task.to_string t)^", \""
^c^"\")") ^c^"\")")
|> String.concat ~sep:" ; " |> String.concat " ; "
in in
Printf.sprintf "{ Printf.sprintf "{
Tasks : %d Queued : %d Running : %d Clients : %d
queued : { %s } queued : { %s }
running : { %s } running : { %s }
tasks : [ %s tasks : [ %s
] ]
}" q r t }"
(number_of_tasks qs) (number_of_queued qs) (number_of_running qs) (number_of_clients qs)
q r t
let test () =
let q =
create ()
|> add_task ~task:"First Task"
|> add_task ~task:"Second Task"
in
let q, client_id =
add_client q
in
let q, task_id, task_content =
match pop_task ~client_id q with
| q, Some x, Some y -> q, Id.Task.to_int x, y
| _ -> assert false
in
Printf.printf "Task_id : %d \t\t Task : %s\n" task_id task_content;
let q, task_id, task_content =
match pop_task ~client_id q with
| q, Some x, Some y -> q, Id.Task.to_int x, y
| _ -> assert false
in
Printf.printf "Task_id : %d \t\t Task : %s\n" task_id task_content;
let q, task_id, task_content =
match pop_task ~client_id q with
| q, None, None -> q, 0, "None"
| _ -> assert false
in
Printf.printf "Task_id : %d \t\t Task : %s\n" task_id task_content;
q
|> to_string
|> print_endline

63
ocaml/Queuing_system.mli Normal file
View File

@ -0,0 +1,63 @@
module RunningMap : Map.S with type key = Id.Task.t
module TasksMap : Map.S with type key = Id.Task.t
module ClientsSet : Set.S with type elt = Id.Client.t
type t = {
queued : Id.Task.t list ;
running : Id.Client.t RunningMap.t ;
tasks : string TasksMap.t ;
clients : ClientsSet.t ;
next_client_id : Id.Client.t ;
next_task_id : Id.Task.t ;
number_of_queued : int ;
number_of_running : int ;
number_of_tasks : int ;
number_of_clients : int ;
}
(** Creates a new queuing system. Returns the new queue. *)
val create : unit -> t
(** Add a new task represented as a string. Returns the queue with the added task. *)
val add_task : task:string -> t -> t
(** Add a new client. Returns the queue and a new client_id. *)
val add_client : t -> t * Id.Client.t
(** Pops a task from the queue. The task is set as running on client client_id.
Returns the queue, a task_id and the content of the task. If the queue contains
no task, the task_id and the task content are None. *)
val pop_task :
client_id:ClientsSet.elt -> t -> t * Id.Task.t option * string option
(** Deletes a client from the queuing system *)
val del_client : client_id:ClientsSet.elt -> t -> t
(** Deletes a client from the queuing system. The client is assumed to be a member
of the set of clients. Returns the queue without the removed client. *)
val end_task : task_id:RunningMap.key -> client_id:ClientsSet.elt -> t -> t
(** Deletes a task from the queuing system. The task is assumed to be a member
of the map of tasks. Returns the queue without the removed task. *)
val del_task : task_id:TasksMap.key -> t -> t
(** Returns the number of tasks, assumed >= 0 *)
val number_of_tasks : t -> int
(** Returns the number of queued tasks, assumed >= 0 *)
val number_of_queued : t -> int
(** Returns the number of running tasks, assumed >= 0 *)
val number_of_running : t -> int
(** Returns the number of connected clients, assumed >= 0 *)
val number_of_clients : t -> int
(** Prints the content of the queue *)
val to_string : t -> string
(** Test function for debug *)
val test : unit -> unit

View File

@ -306,7 +306,7 @@ let del_task msg program_state rep_socket =
} }
in in
let more = let more =
(Queuing_system.number new_program_state.queue > 0) (Queuing_system.number_of_tasks new_program_state.queue > 0)
in in
Message.DelTaskReply (Message.DelTaskReply_msg.create ~task_id ~more) Message.DelTaskReply (Message.DelTaskReply_msg.create ~task_id ~more)
|> Message.to_string |> Message.to_string
@ -680,7 +680,7 @@ let run ~port =
Printf.sprintf "q:%d r:%d n:%d : %s\n%!" Printf.sprintf "q:%d r:%d n:%d : %s\n%!"
(Queuing_system.number_of_queued program_state.queue) (Queuing_system.number_of_queued program_state.queue)
(Queuing_system.number_of_running program_state.queue) (Queuing_system.number_of_running program_state.queue)
(Queuing_system.number program_state.queue) (Queuing_system.number_of_tasks program_state.queue)
(Message.to_string message) (Message.to_string message)
|> debug; |> debug;