diff --git a/ocaml/Address.ml b/ocaml/Address.ml new file mode 100644 index 00000000..5f3bb748 --- /dev/null +++ b/ocaml/Address.ml @@ -0,0 +1,48 @@ +open Core.Std + +module Tcp : sig + type t + val of_string : string -> t + val to_string : t -> string +end = struct + type t = string + let of_string x = + assert (String.is_prefix ~prefix:"tcp://" x); + x + let to_string x = x +end + +module Ipc : sig + type t + val of_string : string -> t + val to_string : t -> string +end = struct + type t = string + let of_string x = + assert (String.is_prefix ~prefix:"ipc://" x); + x + let to_string x = x +end + +module Inproc : sig + type t + val of_string : string -> t + val to_string : t -> string +end = struct + type t = string + let of_string x = + assert (String.is_prefix ~prefix:"inproc://" x); + x + let to_string x = x +end + +type t = +| Tcp of Tcp.t +| Ipc of Ipc.t +| Inproc of Inproc.t + +let to_string = function +| Tcp x -> Tcp.to_string x +| Ipc x -> Ipc.to_string x +| Inproc x -> Inproc.to_string x + diff --git a/ocaml/Id.ml b/ocaml/Id.ml new file mode 100644 index 00000000..660c3452 --- /dev/null +++ b/ocaml/Id.ml @@ -0,0 +1,33 @@ +open Core.Std + +module Id : sig + type t + val of_int : int -> t + val to_int : t -> int + val of_string : string -> t + val to_string : t -> string + val increment : t -> t + val decrement : t -> t +end += struct + type t = int + let of_int x = + assert (x>0); x + let to_int x = x + let of_string x = + Int.of_string x + |> of_int + let to_string x = + Int.to_string x + let increment x = x + 1 + let decrement x = x - 1 +end + +module Task = struct + include Id +end + +module Client = struct + include Id +end + diff --git a/ocaml/Message.ml b/ocaml/Message.ml new file mode 100644 index 00000000..43982059 --- /dev/null +++ b/ocaml/Message.ml @@ -0,0 +1,301 @@ +open Core.Std + +(** New job : Request to create a new multi-tasked job *) + +module State : sig + type t + val of_string : string -> t + val to_string : t -> string +end = struct + type t = string + let of_string x = x + let to_string x = x +end + +module Newjob_msg : sig + type t = + { state: State.t; + address_tcp: Address.Tcp.t ; + address_inproc: Address.Inproc.t; + } + val create : address_tcp:string -> address_inproc:string -> state:string -> t + val to_string : t -> string +end = struct + type t = + { state: State.t; + address_tcp: Address.Tcp.t ; + address_inproc: Address.Inproc.t; + } + let create ~address_tcp ~address_inproc ~state = + { state = State.of_string state; + address_tcp = Address.Tcp.of_string address_tcp ; + address_inproc = Address.Inproc.of_string address_inproc ; + } + let to_string t = + Printf.sprintf "newjob %s %s %s" + ( State.to_string t.state ) + ( Address.Tcp.to_string t.address_tcp ) + ( Address.Inproc.to_string t.address_inproc ) +end + + +(** Connect : connect a new client to the task server *) + +module Connect_msg : sig + type t = Tcp | Inproc | Ipc + val create : typ:string -> t + val to_string : t -> string +end = struct + type t = Tcp | Inproc | Ipc + let create ~typ = + match typ with + | "tcp" -> Tcp + | "inproc" -> Inproc + | "ipc" -> Ipc + | _ -> assert false + let to_string = function + | Tcp -> "connect tcp" + | Inproc -> "connect inproc" + | Ipc -> "connect ipc" +end + +(** ConnectReply : Reply to the connect messsage *) + +module ConnectReply_msg : sig + type t = + { client_id: Id.Client.t ; + state: State.t ; + push_address: Address.t; + } + val create : state:State.t -> client_id:Id.Client.t -> push_address:Address.t -> t + val to_string : t -> string +end = struct + type t = + { client_id: Id.Client.t ; + state: State.t ; + push_address: Address.t; + } + let create ~state ~client_id ~push_address = + { client_id ; state ; push_address } + let to_string x = + Printf.sprintf "connect_reply %s %d %s" + (State.to_string x.state) + (Id.Client.to_int x.client_id) + (Address.to_string x.push_address) +end + + +(** Disconnect : disconnect a client from the task server *) +module Disconnect_msg : sig + type t = + { client_id: Id.Client.t ; + state: State.t ; + } + val create : state:string -> client_id:string -> t + val to_string : t -> string +end = struct + type t = + { client_id: Id.Client.t ; + state: State.t ; + } + let create ~state ~client_id = + { client_id = Id.Client.of_string client_id ; state = State.of_string state } + let to_string x = + Printf.sprintf "disconnect %s %d" + (State.to_string x.state) + (Id.Client.to_int x.client_id) +end + + +(** AddTask : Add a new task to the queue *) +module AddTask_msg : sig + type t = + { state: State.t; + task: string; + } + val create : state:string -> task:string -> t + val to_string : t -> string +end = struct + type t = + { state: State.t; + task: string; + } + let create ~state ~task = { state = State.of_string state ; task } + let to_string x = + Printf.sprintf "add_task %s %s" (State.to_string x.state) x.task +end + + +(** AddTaskReply : Reply to the AddTask message *) +module AddTaskReply_msg : sig + type t + val create : task_id:Id.Task.t -> t + val to_string : t -> string +end = struct + type t = Id.Task.t + let create ~task_id = task_id + let to_string x = + Printf.sprintf "add_task_reply %d" (Id.Task.to_int x) +end + + +(** GetTask : get a new task to do *) +module GetTask_msg : sig + type t = + { client_id: Id.Client.t ; + state: State.t ; + } + val create : state:string -> client_id:string -> t + val to_string : t -> string +end = struct + type t = + { client_id: Id.Client.t ; + state: State.t ; + } + let create ~state ~client_id = + { client_id = Id.Client.of_string client_id ; state = State.of_string state } + let to_string x = + Printf.sprintf "get_task %s %d" + (State.to_string x.state) + (Id.Client.to_int x.client_id) +end + +(** GetTaskReply : Reply to the GetTask message *) +module GetTaskReply_msg : sig + type t + val create : task_id:Id.Task.t -> task:string -> t + val to_string : t -> string +end = struct + type t = + { task_id: Id.Task.t ; + task : string ; + } + let create ~task_id ~task = { task_id ; task } + let to_string x = + Printf.sprintf "get_task_reply %d %s" (Id.Task.to_int x.task_id) x.task +end + + +(** TaskDone : Inform the server that a task is finished *) +module TaskDone_msg : sig + type t = + { client_id: Id.Client.t ; + state: State.t ; + task_id: Id.Task.t; + } + val create : state:string -> client_id:string -> task_id:string -> t + val to_string : t -> string +end = struct + type t = + { client_id: Id.Client.t ; + state: State.t ; + task_id: Id.Task.t; + } + let create ~state ~client_id ~task_id = + { client_id = Id.Client.of_string client_id ; + state = State.of_string state ; + task_id = Id.Task.of_string task_id } + let to_string x = + Printf.sprintf "task_done %s %d %d" + (State.to_string x.state) + (Id.Client.to_int x.client_id) + (Id.Task.to_int x.task_id) +end + +(** Terminate *) +module Terminate_msg : sig + type t + val create : unit -> t + val to_string : t -> string +end = struct + type t = Terminate + let create () = Terminate + let to_string x = "terminate" +end + +(** OK *) +module Ok_msg : sig + type t + val create : unit -> t + val to_string : t -> string +end = struct + type t = Ok + let create () = Ok + let to_string x = "ok" +end + +(** Error *) +module Error_msg : sig + type t + val create : string -> t + val to_string : t -> string +end = struct + type t = string + let create x = x + let to_string x = + String.concat ~sep:" " [ "error" ; x ] +end + + + +(** Message *) + +type t = +| Newjob of Newjob_msg.t +| Connect of Connect_msg.t +| ConnectReply of ConnectReply_msg.t +| Disconnect of Disconnect_msg.t +| GetTask of GetTask_msg.t +| GetTaskReply of GetTaskReply_msg.t +| AddTask of AddTask_msg.t +| AddTaskReply of AddTaskReply_msg.t +| TaskDone of TaskDone_msg.t +| Terminate of Terminate_msg.t +| Ok of Ok_msg.t +| Error of Error_msg.t + + +let of_string s = + let l = + String.split ~on:' ' s + |> List.map ~f:String.strip + |> List.map ~f:String.lowercase + |> List.filter ~f:(fun x -> (String.strip x) <> "") + in + match l with + | "add_task" :: state :: task -> + AddTask (AddTask_msg.create ~state ~task:(String.concat ~sep:" " task) ) + | "get_task" :: state :: client_id :: [] -> + GetTask (GetTask_msg.create ~state ~client_id) + | "task_done" :: state :: client_id :: task_id :: [] -> + TaskDone (TaskDone_msg.create ~state ~client_id ~task_id) + | "disconnect" :: state :: client_id :: [] -> + Disconnect (Disconnect_msg.create ~state ~client_id) + | "connect" :: t :: [] -> + Connect (Connect_msg.create t) + | "new_job" :: state :: push_address_tcp :: push_address_inproc :: [] -> + Newjob (Newjob_msg.create push_address_tcp push_address_inproc state) + | "terminate" :: [] -> + Terminate (Terminate_msg.create () ) + | "ok" :: [] -> + Ok (Ok_msg.create ()) + | "error" :: rest -> + Error (Error_msg.create (String.concat ~sep:" " rest)) + | _ -> failwith "Message not understood" + + +let to_string = function +| Newjob x -> Newjob_msg.to_string x +| Connect x -> Connect_msg.to_string x +| ConnectReply x -> ConnectReply_msg.to_string x +| Disconnect x -> Disconnect_msg.to_string x +| GetTask x -> GetTask_msg.to_string x +| GetTaskReply x -> GetTaskReply_msg.to_string x +| AddTask x -> AddTask_msg.to_string x +| AddTaskReply x -> AddTaskReply_msg.to_string x +| TaskDone x -> TaskDone_msg.to_string x +| Terminate x -> Terminate_msg.to_string x +| Ok x -> Ok_msg.to_string x +| Error x -> Error_msg.to_string x + + diff --git a/ocaml/Queuing_system.ml b/ocaml/Queuing_system.ml new file mode 100644 index 00000000..7a927a60 --- /dev/null +++ b/ocaml/Queuing_system.ml @@ -0,0 +1,116 @@ +open Core.Std + + +type t = +{ queued : Id.Task.t list ; + running : (Id.Task.t, Id.Client.t) Map.Poly.t ; + tasks : (Id.Task.t, string) Map.Poly.t; + clients : Id.Client.t Set.Poly.t; + next_client_id : Id.Client.t; + next_task_id : Id.Task.t; +} + + + +let create () = + { queued = [] ; + running = Map.Poly.empty ; + tasks = Map.Poly.empty; + clients = Set.Poly.empty; + next_client_id = Id.Client.of_int 1; + next_task_id = Id.Task.of_int 1; + } + + + + +let add_task ~task q = + let task_id = + q.next_task_id + in + { q with + queued = q.queued @ [ task_id ] ; + tasks = Map.add q.tasks ~key:task_id ~data:task ; + next_task_id = Id.Task.increment task_id ; + }, task_id + + +let add_client q = + let client_id = + q.next_client_id + in + { q with + clients = Set.add q.clients client_id; + next_client_id = Id.Client.increment client_id; + }, client_id + + +let pop_task ~client_id q = + let { queued ; running ; _ } = + q + in + assert (Set.mem q.clients client_id); + match queued with + | task_id :: new_queue -> + let new_q = + { q with + queued = new_queue ; + running = Map.add running ~key:task_id ~data:client_id ; + } + in new_q, Some task_id, (Map.find q.tasks task_id) + | [] -> q, None, None + + +let del_client ~client_id q = + assert (Set.mem q.clients client_id); + { q with + clients = Set.remove q.clients client_id } + + +let end_task ~task_id ~client_id q = + let { running ; tasks ; _ } = + q + in + assert (Set.mem q.clients client_id); + let () = + match Map.Poly.find running task_id with + | None -> failwith "Task already finished" + | Some client_id_check -> assert (client_id_check = client_id) + in + { q with + running = Map.remove running task_id ; + tasks = Map.remove tasks task_id ; + } + + + +let number_of_queued q = + List.length q.queued + +let number_of_running q = + Map.length q.running + + +let to_string { queued ; running ; tasks ; _ } = + let q = + List.map ~f:Id.Task.to_string queued + |> String.concat ~sep:" ; " + and r = + Map.Poly.to_alist running + |> List.map ~f:(fun (t,c) -> "("^(Id.Task.to_string t)^", " + ^(Id.Client.to_string c)^")") + |> String.concat ~sep:" ; " + and t = + Map.Poly.to_alist tasks + |> List.map ~f:(fun (t,c) -> "("^(Id.Task.to_string t)^", \"" + ^c^"\")") + |> String.concat ~sep:" ; " + in + Printf.sprintf "{ +queued : { %s } +running : { %s } +tasks : [ %s + ] +}" q r t + + diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml new file mode 100644 index 00000000..4c5b2ff2 --- /dev/null +++ b/ocaml/TaskServer.ml @@ -0,0 +1,286 @@ +open Core.Std +open Qptypes + +(** +The tasks server listens on a REQ socket and accepts the following commands: + +* "new_job %s %s %s" state push_address_tcp push_address_inproc -> "OK" + -> "OK" + +* "connect %s" ["tcp"|"inproc"] + -> "%d %s %s" id state push_address + +* "disconnect %d" id + -> "OK" + +* "get_task %d %s" id state + -> "%d %s" task_id task + +* "task_done %d task_id %s" id state + -> "%d %s" task_id task + +*) + +let bind_socket ~socket_type ~socket ~address = + try + ZMQ.Socket.bind socket address + with + | Unix.Unix_error (_, message, f) -> + failwith @@ Printf.sprintf + "\n%s\nUnable to bind the %s socket :\n %s\n%s" + f socket_type address message + | other_exception -> raise other_exception + + +(** Name of the host on which the server runs *) +let hostname = lazy ( + try + Unix.gethostname () + with + | _ -> "localhost" + ) + + +(** IP address *) +let ip_address = lazy ( + match Sys.getenv "QP_NIC" with + | None -> + begin + try + Lazy.force hostname + |> Unix.Inet_addr.of_string_or_getbyname + |> Unix.Inet_addr.to_string + with + | Unix.Unix_error _ -> + failwith "Unable to find IP address from host name." + end + | Some interface -> + begin + try + ok_exn Linux_ext.get_ipv4_address_for_interface interface + with + | Unix.Unix_error _ -> + Lazy.force hostname + |> Unix.Inet_addr.of_string_or_getbyname + |> Unix.Inet_addr.to_string + end +) + +(** Initial ZeroMQ port : + Random port number between 49152 and 65535 *) +let port = lazy ( + 1024 + (Random.int (49151-1024)) ) + +let stop () = + let zmq_context = + ZMQ.Context.create () + in + let req_socket = + ZMQ.Socket.create zmq_context ZMQ.Socket.req + and address = + Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) (Lazy.force port) + in + ZMQ.Socket.connect req_socket address; + + Message.Terminate (Message.Terminate_msg.create ()) + |> Message.to_string + |> ZMQ.Socket.send req_socket ; + + let msg = + ZMQ.Socket.recv req_socket + |> Message.of_string + in + let () = + match msg with + | Message.Ok _ -> () + | _ -> failwith "Problem in termination" + in + ZMQ.Socket.set_linger_period req_socket 1000; + ZMQ.Socket.close req_socket + + +(** Run the task server *) +let run () = + + let zmq_context = + ZMQ.Context.create () + in + + let rep_socket = + ZMQ.Socket.create zmq_context ZMQ.Socket.rep + and address = + Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) (Lazy.force port) + in + bind_socket "REP" rep_socket address; + + let pollitem = + ZMQ.Poll.mask_of + [| (rep_socket, ZMQ.Poll.In) |] + in + + (** State variables *) + let q = ref + (Queuing_system.create ()) + and running = + ref true + and job = + ref None + in + + let get_state () = + match !job with + | None -> None + | Some j -> Some j.Message.Newjob_msg.state + in + + let get_tcp_address () = + match !job with + | Some j -> Address.Tcp j.Message.Newjob_msg.address_tcp + | None -> assert false + in + + let get_inproc_address () = + match !job with + | Some j -> Address.Inproc j.Message.Newjob_msg.address_inproc + | None -> assert false + in + + let ok = + Message.Ok (Message.Ok_msg.create ()) + in + + while ( !running ) + do + let state = + get_state () + and polling = + ZMQ.Poll.poll ~timeout:1000 pollitem + in + + let terminate () = + running := false; + Message.to_string ok + |> ZMQ.Socket.send rep_socket + + and newjob x = + q := Queuing_system.create (); + job := Some x; + Message.to_string ok + |> ZMQ.Socket.send rep_socket + + and connect state msg = + let push_address = + match msg with + | Message.Connect_msg.Tcp -> get_tcp_address () + | Message.Connect_msg.Inproc -> get_inproc_address () + | Message.Connect_msg.Ipc -> assert false + in + let new_q, client_id = + Queuing_system.add_client !q + in + q := new_q; + Message.ConnectReply (Message.ConnectReply_msg.create + ~state ~client_id ~push_address) + |> Message.to_string + |> ZMQ.Socket.send rep_socket + + and disconnect state msg = + let s, c = + msg.Message.Disconnect_msg.state , + msg.Message.Disconnect_msg.client_id + in + assert (s = state); + let new_q = + Queuing_system.del_client ~client_id:c !q + in + q := new_q; + Message.to_string ok + |> ZMQ.Socket.send rep_socket + + and add_task state msg = + let s, task = + msg.Message.AddTask_msg.state, + msg.Message.AddTask_msg.task + in + assert (s = state); + let new_q, task_id = + Queuing_system.add_task ~task !q + in + q := new_q; + Message.to_string ok + |> ZMQ.Socket.send rep_socket + + and get_task state msg = + let s, client_id = + msg.Message.GetTask_msg.state, + msg.Message.GetTask_msg.client_id + in + assert (s = state); + let new_q, task_id, task = + Queuing_system.pop_task ~client_id !q + in + q := new_q; + let reply = + match (task, task_id) with + | Some task, Some task_id -> + Message.GetTaskReply (Message.GetTaskReply_msg.create ~task ~task_id) + | _ -> Message.Terminate (Message.Terminate_msg.create ()) + in + Message.to_string reply + |> ZMQ.Socket.send rep_socket + + and task_done state msg = + let s, client_id, task_id = + msg.Message.TaskDone_msg.state, + msg.Message.TaskDone_msg.client_id, + msg.Message.TaskDone_msg.task_id + in + assert (s = state); + let new_q = + Queuing_system.end_task ~task_id ~client_id !q + in + q := new_q; + Message.to_string ok + |> ZMQ.Socket.send rep_socket + + and error msg = + Message.Error (Message.Error_msg.create msg) + |> Message.to_string + |> ZMQ.Socket.send rep_socket + in + + if (polling.(0) = Some ZMQ.Poll.In) then + let raw_message = + ZMQ.Socket.recv rep_socket + in + try + let message = + Message.of_string raw_message + in + Printf.printf "%s\n%!" (Message.to_string message); + Printf.printf "%s\n%!" (Queuing_system.to_string !q); + match (state, message) with + | _ , Message.Terminate _ -> terminate () + | None , Message.Newjob x -> newjob x + | None , _ -> error "No job is running" + | _ , Message.Newjob _ -> error "A job is already running" + | Some s, Message.Connect x -> connect s x + | Some s, Message.Disconnect x -> disconnect s x + | Some s, Message.AddTask x -> add_task s x + | Some s, Message.GetTask x -> get_task s x + | Some s, Message.TaskDone x -> task_done s x + | _ , _ -> + error ("Invalid message : "^(Message.to_string message)) + with + | Failure f -> error (f^" : "^raw_message) + | Assert_failure (f,i,j) -> error (Printf.sprintf "%s:%d:%d : %s" f i j raw_message) + + done; + ZMQ.Socket.set_linger_period rep_socket 1000; + ZMQ.Socket.close rep_socket + + +let () = + Printf.printf "export QP_RUN_ADDRESS=tcp://%s:%d\n%!" (Lazy.force ip_address) (Lazy.force port) + + diff --git a/ocaml/_tags b/ocaml/_tags index 519d558f..112ee73f 100644 --- a/ocaml/_tags +++ b/ocaml/_tags @@ -1,2 +1,2 @@ -true: package(core,sexplib.syntax,cryptokit) +true: package(core,sexplib.syntax,cryptokit,ZMQ) true: thread diff --git a/ocaml/test_message.ml b/ocaml/test_message.ml new file mode 100644 index 00000000..90b73d5e --- /dev/null +++ b/ocaml/test_message.ml @@ -0,0 +1,89 @@ +open Core.Std + +let () = + Message.of_string "new_job tcp://127.0.0.1 inproc://ao_ints:12345 ao_integrals" + |> Message.to_string + |> print_endline + ; + + Message.of_string "connect tcp" + |> Message.to_string + |> print_endline + ; + + Message.of_string "connect inproc" + |> Message.to_string + |> print_endline + ; + + Message.of_string "disconnect 3 mystate" + |> Message.to_string + |> print_endline + ; + + Message.of_string "get_task 3 mystate" + |> Message.to_string + |> print_endline + ; + + Message.of_string "task_done 1 mystate 3" + |> Message.to_string + |> print_endline + ; + + Message.of_string "add_task mystate 1 2 3 4 5 6" + |> Message.to_string + |> print_endline + ; + + try + Message.of_string "new_job inproc://ao_ints tcp://127.0.0.1:12345 ao_integrals" + |> Message.to_string + |> print_endline + ; + failwith "Should have failed" + with + | Assert_failure _ -> print_endline "OK" + ; + + try + Message.of_string "new_job tcp://ao_ints inproc://ao_ints" + |> Message.to_string + |> print_endline + ; + assert false + with + | Failure _ -> print_endline "OK" + ; + + try + Message.of_string "disconnect -4 mystate" + |> Message.to_string + |> print_endline + ; + assert false + with + | Assert_failure _ -> print_endline "OK" + ; + + try + Message.of_string "disconnect mystate 3" + |> Message.to_string + |> print_endline + ; + assert false + with + | Failure _ -> print_endline "OK" + ; + + try + Message.of_string "connect tcp tcp://127.0.0.1" + |> Message.to_string + |> print_endline + ; + assert false + with + | Failure _ -> print_endline "OK" + ; + + diff --git a/ocaml/test_queuing_system.ml b/ocaml/test_queuing_system.ml new file mode 100644 index 00000000..aa2fa280 --- /dev/null +++ b/ocaml/test_queuing_system.ml @@ -0,0 +1,102 @@ +open Core.Std + +let () = + + let nclients = + 8 + in + + let q = + Queuing_system.create () + in + + let tasks = + Array.init 20 ~f:(fun i -> Printf.sprintf "Task %d" i) + |> Array.to_list + in + + let (q,_) = + List.fold_left tasks ~init:(q, q.Queuing_system.next_task_id) + ~f:(fun (q,_) task -> Queuing_system.add_task ~task q) + in + print_endline @@ Queuing_system.to_string q ; + + let rec aux q clients = function + | 0 -> q, clients + | i -> + let new_q, client_id = + Queuing_system.add_client q + in + aux new_q (client_id::clients) (i-1) + in + let q, _ = + aux q [] nclients + in + + let rec aux q = function + | 0 -> q + | i -> + begin + let c = + Id.Client.of_int i + in + let new_q, task_id, task = + Queuing_system.pop_task ~client_id:c q + in + begin + match task_id, task with + | Some task_id, Some task -> + Printf.printf "Task Running: %d %s\n" (Id.Task.to_int task_id) task + | _ -> Printf.printf "Done!\n" + end; + aux new_q (i-1) + end + in + + let rec aux2 q = function + | 0 -> q + | i -> + begin + let task_id = + (Id.Task.of_int i) + in + try + let client_id = + Map.Poly.find_exn q.Queuing_system.running task_id + in + let new_q = + Queuing_system.end_task ~task_id ~client_id q + in + Printf.printf "Task Done : %d\n" (Id.Task.to_int task_id) ; + aux2 new_q (i-1) + with + | _ -> aux2 q 0 + end + in + let q = + aux q nclients + in + print_endline @@ Queuing_system.to_string q ; + + let q = + aux2 q nclients + in + print_endline @@ Queuing_system.to_string q ; + Printf.printf "Queued : %d\n Running : %d\n" + (Queuing_system.number_of_queued q) + (Queuing_system.number_of_running q) + ; + let q = + aux q nclients + in + print_endline @@ Queuing_system.to_string q ; + let q = + aux2 q nclients + in + print_endline @@ Queuing_system.to_string q ; + + +(* + List.map ~f:Id.Task.to_int tasks + |> List.iter ~f:(fun x -> Printf.printf "%d\n" x) +*) diff --git a/ocaml/test_task_server.ml b/ocaml/test_task_server.ml new file mode 100644 index 00000000..55f74202 --- /dev/null +++ b/ocaml/test_task_server.ml @@ -0,0 +1,5 @@ +open Core + +let () = + TaskServer.run () + diff --git a/ocaml/test_task_server.py b/ocaml/test_task_server.py new file mode 100755 index 00000000..07835820 --- /dev/null +++ b/ocaml/test_task_server.py @@ -0,0 +1,46 @@ +#!/usr/bin/python + +import zmq +import sys, os + + +def main(): + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.connect(os.environ["QP_RUN_ADDRESS"]) + + def send(msg,expected): + print "Send : ", msg + socket.send(msg) + reply = socket.recv() + print "Reply : ", reply + print "" + assert (reply == expected) + + + send("new_job ao_integrals tcp://130.120.229.139:12345 inproc://ao_integrals", + "ok") + send("new_job ao_integrals tcp://130.120.229.139:12345 inproc://ao_integrals", + "error A job is already running") + + send("connect","error Message not understood : connect") + + send("connect tcp","connect_reply ao_integrals 1 tcp://130.120.229.139:12345") + send("connect inproc","connect_reply ao_integrals 2 inproc://ao_integrals") + send("disconnect ao_integrals 3","error Queuing_system.ml:65:2 : disconnect ao_integrals 3") + send("disconnect ao_integrals 2","ok") + send("connect inproc","connect_reply ao_integrals 3 inproc://ao_integrals") + + for i in range(10): + send("add_task ao_integrals %d %d"%(i,i+10), "ok") + + for i in range(10): + send("get_task ao_integrals 3", "get_task_reply %d %d %d"%(i+1,i,i+10)) + send("task_done ao_integrals 3 %d"%(i+1), "ok") + + send("get_task ao_integrals 3", "terminate") + + send("terminate","ok") + +if __name__ == '__main__': + main()