diff --git a/plugins/Full_CI_ZMQ/fci_zmq.irp.f b/plugins/Full_CI_ZMQ/fci_zmq.irp.f index 0deb0ca4..e2188633 100644 --- a/plugins/Full_CI_ZMQ/fci_zmq.irp.f +++ b/plugins/Full_CI_ZMQ/fci_zmq.irp.f @@ -102,8 +102,7 @@ program fci_zmq if (N_det == N_det_max) then threshold_davidson = threshold_davidson_in - SOFT_TOUCH threshold_davidson - endif + end if call diagonalize_CI call save_wavefunction call ezfio_set_full_ci_zmq_energy(CI_energy(1)) @@ -111,7 +110,6 @@ program fci_zmq if (N_det < N_det_max) then threshold_davidson = threshold_davidson_in - SOFT_TOUCH threshold_davidson call diagonalize_CI call save_wavefunction call ezfio_set_full_ci_zmq_energy(CI_energy(1)) @@ -119,19 +117,20 @@ program fci_zmq if(do_pt2_end)then print*,'Last iteration only to compute the PT2' - !threshold_selectors = max(threshold_selectors,threshold_selectors_pt2) - !threshold_generators = max(threshold_generators,threshold_generators_pt2) - !TOUCH threshold_selectors threshold_generators - threshold_selectors = 1.d0 - threshold_generators = 1d0 E_CI_before(1:N_states) = CI_energy(1:N_states) double precision :: relative_error relative_error=1.d-3 pt2 = 0.d0 if (N_states == 1) then + threshold_selectors = 1.d0 + threshold_generators = 1d0 + SOFT_TOUCH threshold_selectors threshold_generators print *, 'Stochastic PT2' call ZMQ_pt2(E_CI_before(1), pt2,relative_error) ! Stochastic PT2 else + threshold_selectors = max(threshold_selectors,threshold_selectors_pt2) + threshold_generators = max(threshold_generators,threshold_generators_pt2) + SOFT_TOUCH threshold_selectors threshold_generators print *, 'Deterministic PT2' call ZMQ_selection(0, pt2) ! Deterministic PT2 endif diff --git a/promela/collector.pml b/promela/collector.pml new file mode 100644 index 00000000..85aae99c --- /dev/null +++ b/promela/collector.pml @@ -0,0 +1,21 @@ +proctype collector(byte state) { + + byte task; + req_message msg; + rep_message reply; + bit loop = 1; + xr pull_socket; + + do + :: (loop == 0) -> break + :: else -> + pull_socket ? task; + /* Handle result */ + send_req(DELTASK, task); + assert (reply.m == OK); + loop = reply.value; + od; + +} + + diff --git a/promela/fortran.pml b/promela/fortran.pml new file mode 100644 index 00000000..011eea75 --- /dev/null +++ b/promela/fortran.pml @@ -0,0 +1,48 @@ +active proctype fortran() { + req_message msg; + rep_message reply; + byte state; + byte count, wait; + + + /* New parallel job */ + state=1; + send_req( NEWJOB, state ); + assert (reply.m == OK); + + send_req( PUTPSI, state ); + assert (reply.m == PUTPSI_REPLY); + + /* Add tasks */ + count = 0; + do + :: (count == NTASKS) -> break; + :: else -> + count++; + send_req( ADDTASK, count ); + assert (reply.m == OK); + od + + wait = _nr_pr; + /* Run collector */ + run collector(state); + + /* Run slaves */ + count = 0; + do + :: (count == NPROC) -> break; + :: else -> count++; run slave(); + od + + /* Wait for collector and slaves to finish */ + (_nr_pr == wait); + + send_req( ENDJOB, state ); + assert (reply.m == OK); + state = reply.value; + + send_req( TERMINATE, 0); + assert (reply.m == OK); + +} + diff --git a/promela/model.pml b/promela/model.pml new file mode 100644 index 00000000..f55769d5 --- /dev/null +++ b/promela/model.pml @@ -0,0 +1,35 @@ +#define NPROC 3 +#define BUFSIZE 2 +#define NTASKS 4 + +mtype = { NONE, OK, WRONG_STATE, TERMINATE, GETPSI, PUTPSI, NEWJOB, ENDJOB, SETRUNNING, + SETWAITING, SETSTOPPED, CONNECT, DISCONNECT, ADDTASK, DELTASK, TASKDONE, GETTASK, + PSI, TASK, PUTPSI_REPLY, WAITING, RUNNING, STOPPED + } + +#define send_req( MESSAGE, VALUE ) atomic { msg.m=MESSAGE ; msg.value=VALUE ; msg.state=state; } ; rep_socket ! msg; msg.reply ? reply + +/* Request/Reply pattern */ + +typedef rep_message { + mtype m = NONE; + byte value = 0; +} + +typedef req_message { + mtype m = NONE; + byte state = 0; + byte value = 0; + chan reply = [BUFSIZE] of { rep_message }; +} + +/* Channels */ + +chan rep_socket = [NPROC] of { req_message }; +chan pull_socket = [NPROC] of { byte }; + + +#include "task_server.pml" +#include "fortran.pml" +#include "collector.pml" +#include "slave.pml" diff --git a/promela/slave.pml b/promela/slave.pml new file mode 100644 index 00000000..e6bfbe2b --- /dev/null +++ b/promela/slave.pml @@ -0,0 +1,29 @@ +proctype slave() { + req_message msg; + rep_message reply; + byte task; + byte state; + + send_req(CONNECT, 0); + assert (reply.m == OK); + state = reply.value; + + send_req(GETPSI, 0); + assert (reply.m == PSI); + + task=255; + do + :: (task == 0) -> break; + :: else -> + send_req( GETTASK, 0); + if + :: (reply.m == NONE) -> task = 0; + :: (reply.m == TASK) -> + task = reply.value; + /* Compute task */ + send_req( TASKDONE, task); + assert (reply.m == OK); + pull_socket ! task; + fi + od +} diff --git a/promela/task_server.pml b/promela/task_server.pml new file mode 100644 index 00000000..6f464628 --- /dev/null +++ b/promela/task_server.pml @@ -0,0 +1,138 @@ +/* State of the task server */ +typedef state_t { + chan queue = [NTASKS+2] of { byte }; + byte state = 0; + bit address_tcp = 0; + bit address_inproc = 0; + bit psi = 0; + bit running = 0; + byte ntasks; + byte nclients = 0; +} + + +active proctype task_server() { + + xr rep_socket; + state_t state; + req_message msg; + rep_message reply; + byte task; + + state.running = 1; + do + :: ( state.running + state.nclients == 0 ) -> break + :: else -> + rep_socket ? msg; + printf("req: "); printm(msg.m); printf("\t%d\n",msg.value); + + if + :: ( msg.m == TERMINATE ) -> + atomic { + assert (state.state == 0); + assert (msg.state == state.state); + state.running = 0; + reply.m = OK; + } + + :: ( msg.m == CONNECT ) -> + atomic { + assert (state.state != 0); + state.nclients++; + reply.m = OK; + reply.value = state.state; + } + +/* + :: ( msg.m == DISCONNECT ) -> + atomic { + assert (state.state != 0); + assert (msg.state == state.state); + state.nclients--; + reply.m = OK; + } +*/ + + :: ( msg.m == PUTPSI ) -> + atomic { + assert (state.state != 0); + assert (msg.state == state.state); + assert (state.psi == 0); + state.psi = 1; + reply.m = PUTPSI_REPLY; + } + + :: ( msg.m == GETPSI ) -> + atomic { + assert (state.state != 0); + assert (msg.state == state.state); + assert (state.psi == 1); + reply.m = PSI; + } + + :: ( msg.m == NEWJOB ) -> + atomic { + assert (state.state == 0); + state.state = msg.value; + reply.m = OK; + reply.value = state.state; + } + + :: ( msg.m == ENDJOB ) -> + atomic { + assert (state.state != 0); + assert (msg.state == state.state); + state.state = 0; + reply.m = OK; + } + + :: ( msg.m == TASKDONE ) -> + atomic { + assert (state.state != 0); + assert (state.ntasks > 0); + assert (msg.state == state.state); + reply.m = OK; + } + + :: ( msg.m == GETTASK ) -> + assert (state.state != 0); + assert (state.nclients > 0); + assert (msg.state == state.state); + if + :: ( state.queue ?[task] ) -> + reply.m = TASK; + state.queue ? reply.value + :: else -> + atomic { + reply.m = NONE; + reply.value = 0; + state.nclients--; + } + fi; + + :: ( msg.m == DELTASK ) -> + assert (state.state != 0); + assert (msg.state == state.state); + state.ntasks--; + if + :: (state.ntasks > 0) -> reply.value = 1; + :: else -> reply.value = 0; + fi; + reply.m = OK; + + :: ( msg.m == ADDTASK ) -> + assert (state.state != 0); + assert (msg.state == state.state); + atomic { + state.ntasks++; + reply.m = OK; + } + state.queue ! msg.value; + + fi; + msg.reply ! reply; + printf("rep: "); printm(reply.m); printf("\t%d\n",reply.value); + + od; +} + diff --git a/src/ZMQ/utils.irp.f b/src/ZMQ/utils.irp.f index e61cf92a..5bd2fe6c 100644 --- a/src/ZMQ/utils.irp.f +++ b/src/ZMQ/utils.irp.f @@ -140,15 +140,15 @@ function new_zmq_to_qp_run_socket() stop 'Unable to create zmq req socket' endif - rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 120000, 4) - if (rc /= 0) then - stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' - endif - - rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 120000, 4) - if (rc /= 0) then - stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_SNDTIMEO, 120000, 4) +! if (rc /= 0) then +! stop 'Unable to set send timeout in new_zmq_to_qp_run_socket' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_to_qp_run_socket, ZMQ_RCVTIMEO, 120000, 4) +! if (rc /= 0) then +! stop 'Unable to set recv timeout in new_zmq_to_qp_run_socket' +! endif rc = f77_zmq_connect(new_zmq_to_qp_run_socket, trim(qp_run_address)//':'//trim(zmq_port(0))) if (rc /= 0) then @@ -180,25 +180,25 @@ function new_zmq_pair_socket(bind) endif - rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_SNDHWM, 1, 4) - if (rc /= 0) then - stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_SNDHWM, 1, 4)' - endif - - rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_RCVHWM, 1, 4) - if (rc /= 0) then - stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_RCVHWM, 1, 4)' - endif +! rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_SNDHWM, 1, 4) +! if (rc /= 0) then +! stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_SNDHWM, 1, 4)' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_RCVHWM, 1, 4) +! if (rc /= 0) then +! stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_RCVHWM, 1, 4)' +! endif rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_IMMEDIATE, 1, 4) if (rc /= 0) then stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_IMMEDIATE, 1, 4)' endif - rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_LINGER, 600000, 4) - if (rc /= 0) then - stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_LINGER, 60000, 4)' - endif +! rc = f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_LINGER, 600000, 4) +! if (rc /= 0) then +! stop 'f77_zmq_setsockopt(new_zmq_pair_socket, ZMQ_LINGER, 60000, 4)' +! endif if (bind) then rc = f77_zmq_bind(new_zmq_pair_socket,zmq_socket_pair_inproc_address) @@ -239,20 +239,20 @@ function new_zmq_pull_socket() stop 'Unable to create zmq pull socket' endif - rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on pull socket' - endif - - rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVBUF,100000000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_RCVBUF on pull socket' - endif - - rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,1,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_RCVHWM on pull socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_LINGER,300000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on pull socket' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVBUF,100000000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_RCVBUF on pull socket' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_pull_socket,ZMQ_RCVHWM,1,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_RCVHWM on pull socket' +! endif integer :: icount @@ -316,30 +316,30 @@ function new_zmq_push_socket(thread) stop 'Unable to create zmq push socket' endif - rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on push socket' - endif - - rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,1,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_SNDHWM on push socket' - endif - - rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDBUF,100000000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_RCVBUF on push socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_LINGER,300000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on push socket' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDHWM,1,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_SNDHWM on push socket' +! endif +! +! rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_SNDBUF,100000000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_RCVBUF on push socket' +! endif rc = f77_zmq_setsockopt(new_zmq_push_socket,ZMQ_IMMEDIATE,1,4) if (rc /= 0) then stop 'Unable to set ZMQ_IMMEDIATE on push socket' endif - rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 100000, 4) - if (rc /= 0) then - stop 'Unable to set send timout in new_zmq_push_socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_push_socket, ZMQ_SNDTIMEO, 100000, 4) +! if (rc /= 0) then +! stop 'Unable to set send timout in new_zmq_push_socket' +! endif if (thread == 1) then rc = f77_zmq_connect(new_zmq_push_socket, zmq_socket_push_inproc_address) @@ -373,10 +373,10 @@ function new_zmq_sub_socket() stop 'Unable to create zmq sub socket' endif - rc = f77_zmq_setsockopt(new_zmq_sub_socket,ZMQ_RCVTIMEO,10000,4) - if (rc /= 0) then - stop 'Unable to set timeout in new_zmq_sub_socket' - endif +! rc = f77_zmq_setsockopt(new_zmq_sub_socket,ZMQ_RCVTIMEO,10000,4) +! if (rc /= 0) then +! stop 'Unable to set timeout in new_zmq_sub_socket' +! endif rc = f77_zmq_setsockopt(new_zmq_sub_socket,ZMQ_CONFLATE,1,4) if (rc /= 0) then @@ -445,10 +445,10 @@ subroutine end_zmq_pull_socket(zmq_socket_pull) integer :: rc character*(8), external :: zmq_port - rc = f77_zmq_setsockopt(zmq_socket_pull,ZMQ_LINGER,0,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on pull socket' - endif +! rc = f77_zmq_setsockopt(zmq_socket_pull,ZMQ_LINGER,0,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on pull socket' +! endif call omp_set_lock(zmq_lock) rc = f77_zmq_close(zmq_socket_pull) @@ -472,10 +472,10 @@ subroutine end_zmq_push_socket(zmq_socket_push,thread) integer :: rc character*(8), external :: zmq_port - rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on push socket' - endif +! rc = f77_zmq_setsockopt(zmq_socket_push,ZMQ_LINGER,300000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on push socket' +! endif call omp_set_lock(zmq_lock) rc = f77_zmq_close(zmq_socket_push) @@ -859,10 +859,10 @@ subroutine end_zmq_to_qp_run_socket(zmq_to_qp_run_socket) character*(8), external :: zmq_port integer :: rc - rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,1000,4) - if (rc /= 0) then - stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' - endif +! rc = f77_zmq_setsockopt(zmq_to_qp_run_socket,ZMQ_LINGER,1000,4) +! if (rc /= 0) then +! stop 'Unable to set ZMQ_LINGER on zmq_to_qp_run_socket' +! endif rc = f77_zmq_close(zmq_to_qp_run_socket) if (rc /= 0) then @@ -901,11 +901,11 @@ subroutine zmq_delete_task(zmq_to_qp_run_socket,zmq_socket_pull,task_id,more) more = 1 else if (reply(16:19) == 'done') then more = 0 - rc = f77_zmq_setsockopt(zmq_socket_pull, ZMQ_RCVTIMEO, 1000, 4) - if (rc /= 0) then - print *, 'f77_zmq_setsockopt(zmq_socket_pull, ZMQ_RCVTIMEO, 3000, 4)' - stop 'error' - endif +! rc = f77_zmq_setsockopt(zmq_socket_pull, ZMQ_RCVTIMEO, 1000, 4) +! if (rc /= 0) then +! print *, 'f77_zmq_setsockopt(zmq_socket_pull, ZMQ_RCVTIMEO, 3000, 4)' +! stop 'error' +! endif else print *, reply print *, 'f77_zmq_recv(zmq_to_qp_run_socket,reply,64,0)'