diff --git a/ocaml/.gitignore b/ocaml/.gitignore index 5618a6c0..51fdb52b 100644 --- a/ocaml/.gitignore +++ b/ocaml/.gitignore @@ -39,9 +39,15 @@ test_excitation test_excitation.byte test_gto test_gto.byte +test_message +test_message.byte test_mo_label test_mo_label.byte test_molecule test_molecule.byte test_point3d test_point3d.byte +test_queuing_system +test_queuing_system.byte +test_task_server +test_task_server.byte diff --git a/ocaml/Message.ml b/ocaml/Message.ml index 43982059..f47d6cec 100644 --- a/ocaml/Message.ml +++ b/ocaml/Message.ml @@ -106,6 +106,27 @@ end = struct (Id.Client.to_int x.client_id) end +module DisconnectReply_msg : sig + type t = + { finished: bool ; + state: State.t ; + } + val create : state:State.t -> finished:bool -> t + val to_string : t -> string +end = struct + type t = + { finished: bool; + state: State.t ; + } + let create ~state ~finished = + { state ; finished } + let to_string x = + Printf.sprintf "disconnect_reply %s %d" + (State.to_string x.state) + (if x.finished then 1 else 0) +end + + (** AddTask : Add a new task to the queue *) module AddTask_msg : sig @@ -245,6 +266,7 @@ type t = | Connect of Connect_msg.t | ConnectReply of ConnectReply_msg.t | Disconnect of Disconnect_msg.t +| DisconnectReply of DisconnectReply_msg.t | GetTask of GetTask_msg.t | GetTaskReply of GetTaskReply_msg.t | AddTask of AddTask_msg.t @@ -289,6 +311,7 @@ let to_string = function | Connect x -> Connect_msg.to_string x | ConnectReply x -> ConnectReply_msg.to_string x | Disconnect x -> Disconnect_msg.to_string x +| DisconnectReply x -> DisconnectReply_msg.to_string x | GetTask x -> GetTask_msg.to_string x | GetTaskReply x -> GetTaskReply_msg.to_string x | AddTask x -> AddTask_msg.to_string x diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 4c5b2ff2..1f882540 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -66,19 +66,15 @@ let ip_address = lazy ( end ) -(** Initial ZeroMQ port : - Random port number between 49152 and 65535 *) -let port = lazy ( - 1024 + (Random.int (49151-1024)) ) -let stop () = +let stop ~port = let zmq_context = ZMQ.Context.create () in let req_socket = ZMQ.Socket.create zmq_context ZMQ.Socket.req and address = - Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) (Lazy.force port) + Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port in ZMQ.Socket.connect req_socket address; @@ -100,7 +96,7 @@ let stop () = (** Run the task server *) -let run () = +let run ~port = let zmq_context = ZMQ.Context.create () @@ -109,7 +105,7 @@ let run () = let rep_socket = ZMQ.Socket.create zmq_context ZMQ.Socket.rep and address = - Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) (Lazy.force port) + Printf.sprintf "tcp://%s:%d" (Lazy.force ip_address) port in bind_socket "REP" rep_socket address; @@ -118,6 +114,8 @@ let run () = [| (rep_socket, ZMQ.Poll.In) |] in + Printf.printf "Task server running : %s\n%!" address; + (** State variables *) let q = ref (Queuing_system.create ()) @@ -194,7 +192,13 @@ let run () = Queuing_system.del_client ~client_id:c !q in q := new_q; - Message.to_string ok + let finished = + Queuing_system.number_of_queued !q + + Queuing_system.number_of_running !q = 0 + in + Message.DisconnectReply (Message.DisconnectReply_msg.create + ~state ~finished) + |> Message.to_string |> ZMQ.Socket.send rep_socket and add_task state msg = @@ -257,8 +261,12 @@ let run () = let message = Message.of_string raw_message in - Printf.printf "%s\n%!" (Message.to_string message); - Printf.printf "%s\n%!" (Queuing_system.to_string !q); +(* + Printf.printf "%d %d : %s\n%!" + (Queuing_system.number_of_queued !q) + (Queuing_system.number_of_running !q) + (Message.to_string message); + Printf.printf "%s\n%!" (Queuing_system.to_string !q); *) match (state, message) with | _ , Message.Terminate _ -> terminate () | None , Message.Newjob x -> newjob x @@ -280,7 +288,9 @@ let run () = ZMQ.Socket.close rep_socket +(* let () = Printf.printf "export QP_RUN_ADDRESS=tcp://%s:%d\n%!" (Lazy.force ip_address) (Lazy.force port) +*) diff --git a/ocaml/create_git_sha1.sh b/ocaml/create_git_sha1.sh new file mode 100755 index 00000000..75174319 --- /dev/null +++ b/ocaml/create_git_sha1.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +SHA1=$(git log -1 | head -1 | cut -d ' ' -f 2) +DATE=$(git log -1 | grep Date | cut -d ':' -f 2) +MESSAGE=$(git log -1 | tail -1) +cat << EOF > Git.ml +open Core.Std +let sha1 = "$SHA1" |> String.strip +let date = "$DATE" |> String.strip +let message = "$MESSAGE" |> String.strip +EOF + diff --git a/ocaml/qp_edit.ml b/ocaml/qp_edit.ml index dd22025a..a693aa2f 100644 --- a/ocaml/qp_edit.ml +++ b/ocaml/qp_edit.ml @@ -17,12 +17,12 @@ type keyword = | Electrons | Mo_basis | Nuclei -| Perturbation -| Hartree_fock -| Pseudo -| Integrals_bielec | Determinants +| Integrals_bielec +| Pseudo +| Perturbation | Properties +| Hartree_fock ;; @@ -32,12 +32,12 @@ let keyword_to_string = function | Electrons -> "Electrons" | Mo_basis -> "MO basis" | Nuclei -> "Molecule" -| Perturbation -> "Perturbation" -| Hartree_fock -> "Hartree_fock" -| Pseudo -> "Pseudo" -| Integrals_bielec -> "Integrals_bielec" | Determinants -> "Determinants" +| Integrals_bielec -> "Integrals_bielec" +| Pseudo -> "Pseudo" +| Perturbation -> "Perturbation" | Properties -> "Properties" +| Hartree_fock -> "Hartree_fock" ;; @@ -86,18 +86,18 @@ let get s = f Ao_basis.(read, to_rst) | Determinants_by_hand -> f Determinants_by_hand.(read, to_rst) - | Perturbation -> - f Perturbation.(read, to_rst) - | Hartree_fock -> - f Hartree_fock.(read, to_rst) - | Pseudo -> - f Pseudo.(read, to_rst) - | Integrals_bielec -> - f Integrals_bielec.(read, to_rst) | Determinants -> f Determinants.(read, to_rst) + | Integrals_bielec -> + f Integrals_bielec.(read, to_rst) + | Pseudo -> + f Pseudo.(read, to_rst) + | Perturbation -> + f Perturbation.(read, to_rst) | Properties -> f Properties.(read, to_rst) + | Hartree_fock -> + f Hartree_fock.(read, to_rst) end with | Sys_error msg -> (Printf.eprintf "Info: %s\n%!" msg ; "") @@ -135,12 +135,12 @@ let set str s = in let open Input in match s with - | Perturbation -> write Perturbation.(of_rst, write) s - | Hartree_fock -> write Hartree_fock.(of_rst, write) s - | Pseudo -> write Pseudo.(of_rst, write) s - | Integrals_bielec -> write Integrals_bielec.(of_rst, write) s | Determinants -> write Determinants.(of_rst, write) s + | Integrals_bielec -> write Integrals_bielec.(of_rst, write) s + | Pseudo -> write Pseudo.(of_rst, write) s + | Perturbation -> write Perturbation.(of_rst, write) s | Properties -> write Properties.(of_rst, write) s + | Hartree_fock -> write Hartree_fock.(of_rst, write) s | Electrons -> write Electrons.(of_rst, write) s | Determinants_by_hand -> write Determinants_by_hand.(of_rst, write) s | Nuclei -> write Nuclei.(of_rst, write) s @@ -169,7 +169,7 @@ let run check_only ezfio_filename = (* Open EZFIO *) if (not (Sys.file_exists_exn ezfio_filename)) then - failwith (ezfio_filename^" does not exist"); + failwith (ezfio_filename^" does not exists"); Ezfio.set_file ezfio_filename; @@ -188,12 +188,12 @@ let run check_only ezfio_filename = Nuclei ; Ao_basis; Electrons ; - Perturbation ; - Hartree_fock ; - Pseudo ; - Integrals_bielec ; Determinants ; + Integrals_bielec ; + Pseudo ; + Perturbation ; Properties ; + Hartree_fock ; Mo_basis; Determinants_by_hand ; ] diff --git a/ocaml/qp_run.ml b/ocaml/qp_run.ml index eb1445d8..600c6f24 100644 --- a/ocaml/qp_run.ml +++ b/ocaml/qp_run.ml @@ -17,13 +17,38 @@ let run exe ezfio_file = if (not (List.exists ~f:(fun (x,_) -> x = exe) executables)) then failwith ("Executable "^exe^" not found"); + Printf.printf "%s\n" (Time.to_string time_start); Printf.printf "===============\nQuantum Package\n===============\n\n"; - Printf.printf "Date : %s\n\n%!" (Time.to_string time_start); + Printf.printf "Git Commit: %s\n" Git.message; + Printf.printf "Git Date : %s\n" Git.date; + Printf.printf "Git SHA1 : %s\n" Git.sha1; + Printf.printf "\n\n%!"; + + (** Check input *) match (Sys.command ("qp_edit -c "^ezfio_file)) with | 0 -> () | i -> failwith "Error: Input inconsistent\n"; ; + + + (** Start task server *) + let port_number = + 12345 + in + let address = + Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number + in + let task_thread = + let thread = + Thread.create ( fun () -> + TaskServer.run port_number ) + in + thread (); + in + Unix.putenv ~key:"QP_RUN_ADDRESS" ~data:address; + + (** Run executable *) let exe = match (List.find ~f:(fun (x,_) -> x = exe) executables) with | None -> assert false @@ -34,6 +59,9 @@ let run exe ezfio_file = | i -> Printf.printf "Program exited with code %d.\n%!" i; ; + TaskServer.stop ~port:port_number; + Thread.join task_thread; + let duration = Time.diff (Time.now()) time_start |> Core.Span.to_string in Printf.printf "Wall time : %s\n\n" duration; @@ -60,6 +88,7 @@ Executes a Quantum Package binary file among these:\n\n" (fun exe ezfio_file () -> run exe ezfio_file ) - |> Command.run + |> Command.run ~version: Git.sha1 ~build_info: Git.message ;; + diff --git a/ocaml/test_task_server.ml b/ocaml/test_task_server.ml index 55f74202..e6a6106b 100644 --- a/ocaml/test_task_server.ml +++ b/ocaml/test_task_server.ml @@ -1,5 +1,5 @@ open Core let () = - TaskServer.run () + TaskServer.run 12345 diff --git a/ocaml/test_task_server.py b/ocaml/test_task_server.py index 07835820..cb7da8ee 100755 --- a/ocaml/test_task_server.py +++ b/ocaml/test_task_server.py @@ -11,7 +11,7 @@ def main(): def send(msg,expected): print "Send : ", msg - socket.send(msg) + print " -> ", socket.send(msg) reply = socket.recv() print "Reply : ", reply print "" @@ -28,7 +28,7 @@ def main(): send("connect tcp","connect_reply ao_integrals 1 tcp://130.120.229.139:12345") send("connect inproc","connect_reply ao_integrals 2 inproc://ao_integrals") send("disconnect ao_integrals 3","error Queuing_system.ml:65:2 : disconnect ao_integrals 3") - send("disconnect ao_integrals 2","ok") + send("disconnect ao_integrals 2","disconnect_reply ao_integrals 1") send("connect inproc","connect_reply ao_integrals 3 inproc://ao_integrals") for i in range(10): diff --git a/src/Integrals_Bielec/ao_bi_integrals.irp.f b/src/Integrals_Bielec/ao_bi_integrals.irp.f index 53ce68e9..6987d06b 100644 --- a/src/Integrals_Bielec/ao_bi_integrals.irp.f +++ b/src/Integrals_Bielec/ao_bi_integrals.irp.f @@ -348,7 +348,7 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ] real(integral_kind),allocatable :: buffer_value(:) integer :: n_integrals, rc - integer :: jl_pairs(2,ao_num*(ao_num+1)/2), kk, m, j1, i1, lmax + integer :: kk, m, j1, i1, lmax integral = ao_bielec_integral(1,1,1,1) @@ -368,55 +368,23 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ] call wall_time(wall_1) call cpu_time(cpu_1) - integer(ZMQ_PTR) :: zmq_socket_rep_inproc, zmq_socket_push_inproc - zmq_socket_rep_inproc = f77_zmq_socket(zmq_context, ZMQ_REP) - rc = f77_zmq_bind(zmq_socket_rep_inproc, 'inproc://req_rep') - if (rc /= 0) then - stop 'Unable to connect zmq_socket_rep_inproc' - endif - - integer(ZMQ_PTR) :: thread(0:nproc) - external :: ao_bielec_integrals_in_map_slave, ao_bielec_integrals_in_map_collector - rc = pthread_create( thread(0), ao_bielec_integrals_in_map_collector ) - ! Create client threads - do i=1,nproc - rc = pthread_create( thread(i), ao_bielec_integrals_in_map_slave ) - enddo - - character*(64) :: message_string - - do l = ao_num, 1, -1 - rc = f77_zmq_recv( zmq_socket_rep_inproc, message_string, 64, 0) - print *, l - ! TODO : error handling - ASSERT (rc >= 0) - ASSERT (message == 'get_ao_integrals') - rc = f77_zmq_send( zmq_socket_rep_inproc, l, 4, 0) - enddo - do i=1,nproc - rc = f77_zmq_recv( zmq_socket_rep_inproc, message_string, 64, 0) - ! TODO : error handling - ASSERT (rc >= 0) - ASSERT (message == 'get_ao_integrals') - rc = f77_zmq_send( zmq_socket_rep_inproc, 0, 4, 0) - enddo - ! TODO terminer thread(0) - - rc = f77_zmq_unbind(zmq_socket_rep_inproc, 'inproc://req_rep') - do i=1,nproc - rc = pthread_join( thread(i) ) - enddo + integer(ZMQ_PTR) :: zmq_to_qp_run_socket + call new_parallel_job(zmq_to_qp_run_socket,'ao_integrals') - zmq_socket_push_inproc = f77_zmq_socket(zmq_context, ZMQ_PUSH) - rc = f77_zmq_connect(zmq_socket_push_inproc, 'inproc://push_pull') - if (rc /= 0) then - stop 'Unable to connect zmq_socket_push_inproc' - endif - rc = f77_zmq_send( zmq_socket_push_inproc, -1, 4, ZMQ_SNDMORE) - rc = f77_zmq_send( zmq_socket_push_inproc, 0_key_kind, key_kind, ZMQ_SNDMORE) - rc = f77_zmq_send( zmq_socket_push_inproc, 0_integral_kind, integral_kind, 0) + character*(32) :: task + do l=1,ao_num + do j = 1, l + if (ao_overlap_abs(j,l) < ao_integrals_threshold) then + cycle + endif + write(task,*) j, l + call add_task_to_taskserver(zmq_to_qp_run_socket,task) + enddo + enddo + external :: ao_bielec_integrals_in_map_slave_inproc, ao_bielec_integrals_in_map_collector + call new_parallel_threads(ao_bielec_integrals_in_map_slave_inproc, ao_bielec_integrals_in_map_collector) - rc = pthread_join( thread(0) ) + call end_parallel_job(zmq_to_qp_run_socket,'ao_integrals') print*, 'Sorting the map' call map_sort(ao_integrals_map) diff --git a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f index 7aa59c0d..e21014b8 100644 --- a/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f +++ b/src/Integrals_Bielec/ao_bielec_integrals_in_map_slave.irp.f @@ -1,4 +1,20 @@ -subroutine ao_bielec_integrals_in_map_slave +subroutine ao_bielec_integrals_in_map_slave_tcp + implicit none + BEGIN_DOC +! Computes a buffer of integrals + END_DOC + call ao_bielec_integrals_in_map_slave(0) +end + +subroutine ao_bielec_integrals_in_map_slave_inproc + implicit none + BEGIN_DOC +! Computes a buffer of integrals + END_DOC + call ao_bielec_integrals_in_map_slave(1) +end + +subroutine ao_bielec_integrals_in_map_slave(thread) use map_module use f77_zmq implicit none @@ -6,51 +22,61 @@ subroutine ao_bielec_integrals_in_map_slave ! Computes a buffer of integrals END_DOC + integer, intent(in) :: thread + integer :: j,l,n_integrals integer :: rc - character*(8), external :: zmq_port - integer(ZMQ_PTR) :: zmq_socket_req_inproc, zmq_socket_push_inproc real(integral_kind), allocatable :: buffer_value(:) integer(key_kind), allocatable :: buffer_i(:) + + integer :: worker_id, task_id + character*(512) :: task + + integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket + integer(ZMQ_PTR) :: zmq_to_qp_run_socket + + zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + + integer(ZMQ_PTR) :: zmq_socket_push + zmq_socket_push = f77_zmq_socket(zmq_context, ZMQ_PUSH) + if (thread == 1) then + rc = f77_zmq_connect(zmq_socket_push, trim(zmq_socket_pull_inproc_address)) + else + rc = f77_zmq_connect(zmq_socket_push, trim(zmq_socket_push_tcp_address)) + endif + if (rc /= 0) then + stop 'Unable to connect zmq_socket_push_tcp' + endif allocate ( buffer_i(ao_num*ao_num), buffer_value(ao_num*ao_num) ) - ! Sockets - zmq_socket_req_inproc = f77_zmq_socket(zmq_context, ZMQ_REQ) - rc = f77_zmq_connect(zmq_socket_req_inproc, 'inproc://req_rep') - if (rc /= 0) then - stop 'Unable to connect zmq_socket_req_inproc' - endif + call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) - zmq_socket_push_inproc = f77_zmq_socket(zmq_context, ZMQ_PUSH) - rc = f77_zmq_connect(zmq_socket_push_inproc, 'inproc://push_pull') - if (rc /= 0) then - stop 'Unable to connect zmq_socket_push_inproc' - endif - - - - rc = f77_zmq_send( zmq_socket_req_inproc, 'get_ao_integrals', 16, 0) - rc = f77_zmq_recv( zmq_socket_req_inproc, l, 4, 0) - - do while (l > 0) - rc = f77_zmq_send( zmq_socket_req_inproc, 'get_ao_integrals', 16, 0) - - do j = 1, l - if (ao_overlap_abs(j,l) < ao_integrals_threshold) then - cycle - endif - call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value) - rc = f77_zmq_send( zmq_socket_push_inproc, n_integrals, 4, ZMQ_SNDMORE) - rc = f77_zmq_send( zmq_socket_push_inproc, buffer_i, key_kind*n_integrals, ZMQ_SNDMORE) - rc = f77_zmq_send( zmq_socket_push_inproc, buffer_value, integral_kind*n_integrals, 0) - enddo - rc = f77_zmq_recv( zmq_socket_req_inproc, l, 4, 0) + do + call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) + if (task_id == 0) then + exit + endif + read(task,*) j, l + call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value) + rc = f77_zmq_send( zmq_socket_push, n_integrals, 4, ZMQ_SNDMORE) + rc = f77_zmq_send( zmq_socket_push, buffer_i, key_kind*n_integrals, ZMQ_SNDMORE) + rc = f77_zmq_send( zmq_socket_push, buffer_value, integral_kind*n_integrals, 0) + call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) enddo deallocate( buffer_i, buffer_value ) - rc = f77_zmq_disconnect(zmq_socket_req_inproc, 'inproc://req_rep') + integer :: finished + call disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id,finished) + + if (finished /= 0) then + rc = f77_zmq_send( zmq_socket_push, -1, 4, 0) + endif + + rc = f77_zmq_disconnect(zmq_socket_push,trim(zmq_socket_push_tcp_address)) + rc = f77_zmq_close(zmq_socket_push) + end @@ -64,36 +90,24 @@ subroutine ao_bielec_integrals_in_map_collector integer :: j,l,n_integrals integer :: rc - character*(8), external :: zmq_port - integer(ZMQ_PTR) :: zmq_socket_pull_inproc real(integral_kind), allocatable :: buffer_value(:) integer(key_kind), allocatable :: buffer_i(:) allocate ( buffer_i(ao_num*ao_num), buffer_value(ao_num*ao_num) ) - zmq_socket_pull_inproc = f77_zmq_socket(zmq_context, ZMQ_PULL) - rc = f77_zmq_bind(zmq_socket_pull_inproc, 'inproc://push_pull') - if (rc /= 0) then - stop 'Unable to connect zmq_socket_pull_inproc' - endif - n_integrals = 0 do while (n_integrals >= 0) - rc = f77_zmq_recv( zmq_socket_pull_inproc, n_integrals, 4, 0) - if (n_integrals > -1) then - rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_i, key_kind*n_integrals, 0) - rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_value, integral_kind*n_integrals, 0) + rc = f77_zmq_recv( zmq_socket_pull, n_integrals, 4, 0) + if (n_integrals >= 0) then + rc = f77_zmq_recv( zmq_socket_pull, buffer_i, key_kind*n_integrals, 0) + rc = f77_zmq_recv( zmq_socket_pull, buffer_value, integral_kind*n_integrals, 0) call insert_into_ao_integrals_map(n_integrals,buffer_i,buffer_value) - else - rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_i, key_kind, 0) - rc = f77_zmq_recv( zmq_socket_pull_inproc, buffer_value, integral_kind, 0) endif enddo - rc = f77_zmq_unbind(zmq_socket_pull_inproc, 'inproc://push_pull') - deallocate( buffer_i, buffer_value ) + end diff --git a/src/ZMQ/NEEDED_CHILDREN_MODULES b/src/ZMQ/NEEDED_CHILDREN_MODULES index 8b137891..19028952 100644 --- a/src/ZMQ/NEEDED_CHILDREN_MODULES +++ b/src/ZMQ/NEEDED_CHILDREN_MODULES @@ -1 +1 @@ - +Utils diff --git a/src/ZMQ/zmq.irp.f b/src/ZMQ/zmq.irp.f index 1577e12f..6a887422 100644 --- a/src/ZMQ/zmq.irp.f +++ b/src/ZMQ/zmq.irp.f @@ -20,10 +20,10 @@ END_PROVIDER character*(128) :: buffer call getenv('QP_RUN_ADDRESS',buffer) if (trim(buffer) == '') then - stop 'QP_RUN_ADDRESS environment variable not defined' + print *, 'This run should be started with the qp_run command' + stop -1 endif - print *, trim(buffer) integer :: i do i=len(buffer),1,-1 if ( buffer(i:i) == ':') then @@ -44,62 +44,293 @@ function zmq_port(ishift) end -BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_to_qp_run_socket ] +function new_zmq_to_qp_run_socket() implicit none BEGIN_DOC ! Socket on which the qp_run process replies END_DOC integer :: rc - zmq_to_qp_run_socket = f77_zmq_socket(zmq_context, ZMQ_REQ) - rc = f77_zmq_connect(zmq_to_qp_run_socket, trim(qp_run_address)) + character*(8), external :: zmq_port + integer(ZMQ_PTR) :: new_zmq_to_qp_run_socket + + new_zmq_to_qp_run_socket = f77_zmq_socket(zmq_context, ZMQ_REQ) + rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0))) if (rc /= 0) then - stop 'Unable to connect zmq_to_qp_run_socket' + stop 'Unable to connect new_zmq_to_qp_run_socket' endif integer :: i i=4 - rc = f77_zmq_setsockopt(zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 120000, i) + rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 120000, i) if (rc /= 0) then - stop 'Unable to set send timout in zmq_to_qp_run_socket' + stop 'Unable to set send timout in new_zmq_to_qp_run_socket' endif - rc = f77_zmq_setsockopt(zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 120000, i) + rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 120000, i) if (rc /= 0) then - stop 'Unable to set recv timout in zmq_to_qp_run_socket' + stop 'Unable to set recv timout in new_zmq_to_qp_run_socket' endif -END_PROVIDER +end -BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_socket_push ] - implicit none - BEGIN_DOC - ! Socket on which to push the results (1) - END_DOC - integer :: rc - character*(64) :: address - character*(8), external :: zmq_port - zmq_socket_push = f77_zmq_socket(zmq_context, ZMQ_PUSH) - address = trim(qp_run_address)//':'//zmq_port(1) - rc = f77_zmq_connect(zmq_socket_push, trim(address)) - if (rc /= 0) then - stop 'Unable to connect zmq_socket_push' - endif - -END_PROVIDER -BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_socket_pull ] + BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_socket_pull ] +&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ] +&BEGIN_PROVIDER [ character*(128), zmq_socket_push_tcp_address ] +&BEGIN_PROVIDER [ character*(128), zmq_socket_pull_inproc_address ] implicit none BEGIN_DOC ! Socket which pulls the results (2) END_DOC integer :: rc - character*(64) :: address character*(8), external :: zmq_port + integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket + + zmq_socket_pull_tcp_address = 'tcp://*:'//zmq_port(1) + zmq_socket_push_tcp_address = trim(qp_run_address)//':'//zmq_port(1) + zmq_socket_pull_inproc_address = 'inproc://'//zmq_port(1) + zmq_socket_pull = f77_zmq_socket(zmq_context, ZMQ_PULL) - address = 'tcp://*:'//zmq_port(2) - rc = f77_zmq_bind(zmq_socket_pull, trim(address)) + rc = f77_zmq_bind(zmq_socket_pull, zmq_socket_pull_tcp_address) + rc = f77_zmq_bind(zmq_socket_pull, zmq_socket_pull_inproc_address) if (rc /= 0) then - stop 'Unable to connect zmq_socket_pull' + stop 'Unable to bind zmq_socket_pull (tcp)' endif - + END_PROVIDER + BEGIN_PROVIDER [ integer(ZMQ_PTR), zmq_thread, (0:nproc) ] +&BEGIN_PROVIDER [ character*(128), zmq_state ] + implicit none + BEGIN_DOC +! Threads executing work through the ZeroMQ interface + END_DOC + zmq_thread = 0_ZMQ_PTR + zmq_state = '' +END_PROVIDER + +subroutine new_parallel_job(zmq_to_qp_run_socket,name) + implicit none + BEGIN_DOC +! Start a new parallel job with name 'name'. The slave tasks execute subroutine 'slave' + END_DOC + character*(*), intent(in) :: name + + character*(512) :: message + integer :: rc + integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket + integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket + + zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() + message = 'new_job '//name//' '//zmq_socket_push_tcp_address//' '//zmq_socket_pull_inproc_address + rc = f77_zmq_send(zmq_to_qp_run_socket,message,len(trim(message)),0) + rc = f77_zmq_recv(zmq_to_qp_run_socket,message,510,0) + message = trim(message(1:rc)) + if (message(1:2) /= 'ok') then + print *, 'Unable to start parallel job : '//name + stop 1 + endif + + zmq_state = name + SOFT_TOUCH zmq_state zmq_thread + +end + +subroutine new_parallel_threads(slave,collector) + implicit none + BEGIN_DOC +! Start a new parallel job with name 'name'. The slave tasks execute subroutine 'slave' + END_DOC + external :: slave, collector + integer :: i,rc + + + rc = pthread_create( zmq_thread(0), collector) + do i=1,nproc + rc = pthread_create( zmq_thread(i), slave ) + enddo + SOFT_TOUCH zmq_thread zmq_state + +end + +subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) + implicit none + BEGIN_DOC +! Connect to the task server and obtain the worker ID + END_DOC + integer, intent(out) :: worker_id + integer, intent(in) :: thread + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + + character*(512) :: message + character*(128) :: reply, state, address + integer :: rc + + if (thread == 1) then + rc = f77_zmq_send(zmq_to_qp_run_socket, "connect inproc", 14, 0) + else + rc = f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0) + endif + + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) + message = trim(message(1:rc)) + read(message,*) reply, state, worker_id, address + if ( (trim(reply) /= 'connect_reply') .and. & + (trim(state) /= trim(zmq_state)) ) then + print *, 'Reply: ', trim(reply) + print *, 'State: ', trim(state), '/', trim(zmq_state) + print *, 'Address: ', trim(address) + stop -1 + endif + +end + +subroutine disconnect_from_taskserver(zmq_to_qp_run_socket,worker_id,finished) + implicit none + BEGIN_DOC +! Disconnect from the task server + END_DOC + integer, intent(in) :: worker_id + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer, intent(out) :: finished + + integer :: rc + character*(64) :: message, reply, state + write(message,*) 'disconnect '//trim(zmq_state), worker_id + + rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0) + + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) + message = trim(message(1:rc)) + + read(message,*) reply, state, finished + if ( (trim(reply) /= 'disconnect_reply').or. & + (trim(state) /= zmq_state) ) then + print *, 'Unable to disconnect' + print *, trim(message) + stop -1 + endif + +end + +subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task) + implicit none + BEGIN_DOC +! Get a task from the task server + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + character*(*), intent(in) :: task + + integer :: rc + character*(512) :: message + write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) + + rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0) + + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) + message = trim(message(1:rc)) + if (trim(message) /= 'ok') then + print *, trim(task) + print *, 'Unable to add the next task' + stop -1 + endif + +end + +subroutine task_done_to_taskserver(zmq_to_qp_run_socket,worker_id, task_id) + implicit none + BEGIN_DOC +! Get a task from the task server + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer, intent(in) :: worker_id, task_id + + integer :: rc + character*(512) :: message + write(message,*) 'task_done '//trim(zmq_state), worker_id, task_id + + rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0) + + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) + message = trim(message(1:rc)) + if (trim(message) /= 'ok') then + print *, 'Unable to send task_done message' + stop -1 + endif + +end + +subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task) + implicit none + BEGIN_DOC +! Get a task from the task server + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + integer, intent(in) :: worker_id + integer, intent(out) :: task_id + character*(512), intent(out) :: task + + character*(512) :: message + character*(64) :: reply + integer :: rc + + write(message,*) 'get_task '//trim(zmq_state), worker_id + + rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), len(trim(message)), 0) + + rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) + message = trim(message(1:rc)) + read(message,*) reply + if (trim(reply) == 'get_task_reply') then + read(message,*) reply, task_id + rc = 15 + do while (message(rc:rc) == ' ') + rc += 1 + enddo + do while (message(rc:rc) /= ' ') + rc += 1 + enddo + rc += 1 + task = message(rc:) + else if (trim(reply) == 'terminate') then + task_id = 0 + task = 'terminate' + else + print *, 'Unable to get the next task' + print *, trim(message) + stop -1 + endif + +end + + +subroutine end_parallel_job(zmq_to_qp_run_socket,name) + implicit none + BEGIN_DOC +! End a new parallel job with name 'name'. The slave tasks execute subroutine 'slave' + END_DOC + integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket + character*(*), intent(in) :: name + + character*(512) :: message + integer :: i,rc + + if (name /= zmq_state) then + stop 'Wrong end of job' + endif + + do i=1,nproc + rc = pthread_join( zmq_thread(i) ) + if (rc /= 0) then + print *, 'Unable to join thread : ', i + stop -1 + endif + zmq_thread(i) = 0 + enddo + zmq_state = 'None' + character*(8), external :: zmq_port + rc = f77_zmq_disconnect(zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0))) + rc = f77_zmq_close(zmq_to_qp_run_socket) + + SOFT_TOUCH zmq_thread zmq_state + +end