10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-07-22 18:57:31 +02:00

Introduced Abort Keyword

This commit is contained in:
Anthony Scemama 2017-05-10 00:04:34 +02:00
parent ff20894479
commit 500bf757e3
5 changed files with 86 additions and 10 deletions

View File

@ -610,6 +610,17 @@ end = struct
let to_string x = "terminate"
end
(** Abort *)
module Abort_msg : sig
type t
val create : t
val to_string : t -> string
end = struct
type t = Abort
let create = Abort
let to_string x = "abort"
end
(** OK *)
module Ok_msg : sig
type t
@ -660,6 +671,7 @@ type t =
| AddTaskReply of AddTaskReply_msg.t
| TaskDone of TaskDone_msg.t
| Terminate of Terminate_msg.t
| Abort of Abort_msg.t
| Ok of Ok_msg.t
| Error of Error_msg.t
| SetStopped
@ -705,6 +717,7 @@ let of_string s =
| PutVector_ { client_id ; size } ->
PutVector (PutVector_msg.create ~client_id ~size ~data:None )
| Terminate_ -> Terminate (Terminate_msg.create )
| Abort_ -> Abort (Abort_msg.create )
| SetWaiting_ -> SetWaiting
| SetStopped_ -> SetStopped
| SetRunning_ -> SetRunning
@ -732,6 +745,7 @@ let to_string = function
| AddTaskReply x -> AddTaskReply_msg.to_string x
| TaskDone x -> TaskDone_msg.to_string x
| Terminate x -> Terminate_msg.to_string x
| Abort x -> Abort_msg.to_string x
| Ok x -> Ok_msg.to_string x
| Error x -> Error_msg.to_string x
| PutPsi x -> PutPsi_msg.to_string x

View File

@ -15,6 +15,7 @@ type kw_type =
| NEW_JOB
| END_JOB
| TERMINATE
| ABORT
| GET_PSI
| PUT_PSI
| GET_VECTOR
@ -44,6 +45,7 @@ type msg =
| NewJob_ of state_tcp_inproc
| EndJob_ of string
| Terminate_
| Abort_
| GetPsi_ of int
| PutPsi_ of psi
| GetVector_ of int
@ -88,6 +90,7 @@ and kw = parse
| "new_job" { NEW_JOB }
| "end_job" { END_JOB }
| "terminate" { TERMINATE }
| "abort" { ABORT }
| "get_psi" { GET_PSI }
| "put_psi" { PUT_PSI }
| "get_vector" { GET_PSI }
@ -218,6 +221,7 @@ and kw = parse
| SET_RUNNING -> SetRunning_
| SET_STOPPED -> SetStopped_
| TERMINATE -> Terminate_
| ABORT -> Abort_
| NONE -> parse_rec lexbuf
| _ -> failwith "Error in MessageLexer"
@ -242,6 +246,7 @@ and kw = parse
"new_job state_pouet tcp://test.com:12345 ipc:///dev/shm/x.socket";
"end_job state_pouet";
"terminate" ;
"abort" ;
"set_running" ;
"set_stopped" ;
"set_waiting" ;
@ -273,6 +278,7 @@ and kw = parse
| PutVector_ { client_id ; size } ->
Printf.sprintf "PUT_VECTOR client_id:%d size:%d" client_id size
| Terminate_ -> "TERMINATE"
| Abort_ -> "ABORT"
| SetWaiting_ -> "SET_WAITING"
| SetStopped_ -> "SET_STOPPED"
| SetRunning_ -> "SET_RUNNING"

View File

@ -567,6 +567,43 @@ let terminate program_state rep_socket =
}
let abort program_state rep_socket =
let queue, client_id =
Queuing_system.add_client program_state.queue
in
let rec aux accu queue = function
| 0 -> (queue, accu)
| rest ->
let new_queue, task_id, _ =
Queuing_system.pop_task ~client_id queue
in
let new_accu =
match task_id with
| Some task_id -> task_id::accu
| None -> accu
in
Queuing_system.number_of_queued new_queue
|> aux new_accu new_queue
in
let queue, tasks =
aux [] queue 1
in
let queue =
List.fold ~f:(fun queue task_id ->
Queuing_system.end_task ~task_id ~client_id queue)
~init:queue tasks
in
let queue =
List.fold ~f:(fun queue task_id -> Queuing_system.del_task ~task_id queue)
~init:queue tasks
in
reply_ok rep_socket;
{ program_state with
queue
}
let error msg program_state rep_socket =
Message.Error (Message.Error_msg.create msg)
|> Message.to_string
@ -714,6 +751,7 @@ let run ~port =
try
match program_state.state, message with
| _ , Message.Terminate _ -> terminate program_state rep_socket
| _ , Message.Abort _ -> abort program_state rep_socket
| _ , Message.PutVector x -> put_vector x rest program_state rep_socket
| _ , Message.GetVector x -> get_vector x program_state rep_socket
| _ , Message.PutPsi x -> put_psi x rest program_state rep_socket

View File

@ -266,16 +266,7 @@ subroutine pt2_collector(E, b, tbc, comb, Ncomb, computed, pt2_detail, sumabove,
! Termination
pt2(1) = avg
print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, ''
integer :: worker_id
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,0)
if(worker_id /= -1) then
do
call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id(1), task)
if (task_id(1) == 0) exit
call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id(1))
call zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id(1),more)
enddo
end if
call zmq_abort(zmq_to_qp_run_socket)
else
if (Nabove(tooth) > Nabove_old) then
print '(G10.3, 2X, F16.10, 2X, G16.3, 2X, F16.4, A20)', Nabove(tooth), avg+E, eqt, time-time0, ''

View File

@ -771,6 +771,33 @@ subroutine add_task_to_taskserver_recv(zmq_to_qp_run_socket)
end
subroutine zmq_abort(zmq_to_qp_run_socket)
use f77_zmq
implicit none
BEGIN_DOC
! Aborts a running parallel computation
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer :: rc, sze
character*(512) :: message
write(message,*) 'abort '
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc /= sze) then
print *, irp_here, 'f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
if (trim(message(1:rc)) /= 'ok') then
print *, trim(message(1:rc))
print *, 'Unable to send abort message'
stop -1
endif
end
subroutine task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id)
use f77_zmq
implicit none