10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-12-23 04:43:50 +01:00

task server with pub socket

This commit is contained in:
Anthony Scemama 2016-07-26 18:29:52 +02:00
parent 91e412c783
commit e681b7c37d
6 changed files with 222 additions and 32 deletions

View File

@ -2,6 +2,23 @@ open Core.Std
open Qptypes
type pub_state =
| Waiting
| Running of string
| Stopped
let pub_state_of_string = function
| "Waiting" -> Waiting
| "Stopped" -> Stopped
| s -> Running s
let string_of_pub_state = function
| Waiting -> "Waiting"
| Stopped -> "Stopped"
| Running s -> s
type t =
{
queue : Queuing_system.t ;
@ -120,7 +137,7 @@ let stop ~port =
ZMQ.Socket.close req_socket
let new_job msg program_state rep_socket =
let new_job msg program_state rep_socket pair_socket =
let state =
msg.Message.Newjob_msg.state
@ -143,10 +160,12 @@ let new_job msg program_state rep_socket =
}
in
reply_ok rep_socket;
string_of_pub_state (Running (Message.State.to_string state))
|> ZMQ.Socket.send pair_socket ;
result
let end_job msg program_state rep_socket =
let end_job msg program_state rep_socket pair_socket =
let failure () =
reply_wrong_state rep_socket;
@ -165,7 +184,11 @@ let end_job msg program_state rep_socket =
| Some state ->
begin
if (msg.Message.Endjob_msg.state = state) then
success state
begin
string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ;
success state
end
else
failure ()
end
@ -355,7 +378,7 @@ let add_task msg program_state rep_socket =
let get_task msg program_state rep_socket =
let get_task msg program_state rep_socket pair_socket =
let state, client_id =
msg.Message.GetTask_msg.state,
@ -371,6 +394,12 @@ let get_task msg program_state rep_socket =
let new_queue, task_id, task =
Queuing_system.pop_task ~client_id program_state.queue
in
if (Queuing_system.number_of_queued new_queue = 0) then
string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket
else
string_of_pub_state (Running (Message.State.to_string state))
|> ZMQ.Socket.send pair_socket;
let new_program_state =
{ program_state with
@ -512,18 +541,76 @@ let error msg program_state rep_socket =
|> ZMQ.Socket.send rep_socket ;
program_state
let start_pub_thread ~port =
Thread.create (fun () ->
let timeout =
1000
in
let pair_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pair
and address =
"inproc://pair"
in
ZMQ.Socket.connect pair_socket address;
let pub_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pub
and address =
Printf.sprintf "tcp://*:%d" port
in
bind_socket ~socket_type:"PUB" ~socket:pub_socket ~address;
let pollitem =
ZMQ.Poll.mask_of
[| (pair_socket, ZMQ.Poll.In) |]
in
let rec run state =
let new_state =
let polling =
ZMQ.Poll.poll ~timeout pollitem
in
if (polling.(0) = Some ZMQ.Poll.In) then
ZMQ.Socket.recv ~block:false pair_socket
|> pub_state_of_string
else
state
in
ZMQ.Socket.send pub_socket @@ string_of_pub_state new_state;
match state with
| Stopped -> ()
| _ -> run new_state
in
run Waiting;
ZMQ.Socket.set_linger_period pair_socket 1000 ;
ZMQ.Socket.close pair_socket;
ZMQ.Socket.set_linger_period pub_socket 1000 ;
ZMQ.Socket.close pub_socket;
)
let run ~port =
(** Bind inproc socket for changing state of pub *)
let pair_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.pair
and address =
"inproc://pair"
in
bind_socket "PAIR" pair_socket address;
let pub_thread =
start_pub_thread ~port:(port+1) ()
in
(** Bind REP socket *)
let rep_socket =
ZMQ.Socket.create zmq_context ZMQ.Socket.rep
and address =
Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port
Printf.sprintf "tcp://*:%d" port
in
bind_socket "REP" rep_socket address;
ZMQ.Socket.set_linger_period rep_socket 1_000_000;
bind_socket "REP" rep_socket address;
let initial_program_state =
{ queue = Queuing_system.create () ;
@ -542,6 +629,9 @@ let run ~port =
[| (rep_socket, ZMQ.Poll.In) |]
in
let address =
Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port
in
Printf.printf "Task server running : %s\n%!" address;
@ -591,15 +681,15 @@ let run ~port =
| _ , Message.Terminate _ -> terminate program_state rep_socket
| _ , Message.PutPsi x -> put_psi x rest program_state rep_socket
| _ , Message.GetPsi x -> get_psi x program_state rep_socket
| None , Message.Newjob x -> new_job x program_state rep_socket
| None , Message.Newjob x -> new_job x program_state rep_socket pair_socket
| _ , Message.Newjob _ -> error "A job is already running" program_state rep_socket
| Some _, Message.Endjob x -> end_job x program_state rep_socket
| Some _, Message.Endjob x -> end_job x program_state rep_socket pair_socket
| None , _ -> error "No job is running" program_state rep_socket
| Some _, Message.Connect x -> connect x program_state rep_socket
| Some _, Message.Disconnect x -> disconnect x program_state rep_socket
| Some _, Message.AddTask x -> add_task x program_state rep_socket
| Some _, Message.DelTask x -> del_task x program_state rep_socket
| Some _, Message.GetTask x -> get_task x program_state rep_socket
| Some _, Message.GetTask x -> get_task x program_state rep_socket pair_socket
| Some _, Message.TaskDone x -> task_done x program_state rep_socket
| _ , _ ->
error ("Invalid message : "^(Message.to_string message)) program_state rep_socket
@ -614,6 +704,10 @@ let run ~port =
end
in main_loop initial_program_state true;
ZMQ.Socket.send pair_socket @@ string_of_pub_state Stopped;
Thread.join pub_thread;

View File

@ -43,10 +43,10 @@ val stop : port:int -> unit
(** {1} Server functions *)
(** Create a new job *)
val new_job : Message.Newjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t
val new_job : Message.Newjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> [> `Pair] ZMQ.Socket.t -> t
(** Finish a running job *)
val end_job : Message.Endjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t
val end_job : Message.Endjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> [> `Pair] ZMQ.Socket.t -> t
(** Connect a client *)
val connect: Message.Connect_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t
@ -64,7 +64,7 @@ val task_done: Message.TaskDone_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t
val del_task: Message.DelTask_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t
(** The client get a new task to execute *)
val get_task: Message.GetTask_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t
val get_task: Message.GetTask_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> [> `Pair] ZMQ.Socket.t -> t
(** Terminate server *)
val terminate : t -> [> `Req ] ZMQ.Socket.t -> t

View File

@ -28,6 +28,8 @@ subroutine run_wf
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
! TODO : do loop here
! TODO : wait_state
call zmq_get_psi(zmq_to_qp_run_socket, 1)
call write_double(6,ci_energy,'Energy')
zmq_state = 'h_apply_fci_pt2'

View File

@ -93,6 +93,8 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc)
integer(ZMQ_PTR), external :: new_zmq_push_socket
integer(ZMQ_PTR) :: zmq_socket_push
character*(64) :: state
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
@ -109,18 +111,15 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc)
call push_integrals(zmq_socket_push, n_integrals, buffer_i, buffer_value, task_id)
enddo
deallocate( buffer_i, buffer_value )
call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
deallocate( buffer_i, buffer_value )
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
end
subroutine ao_bielec_integrals_in_map_collector
use map_module
use f77_zmq

View File

@ -17,10 +17,15 @@ program qp_ao_ints
double precision :: integral, ao_bielec_integral
integral = ao_bielec_integral(1,1,1,1)
!$OMP PARALLEL DEFAULT(PRIVATE) PRIVATE(i)
i = omp_get_thread_num()
call ao_bielec_integrals_in_map_slave_tcp(i)
!$OMP END PARALLEL
character*(64) :: state
call wait_for_state(zmq_state,state)
do while (state /= 'Stopped')
!$OMP PARALLEL DEFAULT(PRIVATE) PRIVATE(i)
i = omp_get_thread_num()
call ao_bielec_integrals_in_map_slave_tcp(i)
!$OMP END PARALLEL
call wait_for_state(zmq_state,state)
enddo
print *, 'Done'
end

View File

@ -46,31 +46,39 @@ END_PROVIDER
&BEGIN_PROVIDER [ character*(128), zmq_socket_push_tcp_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_push_inproc_address ]
&BEGIN_PROVIDER [ character*(128), zmq_socket_sub_tcp_address ]
use f77_zmq
implicit none
BEGIN_DOC
! Socket which pulls the results (2)
END_DOC
character*(8), external :: zmq_port
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(1)//' '
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' '
zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(1)//' '
zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' '
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' '
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' '
zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' '
zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address
zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(2)//' '
zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' '
! /!\ Don't forget to change subroutine reset_zmq_addresses
END_PROVIDER
subroutine reset_zmq_addresses
use f77_zmq
implicit none
BEGIN_DOC
! Socket which pulls the results (2)
END_DOC
character*(8), external :: zmq_port
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(1)//' '
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' '
zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(1)//' '
zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' '
zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' '
zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' '
zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' '
zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address
zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(2)//' '
end
zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' '
end
subroutine switch_qp_run_to_master
@ -87,6 +95,7 @@ subroutine switch_qp_run_to_master
stop -1
endif
qp_run_address = trim(buffer)
print *, 'Switched to qp_run master : ', trim(qp_run_address)
integer :: i
do i=len(buffer),1,-1
@ -96,7 +105,6 @@ subroutine switch_qp_run_to_master
exit
endif
enddo
call reset_zmq_addresses
end
@ -314,6 +322,60 @@ end
function new_zmq_sub_socket()
use f77_zmq
implicit none
BEGIN_DOC
! Socket to read the state published by the Task server
END_DOC
integer :: rc
integer(ZMQ_PTR) :: new_zmq_sub_socket
call omp_set_lock(zmq_lock)
if (zmq_context == 0_ZMQ_PTR) then
stop 'zmq_context is uninitialized'
endif
new_zmq_sub_socket = f77_zmq_socket(zmq_context, ZMQ_SUB)
call omp_unset_lock(zmq_lock)
if (new_zmq_sub_socket == 0_ZMQ_PTR) then
stop 'Unable to create zmq sub socket'
endif
rc = f77_zmq_setsockopt(new_zmq_sub_socket,ZMQ_RCVTIMEO,10000,4)
if (rc /= 0) then
stop 'Unable to set timeout in new_zmq_sub_socket'
endif
rc = f77_zmq_setsockopt(new_zmq_sub_socket,ZMQ_SUBSCRIBE,"",0)
if (rc /= 0) then
stop 'Unable to subscribe new_zmq_sub_socket'
endif
rc = f77_zmq_connect(new_zmq_sub_socket, zmq_socket_sub_tcp_address)
if (rc /= 0) then
stop 'Unable to connect new_zmq_sub_socket'
endif
end
subroutine end_zmq_sub_socket(zmq_socket_sub)
use f77_zmq
implicit none
BEGIN_DOC
! Terminate socket on which the results are sent.
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_socket_sub
integer :: rc
rc = f77_zmq_close(zmq_socket_sub)
if (rc /= 0) then
print *, 'f77_zmq_close(zmq_socket_sub)'
stop 'error'
endif
end
subroutine end_zmq_pair_socket(zmq_socket_pair)
use f77_zmq
implicit none
@ -766,3 +828,31 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more)
endif
end
subroutine wait_for_state(state_wait,state)
use f77_zmq
implicit none
BEGIN_DOC
! Wait for the ZMQ state to be ready
END_DOC
character*(64), intent(in) :: state_wait
character*(64), intent(out) :: state
integer(ZMQ_PTR) :: zmq_socket_sub
integer(ZMQ_PTR), external :: new_zmq_sub_socket
integer :: rc
zmq_socket_sub = new_zmq_sub_socket()
state = "Waiting"
do while (state /= state_wait .and. state /= "Stopped")
rc = f77_zmq_recv( zmq_socket_sub, state, 64, 0)
if (rc > 0) then
state = trim(state(1:rc))
else
print *, 'Timeout reached. Stopping'
state = "Stopped"
endif
end do
call end_zmq_sub_socket(zmq_socket_sub)
end