Less walkers traffic

This commit is contained in:
Anthony Scemama 2016-04-05 00:48:37 +02:00
parent 321c969b0e
commit 3826062b88
5 changed files with 91 additions and 44 deletions

View File

@ -47,15 +47,12 @@ let create m =
}
| "elec_coord" :: c :: pid :: _ :: n ::walkers ->
begin
let walk_num =
Lazy.force Qputils.walk_num
and elec_num =
let elec_num =
Lazy.force Qputils.elec_num
and n =
Int.of_string n
in
assert (n = List.length walkers);
assert (n = walk_num*(elec_num+1)*3);
let rec build_walker accu = function
| (0,tail) ->
let result =

View File

@ -719,7 +719,7 @@ let run ?(daemon=true) ezfio_filename =
in
let handle = function
| Message.Error _ -> ()
| Message.Error m -> Printf.eprintf "%s\n%!" m;
| Message.Walkers (h,pid,w) ->
begin
if (status = Status.Running) then

View File

@ -234,7 +234,7 @@ let run ezfio_filename dataserver =
end
else if (polling.(1) = Some ZMQ.Poll.In) then
begin
Printf.printf "Forwarder subscribe\n%!";
Printf.eprintf "Forwarder subscribe\n%!";
ZMQ.Socket.recv ~block:false pub_socket
|> ZMQ.Socket.send sub_socket ;
end
@ -435,10 +435,53 @@ let run ezfio_filename dataserver =
|> ZMQ.Socket.send_all router_socket
in
let select_n_of ~n ~len l =
let a =
Array.of_list l
in
let s =
(Array.length a)/ len
in
let fetch i =
let rec loop accu = function
| -1 -> accu
| k -> loop ((Array.get a (i+k)) :: accu) (k-1)
in
loop [] (len-1)
in
let rec select accu = function
| 0 -> accu
| i -> let new_accu =
(fetch @@ Random.int s) :: accu
in
select new_accu (i-1)
in
select [] n
|> List.concat
in
(* Handles messages coming into the PULL socket. *)
let handle_pull () =
ZMQ.Socket.recv_all ~block:false pull_socket
|> ZMQ.Socket.send_all push_socket
let message =
ZMQ.Socket.recv_all ~block:false pull_socket
in
let new_message =
match message with
| "elec_coord":: hostname :: pid :: id :: n_str :: rest ->
let n =
Int.of_string n_str
in
let len =
n / !walk_num
in
if (n < 5*len) then
message
else
List.concat [ [ "elec_coord" ; hostname ; pid ; id ;
Int.to_string (5*len)] ; ( select_n_of ~n:5 ~len rest ) ]
| _ -> message
in
ZMQ.Socket.send_all push_socket new_message
in
(* Polling item to poll ROUTER and PULL sockets. *)

View File

@ -1,4 +1,5 @@
BEGIN_PROVIDER [ character*(8), current_PID ]
BEGIN_PROVIDER [ character*(8), current_PID ]
&BEGIN_PROVIDER [ integer, len_current_PID ]
implicit none
BEGIN_DOC
! Process ID
@ -6,6 +7,7 @@ BEGIN_PROVIDER [ character*(8), current_PID ]
integer :: getpid
write(current_PID,'(I8)') getpid()
current_PID = adjustl(trim(current_PID))
len_current_PID = len(trim(current_PID))
END_PROVIDER

View File

@ -18,11 +18,12 @@ subroutine zmq_register_worker(msg)
character*(64) :: buffer
integer :: size
rc = f77_zmq_msg_init_size(msg,64)
write(buffer,'(A)') trim(hostname)
buffer = adjustl(trim(buffer))
rc = f77_zmq_msg_copy_to_data(msg, buffer, len(buffer))
size = len(trim(hostname))
buffer = trim(hostname)
rc = f77_zmq_msg_init_size(msg,size)
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE)
if (rc == -1) then
call abrt(irp_here, 'Unable to send register message (1)')
@ -31,8 +32,8 @@ subroutine zmq_register_worker(msg)
call worker_log(irp_here, 'Registering')
rc = f77_zmq_msg_init_size(msg,8)
rc = f77_zmq_msg_copy_to_data(msg, current_PID, 8)
rc = f77_zmq_msg_init_size(msg,len_current_PID)
rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID)
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,0)
if (rc == -1) then
call abrt(irp_here, 'Unable to send register message (2)')
@ -66,20 +67,21 @@ subroutine zmq_unregister_worker(msg)
endif
rc = f77_zmq_msg_close(msg)
character*(64) :: buffer
character*(64) :: buffer
integer :: size
rc = f77_zmq_msg_init_size(msg,64)
write(buffer,'(A)') trim(hostname)
buffer = adjustl(trim(buffer))
rc = f77_zmq_msg_copy_to_data(msg, buffer, len(buffer))
size = len(trim(hostname))
buffer = trim(hostname)
rc = f77_zmq_msg_init_size(msg,size)
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE)
if (rc == -1) then
call abrt(irp_here, 'Unable to send unregister message (2)')
endif
rc = f77_zmq_msg_close(msg)
rc = f77_zmq_msg_init_size(msg,8)
rc = f77_zmq_msg_copy_to_data(msg, current_PID, 8)
rc = f77_zmq_msg_init_size(msg,len_current_PID)
rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID)
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,0)
if (rc == -1) then
call abrt(irp_here, 'Unable to send unregister message (3)')
@ -124,23 +126,25 @@ subroutine zmq_send_header(msg,header,block_id)
character*(64) :: buffer
rc = f77_zmq_msg_init_size(msg,64)
write(buffer,'(A)') trim(hostname)
buffer = adjustl(trim(buffer))
rc = f77_zmq_msg_copy_to_data(msg, buffer, len(buffer))
buffer = trim(hostname)
size = len(trim(hostname))
rc = f77_zmq_msg_init_size(msg,size)
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
rc = f77_zmq_msg_close(msg)
call worker_log(irp_here, header)
rc = f77_zmq_msg_init_size(msg,8)
rc = f77_zmq_msg_copy_to_data(msg, current_PID, 8)
rc = f77_zmq_msg_init_size(msg,len_current_PID)
rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
rc = f77_zmq_msg_close(msg)
rc = f77_zmq_msg_init_size(msg,8)
write(buffer,'(I8)') block_id
buffer = adjustl(trim(buffer))
rc = f77_zmq_msg_copy_to_data(msg, buffer, 8)
size = len(trim(buffer))
rc = f77_zmq_msg_init_size(msg,size)
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
rc = f77_zmq_msg_close(msg)
@ -156,21 +160,22 @@ subroutine zmq_send_scalar_prop(msg,weight,value)
END_DOC
integer(ZMQ_PTR) :: msg
double precision :: weight, value
integer :: rc,sze
integer :: rc,size
character*(32) :: buffer
write(buffer,'(E32.16)') weight
buffer = adjustl(trim(buffer))
sze = len(buffer)
rc = f77_zmq_msg_init_size(msg,len(buffer))
rc = f77_zmq_msg_copy_to_data(msg, buffer,sze)
size = len(trim(buffer))
rc = f77_zmq_msg_init_size(msg,len(trim(buffer)))
rc = f77_zmq_msg_copy_to_data(msg, buffer,size)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
rc = f77_zmq_msg_close(msg)
write(buffer,'(E32.16)') value
buffer = adjustl(trim(buffer))
rc = f77_zmq_msg_init_size(msg,len(buffer))
rc = f77_zmq_msg_copy_to_data(msg, buffer,len(buffer))
size = len(trim(buffer))
rc = f77_zmq_msg_init_size(msg,size)
rc = f77_zmq_msg_copy_to_data(msg, buffer,size)
rc = f77_zmq_msg_send(msg,zmq_socket_push,0)
rc = f77_zmq_msg_close(msg)
@ -192,7 +197,7 @@ subroutine zmq_send_array_prop(msg,weight,value,isize)
write(buffer,'(I8)') isize
buffer = adjustl(trim(buffer))
l = len(buffer)
l = len(trim(buffer))
rc = f77_zmq_msg_init_size(msg,l)
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
@ -201,7 +206,7 @@ subroutine zmq_send_array_prop(msg,weight,value,isize)
write(buffer,'(E32.16)') weight
buffer = adjustl(trim(buffer))
l = len(buffer)
l = len(trim(buffer))
rc = f77_zmq_msg_init_size(msg,l)
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
@ -211,7 +216,7 @@ subroutine zmq_send_array_prop(msg,weight,value,isize)
do i=1,isize
write(buffer,'(E32.16)') value(i)
buffer = adjustl(trim(buffer))
l = len(buffer)
l = len(trim(buffer))
sze += l
rc = f77_zmq_msg_init_size(msg,l)
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
@ -262,7 +267,7 @@ subroutine zmq_send_int(msg,value,isize)
write(buffer,'(I8)') isize
buffer = adjustl(trim(buffer))
l = len(buffer)
l = len(trim(buffer))
rc = f77_zmq_msg_init_size(msg,l)
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
@ -272,7 +277,7 @@ subroutine zmq_send_int(msg,value,isize)
do i=1,isize
write(buffer,'(I16)') value(i)
buffer = adjustl(trim(buffer))
l = len(buffer)
l = len(trim(buffer))
sze += l
rc = f77_zmq_msg_init_size(msg,l)
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
@ -302,7 +307,7 @@ subroutine zmq_send_real(msg,value,isize)
write(buffer,'(I8)') isize
buffer = adjustl(trim(buffer))
l = len(buffer)
l = len(trim(buffer))
rc = f77_zmq_msg_init_size(msg,l)
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
@ -312,7 +317,7 @@ subroutine zmq_send_real(msg,value,isize)
do i=1,isize
write(buffer,'(E32.16)') value(i)
buffer = adjustl(trim(buffer))
l = len(buffer)
l = len(trim(buffer))
sze += l
rc = f77_zmq_msg_init_size(msg,l)
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
@ -388,7 +393,7 @@ subroutine get_running(do_run)
else
do_run = t_Stopped
endif
call worker_log(irp_here,buffer)
! call worker_log(irp_here,buffer)
endif
end