mirror of
https://github.com/LCPQ/quantum_package
synced 2025-01-03 10:05:57 +01:00
273 lines
5.7 KiB
Promela
273 lines
5.7 KiB
Promela
#define NPROC 1
|
|
#define BUFSIZE 2
|
|
#define NTASKS 3
|
|
|
|
mtype = { NONE, OK, WRONG_STATE, TERMINATE, GETPSI, PUTPSI, NEWJOB, ENDJOB, SETRUNNING,
|
|
SETWAITING, SETSTOPPED, CONNECT, DISCONNECT, ADDTASK, DELTASK, TASKDONE, GETTASK,
|
|
PSI, TASK, PUTPSI_REPLY, WAITING, RUNNING, STOPPED
|
|
}
|
|
|
|
typedef rep_message {
|
|
mtype m = NONE;
|
|
byte value = 0;
|
|
}
|
|
|
|
typedef req_message {
|
|
mtype m = NONE;
|
|
byte state = 0;
|
|
byte value = 0;
|
|
chan reply = [BUFSIZE] of { rep_message };
|
|
}
|
|
|
|
#define send_req( MESSAGE, VALUE ) msg.m=MESSAGE ; msg.value=VALUE ; msg.state=state; rep_socket ! msg; msg.reply ? reply
|
|
|
|
chan rep_socket = [NPROC] of { req_message };
|
|
chan pull_socket = [NPROC] of { byte };
|
|
chan pair_socket = [NPROC] of { req_message };
|
|
chan task_queue = [NTASKS+2] of { byte };
|
|
chan pub_socket = [NTASKS+2] of { mtype };
|
|
|
|
bit socket_up = 0;
|
|
mtype global_state; /* Sent by pub */
|
|
|
|
active proctype qp_run() {
|
|
|
|
bit psi = 0;
|
|
bit address_tcp = 0;
|
|
bit address_inproc = 0;
|
|
bit running = 0;
|
|
byte status = 0;
|
|
byte state = 0;
|
|
byte ntasks = 0;
|
|
req_message msg;
|
|
rep_message reply;
|
|
byte nclients = 0;
|
|
byte task;
|
|
|
|
socket_up = 1;
|
|
running = 1;
|
|
do
|
|
// :: ( (running == 0) && (nclients == 0) && (ntasks == 0) ) -> break
|
|
:: ( running == 0 ) -> break
|
|
:: else ->
|
|
|
|
rep_socket ? msg;
|
|
printf("req: "); printm(msg.m); printf("\t%d\n",msg.value);
|
|
|
|
if
|
|
:: ( msg.m == TERMINATE ) ->
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
running = 0;
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == PUTPSI ) ->
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
assert (psi == 0);
|
|
psi = 1;
|
|
reply.m = PUTPSI_REPLY;
|
|
|
|
:: ( msg.m == GETPSI ) ->
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
assert (psi == 1);
|
|
reply.m = PSI;
|
|
|
|
:: ( msg.m == NEWJOB ) ->
|
|
assert (state == 0);
|
|
state = msg.value;
|
|
pair_socket ! WAITING;
|
|
reply.m = OK;
|
|
reply.value = state;
|
|
|
|
:: ( msg.m == ENDJOB ) ->
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
state = 0;
|
|
pair_socket ! WAITING;
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == ADDTASK ) ->
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
task_queue ! msg.value;
|
|
ntasks++;
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == GETTASK ) ->
|
|
assert (nclients > 0);
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
if
|
|
:: ( task_queue ?[task] ) ->
|
|
pair_socket ! WAITING;
|
|
reply.m = TASK;
|
|
task_queue ? reply.value
|
|
:: else ->
|
|
pair_socket ! RUNNING;
|
|
reply.m = NONE;
|
|
reply.value = 255;
|
|
fi;
|
|
|
|
:: ( msg.m == TASKDONE) ->
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
assert (nclients > 0);
|
|
assert (ntasks > 0);
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == DELTASK ) ->
|
|
assert (state != 0);
|
|
assert (msg.state == state);
|
|
ntasks--;
|
|
if
|
|
:: (ntasks > 0) -> reply.value = 1;
|
|
:: else -> reply.value = 0;
|
|
fi;
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == CONNECT ) ->
|
|
assert ( state != 0 )
|
|
nclients++;
|
|
reply.m = OK;
|
|
reply.value = state;
|
|
|
|
:: ( msg.m == DISCONNECT ) ->
|
|
assert ( msg.state == state )
|
|
nclients--;
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == STOPPED ) ->
|
|
pair_socket ! STOPPED;
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == WAITING ) ->
|
|
pair_socket ! WAITING;
|
|
reply.m = OK;
|
|
|
|
:: ( msg.m == RUNNING ) ->
|
|
assert ( state != 0 );
|
|
pair_socket ! RUNNING;
|
|
reply.m = OK;
|
|
|
|
fi
|
|
msg.reply ! reply
|
|
od
|
|
pair_socket ! STOPPED;
|
|
socket_up = 0;
|
|
|
|
}
|
|
|
|
|
|
active proctype master() {
|
|
|
|
req_message msg;
|
|
rep_message reply;
|
|
byte state = 0;
|
|
byte count;
|
|
|
|
run pub_thread();
|
|
|
|
/* New parallel job */
|
|
state=1;
|
|
send_req( NEWJOB, state );
|
|
assert (reply.m == OK);
|
|
|
|
/* Add tasks */
|
|
count = 0;
|
|
do
|
|
:: (count == NTASKS) -> break;
|
|
:: else ->
|
|
count++;
|
|
send_req( ADDTASK, count );
|
|
assert (reply.m == OK);
|
|
od
|
|
|
|
/* Run collector */
|
|
run collector(state);
|
|
|
|
/* Run slaves */
|
|
count = 0;
|
|
do
|
|
:: (count == NPROC) -> break;
|
|
:: else -> count++; run slave();
|
|
od
|
|
|
|
}
|
|
|
|
proctype slave() {
|
|
|
|
req_message msg;
|
|
rep_message reply;
|
|
byte task;
|
|
byte state;
|
|
|
|
msg.m=CONNECT;
|
|
msg.state = 0;
|
|
|
|
if
|
|
:: (!socket_up) -> goto exit;
|
|
:: else -> skip;
|
|
fi
|
|
rep_socket ! msg;
|
|
|
|
if
|
|
:: (!socket_up) -> goto exit;
|
|
:: else -> skip;
|
|
fi
|
|
msg.reply ? reply;
|
|
|
|
state = reply.value;
|
|
|
|
|
|
task = 1;
|
|
do
|
|
:: (task == 255) -> break;
|
|
:: else ->
|
|
send_req( GETTASK, 0);
|
|
if
|
|
:: (reply.m == NONE) ->
|
|
task = 255;
|
|
:: (reply.m == TASK) ->
|
|
/* Compute task */
|
|
task = reply.value;
|
|
send_req( TASKDONE, task);
|
|
assert (reply.m == OK);
|
|
pull_socket ! task;
|
|
fi
|
|
od
|
|
send_req( DISCONNECT, 0);
|
|
assert (reply.m == OK);
|
|
|
|
exit: skip;
|
|
}
|
|
|
|
proctype collector(byte state) {
|
|
byte task;
|
|
req_message msg;
|
|
rep_message reply;
|
|
bit loop = 1;
|
|
do
|
|
:: (loop == 0) -> break
|
|
:: else ->
|
|
pull_socket ? task;
|
|
/* Handle result */
|
|
send_req(DELTASK, task);
|
|
assert (reply.m == OK);
|
|
loop = reply.value;
|
|
od
|
|
send_req( TERMINATE, 0);
|
|
assert (reply.m == OK);
|
|
}
|
|
|
|
proctype pub_thread() {
|
|
mtype state = WAITING;
|
|
do
|
|
:: (state == STOPPED) -> break;
|
|
:: (pair_socket ? [state]) ->
|
|
pair_socket ? state;
|
|
global_state = state;
|
|
od
|
|
}
|