diff --git a/plugins/dress_zmq/dress_stoch_routines.irp.f b/plugins/dress_zmq/dress_stoch_routines.irp.f index ad58aa5c..d26a4d8c 100644 --- a/plugins/dress_zmq/dress_stoch_routines.irp.f +++ b/plugins/dress_zmq/dress_stoch_routines.irp.f @@ -232,8 +232,10 @@ subroutine dress_collector(zmq_socket_pull, E, relative_error, delta, delta_s2, if(cur_cp == -1) then + !print *, "TASK DEL", task_id call dress_pulled(ind, int_buf, double_buf, det_buf, N_buf) if (zmq_delete_tasks(zmq_to_qp_run_socket,zmq_socket_pull,task_id,1,more) == -1) then + print *, "TASK ID", task_id stop 'Unable to delete tasks' endif !if(more == 0) stop 'loop = .false.' !!!!!!!!!!!!!!!! diff --git a/plugins/dress_zmq/run_dress_slave.irp.f b/plugins/dress_zmq/run_dress_slave.irp.f index 248b7d34..84d9af6c 100644 --- a/plugins/dress_zmq/run_dress_slave.irp.f +++ b/plugins/dress_zmq/run_dress_slave.irp.f @@ -75,126 +75,113 @@ subroutine run_dress_slave(thread,iproce,energy) integer :: iproc, cur_cp, done_for(0:N_cp) integer, allocatable :: tasks(:) - logical :: loop, donedone - integer :: res_task(Nproc), res_gen(Nproc), res_sub(Nproc) - res_gen = 0 - - donedone = .false. - allocate(tasks(0:N_det)) + integer :: lastCp(Nproc) + integer :: lastSent, lastSendable + logical :: send + lastCp = 0 + lastSent = 0 + send = .false. done_for = 0 - do cur_cp=0, N_cp - if(donedone) exit - print *, "DOING CP", cur_cp - tasks(0) = 0 - !$OMP PARALLEL DEFAULT(SHARED) & - !$OMP PRIVATE(int_buf, double_buf, det_buf, delta_ij_loc, task, task_id) & - !$OMP PRIVATE(toothMwen, fracted, fac) & - !$OMP PRIVATE(loop, i_generator, subset, iproc, N_buf) - iproc = omp_get_thread_num()+1 - loop = .true. - allocate(int_buf(N_dress_int_buffer)) - allocate(double_buf(N_dress_double_buffer)) - allocate(det_buf(N_int, 2, N_dress_det_buffer)) - allocate(delta_ij_loc(N_states,N_det,2)) - do while(loop) - if(res_gen(iproc) == 0) then - !$OMP CRITICAL - call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) - !$OMP END CRITICAL - task = task//" 0" - if(task_id == 0) then - donedone = .true. - print *, "DONEDONE" - exit !! LAST MESSAGE ??? - end if - read (task,*) subset, i_generator - else - subset = res_sub(iproc) - i_generator = res_gen(iproc) - task_id = res_task(iproc) - res_gen(iproc) = 0 - end if - - !if(done_cp_at_det(i_generator) > cur_cp) loop = .false. - if(done_cp_at_det(i_generator) > cur_cp) then - res_gen(iproc) = i_generator - res_task(iproc) = task_id - res_sub(iproc) = subset - exit - end if - - !$OMP ATOMIC - done_for(done_cp_at_det(i_generator)) += 1 - - delta_ij_loc(:,:,:) = 0d0 - call generator_start(i_generator, iproc) - call alpha_callback(delta_ij_loc, i_generator, subset, iproc) - call generator_done(i_generator, int_buf, double_buf, det_buf, N_buf, iproc) - - !if(.false.) then - !$OMP CRITICAL - do i=1,N_cp - fac = cps(i_generator, i) * dress_weight_inv(i_generator) * comb_step - if(fac == 0d0) cycle - cp(:,:,i,1) += (delta_ij_loc(:,:,1) * fac) - cp(:,:,i,2) += (delta_ij_loc(:,:,2) * fac) - end do - - - toothMwen = tooth_of_det(i_generator) - fracted = (toothMwen /= 0) - if(fracted) fracted = (i_generator == first_det_of_teeth(toothMwen)) - if(fracted) then - delta_det(:,:,toothMwen-1, 1) += delta_ij_loc(:,:,1) * (1d0-fractage(toothMwen)) - delta_det(:,:,toothMwen-1, 2) += delta_ij_loc(:,:,2) * (1d0-fractage(toothMwen)) - delta_det(:,:,toothMwen , 1) += delta_ij_loc(:,:,1) * (fractage(toothMwen)) - delta_det(:,:,toothMwen , 2) += delta_ij_loc(:,:,2) * (fractage(toothMwen)) - else - delta_det(:,:,toothMwen , 1) += delta_ij_loc(:,:,1) - delta_det(:,:,toothMwen , 2) += delta_ij_loc(:,:,2) - end if - - - !$OMP END CRITICAL - !end if - - !$OMP CRITICAL - call push_dress_results(zmq_socket_push, i_generator, -1, delta_ij_loc, int_buf, double_buf, det_buf, N_buf, task_id) - call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) - !$OMP END CRITICAL - tasks(0) += 1 - tasks(tasks(0)) = task_id - + !$OMP PARALLEL DEFAULT(SHARED) & + !$OMP PRIVATE(int_buf, double_buf, det_buf, delta_ij_loc, task, task_id) & + !$OMP PRIVATE(toothMwen, fracted, fac) & + !$OMP PRIVATE(send, i_generator, subset, iproc, N_buf) + iproc = omp_get_thread_num()+1 + allocate(int_buf(N_dress_int_buffer)) + allocate(double_buf(N_dress_double_buffer)) + allocate(det_buf(N_int, 2, N_dress_det_buffer)) + allocate(delta_ij_loc(N_states,N_det,2)) + do + !$OMP CRITICAL (SENDAGE) + call get_task_from_taskserver(zmq_to_qp_run_socket,worker_id, task_id, task) + !$OMP END CRITICAL (SENDAGE) + task = task//" 0" + if(task_id == 0) then + print *, "DONEDONE" + exit !! LAST MESSAGE ??? + end if + read (task,*) subset, i_generator + + + if(done_cp_at_det(i_generator) < lastCp(iproc)) stop 'loop = .false.' + !$OMP CRITICAL + send = .false. + lastSendable = N_cp*2 + do i=1,Nproc + lastSendable = min(lastCp(iproc), lastSendable) end do - print *, "SLAVE", iproc, "waits" - deallocate(int_buf,double_buf,det_buf,delta_ij_loc) - !$OMP END PARALLEL - - allocate(delta_ij_loc(N_states,N_det,2)) - allocate(int_buf(1), double_buf(1), det_buf(1,1,1)) - N_buf = (/0,1,0/) - - delta_ij_loc = 0d0 - - if(cur_cp > 0) then + lastSendable -= 1 + if(lastSendable > lastSent) then + lastSent = lastSendable + send = .true. + end if + !$OMP END CRITICAL + + if(send) then + !$OMP CRITICAL + N_buf = (/0,1,0/) + + delta_ij_loc = 0d0 + cur_cp = lastSent + if(cur_cp < 1) stop "cur_cp < 1" do i=1,cur_cp delta_ij_loc(:,:,:) += cp(:,:,i,:) - !delta_s2(:,:) += cp(:,:,i,2) end do delta_ij_loc(:,:,:) = delta_ij_loc(:,:,:) / cps_N(cur_cp) do i=cp_first_tooth(cur_cp)-1,0,-1 - delta_ij_loc(:,:,:) = delta_ij_loc(:,:,:) +delta_det(:,:,i,:) + delta_ij_loc(:,:,:) = delta_ij_loc(:,:,:) +delta_det(:,:,i,:) end do + !$OMP END CRITICAL + !$OMP CRITICAL (SENDAGE) + call push_dress_results(zmq_socket_push, done_for(cur_cp), cur_cp, delta_ij_loc, int_buf, double_buf, det_buf, N_buf, -1) + !$OMP END CRITICAL (SENDAGE) end if - call sleep(1) - call push_dress_results(zmq_socket_push, done_for(cur_cp), cur_cp, delta_ij_loc, int_buf, double_buf, det_buf, N_buf, -1) - !do i=1,tasks(0) - ! call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,tasks(i)) - !end do - deallocate(delta_ij_loc, int_buf, double_buf, det_buf) + + + !$OMP ATOMIC + done_for(done_cp_at_det(i_generator)) += 1 + + delta_ij_loc(:,:,:) = 0d0 + call generator_start(i_generator, iproc) + call alpha_callback(delta_ij_loc, i_generator, subset, iproc) + call generator_done(i_generator, int_buf, double_buf, det_buf, N_buf, iproc) + + !if(.false.) then + !$OMP CRITICAL + do i=1,N_cp + fac = cps(i_generator, i) * dress_weight_inv(i_generator) * comb_step + if(fac == 0d0) cycle + cp(:,:,i,1) += (delta_ij_loc(:,:,1) * fac) + cp(:,:,i,2) += (delta_ij_loc(:,:,2) * fac) + end do + + + toothMwen = tooth_of_det(i_generator) + fracted = (toothMwen /= 0) + if(fracted) fracted = (i_generator == first_det_of_teeth(toothMwen)) + if(fracted) then + delta_det(:,:,toothMwen-1, 1) += delta_ij_loc(:,:,1) * (1d0-fractage(toothMwen)) + delta_det(:,:,toothMwen-1, 2) += delta_ij_loc(:,:,2) * (1d0-fractage(toothMwen)) + delta_det(:,:,toothMwen , 1) += delta_ij_loc(:,:,1) * (fractage(toothMwen)) + delta_det(:,:,toothMwen , 2) += delta_ij_loc(:,:,2) * (fractage(toothMwen)) + else + delta_det(:,:,toothMwen , 1) += delta_ij_loc(:,:,1) + delta_det(:,:,toothMwen , 2) += delta_ij_loc(:,:,2) + end if + + + !$OMP END CRITICAL + !end if + + !$OMP CRITICAL (SENDAGE) + call push_dress_results(zmq_socket_push, i_generator, -1, delta_ij_loc, int_buf, double_buf, det_buf, N_buf, task_id) + call task_done_to_taskserver(zmq_to_qp_run_socket,worker_id,task_id) + !$OMP END CRITICAL (SENDAGE) + lastCp(iproc) = done_cp_at_det(i_generator) end do + !$OMP END PARALLEL call sleep(10) call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)