2015-12-20 00:54:56 +01:00
|
|
|
|
|
|
|
! Functions
|
|
|
|
! =========
|
|
|
|
|
|
|
|
subroutine zmq_register_worker(msg)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Register a new worker to the forwarder
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR) :: msg
|
|
|
|
integer :: i,rc
|
|
|
|
|
|
|
|
rc = f77_zmq_msg_init_size(msg,8)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, 'register',8)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
|
|
|
|
character*(64) :: buffer
|
2016-04-05 00:48:37 +02:00
|
|
|
integer :: size
|
2015-12-20 00:54:56 +01:00
|
|
|
|
2016-04-05 00:48:37 +02:00
|
|
|
size = len(trim(hostname))
|
|
|
|
buffer = trim(hostname)
|
|
|
|
rc = f77_zmq_msg_init_size(msg,size)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE)
|
|
|
|
if (rc == -1) then
|
|
|
|
call abrt(irp_here, 'Unable to send register message (1)')
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
call worker_log(irp_here, 'Registering')
|
|
|
|
|
2016-04-05 00:48:37 +02:00
|
|
|
rc = f77_zmq_msg_init_size(msg,len_current_PID)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,0)
|
|
|
|
if (rc == -1) then
|
|
|
|
call abrt(irp_here, 'Unable to send register message (2)')
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
rc = f77_zmq_recv(zmq_to_dataserver_socket, buffer, 32, 0)
|
|
|
|
if (buffer(1:2)/='OK') then
|
|
|
|
call abrt(irp_here, 'Register failed '//trim(http_server))
|
|
|
|
endif
|
|
|
|
call worker_log(irp_here, 'Registered')
|
|
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subroutine zmq_unregister_worker(msg)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Unregister a new worker to the forwarder
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR) :: msg
|
|
|
|
integer :: i,rc
|
|
|
|
|
|
|
|
call worker_log(irp_here, 'Unregistering')
|
|
|
|
rc = f77_zmq_msg_init_size(msg,10)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, 'unregister',10)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE)
|
|
|
|
if (rc == -1) then
|
|
|
|
call abrt(irp_here, 'Unable to send unregister message (1)')
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
2016-04-05 00:48:37 +02:00
|
|
|
character*(64) :: buffer
|
|
|
|
integer :: size
|
2015-12-20 00:54:56 +01:00
|
|
|
|
2016-04-05 00:48:37 +02:00
|
|
|
size = len(trim(hostname))
|
|
|
|
buffer = trim(hostname)
|
|
|
|
rc = f77_zmq_msg_init_size(msg,size)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,ZMQ_SNDMORE)
|
|
|
|
if (rc == -1) then
|
|
|
|
call abrt(irp_here, 'Unable to send unregister message (2)')
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
2016-04-05 00:48:37 +02:00
|
|
|
rc = f77_zmq_msg_init_size(msg,len_current_PID)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_to_dataserver_socket,0)
|
|
|
|
if (rc == -1) then
|
|
|
|
call abrt(irp_here, 'Unable to send unregister message (3)')
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
! Timeout 15 seconds
|
|
|
|
rc = -1
|
|
|
|
do i=1,15
|
|
|
|
rc = f77_zmq_recv(zmq_to_dataserver_socket, buffer, 32, ZMQ_NOBLOCK)
|
|
|
|
if (rc == 2) then
|
|
|
|
call worker_log(irp_here, 'Unregistered')
|
|
|
|
return
|
|
|
|
endif
|
|
|
|
call worker_log(irp_here, 'Unregister failed. Retrying')
|
|
|
|
call sleep(1)
|
|
|
|
enddo
|
|
|
|
call abrt(irp_here, 'Unregister failed')
|
|
|
|
|
|
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subroutine zmq_send_header(msg,header,block_id)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Receive the header of the multi-part message
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR), intent(in) :: msg
|
|
|
|
character*(*), intent(in) :: header
|
|
|
|
integer, intent(in) :: block_id
|
|
|
|
integer :: rc, size
|
|
|
|
character*(16) :: pid_str
|
|
|
|
|
|
|
|
size = len(trim(header))
|
|
|
|
rc = f77_zmq_msg_init_size(msg,size)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, header,size)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
character*(64) :: buffer
|
|
|
|
|
2016-04-05 00:48:37 +02:00
|
|
|
buffer = trim(hostname)
|
|
|
|
size = len(trim(hostname))
|
|
|
|
rc = f77_zmq_msg_init_size(msg,size)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
call worker_log(irp_here, header)
|
2016-04-05 00:48:37 +02:00
|
|
|
rc = f77_zmq_msg_init_size(msg,len_current_PID)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, current_PID, len_current_PID)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
2016-04-05 00:48:37 +02:00
|
|
|
|
2015-12-20 00:54:56 +01:00
|
|
|
write(buffer,'(I8)') block_id
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
size = len(trim(buffer))
|
|
|
|
rc = f77_zmq_msg_init_size(msg,size)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer, size)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subroutine zmq_send_scalar_prop(msg,weight,value)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Send a double precision average over the trajectory
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR) :: msg
|
|
|
|
double precision :: weight, value
|
2016-04-05 00:48:37 +02:00
|
|
|
integer :: rc,size
|
2015-12-20 00:54:56 +01:00
|
|
|
character*(32) :: buffer
|
|
|
|
|
|
|
|
write(buffer,'(E32.16)') weight
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
size = len(trim(buffer))
|
|
|
|
rc = f77_zmq_msg_init_size(msg,len(trim(buffer)))
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,size)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
write(buffer,'(E32.16)') value
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
size = len(trim(buffer))
|
|
|
|
rc = f77_zmq_msg_init_size(msg,size)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,size)
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,0)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
|
|
|
|
call worker_log(irp_here,'')
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subroutine zmq_send_array_prop(msg,weight,value,isize)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Send a double precision average over the trajectory
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR) :: msg
|
|
|
|
integer :: isize
|
|
|
|
double precision :: weight, value(isize)
|
|
|
|
integer :: rc,i,l, sze
|
|
|
|
character*(32) :: buffer
|
|
|
|
|
|
|
|
write(buffer,'(I8)') isize
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
l = len(trim(buffer))
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_init_size(msg,l)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
sze = l
|
|
|
|
|
|
|
|
write(buffer,'(E32.16)') weight
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
l = len(trim(buffer))
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_init_size(msg,l)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
sze += l
|
|
|
|
|
|
|
|
do i=1,isize
|
|
|
|
write(buffer,'(E32.16)') value(i)
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
l = len(trim(buffer))
|
2015-12-20 00:54:56 +01:00
|
|
|
sze += l
|
|
|
|
rc = f77_zmq_msg_init_size(msg,l)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
|
|
|
|
if (i < isize) then
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
else
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,0)
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
enddo
|
|
|
|
|
|
|
|
call worker_log(irp_here,'')
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subroutine zmq_send_info(msg,message)
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Send an info message to the forwarder
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR) :: msg
|
|
|
|
character*(64) :: message
|
|
|
|
integer :: rc
|
|
|
|
integer :: isize
|
|
|
|
|
|
|
|
isize = len(trim(message))
|
|
|
|
rc = f77_zmq_msg_init_size(msg,isize)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, trim(message),isize)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,0)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
call worker_log(irp_here, message)
|
|
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subroutine zmq_send_int(msg,value,isize)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Send an integer array of size n to the forwarder
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR) :: msg
|
|
|
|
integer :: isize
|
|
|
|
integer :: value(isize)
|
|
|
|
integer :: rc,i,l, sze
|
|
|
|
character*(32) :: buffer
|
|
|
|
|
|
|
|
write(buffer,'(I8)') isize
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
l = len(trim(buffer))
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_init_size(msg,l)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
sze = l
|
|
|
|
|
|
|
|
do i=1,isize
|
|
|
|
write(buffer,'(I16)') value(i)
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
l = len(trim(buffer))
|
2015-12-20 00:54:56 +01:00
|
|
|
sze += l
|
|
|
|
rc = f77_zmq_msg_init_size(msg,l)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
|
|
|
|
if (i < isize) then
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
else
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,0)
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
enddo
|
|
|
|
|
|
|
|
call worker_log(irp_here,'')
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
subroutine zmq_send_real(msg,value,isize)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
BEGIN_DOC
|
|
|
|
! Send a real array of size n to the forwarder
|
|
|
|
END_DOC
|
|
|
|
integer(ZMQ_PTR) :: msg
|
|
|
|
integer :: isize
|
|
|
|
real :: value(isize)
|
|
|
|
integer :: rc,i,l, sze
|
|
|
|
character*(32) :: buffer
|
|
|
|
|
|
|
|
write(buffer,'(I8)') isize
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
l = len(trim(buffer))
|
2015-12-20 00:54:56 +01:00
|
|
|
rc = f77_zmq_msg_init_size(msg,l)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
sze = l
|
|
|
|
|
|
|
|
do i=1,isize
|
|
|
|
write(buffer,'(E32.16)') value(i)
|
|
|
|
buffer = adjustl(trim(buffer))
|
2016-04-05 00:48:37 +02:00
|
|
|
l = len(trim(buffer))
|
2015-12-20 00:54:56 +01:00
|
|
|
sze += l
|
|
|
|
rc = f77_zmq_msg_init_size(msg,l)
|
|
|
|
rc = f77_zmq_msg_copy_to_data(msg, buffer,l)
|
|
|
|
if (i < isize) then
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,ZMQ_SNDMORE)
|
|
|
|
else
|
|
|
|
rc = f77_zmq_msg_send(msg,zmq_socket_push,0)
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_msg_close(msg)
|
|
|
|
enddo
|
|
|
|
|
|
|
|
call worker_log(irp_here,'')
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subroutine get_running(do_run)
|
|
|
|
use f77_zmq
|
|
|
|
implicit none
|
|
|
|
include '../types.F'
|
|
|
|
BEGIN_DOC
|
|
|
|
! Fetches the 'do_run' information
|
|
|
|
END_DOC
|
|
|
|
integer :: do_run
|
|
|
|
integer :: rc, timeout
|
|
|
|
character*(16) :: buffer
|
|
|
|
integer(ZMQ_PTR), save :: pollitem = 0_ZMQ_PTR
|
|
|
|
|
|
|
|
if (.not.is_worker) then
|
|
|
|
do_run = t_Running
|
|
|
|
return
|
|
|
|
else
|
|
|
|
|
|
|
|
timeout = 5 ! seconds
|
|
|
|
|
|
|
|
! Polling items
|
|
|
|
! -------------
|
|
|
|
|
|
|
|
if (pollitem == 0_ZMQ_PTR) then
|
|
|
|
pollitem = f77_zmq_pollitem_new(1)
|
|
|
|
rc = f77_zmq_pollitem_set_socket(pollitem,1,zmq_socket_running)
|
|
|
|
rc = f77_zmq_pollitem_set_events(pollitem,1,ZMQ_POLLIN)
|
|
|
|
endif
|
|
|
|
|
|
|
|
! Check for disconnected forwarder after timeout
|
|
|
|
! ----------------------------------------------
|
|
|
|
|
|
|
|
buffer = 'Stopped'
|
|
|
|
do while (timeout > 0)
|
|
|
|
rc = f77_zmq_poll(pollitem, 1, 100_8)
|
|
|
|
if (iand(f77_zmq_pollitem_revents(pollitem,1), ZMQ_POLLIN) /= 0) then
|
|
|
|
exit
|
|
|
|
endif
|
|
|
|
timeout = timeout-1
|
|
|
|
call sleep(1)
|
|
|
|
enddo
|
|
|
|
|
|
|
|
! Empty the queue to get only the last value
|
|
|
|
! ------------------------------------------
|
|
|
|
|
|
|
|
do
|
|
|
|
rc = f77_zmq_poll(pollitem, 1, 0)
|
|
|
|
if (iand(f77_zmq_pollitem_revents(pollitem,1), ZMQ_POLLIN) == 0) then
|
|
|
|
exit
|
|
|
|
endif
|
|
|
|
rc = f77_zmq_recv(zmq_socket_running, buffer, 16, 0)
|
|
|
|
enddo
|
|
|
|
|
|
|
|
if (buffer == 'Running') then
|
|
|
|
do_run = t_Running
|
|
|
|
else if (buffer == 'Queued') then
|
|
|
|
do_run = t_Running
|
|
|
|
else
|
|
|
|
do_run = t_Stopped
|
|
|
|
endif
|
2016-04-05 00:48:37 +02:00
|
|
|
! call worker_log(irp_here,buffer)
|
2015-12-20 00:54:56 +01:00
|
|
|
|
|
|
|
endif
|
|
|
|
end
|