mirror of
https://github.com/QuantumPackage/qp2.git
synced 2024-11-07 05:53:37 +01:00
Removed internal reads in zmq
This commit is contained in:
parent
d7d3afacb1
commit
8102d43aff
@ -116,7 +116,7 @@ subroutine ao_two_e_integrals_in_map_slave(thread,iproc)
|
|||||||
exit
|
exit
|
||||||
endif
|
endif
|
||||||
if (task_id == 0) exit
|
if (task_id == 0) exit
|
||||||
read(task,*) j, l
|
call sscanf_dd(task, j, l)
|
||||||
integer, external :: task_done_to_taskserver
|
integer, external :: task_done_to_taskserver
|
||||||
call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value)
|
call compute_ao_integrals_jl(j,l,n_integrals,buffer_i,buffer_value)
|
||||||
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then
|
if (task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) == -1) then
|
||||||
|
@ -52,7 +52,7 @@ subroutine run_selection_slave(thread,iproc,energy)
|
|||||||
ctask = ctask - 1
|
ctask = ctask - 1
|
||||||
else
|
else
|
||||||
integer :: i_generator, N, subset, bsize
|
integer :: i_generator, N, subset, bsize
|
||||||
read(task,*) subset, i_generator, N
|
call sscanf_ddd(task, subset, i_generator, N)
|
||||||
if(buf%N == 0) then
|
if(buf%N == 0) then
|
||||||
! Only first time
|
! Only first time
|
||||||
call create_selection_buffer(N, N*2, buf)
|
call create_selection_buffer(N, N*2, buf)
|
||||||
|
@ -1,6 +1,40 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
void usleep_c(int s)
|
void usleep_c(int s)
|
||||||
{
|
{
|
||||||
usleep((useconds_t) s);
|
usleep((useconds_t) s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void sscanf_ssds_c(const char* str, char* s1, char* s2, int* i, char* s3)
|
||||||
|
{
|
||||||
|
sscanf(str, "%s %s %d %s", s1, s2, i, s3);
|
||||||
|
s1[strlen(s1)] = ' ';
|
||||||
|
s2[strlen(s2)] = ' ';
|
||||||
|
s3[strlen(s3)] = ' ';
|
||||||
|
}
|
||||||
|
|
||||||
|
void sscanf_dd_c(const char* str, int* i1, int* i2)
|
||||||
|
{
|
||||||
|
sscanf(str, "%d %d", i1, i2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void sscanf_ddd_c(const char* str, int* i1, int* i2, int* i3)
|
||||||
|
{
|
||||||
|
sscanf(str, "%d %d %d", i1, i2, i3);
|
||||||
|
}
|
||||||
|
|
||||||
|
void sscanf_ss_c(const char* str, char* s1, char* s2)
|
||||||
|
{
|
||||||
|
sscanf(str, "%s %s", s1, s2);
|
||||||
|
s1[strlen(s1)] = ' ';
|
||||||
|
s2[strlen(s2)] = ' ';
|
||||||
|
}
|
||||||
|
|
||||||
|
void sscanf_sd_c(const char* str, char* s1, int* i)
|
||||||
|
{
|
||||||
|
sscanf(str, "%s %d", s1, i);
|
||||||
|
s1[strlen(s1)] = ' ';
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -8,14 +8,127 @@ module c_functions
|
|||||||
end subroutine usleep_c
|
end subroutine usleep_c
|
||||||
end interface
|
end interface
|
||||||
|
|
||||||
end module
|
interface
|
||||||
|
integer(c_int) function atoi_c(a) bind (C,name="atoi")
|
||||||
|
use iso_c_binding
|
||||||
|
character(kind=c_char), intent(in) :: a(*)
|
||||||
|
end function atoi_c
|
||||||
|
end interface
|
||||||
|
|
||||||
subroutine usleep(us)
|
interface
|
||||||
|
subroutine sscanf_ss_c(str,s1, s2) bind (C)
|
||||||
|
use iso_c_binding
|
||||||
|
character(kind=c_char), intent(in ) :: str(*)
|
||||||
|
character(kind=c_char), intent(out) :: s1(*),s2(*)
|
||||||
|
end subroutine sscanf_ss_c
|
||||||
|
end interface
|
||||||
|
|
||||||
|
interface
|
||||||
|
subroutine sscanf_ssds_c(str, s1, s2, i, s3) bind (C)
|
||||||
|
use iso_c_binding
|
||||||
|
character(kind=c_char), intent(in ) :: str(*)
|
||||||
|
character(kind=c_char), intent(out) :: s1(*),s2(*),s3(*)
|
||||||
|
integer(kind=c_int) , intent(out) :: i
|
||||||
|
end subroutine sscanf_ssds_c
|
||||||
|
end interface
|
||||||
|
|
||||||
|
interface
|
||||||
|
subroutine sscanf_dd_c(str, i1, i2) bind (C)
|
||||||
|
use iso_c_binding
|
||||||
|
character(kind=c_char), intent(in ) :: str(*)
|
||||||
|
integer(kind=c_int) , intent(out) :: i1, i2
|
||||||
|
end subroutine sscanf_dd_c
|
||||||
|
end interface
|
||||||
|
|
||||||
|
interface
|
||||||
|
subroutine sscanf_ddd_c(str, i1, i2, i3) bind (C)
|
||||||
|
use iso_c_binding
|
||||||
|
character(kind=c_char), intent(in ) :: str(*)
|
||||||
|
integer(kind=c_int) , intent(out) :: i1, i2, i3
|
||||||
|
end subroutine sscanf_ddd_c
|
||||||
|
end interface
|
||||||
|
|
||||||
|
interface
|
||||||
|
subroutine sscanf_sd_c(str,s1, i) bind (C)
|
||||||
|
use iso_c_binding
|
||||||
|
character(kind=c_char), intent(in ) :: str(*)
|
||||||
|
character(kind=c_char), intent(out) :: s1(*)
|
||||||
|
integer(kind=c_int) , intent(out) :: i
|
||||||
|
end subroutine sscanf_sd_c
|
||||||
|
end interface
|
||||||
|
|
||||||
|
contains
|
||||||
|
|
||||||
|
integer function atoi(a)
|
||||||
|
implicit none
|
||||||
|
character(len=*), intent(in) :: a
|
||||||
|
atoi = atoi_c(trim(a)//c_null_char)
|
||||||
|
end function atoi
|
||||||
|
|
||||||
|
end module c_functions
|
||||||
|
|
||||||
|
subroutine sscanf_ss(str, s1,s2)
|
||||||
use c_functions
|
use c_functions
|
||||||
use iso_c_binding
|
use iso_c_binding
|
||||||
implicit none
|
implicit none
|
||||||
|
character(*), intent(in) :: str
|
||||||
|
character(*), intent(out) :: s1,s2
|
||||||
|
s1 = ' '
|
||||||
|
s2 = ' '
|
||||||
|
call sscanf_ss_c(trim(str)//c_null_char, s1, s2)
|
||||||
|
end subroutine sscanf_ss
|
||||||
|
|
||||||
|
subroutine sscanf_sd(str, s1,i)
|
||||||
|
use c_functions
|
||||||
|
use iso_c_binding
|
||||||
|
implicit none
|
||||||
|
character(*), intent(in) :: str
|
||||||
|
character(*), intent(out) :: s1
|
||||||
|
integer, intent(out) :: i
|
||||||
|
s1 = ' '
|
||||||
|
call sscanf_sd_c(trim(str)//c_null_char, s1, i)
|
||||||
|
end subroutine sscanf_sd
|
||||||
|
|
||||||
|
subroutine sscanf_ssds(str, s1,s2,i,s3)
|
||||||
|
use c_functions
|
||||||
|
use iso_c_binding
|
||||||
|
implicit none
|
||||||
|
character(*), intent(in) :: str
|
||||||
|
character(*), intent(out) :: s1,s2,s3
|
||||||
|
integer, intent(out) :: i
|
||||||
|
s1 = ' '
|
||||||
|
s2 = ' '
|
||||||
|
s3 = ' '
|
||||||
|
call sscanf_ssds_c(trim(str)//c_null_char, s1, s2, i, s3)
|
||||||
|
end subroutine sscanf_ssds
|
||||||
|
|
||||||
|
subroutine sscanf_dd(str, i1,i2)
|
||||||
|
use c_functions
|
||||||
|
use iso_c_binding
|
||||||
|
implicit none
|
||||||
|
character(*), intent(in) :: str
|
||||||
|
integer, intent(out) :: i1, i2
|
||||||
|
call sscanf_dd_c(trim(str)//c_null_char, i1, i2)
|
||||||
|
end subroutine sscanf_dd
|
||||||
|
|
||||||
|
subroutine sscanf_ddd(str, i1,i2,i3)
|
||||||
|
use c_functions
|
||||||
|
use iso_c_binding
|
||||||
|
implicit none
|
||||||
|
character(*), intent(in) :: str
|
||||||
|
integer, intent(out) :: i1, i2, i3
|
||||||
|
call sscanf_ddd_c(trim(str)//c_null_char, i1, i2, i3)
|
||||||
|
end subroutine sscanf_ddd
|
||||||
|
|
||||||
|
|
||||||
|
subroutine usleep(us)
|
||||||
|
use iso_c_binding
|
||||||
|
use c_functions
|
||||||
|
implicit none
|
||||||
integer, intent(in) :: us
|
integer, intent(in) :: us
|
||||||
integer(c_int) :: u
|
integer(c_int) :: u
|
||||||
u = us
|
u = us
|
||||||
call usleep_c(u)
|
call usleep_c(u)
|
||||||
end
|
end subroutine usleep
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ END_PROVIDER
|
|||||||
BEGIN_PROVIDER [ character*(128), qp_run_address ]
|
BEGIN_PROVIDER [ character*(128), qp_run_address ]
|
||||||
&BEGIN_PROVIDER [ integer, zmq_port_start ]
|
&BEGIN_PROVIDER [ integer, zmq_port_start ]
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
|
use c_functions
|
||||||
implicit none
|
implicit none
|
||||||
BEGIN_DOC
|
BEGIN_DOC
|
||||||
! Address of the qp_run socket
|
! Address of the qp_run socket
|
||||||
@ -32,14 +33,15 @@ END_PROVIDER
|
|||||||
do i=len(buffer),1,-1
|
do i=len(buffer),1,-1
|
||||||
if ( buffer(i:i) == ':') then
|
if ( buffer(i:i) == ':') then
|
||||||
qp_run_address = trim(buffer(1:i-1))
|
qp_run_address = trim(buffer(1:i-1))
|
||||||
read(buffer(i+1:), *, err=10,end=10) zmq_port_start
|
zmq_port_start = atoi(buffer(i+1:))
|
||||||
exit
|
exit
|
||||||
endif
|
endif
|
||||||
enddo
|
enddo
|
||||||
return
|
|
||||||
10 continue
|
if (zmq_port_start == 0) then
|
||||||
print *, irp_here, ': Error in read'
|
print *, irp_here, ': zmq_port_start is 0'
|
||||||
stop -1
|
stop -1
|
||||||
|
endif
|
||||||
END_PROVIDER
|
END_PROVIDER
|
||||||
|
|
||||||
BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ]
|
BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ]
|
||||||
@ -84,6 +86,7 @@ end
|
|||||||
|
|
||||||
subroutine switch_qp_run_to_master
|
subroutine switch_qp_run_to_master
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
|
use c_functions
|
||||||
implicit none
|
implicit none
|
||||||
BEGIN_DOC
|
BEGIN_DOC
|
||||||
! Address of the master qp_run socket
|
! Address of the master qp_run socket
|
||||||
@ -102,16 +105,17 @@ subroutine switch_qp_run_to_master
|
|||||||
do i=len(buffer),1,-1
|
do i=len(buffer),1,-1
|
||||||
if ( buffer(i:i) == ':') then
|
if ( buffer(i:i) == ':') then
|
||||||
qp_run_address = trim(buffer(1:i-1))
|
qp_run_address = trim(buffer(1:i-1))
|
||||||
read(buffer(i+1:), *, end=10, err=10) zmq_port_start
|
zmq_port_start = atoi(buffer(i+1:))
|
||||||
exit
|
exit
|
||||||
endif
|
endif
|
||||||
enddo
|
enddo
|
||||||
call reset_zmq_addresses
|
call reset_zmq_addresses
|
||||||
|
|
||||||
return
|
return
|
||||||
10 continue
|
if (zmq_port_start == 0) then
|
||||||
print *, irp_here, ': Error in read'
|
print *, irp_here, ': zmq_port_start is 0'
|
||||||
stop -1
|
stop -1
|
||||||
|
endif
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
@ -650,12 +654,17 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
|
|||||||
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
|
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
|
||||||
message = trim(message(1:rc))
|
message = trim(message(1:rc))
|
||||||
if(message(1:5) == "error") then
|
if(message(1:5) == "error") then
|
||||||
go to 10
|
connect_to_taskserver = -1
|
||||||
|
return
|
||||||
end if
|
end if
|
||||||
read(message,*, end=10, err=10) reply, state, worker_id, address
|
|
||||||
|
call sscanf_ssds(message, reply, state, worker_id, address)
|
||||||
|
|
||||||
if (trim(reply) /= 'connect_reply') then
|
if (trim(reply) /= 'connect_reply') then
|
||||||
go to 10
|
connect_to_taskserver = -1
|
||||||
|
return
|
||||||
endif
|
endif
|
||||||
|
|
||||||
if (trim(state) /= zmq_state) then
|
if (trim(state) /= zmq_state) then
|
||||||
integer, external :: disconnect_from_taskserver_state
|
integer, external :: disconnect_from_taskserver_state
|
||||||
if (disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_id, state) == -1) then
|
if (disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_id, state) == -1) then
|
||||||
@ -663,13 +672,8 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
|
|||||||
continue
|
continue
|
||||||
endif
|
endif
|
||||||
connect_to_taskserver = -1
|
connect_to_taskserver = -1
|
||||||
return
|
|
||||||
endif
|
endif
|
||||||
|
|
||||||
return
|
|
||||||
10 continue
|
|
||||||
! print *, irp_here//': '//trim(message)
|
|
||||||
connect_to_taskserver = -1
|
|
||||||
end
|
end
|
||||||
|
|
||||||
integer function disconnect_from_taskserver(zmq_to_qp_run_socket, worker_id)
|
integer function disconnect_from_taskserver(zmq_to_qp_run_socket, worker_id)
|
||||||
@ -698,7 +702,7 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i
|
|||||||
character*(512) :: message, reply
|
character*(512) :: message, reply
|
||||||
character*(128) :: state_tmp
|
character*(128) :: state_tmp
|
||||||
|
|
||||||
disconnect_from_taskserver_state = 0
|
disconnect_from_taskserver_state = -1
|
||||||
|
|
||||||
write(message,*) 'disconnect '//trim(state), worker_id
|
write(message,*) 'disconnect '//trim(state), worker_id
|
||||||
|
|
||||||
@ -718,21 +722,15 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i
|
|||||||
rc = min(510,rc)
|
rc = min(510,rc)
|
||||||
message = trim(message(1:rc))
|
message = trim(message(1:rc))
|
||||||
|
|
||||||
read(message,*, end=10, err=10) reply, state_tmp
|
call sscanf_ss(message, reply, state_tmp)
|
||||||
if ((trim(reply) == 'disconnect_reply').and.(trim(state_tmp) == trim(state))) then
|
|
||||||
return
|
if (trim(state_tmp) /= trim(state)) then
|
||||||
endif
|
|
||||||
if (trim(message) == 'error Wrong state') then
|
|
||||||
disconnect_from_taskserver_state = -1
|
|
||||||
return
|
|
||||||
else if (trim(message) == 'error No job is running') then
|
|
||||||
disconnect_from_taskserver_state = -1
|
|
||||||
return
|
return
|
||||||
endif
|
endif
|
||||||
|
|
||||||
return
|
if ((trim(reply) == 'disconnect_reply')) then
|
||||||
10 continue
|
disconnect_from_taskserver_state = 0
|
||||||
disconnect_from_taskserver_state = -1
|
endif
|
||||||
end
|
end
|
||||||
|
|
||||||
integer function add_task_to_taskserver(zmq_to_qp_run_socket,task)
|
integer function add_task_to_taskserver(zmq_to_qp_run_socket,task)
|
||||||
@ -898,7 +896,7 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id
|
|||||||
|
|
||||||
character*(1024) :: message
|
character*(1024) :: message
|
||||||
character*(64) :: reply
|
character*(64) :: reply
|
||||||
integer :: rc, sze
|
integer :: rc, sze, i
|
||||||
|
|
||||||
get_task_from_taskserver = 0
|
get_task_from_taskserver = 0
|
||||||
|
|
||||||
@ -912,12 +910,15 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id
|
|||||||
endif
|
endif
|
||||||
|
|
||||||
task_id = 0
|
task_id = 0
|
||||||
message = repeat(' ',1024)
|
message = ' '
|
||||||
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
|
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
|
||||||
rc = min(64,rc)
|
i = 1
|
||||||
read(message(1:rc),*, end=10, err=10) reply
|
do while (message(i:i) /= ' ')
|
||||||
if (trim(reply) == 'get_task_reply') then
|
i = i+1
|
||||||
read(message(1:rc),*, end=10, err=10) reply, task_id
|
enddo
|
||||||
|
reply = message(1:i-1)
|
||||||
|
if (reply == 'get_task_reply') then
|
||||||
|
call sscanf_sd(message, reply, task_id)
|
||||||
rc = 15
|
rc = 15
|
||||||
do while (rc < 1024 .and. message(rc:rc) == ' ')
|
do while (rc < 1024 .and. message(rc:rc) == ' ')
|
||||||
rc += 1
|
rc += 1
|
||||||
@ -937,15 +938,12 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id
|
|||||||
get_task_from_taskserver = -1
|
get_task_from_taskserver = -1
|
||||||
return
|
return
|
||||||
endif
|
endif
|
||||||
return
|
|
||||||
|
|
||||||
10 continue
|
|
||||||
get_task_from_taskserver = -1
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,n_tasks)
|
integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,n_tasks)
|
||||||
|
use c_functions
|
||||||
use f77_zmq
|
use f77_zmq
|
||||||
implicit none
|
implicit none
|
||||||
BEGIN_DOC
|
BEGIN_DOC
|
||||||
@ -1000,7 +998,7 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i
|
|||||||
return
|
return
|
||||||
endif
|
endif
|
||||||
rc = min(1024,rc)
|
rc = min(1024,rc)
|
||||||
read(message(1:rc),*, end=10, err=10) task_id(i)
|
task_id(i) = atoi(message(1:rc))
|
||||||
if (task_id(i) == 0) then
|
if (task_id(i) == 0) then
|
||||||
task(i) = 'terminate'
|
task(i) = 'terminate'
|
||||||
n_tasks = i
|
n_tasks = i
|
||||||
|
Loading…
Reference in New Issue
Block a user