From bfdda0b08a74c1842694d8179f13095dd6787705 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Mon, 27 Feb 2017 19:24:19 +0100 Subject: [PATCH] Added OcamlLex parser for messages --- ocaml/Makefile | 1 + ocaml/Message.ml | 135 +++++++++++----------- ocaml/Message_lexer.mll | 245 ++++++++++++++++++++++++++++++++++++++++ ocaml/TaskServer.ml | 4 +- 4 files changed, 314 insertions(+), 71 deletions(-) create mode 100644 ocaml/Message_lexer.mll diff --git a/ocaml/Makefile b/ocaml/Makefile index 7d51986f..8519c973 100644 --- a/ocaml/Makefile +++ b/ocaml/Makefile @@ -13,6 +13,7 @@ LIBS= PKGS= OCAMLCFLAGS="-g -warn-error A" OCAMLBUILD=ocamlbuild -j 0 -syntax camlp4o -cflags $(OCAMLCFLAGS) -lflags $(OCAMLCFLAGS) +MLLFILES=$(wildcard *.mll) MLFILES=$(wildcard *.ml) ezfio.ml Qptypes.ml Input_auto_generated.ml qp_edit.ml MLIFILES=$(wildcard *.mli) git ALL_TESTS=$(patsubst %.ml,%.byte,$(wildcard test_*.ml)) diff --git a/ocaml/Message.ml b/ocaml/Message.ml index 68b866d5..3a1f5c57 100644 --- a/ocaml/Message.ml +++ b/ocaml/Message.ml @@ -110,7 +110,7 @@ module Disconnect_msg : sig { client_id: Id.Client.t ; state: State.t ; } - val create : state:string -> client_id:string -> t + val create : state:string -> client_id:int -> t val to_string : t -> string end = struct type t = @@ -118,7 +118,7 @@ end = struct state: State.t ; } let create ~state ~client_id = - { client_id = Id.Client.of_string client_id ; state = State.of_string state } + { client_id = Id.Client.of_int client_id ; state = State.of_string state } let to_string x = Printf.sprintf "disconnect %s %d" (State.to_string x.state) @@ -184,7 +184,7 @@ module DelTask_msg : sig { state: State.t; task_id: Id.Task.t } - val create : state:string -> task_id:string -> t + val create : state:string -> task_id:int -> t val to_string : t -> string end = struct type t = @@ -193,7 +193,7 @@ end = struct } let create ~state ~task_id = { state = State.of_string state ; - task_id = Id.Task.of_string task_id + task_id = Id.Task.of_int task_id } let to_string x = Printf.sprintf "del_task %s %d" @@ -230,7 +230,7 @@ module GetTask_msg : sig { client_id: Id.Client.t ; state: State.t ; } - val create : state:string -> client_id:string -> t + val create : state:string -> client_id:int -> t val to_string : t -> string end = struct type t = @@ -238,7 +238,7 @@ end = struct state: State.t ; } let create ~state ~client_id = - { client_id = Id.Client.of_string client_id ; state = State.of_string state } + { client_id = Id.Client.of_int client_id ; state = State.of_string state } let to_string x = Printf.sprintf "get_task %s %d" (State.to_string x.state) @@ -269,14 +269,14 @@ module GetPsi_msg : sig type t = { client_id: Id.Client.t ; } - val create : client_id:string -> t + val create : client_id:int -> t val to_string : t -> string end = struct type t = { client_id: Id.Client.t ; } let create ~client_id = - { client_id = Id.Client.of_string client_id } + { client_id = Id.Client.of_int client_id } let to_string x = Printf.sprintf "get_psi %d" (Id.Client.to_int x.client_id) @@ -365,14 +365,14 @@ module PutPsi_msg : sig n_det_selectors : Strictly_positive_int.t option; psi : Psi.t option } val create : - client_id:string -> - n_state:string -> - n_det:string -> - psi_det_size:string -> + client_id:int -> + n_state:int -> + n_det:int -> + psi_det_size:int -> psi_det:string option -> psi_coef:string option -> - n_det_generators: string option -> - n_det_selectors:string option -> + n_det_generators: int option -> + n_det_selectors:int option -> energy:string option -> t val to_string_list : t -> string list val to_string : t -> string @@ -388,20 +388,17 @@ end = struct let create ~client_id ~n_state ~n_det ~psi_det_size ~psi_det ~psi_coef ~n_det_generators ~n_det_selectors ~energy = let n_state, n_det, psi_det_size = - Int.of_string n_state - |> Strictly_positive_int.of_int , - Int.of_string n_det - |> Strictly_positive_int.of_int , - Int.of_string psi_det_size - |> Strictly_positive_int.of_int + Strictly_positive_int.of_int n_state, + Strictly_positive_int.of_int n_det, + Strictly_positive_int.of_int psi_det_size in assert (Strictly_positive_int.to_int psi_det_size >= Strictly_positive_int.to_int n_det); let n_det_generators, n_det_selectors = match n_det_generators, n_det_selectors with | Some x, Some y -> - Some (Strictly_positive_int.of_int @@ Int.of_string x), - Some (Strictly_positive_int.of_int @@ Int.of_string y) + Some (Strictly_positive_int.of_int x), + Some (Strictly_positive_int.of_int y) | _ -> None, None in let psi = @@ -411,7 +408,7 @@ end = struct ~psi_coef ~n_det_generators ~n_det_selectors ~energy) | _ -> None in - { client_id = Id.Client.of_string client_id ; + { client_id = Id.Client.of_int client_id ; n_state ; n_det ; psi_det_size ; n_det_generators ; n_det_selectors ; psi } @@ -465,7 +462,7 @@ module TaskDone_msg : sig state: State.t ; task_id: Id.Task.t ; } - val create : state:string -> client_id:string -> task_id:string -> t + val create : state:string -> client_id:int -> task_id:int -> t val to_string : t -> string end = struct type t = @@ -474,9 +471,9 @@ end = struct task_id: Id.Task.t; } let create ~state ~client_id ~task_id = - { client_id = Id.Client.of_string client_id ; + { client_id = Id.Client.of_int client_id ; state = State.of_string state ; - task_id = Id.Task.of_string task_id; + task_id = Id.Task.of_int task_id; } let to_string x = @@ -489,22 +486,22 @@ end (** Terminate *) module Terminate_msg : sig type t - val create : unit -> t + val create : t val to_string : t -> string end = struct type t = Terminate - let create () = Terminate + let create = Terminate let to_string x = "terminate" end (** OK *) module Ok_msg : sig type t - val create : unit -> t + val create : t val to_string : t -> string end = struct type t = Ok - let create () = Ok + let create = Ok let to_string x = "ok" end @@ -551,45 +548,45 @@ type t = let of_string s = - let l = - String.split ~on:' ' s - |> List.filter ~f:(fun x -> (String.strip x) <> "") - |> List.map ~f:String.lowercase - in - match l with - | "add_task" :: state :: task -> - AddTask (AddTask_msg.create ~state ~task:(String.concat ~sep:" " task) ) - | "del_task" :: state :: task_id :: [] -> - DelTask (DelTask_msg.create ~state ~task_id) - | "get_task" :: state :: client_id :: [] -> - GetTask (GetTask_msg.create ~state ~client_id) - | "task_done" :: state :: client_id :: task_id :: [] -> - TaskDone (TaskDone_msg.create ~state ~client_id ~task_id) - | "disconnect" :: state :: client_id :: [] -> - Disconnect (Disconnect_msg.create ~state ~client_id) - | "connect" :: t :: [] -> - Connect (Connect_msg.create t) - | "new_job" :: state :: push_address_tcp :: push_address_inproc :: [] -> - Newjob (Newjob_msg.create push_address_tcp push_address_inproc state) - | "end_job" :: state :: [] -> - Endjob (Endjob_msg.create state) - | "terminate" :: [] -> - Terminate (Terminate_msg.create () ) - | "get_psi" :: client_id :: [] -> - GetPsi (GetPsi_msg.create ~client_id) - | "put_psi" :: client_id :: n_state :: n_det :: psi_det_size :: n_det_generators :: n_det_selectors :: [] -> - PutPsi (PutPsi_msg.create ~client_id ~n_state ~n_det ~psi_det_size - ~n_det_generators:(Some n_det_generators) ~n_det_selectors:(Some n_det_selectors) - ~psi_det:None ~psi_coef:None ~energy:None ) - | "put_psi" :: client_id :: n_state :: n_det :: psi_det_size :: [] -> - PutPsi (PutPsi_msg.create ~client_id ~n_state ~n_det ~psi_det_size ~n_det_generators:None - ~n_det_selectors:None ~psi_det:None ~psi_coef:None ~energy:None) - | "ok" :: [] -> Ok (Ok_msg.create ()) - | "error" :: rest -> Error (Error_msg.create (String.concat ~sep:" " rest)) - | "set_stopped" :: [] -> SetStopped - | "set_running" :: [] -> SetRunning - | "set_waiting" :: [] -> SetWaiting - | _ -> failwith "Message not understood" + let open Message_lexer in + match parse s with + | AddTask_ { state ; task } -> + AddTask (AddTask_msg.create ~state ~task) + | DelTask_ { state ; task_id } -> + DelTask (DelTask_msg.create ~state ~task_id) + | GetTask_ { state ; client_id } -> + GetTask (GetTask_msg.create ~state ~client_id) + | TaskDone_ { state ; task_id ; client_id } -> + TaskDone (TaskDone_msg.create ~state ~client_id ~task_id) + | Disconnect_ { state ; client_id } -> + Disconnect (Disconnect_msg.create ~state ~client_id) + | Connect_ socket -> + Connect (Connect_msg.create socket) + | NewJob_ { state ; push_address_tcp ; push_address_inproc } -> + Newjob (Newjob_msg.create push_address_tcp push_address_inproc state) + | EndJob_ state -> + Endjob (Endjob_msg.create state) + | GetPsi_ client_id -> + GetPsi (GetPsi_msg.create ~client_id) + | PutPsi_ { client_id ; n_state ; n_det ; psi_det_size ; n_det_generators ; n_det_selectors } -> + begin + match n_det_selectors, n_det_generators with + | Some s, Some g -> + PutPsi (PutPsi_msg.create ~client_id ~n_state ~n_det ~psi_det_size + ~n_det_generators:(Some g) ~n_det_selectors:(Some s) + ~psi_det:None ~psi_coef:None ~energy:None ) + | _ -> + PutPsi (PutPsi_msg.create ~client_id ~n_state ~n_det ~psi_det_size + ~n_det_generators:None ~n_det_selectors:None + ~psi_det:None ~psi_coef:None ~energy:None ) + end + | Terminate_ -> Terminate (Terminate_msg.create ) + | SetWaiting_ -> SetWaiting + | SetStopped_ -> SetStopped + | SetRunning_ -> SetRunning + | Ok_ -> Ok (Ok_msg.create) + | Error_ m -> Error (Error_msg.create m) + let to_string = function diff --git a/ocaml/Message_lexer.mll b/ocaml/Message_lexer.mll new file mode 100644 index 00000000..45ffc4d4 --- /dev/null +++ b/ocaml/Message_lexer.mll @@ -0,0 +1,245 @@ +{ + +type kw_type = + | TEXT of string + | WORD of string + | INTEGER of int + | FLOAT of float + | NONE + | END_OF_FILE + | ADD_TASK + | DEL_TASK + | GET_TASK + | TASK_DONE + | DISCONNECT + | CONNECT + | NEW_JOB + | END_JOB + | TERMINATE + | GET_PSI + | PUT_PSI + | OK + | ERROR + | SET_STOPPED + | SET_RUNNING + | SET_WAITING + +type state_task = { state : string ; task : string ; } +type state_taskid = { state : string ; task_id : int ; } +type state_clientid = { state : string ; client_id : int ; } +type state_taskid_clientid = { state : string ; task_id : int ; client_id : int ; } +type state_tcp_inproc = { state : string ; push_address_tcp : string ; push_address_inproc : string ; } +type psi = { client_id: int ; n_state: int ; n_det: int ; psi_det_size: int ; + n_det_generators: int option ; n_det_selectors: int option } + +type msg = + | AddTask_ of state_task + | DelTask_ of state_taskid + | GetTask_ of state_clientid + | TaskDone_ of state_taskid_clientid + | Disconnect_ of state_clientid + | Connect_ of string + | NewJob_ of state_tcp_inproc + | EndJob_ of string + | Terminate_ + | GetPsi_ of int + | PutPsi_ of psi + | Ok_ + | Error_ of string + | SetStopped_ + | SetRunning_ + | SetWaiting_ +} + +let word = [^' ' '\t' '\n']+ +let text = [^' ']+[^'\n']+ +let integer = ['0'-'9']+ +let real = '-'? integer '.' integer (['e' 'E'] '-'? integer)? + +let white = [' ' '\t']+ + + +rule get_text = parse + | text as t { TEXT t } + | _ { NONE } + +and kw = parse + | integer as i { INTEGER (int_of_string i) } + | real as r { FLOAT (float_of_string r)} + | "add_task" { ADD_TASK } + | "del_task" { DEL_TASK } + | "get_task" { GET_TASK } + | "task_done" { TASK_DONE } + | "disconnect" { DISCONNECT } + | "connect" { CONNECT } + | "new_job" { NEW_JOB } + | "end_job" { END_JOB } + | "terminate" { TERMINATE } + | "get_psi" { GET_PSI } + | "put_psi" { PUT_PSI } + | "ok" { OK } + | "error" { ERROR } + | "set_stopped" { SET_STOPPED } + | "set_running" { SET_RUNNING } + | "set_waiting" { SET_WAITING } + | word as w { WORD w } + | eof { END_OF_FILE } + | _ { NONE } + + +{ + let rec read_text lexbuf = + let token = + get_text lexbuf + in + match token with + | TEXT t -> t + | NONE -> read_text lexbuf + | _ -> failwith "Error in MessageLexer (2)" + + and read_word lexbuf = + let token = + kw lexbuf + in + match token with + | WORD w -> w + | NONE -> read_word lexbuf + | _ -> failwith "Error in MessageLexer (3)" + + and read_int lexbuf = + let token = + kw lexbuf + in + match token with + | INTEGER i -> i + | NONE -> read_int lexbuf + | _ -> failwith "Error in MessageLexer (4)" + + and parse_rec lexbuf = + let token = + kw lexbuf + in + match token with + | ADD_TASK -> + let state = read_word lexbuf in + let task = read_text lexbuf in + AddTask_ { state ; task } + + | DEL_TASK -> + let state = read_word lexbuf in + let task_id = read_int lexbuf in + DelTask_ { state ; task_id } + + | GET_TASK -> + let state = read_word lexbuf in + let client_id = read_int lexbuf in + GetTask_ { state ; client_id } + + | TASK_DONE -> + let state = read_word lexbuf in + let client_id = read_int lexbuf in + let task_id = read_int lexbuf in + TaskDone_ { state ; task_id ; client_id } + + | DISCONNECT -> + let state = read_word lexbuf in + let client_id = read_int lexbuf in + Disconnect_ { state ; client_id } + + | GET_PSI -> + let client_id = read_int lexbuf in + GetPsi_ client_id + + | PUT_PSI -> + let client_id = read_int lexbuf in + let n_state = read_int lexbuf in + let n_det = read_int lexbuf in + let psi_det_size = read_int lexbuf in + let n_det_generators, n_det_selectors = + try + (Some (read_int lexbuf), Some (read_int lexbuf)) + with (Failure _) -> (None, None) + in + PutPsi_ { client_id ; n_state ; n_det ; psi_det_size ; n_det_generators ; n_det_selectors } + + | CONNECT -> + let socket = read_word lexbuf in + Connect_ socket + + | NEW_JOB -> + let state = read_word lexbuf in + let push_address_tcp = read_word lexbuf in + let push_address_inproc = read_word lexbuf in + NewJob_ { state ; push_address_tcp ; push_address_inproc } + + | END_JOB -> + let state = read_word lexbuf in + EndJob_ state + + | ERROR -> + let message = read_text lexbuf in + Error_ message + + | OK -> Ok_ + | SET_WAITING -> SetWaiting_ + | SET_RUNNING -> SetRunning_ + | SET_STOPPED -> SetStopped_ + | TERMINATE -> Terminate_ + | NONE -> parse_rec lexbuf + | _ -> failwith "Error in MessageLexer" + + let parse message = + let lexbuf = + Lexing.from_string message + in + parse_rec lexbuf + + + let debug () = + let l = [ + "add_task state_pouet Task pouet zob" ; + "del_task state_pouet 12345" ; + "get_task state_pouet 12" ; + "task_done state_pouet 12 12345"; + "connect tcp"; + "disconnect state_pouet 12"; + "new_job state_pouet tcp://test.com:12345 ipc:///dev/shm/x.socket"; + "end_job state_pouet"; + "terminate" ; + "set_running" ; + "set_stopped" ; + "set_waiting" ; + "ok" ; + "error my_error" ; + "get_psi 12" ; + "put_psi 12 2 1000 10000 800 900" ; + "put_psi 12 2 1000 10000" + ] + |> List.map parse + in + List.map (function + | AddTask_ { state ; task } -> Printf.sprintf "ADD_TASK state:\"%s\" task:\"%s\"" state task + | DelTask_ { state ; task_id } -> Printf.sprintf "DEL_TASK state:\"%s\" task_id:%d" state task_id + | GetTask_ { state ; client_id } -> Printf.sprintf "GET_TASK state:\"%s\" task_id:%d" state client_id + | TaskDone_ { state ; task_id ; client_id } -> Printf.sprintf "TASK_DONE state:\"%s\" task_id:%d client_id:%d" state task_id client_id + | Disconnect_ { state ; client_id } -> Printf.sprintf "DISCONNECT state:\"%s\" client_id:%d" state client_id + | Connect_ socket -> Printf.sprintf "CONNECT socket:\"%s\"" socket + | NewJob_ { state ; push_address_tcp ; push_address_inproc } -> Printf.sprintf "NEW_JOB state:\"%s\" tcp:\"%s\" inproc:\"%s\"" state push_address_tcp push_address_inproc + | EndJob_ state -> Printf.sprintf "END_JOB state:\"%s\"" state + | GetPsi_ client_id -> Printf.sprintf "GET_PSI client_id:%d" client_id + | PutPsi_ { client_id ; n_state ; n_det ; psi_det_size ; n_det_generators ; n_det_selectors } -> + begin + match n_det_selectors, n_det_generators with + | Some s, Some g -> Printf.sprintf "PUT_PSI client_id:%d n_state:%d n_det:%d psi_det_size:%d n_det_generators:%d n_det_selectors:%d" client_id n_state n_det psi_det_size g s + | _ -> Printf.sprintf "PUT_PSI client_id:%d n_state:%d n_det:%d psi_det_size:%d" client_id n_state n_det psi_det_size + end + | Terminate_ -> "TERMINATE" + | SetWaiting_ -> "SET_WAITING" + | SetStopped_ -> "SET_STOPPED" + | SetRunning_ -> "SET_RUNNING" + | Ok_ -> "OK" + | Error_ s -> Printf.sprintf "ERROR: \"%s\"" s + ) l + |> List.iter print_endline + +} diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 6edc8122..9d830437 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -99,7 +99,7 @@ let ip_address = lazy ( let reply_ok rep_socket = - Message.Ok_msg.create () + Message.Ok_msg.create |> Message.Ok_msg.to_string |> ZMQ.Socket.send rep_socket @@ -121,7 +121,7 @@ let stop ~port = ZMQ.Socket.set_linger_period req_socket 1_000_000; ZMQ.Socket.connect req_socket address; - Message.Terminate (Message.Terminate_msg.create ()) + Message.Terminate (Message.Terminate_msg.create) |> Message.to_string |> ZMQ.Socket.send req_socket ;