diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 67d5bb07..d6d6bc51 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -2,6 +2,23 @@ open Core.Std open Qptypes +type pub_state = +| Waiting +| Running of string +| Stopped + +let pub_state_of_string = function +| "Waiting" -> Waiting +| "Stopped" -> Stopped +| s -> Running s + +let string_of_pub_state = function +| Waiting -> "Waiting" +| Stopped -> "Stopped" +| Running s -> s + + + type t = { queue : Queuing_system.t ; @@ -120,7 +137,7 @@ let stop ~port = ZMQ.Socket.close req_socket -let new_job msg program_state rep_socket = +let new_job msg program_state rep_socket pair_socket = let state = msg.Message.Newjob_msg.state @@ -143,10 +160,12 @@ let new_job msg program_state rep_socket = } in reply_ok rep_socket; + string_of_pub_state (Running (Message.State.to_string state)) + |> ZMQ.Socket.send pair_socket ; result -let end_job msg program_state rep_socket = +let end_job msg program_state rep_socket pair_socket = let failure () = reply_wrong_state rep_socket; @@ -165,7 +184,11 @@ let end_job msg program_state rep_socket = | Some state -> begin if (msg.Message.Endjob_msg.state = state) then - success state + begin + string_of_pub_state Waiting + |> ZMQ.Socket.send pair_socket ; + success state + end else failure () end @@ -355,7 +378,7 @@ let add_task msg program_state rep_socket = -let get_task msg program_state rep_socket = +let get_task msg program_state rep_socket pair_socket = let state, client_id = msg.Message.GetTask_msg.state, @@ -371,6 +394,12 @@ let get_task msg program_state rep_socket = let new_queue, task_id, task = Queuing_system.pop_task ~client_id program_state.queue in + if (Queuing_system.number_of_queued new_queue = 0) then + string_of_pub_state Waiting + |> ZMQ.Socket.send pair_socket + else + string_of_pub_state (Running (Message.State.to_string state)) + |> ZMQ.Socket.send pair_socket; let new_program_state = { program_state with @@ -512,18 +541,76 @@ let error msg program_state rep_socket = |> ZMQ.Socket.send rep_socket ; program_state +let start_pub_thread ~port = + Thread.create (fun () -> + let timeout = + 1000 + in + let pair_socket = + ZMQ.Socket.create zmq_context ZMQ.Socket.pair + and address = + "inproc://pair" + in + ZMQ.Socket.connect pair_socket address; + + let pub_socket = + ZMQ.Socket.create zmq_context ZMQ.Socket.pub + and address = + Printf.sprintf "tcp://*:%d" port + in + bind_socket ~socket_type:"PUB" ~socket:pub_socket ~address; + + let pollitem = + ZMQ.Poll.mask_of + [| (pair_socket, ZMQ.Poll.In) |] + in + + let rec run state = + let new_state = + let polling = + ZMQ.Poll.poll ~timeout pollitem + in + if (polling.(0) = Some ZMQ.Poll.In) then + ZMQ.Socket.recv ~block:false pair_socket + |> pub_state_of_string + else + state + in + ZMQ.Socket.send pub_socket @@ string_of_pub_state new_state; + match state with + | Stopped -> () + | _ -> run new_state + in + run Waiting; + ZMQ.Socket.set_linger_period pair_socket 1000 ; + ZMQ.Socket.close pair_socket; + ZMQ.Socket.set_linger_period pub_socket 1000 ; + ZMQ.Socket.close pub_socket; + ) let run ~port = + (** Bind inproc socket for changing state of pub *) + let pair_socket = + ZMQ.Socket.create zmq_context ZMQ.Socket.pair + and address = + "inproc://pair" + in + bind_socket "PAIR" pair_socket address; + + let pub_thread = + start_pub_thread ~port:(port+1) () + in + (** Bind REP socket *) let rep_socket = ZMQ.Socket.create zmq_context ZMQ.Socket.rep and address = - Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port + Printf.sprintf "tcp://*:%d" port in - bind_socket "REP" rep_socket address; ZMQ.Socket.set_linger_period rep_socket 1_000_000; + bind_socket "REP" rep_socket address; let initial_program_state = { queue = Queuing_system.create () ; @@ -542,6 +629,9 @@ let run ~port = [| (rep_socket, ZMQ.Poll.In) |] in + let address = + Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port + in Printf.printf "Task server running : %s\n%!" address; @@ -591,15 +681,15 @@ let run ~port = | _ , Message.Terminate _ -> terminate program_state rep_socket | _ , Message.PutPsi x -> put_psi x rest program_state rep_socket | _ , Message.GetPsi x -> get_psi x program_state rep_socket - | None , Message.Newjob x -> new_job x program_state rep_socket + | None , Message.Newjob x -> new_job x program_state rep_socket pair_socket | _ , Message.Newjob _ -> error "A job is already running" program_state rep_socket - | Some _, Message.Endjob x -> end_job x program_state rep_socket + | Some _, Message.Endjob x -> end_job x program_state rep_socket pair_socket | None , _ -> error "No job is running" program_state rep_socket | Some _, Message.Connect x -> connect x program_state rep_socket | Some _, Message.Disconnect x -> disconnect x program_state rep_socket | Some _, Message.AddTask x -> add_task x program_state rep_socket | Some _, Message.DelTask x -> del_task x program_state rep_socket - | Some _, Message.GetTask x -> get_task x program_state rep_socket + | Some _, Message.GetTask x -> get_task x program_state rep_socket pair_socket | Some _, Message.TaskDone x -> task_done x program_state rep_socket | _ , _ -> error ("Invalid message : "^(Message.to_string message)) program_state rep_socket @@ -614,6 +704,10 @@ let run ~port = end in main_loop initial_program_state true; + ZMQ.Socket.send pair_socket @@ string_of_pub_state Stopped; + Thread.join pub_thread; + + diff --git a/ocaml/TaskServer.mli b/ocaml/TaskServer.mli index f16ddaab..f923a18a 100644 --- a/ocaml/TaskServer.mli +++ b/ocaml/TaskServer.mli @@ -43,10 +43,10 @@ val stop : port:int -> unit (** {1} Server functions *) (** Create a new job *) -val new_job : Message.Newjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t +val new_job : Message.Newjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> [> `Pair] ZMQ.Socket.t -> t (** Finish a running job *) -val end_job : Message.Endjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t +val end_job : Message.Endjob_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> [> `Pair] ZMQ.Socket.t -> t (** Connect a client *) val connect: Message.Connect_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t @@ -64,7 +64,7 @@ val task_done: Message.TaskDone_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t val del_task: Message.DelTask_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t (** The client get a new task to execute *) -val get_task: Message.GetTask_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> t +val get_task: Message.GetTask_msg.t -> t -> [> `Req ] ZMQ.Socket.t -> [> `Pair] ZMQ.Socket.t -> t (** Terminate server *) val terminate : t -> [> `Req ] ZMQ.Socket.t -> t diff --git a/plugins/Full_CI/micro_pt2.irp.f b/plugins/Full_CI/micro_pt2.irp.f index 14cc52bf..9ce45eb5 100644 --- a/plugins/Full_CI/micro_pt2.irp.f +++ b/plugins/Full_CI/micro_pt2.irp.f @@ -28,6 +28,8 @@ subroutine run_wf zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + ! TODO : do loop here + ! TODO : wait_state call zmq_get_psi(zmq_to_qp_run_socket, 1) call write_double(6,ci_energy,'Energy') zmq_state = 'h_apply_fci_pt2' diff --git a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f index aa1d2420..ce4518cf 100644 --- a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f +++ b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f @@ -93,6 +93,8 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc) integer(ZMQ_PTR), external :: new_zmq_push_socket integer(ZMQ_PTR) :: zmq_socket_push + character*(64) :: state + zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() zmq_socket_push = new_zmq_push_socket(thread) @@ -109,18 +111,15 @@ subroutine ao_bielec_integrals_in_map_slave(thread,iproc) call push_integrals(zmq_socket_push, n_integrals, buffer_i, buffer_value, task_id) enddo - deallocate( buffer_i, buffer_value ) call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id) + deallocate( buffer_i, buffer_value ) call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) call end_zmq_push_socket(zmq_socket_push,thread) end - - - subroutine ao_bielec_integrals_in_map_collector use map_module use f77_zmq diff --git a/src/Integrals_Bielec/qp_ao_ints.irp.f b/src/Integrals_Bielec/qp_ao_ints.irp.f index c60b4e5d..93f62a7d 100644 --- a/src/Integrals_Bielec/qp_ao_ints.irp.f +++ b/src/Integrals_Bielec/qp_ao_ints.irp.f @@ -17,10 +17,15 @@ program qp_ao_ints double precision :: integral, ao_bielec_integral integral = ao_bielec_integral(1,1,1,1) - !$OMP PARALLEL DEFAULT(PRIVATE) PRIVATE(i) - i = omp_get_thread_num() - call ao_bielec_integrals_in_map_slave_tcp(i) - !$OMP END PARALLEL + character*(64) :: state + call wait_for_state(zmq_state,state) + do while (state /= 'Stopped') + !$OMP PARALLEL DEFAULT(PRIVATE) PRIVATE(i) + i = omp_get_thread_num() + call ao_bielec_integrals_in_map_slave_tcp(i) + !$OMP END PARALLEL + call wait_for_state(zmq_state,state) + enddo print *, 'Done' end diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 7c87a0ef..d3b76f4f 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -46,31 +46,39 @@ END_PROVIDER &BEGIN_PROVIDER [ character*(128), zmq_socket_push_tcp_address ] &BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ] &BEGIN_PROVIDER [ character*(128), zmq_socket_push_inproc_address ] +&BEGIN_PROVIDER [ character*(128), zmq_socket_sub_tcp_address ] use f77_zmq implicit none BEGIN_DOC ! Socket which pulls the results (2) END_DOC - character*(8), external :: zmq_port - zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(1)//' ' - zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' ' - zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(1)//' ' + + zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' ' + zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' ' + zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' ' + zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' ' zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address - zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(2)//' ' + zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' ' + + ! /!\ Don't forget to change subroutine reset_zmq_addresses END_PROVIDER subroutine reset_zmq_addresses use f77_zmq implicit none + BEGIN_DOC + ! Socket which pulls the results (2) + END_DOC character*(8), external :: zmq_port - - zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(1)//' ' - zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' ' - zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(1)//' ' + + zmq_socket_sub_tcp_address = trim(qp_run_address)//':'//zmq_port(1)//' ' + zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(2)//' ' + zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(2)//' ' + zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(2)//' ' zmq_socket_push_inproc_address = zmq_socket_pull_inproc_address - zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(2)//' ' -end + zmq_socket_pair_inproc_address = 'inproc://'//zmq_port(3)//' ' +end subroutine switch_qp_run_to_master @@ -87,6 +95,7 @@ subroutine switch_qp_run_to_master stop -1 endif qp_run_address = trim(buffer) + print *, 'Switched to qp_run master : ', trim(qp_run_address) integer :: i do i=len(buffer),1,-1 @@ -96,7 +105,6 @@ subroutine switch_qp_run_to_master exit endif enddo - call reset_zmq_addresses end @@ -314,6 +322,60 @@ end +function new_zmq_sub_socket() + use f77_zmq + implicit none + BEGIN_DOC + ! Socket to read the state published by the Task server + END_DOC + integer :: rc + integer(ZMQ_PTR) :: new_zmq_sub_socket + + call omp_set_lock(zmq_lock) + if (zmq_context == 0_ZMQ_PTR) then + stop 'zmq_context is uninitialized' + endif + new_zmq_sub_socket = f77_zmq_socket(zmq_context, ZMQ_SUB) + call omp_unset_lock(zmq_lock) + if (new_zmq_sub_socket == 0_ZMQ_PTR) then + stop 'Unable to create zmq sub socket' + endif + + rc = f77_zmq_setsockopt(new_zmq_sub_socket,ZMQ_RCVTIMEO,10000,4) + if (rc /= 0) then + stop 'Unable to set timeout in new_zmq_sub_socket' + endif + + rc = f77_zmq_setsockopt(new_zmq_sub_socket,ZMQ_SUBSCRIBE,"",0) + if (rc /= 0) then + stop 'Unable to subscribe new_zmq_sub_socket' + endif + + rc = f77_zmq_connect(new_zmq_sub_socket, zmq_socket_sub_tcp_address) + if (rc /= 0) then + stop 'Unable to connect new_zmq_sub_socket' + endif +end + + +subroutine end_zmq_sub_socket(zmq_socket_sub) + use f77_zmq + implicit none + BEGIN_DOC + ! Terminate socket on which the results are sent. + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_socket_sub + integer :: rc + + rc = f77_zmq_close(zmq_socket_sub) + if (rc /= 0) then + print *, 'f77_zmq_close(zmq_socket_sub)' + stop 'error' + endif + +end + + subroutine end_zmq_pair_socket(zmq_socket_pair) use f77_zmq implicit none @@ -766,3 +828,31 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) endif end +subroutine wait_for_state(state_wait,state) + use f77_zmq + implicit none + BEGIN_DOC +! Wait for the ZMQ state to be ready + END_DOC + character*(64), intent(in) :: state_wait + character*(64), intent(out) :: state + integer(ZMQ_PTR) :: zmq_socket_sub + integer(ZMQ_PTR), external :: new_zmq_sub_socket + integer :: rc + + zmq_socket_sub = new_zmq_sub_socket() + state = "Waiting" + do while (state /= state_wait .and. state /= "Stopped") + rc = f77_zmq_recv( zmq_socket_sub, state, 64, 0) + if (rc > 0) then + state = trim(state(1:rc)) + else + print *, 'Timeout reached. Stopping' + state = "Stopped" + endif + end do + call end_zmq_sub_socket(zmq_socket_sub) +end + + +