From 32c2d2c80e40314a16de998cf36f7e11f734ea58 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Sat, 20 Mar 2021 20:57:32 +0100 Subject: [PATCH] Fixed segfaults --- src/cipsi/run_pt2_slave.irp.f | 4 +- src/zmq/utils.irp.f | 126 +++++++++++++++++++++++----------- 2 files changed, 90 insertions(+), 40 deletions(-) diff --git a/src/cipsi/run_pt2_slave.irp.f b/src/cipsi/run_pt2_slave.irp.f index d3f4d45d..4fdcb0b1 100644 --- a/src/cipsi/run_pt2_slave.irp.f +++ b/src/cipsi/run_pt2_slave.irp.f @@ -171,7 +171,7 @@ subroutine run_pt2_slave_large(thread,iproc,energy) integer :: rc, i integer :: worker_id, ctask, ltask - character*(512) :: task + character(LEN=:), allocatable :: task integer :: task_id(1) integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket @@ -191,6 +191,7 @@ subroutine run_pt2_slave_large(thread,iproc,energy) logical :: sending PROVIDE global_selection_buffer global_selection_buffer_lock + allocate(character(LEN=512) :: task) zmq_to_qp_run_socket = new_zmq_to_qp_run_socket() @@ -264,6 +265,7 @@ subroutine run_pt2_slave_large(thread,iproc,energy) call pt2_dealloc(pt2_data(1)) end do + deallocate(task) call push_pt2_results_async_recv(zmq_socket_push,b%mini,sending) integer, external :: disconnect_from_taskserver diff --git a/src/zmq/utils.irp.f b/src/zmq/utils.irp.f index 2fa43b29..898fbbb5 100644 --- a/src/zmq/utils.irp.f +++ b/src/zmq/utils.irp.f @@ -492,12 +492,13 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) END_DOC character*(*), intent(in) :: name_in - character*(512) :: message, name + character(LEN=:), allocatable :: message, name integer :: rc, sze integer(ZMQ_PTR),external :: new_zmq_to_qp_run_socket integer(ZMQ_PTR),external :: new_zmq_pull_socket integer(ZMQ_PTR), intent(out) :: zmq_to_qp_run_socket, zmq_socket_pull integer, save :: icount=0 + allocate(character(LEN=512) :: message, name) icount = icount+1 call omp_set_lock(zmq_lock) @@ -532,6 +533,7 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) print *, 'Unable to start parallel job : '//name stop 1 endif + deallocate(message, name) end @@ -543,8 +545,9 @@ integer function zmq_set_running(zmq_to_qp_run_socket) END_DOC integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket - character*(512) :: message + character(LEN=:), allocatable :: message integer :: rc, sze + allocate(character(LEN=512) :: message) zmq_set_running = 0 message = 'set_running' @@ -552,14 +555,17 @@ integer function zmq_set_running(zmq_to_qp_run_socket) rc = f77_zmq_send(zmq_to_qp_run_socket,message,sze,0) if (rc /= sze) then zmq_set_running = -1 + deallocate(message) return endif rc = f77_zmq_recv(zmq_to_qp_run_socket,message,510,0) message = trim(message(1:rc)) if (message(1:2) /= 'ok') then zmq_set_running = -1 + deallocate(message) return endif + deallocate(message) end @@ -573,9 +579,10 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket, zmq_socket_pull character*(*), intent(in) :: name_in - character*(512) :: message, name + character(LEN=:), allocatable :: message, name integer :: i,rc, sze integer, save :: icount=0 + allocate(character(LEN=512) :: message, name) icount = icount+1 write(name,'(A,I8.8)') trim(name_in)//'.', icount @@ -613,6 +620,7 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in) print *, 'Unable to terminate ZMQ context' stop 'error' endif + deallocate(message,name) end integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) @@ -625,9 +633,9 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) integer, intent(out) :: worker_id integer, intent(in) :: thread - character*(512) :: message - character*(128) :: reply, state, address + character(LEN=:), allocatable :: message, reply, state, address integer :: rc + allocate(character(LEN=512) :: message, reply, state, address) !Success connect_to_taskserver = 0 @@ -642,6 +650,7 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) rc = f77_zmq_send(zmq_to_qp_run_socket, "connect tcp", 11, 0) if (rc /= 11) then connect_to_taskserver = -1 + deallocate(message, reply, state, address) return endif endif @@ -663,12 +672,13 @@ integer function connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread) continue endif connect_to_taskserver = -1 + deallocate(message, reply, state, address) return endif return 10 continue -! print *, irp_here//': '//trim(message) + deallocate(message, reply, state, address) connect_to_taskserver = -1 end @@ -695,8 +705,8 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i character*(128), intent(in) :: state integer :: rc, sze - character*(512) :: message, reply - character*(128) :: state_tmp + character(LEN=:), allocatable :: message, reply, state_tmp + allocate(character(LEN=512) :: message, reply, state_tmp) disconnect_from_taskserver_state = 0 @@ -707,12 +717,14 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i if (rc /= sze) then disconnect_from_taskserver_state = -2 + deallocate(message, reply, state_tmp) return endif rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0) if (rc <= 0) then disconnect_from_taskserver_state = -3 + deallocate(message, reply, state_tmp) return endif rc = min(510,rc) @@ -720,18 +732,23 @@ integer function disconnect_from_taskserver_state(zmq_to_qp_run_socket, worker_i read(message,*, end=10, err=10) reply, state_tmp if ((trim(reply) == 'disconnect_reply').and.(trim(state_tmp) == trim(state))) then + deallocate(message, reply, state_tmp) return endif if (trim(message) == 'error Wrong state') then disconnect_from_taskserver_state = -1 + deallocate(message, reply, state_tmp) return else if (trim(message) == 'error No job is running') then disconnect_from_taskserver_state = -1 + deallocate(message, reply, state_tmp) return endif + deallocate(message, reply, state_tmp) return 10 continue + deallocate(message, reply, state_tmp) disconnect_from_taskserver_state = -1 end @@ -780,12 +797,12 @@ integer function zmq_abort(zmq_to_qp_run_socket) integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket integer :: rc, sze, i integer, parameter :: count_max=60 - character*(512) :: message + character(LEN=:), allocatable :: message + allocate(character(LEN=512) :: message) zmq_abort = 0 write(message,*) 'abort ' - sze = len(trim(message)) do i=1,count_max rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) @@ -809,6 +826,7 @@ integer function zmq_abort(zmq_to_qp_run_socket) return endif + deallocate(message) end integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id) @@ -821,7 +839,8 @@ integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_i integer, intent(in) :: worker_id, task_id integer :: rc, sze - character*(512) :: message + character(LEN=:), allocatable :: message + allocate(character(LEN=512) :: message) task_done_to_taskserver = 0 @@ -831,6 +850,7 @@ integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_i rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc /= sze) then task_done_to_taskserver = -1 + deallocate(message) return endif @@ -838,9 +858,11 @@ integer function task_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_i if (trim(message(1:rc)) /= 'ok') then print *, 'task_done_to_taskserver: '//trim(message(1:rc)) task_done_to_taskserver = -1 + deallocate(message) return endif + deallocate(message) end integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_id, n_tasks) @@ -853,26 +875,27 @@ integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_ integer, intent(in) :: n_tasks, worker_id, task_id(n_tasks) integer :: rc, sze, k - character(LEN=:), allocatable :: message - character*(64) :: fmt + character(LEN=:), allocatable :: message, fmt tasks_done_to_taskserver = 0 - allocate(character(LEN=64+n_tasks*12) :: message) - write(fmt,*) '(A,X,A,I10,X,', n_tasks, '(I11,1X))' + allocate(character(LEN=64+n_tasks*12) :: message, fmt) + !$OMP CRITICAL + write(fmt,'(A,I5,A)') '(A,X,A,I10,X,', n_tasks, '(I11,1X))' + !$OMP END CRITICAL write(message,*) 'task_done '//trim(zmq_state), worker_id, (task_id(k), k=1,n_tasks) sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc == -1) then tasks_done_to_taskserver = -1 - deallocate(message) + deallocate(message,fmt) return endif if (rc /= sze) then tasks_done_to_taskserver = -1 - deallocate(message) + deallocate(message,fmt) return endif @@ -881,7 +904,7 @@ integer function tasks_done_to_taskserver(zmq_to_qp_run_socket, worker_id, task_ print *, 'tasks_done_to_taskserver: '//trim(message(1:rc)) tasks_done_to_taskserver = -1 endif - deallocate(message) + deallocate(message,fmt) end @@ -896,9 +919,10 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id integer, intent(out) :: task_id character*(512), intent(out) :: task - character*(1024) :: message + character(LEN=:), allocatable :: message character*(64) :: reply integer :: rc, sze + allocate(character(LEN=1024) :: message) get_task_from_taskserver = 0 @@ -908,6 +932,7 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) if (rc /= sze) then get_task_from_taskserver = -1 + deallocate(message) return endif @@ -939,13 +964,17 @@ integer function get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id task = 'terminate' else get_task_from_taskserver = -1 + deallocate(message) return endif + deallocate(message) return 10 continue + deallocate(message) get_task_from_taskserver = -1 + allocate(character(LEN=1024) :: message) end @@ -961,25 +990,27 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i integer, intent(out) :: task_id(n_tasks) character*(512), intent(out) :: task(n_tasks) - character*(1024) :: message - character*(64) :: reply + character(LEN=:), allocatable :: message integer :: rc, sze, i + allocate(character(LEN=1024) :: message) get_tasks_from_taskserver = 0 + !$OMP CRITICAL write(message,*) 'get_tasks '//trim(zmq_state), worker_id, n_tasks + !$OMP END CRITICAL sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0) if (rc /= sze) then get_tasks_from_taskserver = -1 + deallocate(message) return endif message = repeat(' ',1024) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0) rc = min(1024,rc) - read(message(1:rc),*, end=10, err=10) reply if (trim(message) == 'get_tasks_reply ok') then continue else if (trim(message) == 'terminate') then @@ -990,6 +1021,7 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i task(1) = 'terminate' else get_tasks_from_taskserver = -1 + deallocate(message) return endif @@ -998,7 +1030,10 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i message = repeat(' ',512) rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0) rc = min(1024,rc) - read(message(1:rc),*, end=10, err=10) task_id(i) + !$OMP CRITICAL +! read(message(1:rc),*, end=10, err=10) task_id(i) + read(message(1:rc),*) task_id(i) + !$OMP END CRITICAL if (task_id(i) == 0) then task(i) = 'terminate' n_tasks = i @@ -1014,10 +1049,12 @@ integer function get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_i rc += 1 task(i) = message(rc:) enddo + deallocate(message) return 10 continue get_tasks_from_taskserver = -1 + deallocate(message) return end @@ -1057,7 +1094,8 @@ integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,mo integer, intent(in) :: task_id integer, intent(out) :: more integer :: rc - character*(512) :: message + character(LEN=:), allocatable :: message + allocate(character(LEN=512) :: message) zmq_delete_task = 0 @@ -1065,6 +1103,7 @@ integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,mo rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) if (rc /= len(trim(message))) then zmq_delete_task = -1 + deallocate(message) return endif @@ -1078,8 +1117,10 @@ integer function zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,mo more = 0 else zmq_delete_task = -1 + deallocate(message) return endif + deallocate(message) end integer function zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending) @@ -1093,7 +1134,8 @@ integer function zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending integer, intent(in) :: task_id logical, intent(inout) :: sending integer :: rc - character*(512) :: message + character(LEN=:), allocatable :: message + allocate(character(LEN=512) :: message) if (sending) then print *, irp_here, ': sending=true' @@ -1105,9 +1147,11 @@ integer function zmq_delete_task_async_send(zmq_to_qp_run_socket,task_id,sending rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) if (rc /= len(trim(message))) then zmq_delete_task_async_send = -1 + deallocate(message) return endif sending = .True. + deallocate(message) end @@ -1122,10 +1166,12 @@ integer function zmq_delete_task_async_recv(zmq_to_qp_run_socket,more,sending) integer, intent(out) :: more logical, intent(inout) :: sending integer :: rc - character*(512) :: message - character*(64) :: reply + character(LEN=:), allocatable :: message, reply + zmq_delete_task_async_recv = 0 if (.not.sending) return + + allocate(character(LEN=512) :: message, reply) sending = .False. reply = '' rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0) @@ -1136,8 +1182,8 @@ integer function zmq_delete_task_async_recv(zmq_to_qp_run_socket,more,sending) else print *, reply(1:rc) zmq_delete_task_async_recv = -1 - return endif + deallocate(message,reply) end integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n_tasks,more) @@ -1152,12 +1198,11 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n integer, intent(in) :: n_tasks, task_id(n_tasks) integer, intent(out) :: more integer :: rc, k - character*(64) :: fmt, reply - character(LEN=:), allocatable :: message + character(LEN=:), allocatable :: message, fmt, reply zmq_delete_tasks = 0 - allocate(character(LEN=64+n_tasks*12) :: message) + allocate(character(LEN=64+n_tasks*12) :: message, fmt, reply) write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))' write(message,*) 'del_task '//trim(zmq_state), (task_id(k), k=1,n_tasks) @@ -1166,10 +1211,9 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) if (rc /= len(trim(message))) then zmq_delete_tasks = -1 - deallocate(message) + deallocate(message,fmt,reply) return endif - deallocate(message) reply = '' rc = f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0) @@ -1181,6 +1225,7 @@ integer function zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,n else zmq_delete_tasks = -1 endif + deallocate(message,fmt,reply) end integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_tasks,sending) @@ -1194,8 +1239,7 @@ integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_task integer, intent(in) :: n_tasks, task_id(n_tasks) logical, intent(inout) :: sending integer :: rc, k - character*(64) :: fmt, reply - character(LEN=:), allocatable :: message + character(LEN=:), allocatable :: fmt, message, reply if (sending) then print *, irp_here, ': sending is true' @@ -1204,19 +1248,21 @@ integer function zmq_delete_tasks_async_send(zmq_to_qp_run_socket,task_id,n_task sending = .True. zmq_delete_tasks_async_send = 0 - allocate(character(LEN=64+n_tasks*12) :: message) + allocate(character(LEN=64+n_tasks*12) :: message, fmt, reply) + !$OMP CRITICAL write(fmt,*) '(A,1X,A,1X,', n_tasks, '(I11,1X))' write(message,*) 'del_task '//trim(zmq_state), (task_id(k), k=1,n_tasks) + !$OMP END CRITICAL rc = f77_zmq_send(zmq_to_qp_run_socket,trim(message),len(trim(message)),0) if (rc /= len(trim(message))) then zmq_delete_tasks_async_send = -1 - deallocate(message) + deallocate(message,fmt,reply) return endif - deallocate(message) + deallocate(message,fmt,reply) end @@ -1232,7 +1278,8 @@ integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,more,sending) integer, intent(out) :: more logical, intent(inout) :: sending integer :: rc - character*(64) :: reply + character(LEN=:), allocatable :: reply + allocate(character(LEN=64) :: reply) zmq_delete_tasks_async_recv = 0 if (.not.sending) return @@ -1247,6 +1294,7 @@ integer function zmq_delete_tasks_async_recv(zmq_to_qp_run_socket,more,sending) else zmq_delete_tasks_async_recv = -1 endif + deallocate(reply) sending = .False. end