From ff05b132599a17134b1a75df0bd77541d4f5ff43 Mon Sep 17 00:00:00 2001 From: Anthony Scemama Date: Tue, 28 Feb 2017 14:46:32 +0100 Subject: [PATCH] Update tasks --- ocaml/TaskServer.ml | 22 ++++++++++++++++------ plugins/Full_CI_ZMQ/zmq_selection.irp.f | 8 +++++--- src/Davidson/davidson_parallel.irp.f | 5 ++--- src/ZMQ/utils.irp.f | 14 ++++++++++---- 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/ocaml/TaskServer.ml b/ocaml/TaskServer.ml index 7013b671..6bfdc50e 100644 --- a/ocaml/TaskServer.ml +++ b/ocaml/TaskServer.ml @@ -341,10 +341,15 @@ let add_task msg program_state rep_socket = in let result = + let new_queue, new_bar = + List.fold ~f:(fun (queue, bar) task -> + Queuing_system.add_task ~task queue, + increment_progress_bar bar) + ~init:(program_state.queue, program_state.progress_bar) tasks + in { program_state with - queue = List.fold ~f:(fun queue task -> Queuing_system.add_task ~task queue) - ~init:program_state.queue tasks ; - progress_bar = increment_progress_bar program_state.progress_bar ; + queue = new_queue; + progress_bar = new_bar } in reply_ok rep_socket; @@ -418,11 +423,16 @@ let task_done msg program_state rep_socket = program_state and success () = + let new_queue, new_bar = + List.fold ~f:(fun (queue, bar) task_id -> + Queuing_system.end_task ~task_id ~client_id queue, + increment_progress_bar bar) + ~init:(program_state.queue, program_state.progress_bar) task_ids + in let result = { program_state with - queue = List.fold ~f:(fun queue task_id -> Queuing_system.end_task ~task_id - ~client_id queue) ~init:program_state.queue task_ids ; - progress_bar = increment_progress_bar program_state.progress_bar ; + queue = new_queue; + progress_bar = new_bar } in reply_ok rep_socket; diff --git a/plugins/Full_CI_ZMQ/zmq_selection.irp.f b/plugins/Full_CI_ZMQ/zmq_selection.irp.f index 838af9ef..a5f38691 100644 --- a/plugins/Full_CI_ZMQ/zmq_selection.irp.f +++ b/plugins/Full_CI_ZMQ/zmq_selection.irp.f @@ -4,7 +4,6 @@ subroutine ZMQ_selection(N_in, pt2) implicit none - character*(512) :: task integer(ZMQ_PTR) :: zmq_to_qp_run_socket integer, intent(in) :: N_in type(selection_buffer) :: b @@ -28,14 +27,17 @@ subroutine ZMQ_selection(N_in, pt2) integer :: i_generator, i_generator_start, i_generator_max, step ! step = int(max(1.,10*elec_num/mo_tot_num) + character(len=:), allocatable :: task + allocate(character(len=32*N_det_generators) :: task) step = int(5000000.d0 / dble(N_int * N_states * elec_num * elec_num * mo_tot_num * mo_tot_num )) step = max(1,step) do i= 1, N_det_generators,step i_generator_start = i i_generator_max = min(i+step-1,N_det_generators) - write(task,*) i_generator_start, i_generator_max, 1, N - call add_task_to_taskserver(zmq_to_qp_run_socket,task) + write(task(32*(i-1)+1:32*i),'(I9,X,I9,X,''1'',X,I9,''|'')') i_generator_start, i_generator_max, N end do + call add_task_to_taskserver(zmq_to_qp_run_socket,task) + deallocate(task) !$OMP PARALLEL DEFAULT(shared) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1) i = omp_get_thread_num() diff --git a/src/Davidson/davidson_parallel.irp.f b/src/Davidson/davidson_parallel.irp.f index 04b0cc52..5387ff5b 100644 --- a/src/Davidson/davidson_parallel.irp.f +++ b/src/Davidson/davidson_parallel.irp.f @@ -184,8 +184,7 @@ subroutine davidson_add_task(zmq_to_qp_run_socket, blockb, blockb2, istep) integer ,intent(in) :: blockb, blockb2, istep character*(512) :: task - - write(task,*) blockb, blockb2, istep + write(task,'(3(I9,X))') blockb, blockb2, istep call add_task_to_taskserver(zmq_to_qp_run_socket, task) end subroutine @@ -267,7 +266,7 @@ subroutine davidson_slave_work(zmq_to_qp_run_socket, zmq_socket_push, worker_id) do call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) if(task_id == 0) exit - read (task,*) blockb, blockb2, istep + read (task,'(3(I9,X))') blockb, blockb2, istep bs = shortcut_(blockb+1,1) - shortcut_(blockb, 1) do i=blockb, shortcut_(0,2), shortcut_(0,1) do j=i, min(i, shortcut_(0,2)) diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index 9e28aff5..aff2707a 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -684,10 +684,12 @@ subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task) character*(*), intent(in) :: task integer :: rc, sze - character*(512) :: message + character(len=:), allocatable :: message + + sze = len(trim(task))+12+len(trim(zmq_state)) + allocate(character(len=sze) :: message) write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) - sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc /= sze) then print *, rc, sze @@ -701,6 +703,7 @@ subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task) print *, 'Unable to add the next task' stop -1 endif + deallocate(message) end @@ -714,16 +717,19 @@ subroutine add_task_to_taskserver_send(zmq_to_qp_run_socket,task) character*(*), intent(in) :: task integer :: rc, sze - character*(512) :: message + character(len=:), allocatable :: message + + sze = len(trim(task))+12+len(trim(zmq_state)) + allocate(character(len=sze) :: message) write(message,*) 'add_task '//trim(zmq_state)//' '//trim(task) - sze = len(trim(message)) rc = f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0) if (rc /= sze) then print *, rc, sze print *, irp_here,': f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)' stop 'error' endif + deallocate(message) end