10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-11-03 20:54:00 +01:00

Massively parallel selection

This commit is contained in:
Yann Garniron 2016-08-01 16:05:40 +02:00
parent b3df66cd89
commit 8da0509c5e
10 changed files with 206 additions and 164 deletions

View File

@ -541,6 +541,9 @@ type t =
| Terminate of Terminate_msg.t
| Ok of Ok_msg.t
| Error of Error_msg.t
| SetStopped
| SetWaiting
| SetRunning
let of_string s =
@ -577,10 +580,11 @@ let of_string s =
| "put_psi" :: client_id :: n_state :: n_det :: psi_det_size :: [] ->
PutPsi (PutPsi_msg.create ~client_id ~n_state ~n_det ~psi_det_size ~n_det_generators:None
~n_det_selectors:None ~psi_det:None ~psi_coef:None ~energy:None)
| "ok" :: [] ->
Ok (Ok_msg.create ())
| "error" :: rest ->
Error (Error_msg.create (String.concat ~sep:" " rest))
| "ok" :: [] -> Ok (Ok_msg.create ())
| "error" :: rest -> Error (Error_msg.create (String.concat ~sep:" " rest))
| "set_stopped" :: [] -> SetStopped
| "set_running" :: [] -> SetRunning
| "set_waiting" :: [] -> SetWaiting
| _ -> failwith "Message not understood"
@ -605,6 +609,9 @@ let to_string = function
| Error x -> Error_msg.to_string x
| PutPsi x -> PutPsi_msg.to_string x
| GetPsiReply x -> GetPsiReply_msg.to_string x
| SetStopped -> "set_stopped"
| SetRunning -> "set_running"
| SetWaiting -> "set_waiting"
let to_string_list = function

View File

@ -160,10 +160,30 @@ let new_job msg program_state rep_socket pair_socket =
}
in
reply_ok rep_socket;
string_of_pub_state (Running (Message.State.to_string state))
string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ;
result
let change_pub_state msg program_state rep_socket pair_socket =
let msg =
match msg with
| `Waiting -> Waiting
| `Stopped -> Stopped
| `Running ->
begin
let state =
match program_state.state with
| Some x -> x
| None -> failwith "Trying to change pub state while no job is ready"
in
Running (Message.State.to_string state)
end
in
reply_ok rep_socket;
string_of_pub_state msg
|> ZMQ.Socket.send pair_socket ;
program_state
let end_job msg program_state rep_socket pair_socket =
@ -531,6 +551,9 @@ let get_psi msg program_state rep_socket =
let terminate program_state rep_socket =
reply_ok rep_socket;
{ program_state with
psi = None;
address_tcp = None;
address_inproc = None;
running = false
}
@ -685,6 +708,9 @@ let run ~port =
| None , Message.Newjob x -> new_job x program_state rep_socket pair_socket
| _ , Message.Newjob _ -> error "A job is already running" program_state rep_socket
| Some _, Message.Endjob x -> end_job x program_state rep_socket pair_socket
| Some _, Message.SetRunning -> change_pub_state `Running program_state rep_socket pair_socket
| _, Message.SetWaiting -> change_pub_state `Waiting program_state rep_socket pair_socket
| _, Message.SetStopped -> change_pub_state `Stopped program_state rep_socket pair_socket
| None , _ -> error "No job is running" program_state rep_socket
| Some _, Message.Connect x -> connect x program_state rep_socket
| Some _, Message.Disconnect x -> disconnect x program_state rep_socket

View File

@ -58,6 +58,4 @@ subroutine run_wf
i = omp_get_thread_num()
call H_apply_FCI_PT2_slave_tcp(i)
!$OMP END PARALLEL
end

View File

@ -1,5 +1,3 @@
program fci_zmq
implicit none
integer :: i,k
@ -7,9 +5,7 @@ program fci_zmq
double precision, allocatable :: pt2(:), norm_pert(:), H_pert_diag(:)
integer :: N_st, degree
integer :: it, mit(0:6)
mit = (/1, 246, 1600, 17528, 112067, 519459, 2685970/)
it = 0
integer(bit_kind) :: chk
N_st = N_states
allocate (pt2(N_st), norm_pert(N_st),H_pert_diag(N_st))
@ -39,20 +35,12 @@ program fci_zmq
integer :: n_det_before
print*,'Beginning the selection ...'
E_CI_before = CI_energy
do while (N_det < N_det_max.and.maxval(abs(pt2(1:N_st))) > pt2_max)
n_det_before = N_det
! call H_apply_FCI(pt2, norm_pert, H_pert_diag, N_st)
it += 1
if(it > 6) stop
call ZMQ_selection(mit(it) - mit(it-1), pt2) ! max(1000-N_det, N_det), pt2)
call ZMQ_selection(max(1024-N_det, N_det), pt2)
!do i=1, N_det
!if(popcnt(psi_det(1,1,i)) + popcnt(psi_det(2,1,i)) /= 23) stop "ZZ1" -2099.2504682049275
!if(popcnt(psi_det(1,2,i)) + popcnt(psi_det(2,2,i)) /= 23) stop "ZZ2"
! do k=1,i-1
! if(detEq(psi_det(1,1,i), psi_det(1,1,k), N_int)) stop "ATRRGRZER"
! end do
!end do
PROVIDE psi_coef
PROVIDE psi_det
PROVIDE psi_det_sorted
@ -65,6 +53,14 @@ program fci_zmq
endif
call diagonalize_CI
call save_wavefunction
! chk = 0_8
! do i=1, N_det
! do k=1, N_int
! chk = xor(psi_det(k,1,i), chk)
! chk = xor(psi_det(k,2,i), chk)
! end do
! end do
! print *, "CHK ", chk
print *, 'N_det = ', N_det
print *, 'N_states = ', N_states
@ -128,18 +124,20 @@ subroutine ZMQ_selection(N, pt2)
integer :: i
integer, external :: omp_get_thread_num
double precision, intent(out) :: pt2(N_states)
!call flip_generators()
call new_parallel_job(zmq_to_qp_run_socket,'selection')
provide nproc
provide ci_electronic_energy
call new_parallel_job(zmq_to_qp_run_socket,"selection")
call zmq_put_psi(zmq_to_qp_run_socket,1,ci_electronic_energy,size(ci_electronic_energy))
call zmq_set_running(zmq_to_qp_run_socket)
call create_selection_buffer(N, N*2, b)
do i= N_det_generators, 1, -1
write(task,*) i, N
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
end do
provide nproc
provide ci_electronic_energy
!$OMP PARALLEL DEFAULT(none) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1)
!$OMP PARALLEL DEFAULT(none) SHARED(b, pt2) PRIVATE(i) NUM_THREADS(nproc+1) shared(ci_electronic_energy_is_built, n_det_generators_is_built, n_states_is_built, n_int_is_built, nproc_is_built)
i = omp_get_thread_num()
if (i==0) then
call selection_collector(b, pt2)
@ -148,125 +146,15 @@ subroutine ZMQ_selection(N, pt2)
endif
!$OMP END PARALLEL
call end_parallel_job(zmq_to_qp_run_socket, 'selection')
!call flip_generators()
call fill_H_apply_buffer_no_selection(b%cur,b%det,N_int,0) !!! PAS DE ROBIN
call copy_H_apply_buffer_to_wf()
end subroutine
subroutine selection_dressing_slave_tcp(i)
implicit none
integer, intent(in) :: i
call selection_slave(0,i)
end
subroutine selection_dressing_slave_inproc(i)
implicit none
integer, intent(in) :: i
call selection_slave(1,i)
call selection_slaved(1,i)
end
! subroutine ZMQ_selection()
! use f77_zmq
! implicit none
! BEGIN_DOC
! ! Massively parallel Full-CI
! END_DOC
!
! integer :: i,ithread
! integer(ZMQ_PTR) :: zmq_socket_push
! integer(ZMQ_PTR), external :: new_zmq_push_socket
! zmq_context = f77_zmq_ctx_new ()
! PROVIDE H_apply_buffer_allocated
!
! PROVIDE ci_electronic_energy
! PROVIDE nproc
! !$OMP PARALLEL PRIVATE(i,ithread,zmq_socket_push) num_threads(nproc+1)
! ithread = omp_get_thread_num()
! if (ithread == 0) then
! call receive_selected_determinants()
! else
! zmq_socket_push = new_zmq_push_socket(1)
!
! do i=ithread,N_det_generators,nproc
! print *, i, "/", N_det_generators
! call select_connected(i, max(100, N_det), ci_electronic_energy,zmq_socket_push)
! enddo
!
! if (ithread == 1) then
! integer :: rc
! rc = f77_zmq_send(zmq_socket_push,0,1,0)
! if (rc /= 1) then
! stop 'Error sending termination signal'
! endif
! endif
! call end_zmq_push_socket(zmq_socket_push, 1)
! endif
! !$OMP END PARALLEL
! call copy_H_apply_buffer_to_wf()
! end
! program Full_CI_ZMQ
! use f77_zmq
! implicit none
! BEGIN_DOC
! ! Massively parallel Full-CI
! END_DOC
!
! integer :: i,ithread
!
! integer(ZMQ_PTR) :: zmq_socket_push
! integer(ZMQ_PTR), external :: new_zmq_push_socket
! zmq_context = f77_zmq_ctx_new ()
! PROVIDE H_apply_buffer_allocated
!
! do while (N_det < N_det_max)
!
! PROVIDE ci_electronic_energy
! PROVIDE nproc
! !$OMP PARALLEL PRIVATE(i,ithread,zmq_socket_push) num_threads(nproc+1)
! ithread = omp_get_thread_num()
! if (ithread == 0) then
! call receive_selected_determinants()
! else
! zmq_socket_push = new_zmq_push_socket(0)
!
! do i=ithread,N_det_generators,nproc
! print *, i , "/", N_det_generators
! call select_connected(i, 1.d-7, ci_electronic_energy,zmq_socket_push)
! enddo
! print *, "END .... "
!
! if (ithread == 1) then
! integer :: rc
! rc = f77_zmq_send(zmq_socket_push,0,1,0)
! if (rc /= 1) then
! stop 'Error sending termination signal'
! endif
! endif
! call end_zmq_push_socket(zmq_socket_push, 0)
! endif
! !$OMP END PARALLEL
! call copy_H_apply_buffer_to_wf()
! call diagonalize_CI()
! call save_wavefunction()
! end do
!
! end

View File

@ -13,7 +13,7 @@ BEGIN_PROVIDER [ double precision, integral8, (mo_tot_num, mo_tot_num, mo_tot_n
END_PROVIDER
subroutine selection_slave(thread,iproc)
subroutine selection_slaved(thread,iproc)
use f77_zmq
use selection_types
implicit none
@ -37,7 +37,13 @@ subroutine selection_slave(thread,iproc)
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
if(worker_id == -1) then
print *, "WORKER -1"
!call disconnect_from_taskserver(zmq_to_qp_run_socket,zmq_socket_push,worker_id)
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
return
end if
buf%N = 0
ctask = 1
pt2 = 0d0
@ -53,7 +59,9 @@ subroutine selection_slave(thread,iproc)
else
if(N /= buf%N) stop "N changed... wtf man??"
end if
call select_connected(i_generator,ci_electronic_energy,pt2,buf) !! ci_electronic_energy ??
!print *, "psi_selectors_coef ", psi_selectors_coef(N_det_selectors-5:N_det_selectors, 1)
!call debug_det(psi_selectors(1,1,N_det_selectors), N_int)
call select_connected(i_generator,ci_electronic_energy,pt2,buf)
end if
if(done) ctask = ctask - 1
@ -160,7 +168,6 @@ subroutine select_connected(i_generator,E0,pt2,b)
integer(bit_kind) :: hole_mask(N_int,2), particle_mask(N_int,2)
double precision :: fock_diag_tmp(2,mo_tot_num+1)
call build_fock_tmp(fock_diag_tmp,psi_det_generators(1,1,i_generator),N_int)
do l=1,N_generators_bitmask

View File

@ -0,0 +1,83 @@
program selection_slave
implicit none
BEGIN_DOC
! Helper program to compute the PT2 in distributed mode.
END_DOC
read_wf = .False.
SOFT_TOUCH read_wf
call provide_everything
call switch_qp_run_to_master
call run_wf
end
subroutine provide_everything
PROVIDE H_apply_buffer_allocated mo_bielec_integrals_in_map psi_det_generators psi_coef_generators psi_det_sorted_bit psi_selectors n_det_generators n_states generators_bitmask zmq_context
! PROVIDE ci_electronic_energy mo_tot_num N_int
end
subroutine run_wf
use f77_zmq
implicit none
integer(ZMQ_PTR), external :: new_zmq_to_qp_run_socket
integer(ZMQ_PTR) :: zmq_to_qp_run_socket
double precision :: energy(N_states_diag)
character*(64) :: state
integer :: oki
oki = 0
call provide_everything
zmq_context = f77_zmq_ctx_new ()
zmq_to_qp_run_socket = new_zmq_to_qp_run_socket()
do
call wait_for_state("selection", zmq_state)
if(trim(zmq_state) /= "selection") exit
if(oki < 0) then
oki += 1
cycle
end if
oki = 0
print *, 'Getting wave function'
call zmq_get_psi(zmq_to_qp_run_socket,1,energy,size(energy))
integer :: j,k
do j=1,N_states_diag
do k=1,N_det
CI_eigenvectors(k,j) = psi_coef(k,j)
enddo
call get_s2_u0(psi_det,CI_eigenvectors(1,j),N_det,size(CI_eigenvectors,1),CI_eigenvectors_s2(j))
enddo
if (.True.) then
do k=1,size(ci_electronic_energy)
ci_electronic_energy(k) = energy(k)
enddo
SOFT_TOUCH ci_electronic_energy CI_eigenvectors_s2 CI_eigenvectors
endif
call write_double(6,ci_energy,'Energy')
!zmq_state = 'selection'
integer :: rc, i
print *, 'Selection slave running'
!$OMP PARALLEL PRIVATE(i)
i = omp_get_thread_num()
call selection_dressing_slave_tcp(i)
!$OMP END PARALLEL
end do
end
subroutine selection_dressing_slave_tcp(i)
implicit none
integer, intent(in) :: i
call selection_slaved(0,i)
end

View File

@ -79,7 +79,7 @@ subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id, energy, size_energy)
integer :: N_states_read, N_det_read, psi_det_size_read
integer :: N_det_selectors_read, N_det_generators_read
read(msg(14:rc),*) rc, N_states_read, N_det_read, psi_det_size_read, &
N_det_selectors_read, N_det_generators_read
N_det_generators_read, N_det_selectors_read
if (rc /= worker_id) then
print *, 'Wrong worker ID'
stop 'error'

View File

@ -373,6 +373,8 @@ BEGIN_PROVIDER [ logical, ao_bielec_integrals_in_map ]
call add_task_to_taskserver(zmq_to_qp_run_socket,task)
enddo
call zmq_set_running(zmq_to_qp_run_socket)
PROVIDE nproc
!$OMP PARALLEL DEFAULT(private) num_threads(nproc+1)
i = omp_get_thread_num()

View File

@ -143,11 +143,6 @@ function new_zmq_to_qp_run_socket()
stop 'Unable to create zmq req socket'
endif
rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0)))
if (rc /= 0) then
stop 'Unable to connect new_zmq_to_qp_run_socket'
endif
rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 120000, 4)
if (rc /= 0) then
stop 'Unable to set send timout in new_zmq_to_qp_run_socket'
@ -158,6 +153,11 @@ function new_zmq_to_qp_run_socket()
stop 'Unable to set recv timout in new_zmq_to_qp_run_socket'
endif
rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0)))
if (rc /= 0) then
stop 'Unable to connect new_zmq_to_qp_run_socket'
endif
end
@ -182,18 +182,6 @@ function new_zmq_pair_socket(bind)
stop 'Unable to create zmq pair socket'
endif
if (bind) then
rc = f77_zmq_bind(new_zmq_pair_socket,zmq_socket_pair_inproc_address)
if (rc /= 0) then
print *, 'f77_zmq_bind(new_zmq_pair_socket, zmq_socket_pair_inproc_address)'
stop 'error'
endif
else
rc = f77_zmq_connect(new_zmq_pair_socket,zmq_socket_pair_inproc_address)
if (rc /= 0) then
stop 'Unable to connect new_zmq_pair_socket'
endif
endif
rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_SNDHWM, 1, 4)
if (rc /= 0) then
@ -215,6 +203,19 @@ function new_zmq_pair_socket(bind)
stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_LINGER, 60000, 4)'
endif
if (bind) then
rc = f77_zmq_bind(new_zmq_pair_socket,zmq_socket_pair_inproc_address)
if (rc /= 0) then
print *, 'f77_zmq_bind(new_zmq_pair_socket, zmq_socket_pair_inproc_address)'
stop 'error'
endif
else
rc = f77_zmq_connect(new_zmq_pair_socket,zmq_socket_pair_inproc_address)
if (rc /= 0) then
stop 'Unable to connect new_zmq_pair_socket'
endif
endif
end
@ -535,6 +536,34 @@ subroutine new_parallel_job(zmq_to_qp_run_socket,name_in)
end
subroutine zmq_set_running(zmq_to_qp_run_socket)
use f77_zmq
implicit none
BEGIN_DOC
! Set the job to Running in QP-run
END_DOC
integer(ZMQ_PTR), intent(in) :: zmq_to_qp_run_socket
character*(512) :: message
integer :: rc, sze
message = 'set_running'
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket,message,sze,0)
if (rc /= sze) then
print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket,message,sze,0)'
stop 'error'
endif
rc = f77_zmq_recv(zmq_to_qp_run_socket,message,510,0)
message = trim(message(1:rc))
if (message(1:2) /= 'ok') then
print *, 'Unable to set qp_run to Running'
stop 1
endif
end
subroutine end_parallel_job(zmq_to_qp_run_socket,name_in)
use f77_zmq
@ -584,7 +613,6 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
character*(512) :: message
character*(128) :: reply, state, address
integer :: rc
if (thread == 1) then
rc = f77_zmq_send(zmq_to_qp_run_socket, "connect inproc", 14, 0)
if (rc /= 14) then
@ -601,6 +629,10 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
if(message(1:5) == "error") then
worker_id = -1
return
end if
read(message,*) reply, state, worker_id, address
if ( (trim(reply) /= 'connect_reply') .and. &
(trim(state) /= trim(zmq_state)) ) then
@ -609,7 +641,6 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
print *, 'Address: ', trim(address)
stop -1
endif
end
subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
@ -842,7 +873,7 @@ subroutine wait_for_state(state_wait,state)
zmq_socket_sub = new_zmq_sub_socket()
state = "Waiting"
do while (state /= state_wait .and. state /= "Stopped")
do while (trim(state) /= trim(state_wait) .and. trim(state) /= "Stopped")
rc = f77_zmq_recv( zmq_socket_sub, state, 64, 0)
if (rc > 0) then
state = trim(state(1:rc))