10
0
mirror of https://github.com/LCPQ/quantum_package synced 2024-11-03 20:54:00 +01:00

Parallelism OK

This commit is contained in:
Anthony Scemama 2017-11-28 15:51:21 +01:00
parent c476aa1159
commit d680c85b8c
5 changed files with 75 additions and 57 deletions

View File

@ -188,6 +188,9 @@ let change_pub_state msg program_state rep_socket pair_socket =
program_state
let force_state =
Message.State.of_string "force"
let end_job msg program_state rep_socket pair_socket =
let failure () =
@ -202,7 +205,7 @@ let end_job msg program_state rep_socket pair_socket =
}
and wait n =
Printf.sprintf "waiting %d" n
Printf.sprintf "waiting for %d slaves..." n
|> Message.Error_msg.create
|> Message.Error_msg.to_string
|> ZMQ.Socket.send rep_socket ;
@ -213,7 +216,13 @@ let end_job msg program_state rep_socket pair_socket =
| None -> failure ()
| Some state ->
begin
if (msg.Message.Endjob_msg.state = state) then
if (state = force_state) then
begin
string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ;
success ()
end
else if (msg.Message.Endjob_msg.state = state) then
begin
string_of_pub_state Waiting
|> ZMQ.Socket.send pair_socket ;
@ -223,7 +232,10 @@ let end_job msg program_state rep_socket pair_socket =
wait (Queuing_system.number_of_clients program_state.queue)
end
else
(
Printf.eprintf "STATE:%s%!" (Message.State.to_string state);
failure ()
)
end

View File

@ -70,6 +70,7 @@ let run slave exe ezfio_file =
(** Check input *)
if (not slave) then
begin
match (Sys.command ("qp_edit -c "^ezfio_file)) with
| 0 -> ()

View File

@ -36,7 +36,6 @@ subroutine davidson_run_slave(thread,iproc)
zmq_socket_push = new_zmq_push_socket(thread)
call connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
if(worker_id == -1) then
print *, 'Exited'
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_push_socket(zmq_socket_push,thread)
return

View File

@ -175,41 +175,10 @@ subroutine zmq_get_psi(zmq_to_qp_run_socket, worker_id)
call zmq_get_N_states(zmq_to_qp_run_socket, worker_id)
call zmq_get_N_det(zmq_to_qp_run_socket, worker_id)
call zmq_get_psi_det_size(zmq_to_qp_run_socket, worker_id)
IRP_IF MPI
include 'mpif.h'
integer :: ierr
call MPI_BCAST (N_states, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_states'
stop -1
endif
call MPI_BCAST (N_det, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast N_det'
stop -1
endif
call MPI_BCAST (psi_det_size, 1, MPI_INTEGER, 0, MPI_COMM_WORLD, ierr)
if (ierr /= MPI_SUCCESS) then
print *, irp_here//': Unable to broadcast psi_det_size'
stop -1
endif
IRP_ENDIF
TOUCH psi_det_size N_det N_states
if (mpi_master) then
call zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
call zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
endif
IRP_IF MPI
call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2)
call broadcast_chunks_double(psi_coef,N_states*N_det)
IRP_ENDIF
SOFT_TOUCH psi_det psi_coef
end
@ -247,6 +216,9 @@ subroutine zmq_get_psi_det(zmq_to_qp_run_socket, worker_id)
print *, irp_here, ': Error getting psi_det', rc8, N_int*2_8*N_det*bit_kind
stop 'error'
endif
IRP_IF MPI
call broadcast_chunks_bit_kind(psi_det,N_det*N_int*2)
IRP_ENDIF
end
@ -283,6 +255,10 @@ subroutine zmq_get_psi_coef(zmq_to_qp_run_socket, worker_id)
stop 'error'
endif
IRP_IF MPI
call broadcast_chunks_double(psi_coef,N_states*N_det)
IRP_ENDIF
end

View File

@ -32,10 +32,14 @@ END_PROVIDER
do i=len(buffer),1,-1
if ( buffer(i:i) == ':') then
qp_run_address = trim(buffer(1:i-1))
read(buffer(i+1:), *) zmq_port_start
read(buffer(i+1:), *, err=10,end=10) zmq_port_start
exit
endif
enddo
return
10 continue
print *, irp_here, ': Error in read'
stop -1
END_PROVIDER
BEGIN_PROVIDER [ character*(128), zmq_socket_pull_tcp_address ]
@ -98,12 +102,16 @@ subroutine switch_qp_run_to_master
do i=len(buffer),1,-1
if ( buffer(i:i) == ':') then
qp_run_address = trim(buffer(1:i-1))
read(buffer(i+1:), *) zmq_port_start
read(buffer(i+1:), *, end=10, err=10) zmq_port_start
exit
endif
enddo
call reset_zmq_addresses
return
10 continue
print *, irp_here, ': Error in read'
stop -1
end
@ -604,17 +612,20 @@ subroutine end_parallel_job(zmq_to_qp_run_socket,zmq_socket_pull,name_in)
stop 'Wrong end of job'
endif
do i=1,30
do i=6,1,-1
rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job '//trim(zmq_state),8+len(trim(zmq_state)),0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0)
if (trim(message(1:13)) == 'error waiting') then
print *, trim(message(6:rc))
call sleep(1)
cycle
else if (message(1:2) == 'ok') then
exit
endif
end do
if (i==0) then
rc = f77_zmq_send(zmq_to_qp_run_socket, 'end_job force',13,0)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 512, 0)
endif
zmq_state = 'No_state'
call end_zmq_to_qp_run_socket(zmq_to_qp_run_socket)
call end_zmq_pull_socket(zmq_socket_pull)
@ -659,18 +670,18 @@ subroutine connect_to_taskserver(zmq_to_qp_run_socket,worker_id,thread)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
if(message(1:5) == "error") then
print *, trim(message(1:rc))
worker_id = -1
return
end if
read(message,*) reply, state, worker_id, address
if ( (trim(reply) /= 'connect_reply') .and. &
(trim(state) /= trim(zmq_state)) ) then
print *, 'Reply: ', trim(reply)
print *, 'State: ', trim(state), '/', trim(zmq_state)
print *, 'Address: ', trim(address)
read(message,*, end=10, err=10) reply, state, worker_id, address
if (trim(reply) /= 'connect_reply') then
print *, trim(message)
stop -1
endif
return
10 continue
print *, irp_here, ': Error in read'
stop
end
subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
@ -703,10 +714,13 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 510, 0)
message = trim(message(1:rc))
read(message,*) reply, state
read(message,*, end=10, err=10) reply, state
if ((trim(reply) == 'disconnect_reply').and.(trim(state) == trim(zmq_state))) then
return
endif
if (trim(message) == 'error Wrong state') then
return
endif
if (trim(message) == 'error No job is running') then
return
endif
@ -715,6 +729,10 @@ subroutine disconnect_from_taskserver(zmq_to_qp_run_socket, &
print *, trim(message)
stop -1
return
10 continue
print *, irp_here, ': Error in read'
stop
end
subroutine add_task_to_taskserver(zmq_to_qp_run_socket,task)
@ -916,9 +934,9 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
message = repeat(' ',512)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
rc = min(1024,rc)
read(message(1:rc),*) reply
read(message(1:rc),*, end=10, err=10) reply
if (trim(reply) == 'get_task_reply') then
read(message(1:rc),*) reply, task_id
read(message(1:rc),*, end=10, err=10) reply, task_id
rc = 15
do while (message(rc:rc) == ' ')
rc += 1
@ -934,11 +952,18 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
else if (trim(message) == 'error No job is running') then
task_id = 0
task = 'terminate'
else if (trim(message) == 'error Wrong state') then
task_id = 0
task = 'terminate'
else
print *, 'Unable to get the next task'
print *, trim(message)
stop -1
endif
return
10 continue
print *, irp_here, ': Error in read'
stop
end
@ -973,7 +998,7 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
message = repeat(' ',1024)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
rc = min(1024,rc)
read(message(1:rc),*) reply
read(message(1:rc),*, end=10, err=10) reply
if (trim(message) == 'get_tasks_reply ok') then
continue
else if (trim(message) == 'terminate') then
@ -993,7 +1018,7 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
message = repeat(' ',512)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
rc = min(1024,rc)
read(message(1:rc),*) task_id(i)
read(message(1:rc),*, end=10, err=10) task_id(i)
if (task_id(i) == 0) then
task(i) = 'terminate'
n_tasks = i
@ -1010,6 +1035,11 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
task(i) = message(rc:)
enddo
return
10 continue
print *, irp_here, ': Error in read'
stop
end