From 500bf757e33be655854d9a70fb5f19a6d3f608b4 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Wed, 10 May 2017 00:04:34 +0200 Subject: [PATCH] Introduced Abort Keyword --- ocaml/Message.ml | 14 ++++++++ ocaml/Message_lexer.mll | 6 ++++ ocaml/TaskServer.ml | 38 ++++++++++++++++++++ plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f | 11 +----- src/ZMQ/utils.irp.f | 27 ++++++++++++++ 5 files changed, 86 insertions(+), 10 deletions(-) diff --git a/ocaml/Message.ml b/ocaml/Message.ml index 7a1d1712..72fb41b5 100644 --- a/ocaml/Message.ml +++ b/ocaml/Message.ml @@ -610,6 +610,17 @@ end = struct let to_string x = "terminate" end +(** Abort *) +module Abort_msg : sig + type t + val create : t + val to_string : t -> string +end = struct + type t = Abort + let create = Abort + let to_string x = "abort" +end + (** OK *) module Ok_msg : sig type t @@ -660,6 +671,7 @@ type t = | AddTaskReply of AddTaskReply_msg.t | TaskDone of TaskDone_msg.t | Terminate of Terminate_msg.t +| Abort of Abort_msg.t | Ok of Ok_msg.t | Error of Error_msg.t | SetStopped @@ -705,6 +717,7 @@ let of_string s = | PutVector_ { client_id ; size } -> PutVector (PutVector_msg.create ~client_id ~size ~data:None ) | Terminate_ -> Terminate (Terminate_msg.create ) + | Abort_ -> Abort (Abort_msg.create ) | SetWaiting_ -> SetWaiting | SetStopped_ -> SetStopped | SetRunning_ -> SetRunning @@ -732,6 +745,7 @@ let to_string = function | AddTaskReply x -> AddTaskReply_msg.to_string x | TaskDone x -> TaskDone_msg.to_string x | Terminate x -> Terminate_msg.to_string x +| Abort x -> Abort_msg.to_string x | Ok x -> Ok_msg.to_string x | Error x -> Error_msg.to_string x | PutPsi x -> PutPsi_msg.to_string x diff --git a/ocaml/Message_lexer.mll b/ocaml/Message_lexer.mll index b85baecf..f01a3eec 100644 --- a/ocaml/Message_lexer.mll +++ b/ocaml/Message_lexer.mll @@ -15,6 +15,7 @@ type kw_type = | NEW_JOB | END_JOB | TERMINATE + | ABORT | GET_PSI | PUT_PSI | GET_VECTOR @@ -44,6 +45,7 @@ type msg = | NewJob_ of state_tcp_inproc | EndJob_ of string | Terminate_ + | Abort_ | GetPsi_ of int | PutPsi_ of psi | GetVector_ of int @@ -88,6 +90,7 @@ and kw = parse | "new_job" { NEW_JOB } | "end_job" { END_JOB } | "terminate" { TERMINATE } + | "abort" { ABORT } | "get_psi" { GET_PSI } | "put_psi" { PUT_PSI } | "get_vector" { GET_PSI } @@ -218,6 +221,7 @@ and kw = parse | SET_RUNNING -> SetRunning_ | SET_STOPPED -> SetStopped_ | TERMINATE -> Terminate_ + | ABORT -> Abort_ | NONE -> parse_rec lexbuf | _ -> failwith "Error in MessageLexer" @@ -242,6 +246,7 @@ and kw = parse "new_job state_pouet tcp://test.com:12345 ipc:///dev/shm/x.socket"; "end_job state_pouet"; "terminate" ; + "abort" ; "set_running" ; "set_stopped" ; "set_waiting" ; @@ -273,6 +278,7 @@ and kw = parse | PutVector_ { client_id ; size } -> Printf.sprintf "PUT_VECTOR client_id:%d size:%d" client_id size | Terminate_ -> "TERMINATE" + | Abort_ -> "ABORT" | SetWaiting_ -> "SET_WAITING" | SetStopped_ -> "SET_STOPPED" | SetRunning_ -> "SET_RUNNING" diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 8c7e4c8a..1b2acdee 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -567,6 +567,43 @@ let terminate program_state rep_socket = } +let abort program_state rep_socket = + let queue, client_id = + Queuing_system.add_client program_state.queue + in + let rec aux accu queue = function + | 0 -> (queue, accu) + | rest -> + let new_queue, task_id, _ = + Queuing_system.pop_task ~client_id queue + in + let new_accu = + match task_id with + | Some task_id -> task_id::accu + | None -> accu + in + Queuing_system.number_of_queued new_queue + |> aux new_accu new_queue + in + let queue, tasks = + aux [] queue 1 + in + let queue = + List.fold ~f:(fun queue task_id -> + Queuing_system.end_task ~task_id ~client_id queue) + ~init:queue tasks + in + let queue = + List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue) + ~init:queue tasks + in + reply_ok rep_socket; + + { program_state with + queue + } + + let error msg program_state rep_socket = Message.Error (Message.Error_msg.create msg) |> Message.to_string @@ -714,6 +751,7 @@ let run ~port = try match program_state.state, message with | _ , Message.Terminate _ -> terminate program_state rep_socket + | _ , Message.Abort _ -> abort program_state rep_socket | _ , Message.PutVector x -> put_vector x rest program_state rep_socket | _ , Message.GetVector x -> get_vector x program_state rep_socket | _ , Message.PutPsi x -> put_psi x rest program_state rep_socket diff --git a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f index 34dde986..0d138493 100644 --- a/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f +++ b/plugins/Full_CI_ZMQ/pt2_stoch_routines.irp.f @@ -266,16 +266,7 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove, ! Termination pt2(1) = avg print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, '' - integer :: worker_id - call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,0) - if(worker_id /= -1) then - do - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(1), task) - if (task_id(1) == 0) exit - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(1)) - call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(1),more) - enddo - end if + call zmq_abort(zmq_to_qp_run_socket) else if (Nabove(tooth) > Nabove_old) then print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, '' diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index fbe09381..91c46caa 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -771,6 +771,33 @@ subroutine add_task_to_taskserver_recv(zmq_to_qp_run_socket) end +subroutine zmq_abort(zmq_to_qp_run_socket) + use f77_zmq + implicit none + BEGIN_DOC + ! Aborts a running parallel computation + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer :: rc, sze + character*(512) :: message + write(message,*) 'abort ' + + sze = len(trim(message)) + rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) + if (rc /= sze) then + print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' + stop 'error' + endif + + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) + if (trim(message(1:rc)) /= 'ok') then + print *, trim(message(1:rc)) + print *, 'Unable to send abort message' + stop -1 + endif + +end + subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) use f77_zmq implicit none