From 3826062b886e16d19e0b4239b7125c69fcb01f2d Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Tue, 5 Apr 2016 00:48:37 +0200 Subject: [PATCH] Less walkers traffic --- ocaml/Message.ml | 5 +-- ocaml/Qmcchem_dataserver.ml | 2 +- ocaml/Qmcchem_forwarder.ml | 49 ++++++++++++++++++++++-- src/TOOLS/Util.irp.f | 4 +- src/ZMQ/worker.irp.f | 75 ++++++++++++++++++++----------------- 5 files changed, 91 insertions(+), 44 deletions(-) diff --git a/ocaml/Message.ml b/ocaml/Message.ml index 37b5b1a..8e103ae 100644 --- a/ocaml/Message.ml +++ b/ocaml/Message.ml @@ -47,15 +47,12 @@ let create m = } | "elec_coord" :: c :: pid :: _ :: n ::walkers -> begin - let walk_num = - Lazy.force Qputils.walk_num - and elec_num = + let elec_num = Lazy.force Qputils.elec_num and n = Int.of_string n in assert (n = List.length walkers); - assert (n = walk_num*(elec_num+1)*3); let rec build_walker accu = function | (0,tail) -> let result = diff --git a/ocaml/Qmcchem_dataserver.ml b/ocaml/Qmcchem_dataserver.ml index 4ca2032..3dce4f1 100644 --- a/ocaml/Qmcchem_dataserver.ml +++ b/ocaml/Qmcchem_dataserver.ml @@ -719,7 +719,7 @@ let run ?(daemon=true) ezfio_filename = in let handle = function - | Message.Error _ -> () + | Message.Error m -> Printf.eprintf "%s\n%!" m; | Message.Walkers (h,pid,w) -> begin if (status = Status.Running) then diff --git a/ocaml/Qmcchem_forwarder.ml b/ocaml/Qmcchem_forwarder.ml index 09caa49..9a75765 100644 --- a/ocaml/Qmcchem_forwarder.ml +++ b/ocaml/Qmcchem_forwarder.ml @@ -234,7 +234,7 @@ let run ezfio_filename dataserver = end else if (polling.(1) = Some ZMQ.Poll.In) then begin - Printf.printf "Forwarder subscribe\n%!"; + Printf.eprintf "Forwarder subscribe\n%!"; ZMQ.Socket.recv ~block:false pub_socket |> ZMQ.Socket.send sub_socket ; end @@ -435,10 +435,53 @@ let run ezfio_filename dataserver = |> ZMQ.Socket.send_all router_socket in + let select_n_of ~n ~len l = + let a = + Array.of_list l + in + let s = + (Array.length a)/ len + in + let fetch i = + let rec loop accu = function + | -1 -> accu + | k -> loop ((Array.get a (i+k)) :: accu) (k-1) + in + loop [] (len-1) + in + let rec select accu = function + | 0 -> accu + | i -> let new_accu = + (fetch @@ Random.int s) :: accu + in + select new_accu (i-1) + in + select [] n + |> List.concat + in + (* Handles messages coming into the PULL socket. *) let handle_pull () = - ZMQ.Socket.recv_all ~block:false pull_socket - |> ZMQ.Socket.send_all push_socket + let message = + ZMQ.Socket.recv_all ~block:false pull_socket + in + let new_message = + match message with + | "elec_coord":: hostname :: pid :: id :: n_str :: rest -> + let n = + Int.of_string n_str + in + let len = + n / !walk_num + in + if (n < 5*len) then + message + else + List.concat [ [ "elec_coord" ; hostname ; pid ; id ; + Int.to_string (5*len)] ; ( select_n_of ~n:5 ~len rest ) ] + | _ -> message + in + ZMQ.Socket.send_all push_socket new_message in (* Polling item to poll ROUTER and PULL sockets. *) diff --git a/src/TOOLS/Util.irp.f b/src/TOOLS/Util.irp.f index 53c780c..0424e01 100644 --- a/src/TOOLS/Util.irp.f +++ b/src/TOOLS/Util.irp.f @@ -1,4 +1,5 @@ -BEGIN_PROVIDER [ character*(8), current_PID ] + BEGIN_PROVIDER [ character*(8), current_PID ] +&BEGIN_PROVIDER [ integer, len_current_PID ] implicit none BEGIN_DOC ! Process ID @@ -6,6 +7,7 @@ BEGIN_PROVIDER [ character*(8), current_PID ] integer :: getpid write(current_PID,'(I8)') getpid() current_PID = adjustl(trim(current_PID)) + len_current_PID = len(trim(current_PID)) END_PROVIDER diff --git a/src/ZMQ/worker.irp.f b/src/ZMQ/worker.irp.f index d66e2e1..bcc773f 100644 --- a/src/ZMQ/worker.irp.f +++ b/src/ZMQ/worker.irp.f @@ -18,11 +18,12 @@ subroutine zmq_register_worker(msg) character*(64) :: buffer + integer :: size - rc = f77_zmq_msg_init_size(msg,64) - write(buffer,'(A)') trim(hostname) - buffer = adjustl(trim(buffer)) - rc = f77_zmq_msg_copy_to_data(msg, buffer, len(buffer)) + size = len(trim(hostname)) + buffer = trim(hostname) + rc = f77_zmq_msg_init_size(msg,size) + rc = f77_zmq_msg_copy_to_data(msg, buffer, size) rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE) if (rc == -1) then call abrt(irp_here, 'Unable to send register message (1)') @@ -31,8 +32,8 @@ subroutine zmq_register_worker(msg) call worker_log(irp_here, 'Registering') - rc = f77_zmq_msg_init_size(msg,8) - rc = f77_zmq_msg_copy_to_data(msg, current_PID, 8) + rc = f77_zmq_msg_init_size(msg,len_current_PID) + rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID) rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,0) if (rc == -1) then call abrt(irp_here, 'Unable to send register message (2)') @@ -66,20 +67,21 @@ subroutine zmq_unregister_worker(msg) endif rc = f77_zmq_msg_close(msg) - character*(64) :: buffer + character*(64) :: buffer + integer :: size - rc = f77_zmq_msg_init_size(msg,64) - write(buffer,'(A)') trim(hostname) - buffer = adjustl(trim(buffer)) - rc = f77_zmq_msg_copy_to_data(msg, buffer, len(buffer)) + size = len(trim(hostname)) + buffer = trim(hostname) + rc = f77_zmq_msg_init_size(msg,size) + rc = f77_zmq_msg_copy_to_data(msg, buffer, size) rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE) if (rc == -1) then call abrt(irp_here, 'Unable to send unregister message (2)') endif rc = f77_zmq_msg_close(msg) - rc = f77_zmq_msg_init_size(msg,8) - rc = f77_zmq_msg_copy_to_data(msg, current_PID, 8) + rc = f77_zmq_msg_init_size(msg,len_current_PID) + rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID) rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,0) if (rc == -1) then call abrt(irp_here, 'Unable to send unregister message (3)') @@ -124,23 +126,25 @@ subroutine zmq_send_header(msg,header,block_id) character*(64) :: buffer - rc = f77_zmq_msg_init_size(msg,64) - write(buffer,'(A)') trim(hostname) - buffer = adjustl(trim(buffer)) - rc = f77_zmq_msg_copy_to_data(msg, buffer, len(buffer)) + buffer = trim(hostname) + size = len(trim(hostname)) + rc = f77_zmq_msg_init_size(msg,size) + rc = f77_zmq_msg_copy_to_data(msg, buffer, size) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) rc = f77_zmq_msg_close(msg) call worker_log(irp_here, header) - rc = f77_zmq_msg_init_size(msg,8) - rc = f77_zmq_msg_copy_to_data(msg, current_PID, 8) + rc = f77_zmq_msg_init_size(msg,len_current_PID) + rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) rc = f77_zmq_msg_close(msg) - rc = f77_zmq_msg_init_size(msg,8) + write(buffer,'(I8)') block_id buffer = adjustl(trim(buffer)) - rc = f77_zmq_msg_copy_to_data(msg, buffer, 8) + size = len(trim(buffer)) + rc = f77_zmq_msg_init_size(msg,size) + rc = f77_zmq_msg_copy_to_data(msg, buffer, size) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) rc = f77_zmq_msg_close(msg) @@ -156,21 +160,22 @@ subroutine zmq_send_scalar_prop(msg,weight,value) END_DOC integer(ZMQ_PTR) :: msg double precision :: weight, value - integer :: rc,sze + integer :: rc,size character*(32) :: buffer write(buffer,'(E32.16)') weight buffer = adjustl(trim(buffer)) - sze = len(buffer) - rc = f77_zmq_msg_init_size(msg,len(buffer)) - rc = f77_zmq_msg_copy_to_data(msg, buffer,sze) + size = len(trim(buffer)) + rc = f77_zmq_msg_init_size(msg,len(trim(buffer))) + rc = f77_zmq_msg_copy_to_data(msg, buffer,size) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) rc = f77_zmq_msg_close(msg) write(buffer,'(E32.16)') value buffer = adjustl(trim(buffer)) - rc = f77_zmq_msg_init_size(msg,len(buffer)) - rc = f77_zmq_msg_copy_to_data(msg, buffer,len(buffer)) + size = len(trim(buffer)) + rc = f77_zmq_msg_init_size(msg,size) + rc = f77_zmq_msg_copy_to_data(msg, buffer,size) rc = f77_zmq_msg_send(msg,zmq_socket_push,0) rc = f77_zmq_msg_close(msg) @@ -192,7 +197,7 @@ subroutine zmq_send_array_prop(msg,weight,value,isize) write(buffer,'(I8)') isize buffer = adjustl(trim(buffer)) - l = len(buffer) + l = len(trim(buffer)) rc = f77_zmq_msg_init_size(msg,l) rc = f77_zmq_msg_copy_to_data(msg, buffer,l) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) @@ -201,7 +206,7 @@ subroutine zmq_send_array_prop(msg,weight,value,isize) write(buffer,'(E32.16)') weight buffer = adjustl(trim(buffer)) - l = len(buffer) + l = len(trim(buffer)) rc = f77_zmq_msg_init_size(msg,l) rc = f77_zmq_msg_copy_to_data(msg, buffer,l) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) @@ -211,7 +216,7 @@ subroutine zmq_send_array_prop(msg,weight,value,isize) do i=1,isize write(buffer,'(E32.16)') value(i) buffer = adjustl(trim(buffer)) - l = len(buffer) + l = len(trim(buffer)) sze += l rc = f77_zmq_msg_init_size(msg,l) rc = f77_zmq_msg_copy_to_data(msg, buffer,l) @@ -262,7 +267,7 @@ subroutine zmq_send_int(msg,value,isize) write(buffer,'(I8)') isize buffer = adjustl(trim(buffer)) - l = len(buffer) + l = len(trim(buffer)) rc = f77_zmq_msg_init_size(msg,l) rc = f77_zmq_msg_copy_to_data(msg, buffer,l) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) @@ -272,7 +277,7 @@ subroutine zmq_send_int(msg,value,isize) do i=1,isize write(buffer,'(I16)') value(i) buffer = adjustl(trim(buffer)) - l = len(buffer) + l = len(trim(buffer)) sze += l rc = f77_zmq_msg_init_size(msg,l) rc = f77_zmq_msg_copy_to_data(msg, buffer,l) @@ -302,7 +307,7 @@ subroutine zmq_send_real(msg,value,isize) write(buffer,'(I8)') isize buffer = adjustl(trim(buffer)) - l = len(buffer) + l = len(trim(buffer)) rc = f77_zmq_msg_init_size(msg,l) rc = f77_zmq_msg_copy_to_data(msg, buffer,l) rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE) @@ -312,7 +317,7 @@ subroutine zmq_send_real(msg,value,isize) do i=1,isize write(buffer,'(E32.16)') value(i) buffer = adjustl(trim(buffer)) - l = len(buffer) + l = len(trim(buffer)) sze += l rc = f77_zmq_msg_init_size(msg,l) rc = f77_zmq_msg_copy_to_data(msg, buffer,l) @@ -388,7 +393,7 @@ subroutine get_running(do_run) else do_run = t_Stopped endif - call worker_log(irp_here,buffer) +! call worker_log(irp_here,buffer) endif end