10
0
mirror of https://github.com/QuantumPackage/qp2.git synced 2025-01-03 18:16:04 +01:00

Fixed segfaults

This commit is contained in:
Anthony Scemama 2021-03-20 20:57:32 +01:00
parent ad6419f3af
commit 32c2d2c80e
2 changed files with 90 additions and 40 deletions

View File

@ -171,7 +171,7 @@ subroutine run_pt2_slave_large(thread,iproc,energy)
integer :: rc, i
integer :: worker_id, ctask, ltask
character*(512) :: task
character(LEN=:), allocatable :: task
integer :: task_id(1)
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
@ -191,6 +191,7 @@ subroutine run_pt2_slave_large(thread,iproc,energy)
logical :: sending
PROVIDE global_selection_buffer global_selection_buffer_lock
allocate(character(LEN=512) :: task)
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
@ -264,6 +265,7 @@ subroutine run_pt2_slave_large(thread,iproc,energy)
call pt2_dealloc(pt2_data(1))
end do
deallocate(task)
call push_pt2_results_async_recv(zmq_socket_push,b%mini,sending)
integer, external :: disconnect_from_taskserver

View File

@ -492,12 +492,13 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
END_DOC
character*(*), intent(in) :: name_in
character*(512) :: message, name
character(LEN=:), allocatable :: message, name
integer :: rc, sze
integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR),external :: new_zmq_pull_socket
integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket, zmq_socket_pull
integer, save :: icount=0
allocate(character(LEN=512) :: message, name)
icount = icount+1
call omp_set_lock(zmq_lock)
@ -532,6 +533,7 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
print *, 'Unable to start parallel job : '//name
stop 1
endif
deallocate(message, name)
end
@ -543,8 +545,9 @@ integer function zmq_set_running(zmq_to_qp_run_socket)
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
character*(512) :: message
character(LEN=:), allocatable :: message
integer :: rc, sze
allocate(character(LEN=512) :: message)
zmq_set_running = 0
message = 'set_running'
@ -552,14 +555,17 @@ integer function zmq_set_running(zmq_to_qp_run_socket)
rc = f77_zmq_send(zmq_to_qp_run_socket,message,sze,0)
if (rc /= sze) then
zmq_set_running = -1
deallocate(message)
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,message,510,0)
message = trim(message(1:rc))
if (message(1:2) /= 'ok') then
zmq_set_running = -1
deallocate(message)
return
endif
deallocate(message)
end
@ -573,9 +579,10 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket, zmq_socket_pull
character*(*), intent(in) :: name_in
character*(512) :: message, name
character(LEN=:), allocatable :: message, name
integer :: i,rc, sze
integer, save :: icount=0
allocate(character(LEN=512) :: message, name)
icount = icount+1
write(name,'(A,I8.8)') trim(name_in)//'.', icount
@ -613,6 +620,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
print *, 'Unable to terminate ZMQ context'
stop 'error'
endif
deallocate(message,name)
end
integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
@ -625,9 +633,9 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
integer, intent(out) :: worker_id
integer, intent(in) :: thread
character*(512) :: message
character*(128) :: reply, state, address
character(LEN=:), allocatable :: message, reply, state, address
integer :: rc
allocate(character(LEN=512) :: message, reply, state, address)
!Success
connect_to_taskserver = 0
@ -642,6 +650,7 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
rc = f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0)
if (rc /= 11) then
connect_to_taskserver = -1
deallocate(message, reply, state, address)
return
endif
endif
@ -663,12 +672,13 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
continue
endif
connect_to_taskserver = -1
deallocate(message, reply, state, address)
return
endif
return
10 continue
! print *, irp_here//': '//trim(message)
deallocate(message, reply, state, address)
connect_to_taskserver = -1
end
@ -695,8 +705,8 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i
character*(128), intent(in) :: state
integer :: rc, sze
character*(512) :: message, reply
character*(128) :: state_tmp
character(LEN=:), allocatable :: message, reply, state_tmp
allocate(character(LEN=512) :: message, reply, state_tmp)
disconnect_from_taskserver_state = 0
@ -707,12 +717,14 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i
if (rc /= sze) then
disconnect_from_taskserver_state = -2
deallocate(message, reply, state_tmp)
return
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
if (rc <= 0) then
disconnect_from_taskserver_state = -3
deallocate(message, reply, state_tmp)
return
endif
rc = min(510,rc)
@ -720,18 +732,23 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i
read(message,*, end=10, err=10) reply, state_tmp
if ((trim(reply) == 'disconnect_reply').and.(trim(state_tmp) == trim(state))) then
deallocate(message, reply, state_tmp)
return
endif
if (trim(message) == 'error Wrong state') then
disconnect_from_taskserver_state = -1
deallocate(message, reply, state_tmp)
return
else if (trim(message) == 'error No job is running') then
disconnect_from_taskserver_state = -1
deallocate(message, reply, state_tmp)
return
endif
deallocate(message, reply, state_tmp)
return
10 continue
deallocate(message, reply, state_tmp)
disconnect_from_taskserver_state = -1
end
@ -780,12 +797,12 @@ integer function zmq_abort(zmq_to_qp_run_socket)
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
integer :: rc, sze, i
integer, parameter :: count_max=60
character*(512) :: message
character(LEN=:), allocatable :: message
allocate(character(LEN=512) :: message)
zmq_abort = 0
write(message,*) 'abort '
sze = len(trim(message))
do i=1,count_max
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
@ -809,6 +826,7 @@ integer function zmq_abort(zmq_to_qp_run_socket)
return
endif
deallocate(message)
end
integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id)
@ -821,7 +839,8 @@ integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_i
integer, intent(in) :: worker_id, task_id
integer :: rc, sze
character*(512) :: message
character(LEN=:), allocatable :: message
allocate(character(LEN=512) :: message)
task_done_to_taskserver = 0
@ -831,6 +850,7 @@ integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_i
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc /= sze) then
task_done_to_taskserver = -1
deallocate(message)
return
endif
@ -838,9 +858,11 @@ integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_i
if (trim(message(1:rc)) /= 'ok') then
print *, 'task_done_to_taskserver: '//trim(message(1:rc))
task_done_to_taskserver = -1
deallocate(message)
return
endif
deallocate(message)
end
integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks)
@ -853,26 +875,27 @@ integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_
integer, intent(in) :: n_tasks, worker_id, task_id(n_tasks)
integer :: rc, sze, k
character(LEN=:), allocatable :: message
character*(64) :: fmt
character(LEN=:), allocatable :: message, fmt
tasks_done_to_taskserver = 0
allocate(character(LEN=64+n_tasks*12) :: message)
write(fmt,*) '(A,X,A,I10,X,', n_tasks, '(I11,1X))'
allocate(character(LEN=64+n_tasks*12) :: message, fmt)
!$OMP CRITICAL
write(fmt,'(A,I5,A)') '(A,X,A,I10,X,', n_tasks, '(I11,1X))'
!$OMP END CRITICAL
write(message,*) 'task_done '//trim(zmq_state), worker_id, (task_id(k), k=1,n_tasks)
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)
if (rc == -1) then
tasks_done_to_taskserver = -1
deallocate(message)
deallocate(message,fmt)
return
endif
if (rc /= sze) then
tasks_done_to_taskserver = -1
deallocate(message)
deallocate(message,fmt)
return
endif
@ -881,7 +904,7 @@ integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_
print *, 'tasks_done_to_taskserver: '//trim(message(1:rc))
tasks_done_to_taskserver = -1
endif
deallocate(message)
deallocate(message,fmt)
end
@ -896,9 +919,10 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id
integer, intent(out) :: task_id
character*(512), intent(out) :: task
character*(1024) :: message
character(LEN=:), allocatable :: message
character*(64) :: reply
integer :: rc, sze
allocate(character(LEN=1024) :: message)
get_task_from_taskserver = 0
@ -908,6 +932,7 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
if (rc /= sze) then
get_task_from_taskserver = -1
deallocate(message)
return
endif
@ -939,13 +964,17 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id
task = 'terminate'
else
get_task_from_taskserver = -1
deallocate(message)
return
endif
deallocate(message)
return
10 continue
deallocate(message)
get_task_from_taskserver = -1
allocate(character(LEN=1024) :: message)
end
@ -961,25 +990,27 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i
integer, intent(out) :: task_id(n_tasks)
character*(512), intent(out) :: task(n_tasks)
character*(1024) :: message
character*(64) :: reply
character(LEN=:), allocatable :: message
integer :: rc, sze, i
allocate(character(LEN=1024) :: message)
get_tasks_from_taskserver = 0
!$OMP CRITICAL
write(message,*) 'get_tasks '//trim(zmq_state), worker_id, n_tasks
!$OMP END CRITICAL
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
if (rc /= sze) then
get_tasks_from_taskserver = -1
deallocate(message)
return
endif
message = repeat(' ',1024)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
rc = min(1024,rc)
read(message(1:rc),*, end=10, err=10) reply
if (trim(message) == 'get_tasks_reply ok') then
continue
else if (trim(message) == 'terminate') then
@ -990,6 +1021,7 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i
task(1) = 'terminate'
else
get_tasks_from_taskserver = -1
deallocate(message)
return
endif
@ -998,7 +1030,10 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i
message = repeat(' ',512)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
rc = min(1024,rc)
read(message(1:rc),*, end=10, err=10) task_id(i)
!$OMP CRITICAL
! read(message(1:rc),*, end=10, err=10) task_id(i)
read(message(1:rc),*) task_id(i)
!$OMP END CRITICAL
if (task_id(i) == 0) then
task(i) = 'terminate'
n_tasks = i
@ -1014,10 +1049,12 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i
rc += 1
task(i) = message(rc:)
enddo
deallocate(message)
return
10 continue
get_tasks_from_taskserver = -1
deallocate(message)
return
end
@ -1057,7 +1094,8 @@ integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,mo
integer, intent(in) :: task_id
integer, intent(out) :: more
integer :: rc
character*(512) :: message
character(LEN=:), allocatable :: message
allocate(character(LEN=512) :: message)
zmq_delete_task = 0
@ -1065,6 +1103,7 @@ integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,mo
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
if (rc /= len(trim(message))) then
zmq_delete_task = -1
deallocate(message)
return
endif
@ -1078,8 +1117,10 @@ integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,mo
more = 0
else
zmq_delete_task = -1
deallocate(message)
return
endif
deallocate(message)
end
integer function zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending)
@ -1093,7 +1134,8 @@ integer function zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending
integer, intent(in) :: task_id
logical, intent(inout) :: sending
integer :: rc
character*(512) :: message
character(LEN=:), allocatable :: message
allocate(character(LEN=512) :: message)
if (sending) then
print *, irp_here, ': sending=true'
@ -1105,9 +1147,11 @@ integer function zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
if (rc /= len(trim(message))) then
zmq_delete_task_async_send = -1
deallocate(message)
return
endif
sending = .True.
deallocate(message)
end
@ -1122,10 +1166,12 @@ integer function zmq_delete_task_async_recv(zmq_to_qp_run_socket,more,sending)
integer, intent(out) :: more
logical, intent(inout) :: sending
integer :: rc
character*(512) :: message
character*(64) :: reply
character(LEN=:), allocatable :: message, reply
zmq_delete_task_async_recv = 0
if (.not.sending) return
allocate(character(LEN=512) :: message, reply)
sending = .False.
reply = ''
rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0)
@ -1136,8 +1182,8 @@ integer function zmq_delete_task_async_recv(zmq_to_qp_run_socket,more,sending)
else
print *, reply(1:rc)
zmq_delete_task_async_recv = -1
return
endif
deallocate(message,reply)
end
integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more)
@ -1152,12 +1198,11 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n
integer, intent(in) :: n_tasks, task_id(n_tasks)
integer, intent(out) :: more
integer :: rc, k
character*(64) :: fmt, reply
character(LEN=:), allocatable :: message
character(LEN=:), allocatable :: message, fmt, reply
zmq_delete_tasks = 0
allocate(character(LEN=64+n_tasks*12) :: message)
allocate(character(LEN=64+n_tasks*12) :: message, fmt, reply)
write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))'
write(message,*) 'del_task '//trim(zmq_state), (task_id(k), k=1,n_tasks)
@ -1166,10 +1211,9 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
if (rc /= len(trim(message))) then
zmq_delete_tasks = -1
deallocate(message)
deallocate(message,fmt,reply)
return
endif
deallocate(message)
reply = ''
rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0)
@ -1181,6 +1225,7 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n
else
zmq_delete_tasks = -1
endif
deallocate(message,fmt,reply)
end
integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_tasks,sending)
@ -1194,8 +1239,7 @@ integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_task
integer, intent(in) :: n_tasks, task_id(n_tasks)
logical, intent(inout) :: sending
integer :: rc, k
character*(64) :: fmt, reply
character(LEN=:), allocatable :: message
character(LEN=:), allocatable :: fmt, message, reply
if (sending) then
print *, irp_here, ': sending is true'
@ -1204,19 +1248,21 @@ integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_task
sending = .True.
zmq_delete_tasks_async_send = 0
allocate(character(LEN=64+n_tasks*12) :: message)
allocate(character(LEN=64+n_tasks*12) :: message, fmt, reply)
!$OMP CRITICAL
write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))'
write(message,*) 'del_task '//trim(zmq_state), (task_id(k), k=1,n_tasks)
!$OMP END CRITICAL
rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0)
if (rc /= len(trim(message))) then
zmq_delete_tasks_async_send = -1
deallocate(message)
deallocate(message,fmt,reply)
return
endif
deallocate(message)
deallocate(message,fmt,reply)
end
@ -1232,7 +1278,8 @@ integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,more,sending)
integer, intent(out) :: more
logical, intent(inout) :: sending
integer :: rc
character*(64) :: reply
character(LEN=:), allocatable :: reply
allocate(character(LEN=64) :: reply)
zmq_delete_tasks_async_recv = 0
if (.not.sending) return
@ -1247,6 +1294,7 @@ integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,more,sending)
else
zmq_delete_tasks_async_recv = -1
endif
deallocate(reply)
sending = .False.
end