#define NPROC 1 #define BUFSIZE 2 #define NTASKS 3 mtype = { NONE, OK, WRONG_STATE, TERMINATE, GETPSI, PUTPSI, NEWJOB, ENDJOB, SETRUNNING, SETWAITING, SETSTOPPED, CONNECT, DISCONNECT, ADDTASK, DELTASK, TASKDONE, GETTASK, PSI, TASK, PUTPSI_REPLY, WAITING, RUNNING, STOPPED } typedef rep_message { mtype m = NONE; byte value = 0; } typedef req_message { mtype m = NONE; byte state = 0; byte value = 0; chan reply = [BUFSIZE] of { rep_message }; } #define send_req( MESSAGE, VALUE ) msg.m=MESSAGE ; msg.value=VALUE ; msg.state=state; rep_socket ! msg; msg.reply ? reply chan rep_socket = [NPROC] of { req_message }; chan pull_socket = [NPROC] of { byte }; chan pair_socket = [NPROC] of { req_message }; chan task_queue = [NTASKS+2] of { byte }; chan pub_socket = [NTASKS+2] of { mtype }; bit socket_up = 0; mtype global_state; /* Sent by pub */ active proctype qp_run() { bit psi = 0; bit address_tcp = 0; bit address_inproc = 0; bit running = 0; byte status = 0; byte state = 0; byte ntasks = 0; req_message msg; rep_message reply; byte nclients = 0; byte task; socket_up = 1; running = 1; do // :: ( (running == 0) && (nclients == 0) && (ntasks == 0) ) -> break :: ( running == 0 ) -> break :: else -> rep_socket ? msg; printf("req: "); printm(msg.m); printf("\t%d\n",msg.value); if :: ( msg.m == TERMINATE ) -> assert (state != 0); assert (msg.state == state); running = 0; reply.m = OK; :: ( msg.m == PUTPSI ) -> assert (state != 0); assert (msg.state == state); assert (psi == 0); psi = 1; reply.m = PUTPSI_REPLY; :: ( msg.m == GETPSI ) -> assert (state != 0); assert (msg.state == state); assert (psi == 1); reply.m = PSI; :: ( msg.m == NEWJOB ) -> assert (state == 0); state = msg.value; pair_socket ! WAITING; reply.m = OK; reply.value = state; :: ( msg.m == ENDJOB ) -> assert (state != 0); assert (msg.state == state); state = 0; pair_socket ! WAITING; reply.m = OK; :: ( msg.m == ADDTASK ) -> assert (state != 0); assert (msg.state == state); task_queue ! msg.value; ntasks++; reply.m = OK; :: ( msg.m == GETTASK ) -> assert (nclients > 0); assert (state != 0); assert (msg.state == state); if :: ( task_queue ?[task] ) -> pair_socket ! WAITING; reply.m = TASK; task_queue ? reply.value :: else -> pair_socket ! RUNNING; reply.m = NONE; reply.value = 255; fi; :: ( msg.m == TASKDONE) -> assert (state != 0); assert (msg.state == state); assert (nclients > 0); assert (ntasks > 0); reply.m = OK; :: ( msg.m == DELTASK ) -> assert (state != 0); assert (msg.state == state); ntasks--; if :: (ntasks > 0) -> reply.value = 1; :: else -> reply.value = 0; fi; reply.m = OK; :: ( msg.m == CONNECT ) -> assert ( state != 0 ) nclients++; reply.m = OK; reply.value = state; :: ( msg.m == DISCONNECT ) -> assert ( msg.state == state ) nclients--; reply.m = OK; :: ( msg.m == STOPPED ) -> pair_socket ! STOPPED; reply.m = OK; :: ( msg.m == WAITING ) -> pair_socket ! WAITING; reply.m = OK; :: ( msg.m == RUNNING ) -> assert ( state != 0 ); pair_socket ! RUNNING; reply.m = OK; fi msg.reply ! reply od pair_socket ! STOPPED; socket_up = 0; } active proctype master() { req_message msg; rep_message reply; byte state = 0; byte count; run pub_thread(); /* New parallel job */ state=1; send_req( NEWJOB, state ); assert (reply.m == OK); /* Add tasks */ count = 0; do :: (count == NTASKS) -> break; :: else -> count++; send_req( ADDTASK, count ); assert (reply.m == OK); od /* Run collector */ run collector(state); /* Run slaves */ count = 0; do :: (count == NPROC) -> break; :: else -> count++; run slave(); od } proctype slave() { req_message msg; rep_message reply; byte task; byte state; msg.m=CONNECT; msg.state = 0; if :: (!socket_up) -> goto exit; :: else -> skip; fi rep_socket ! msg; if :: (!socket_up) -> goto exit; :: else -> skip; fi msg.reply ? reply; state = reply.value; task = 1; do :: (task == 255) -> break; :: else -> send_req( GETTASK, 0); if :: (reply.m == NONE) -> task = 255; :: (reply.m == TASK) -> /* Compute task */ task = reply.value; send_req( TASKDONE, task); assert (reply.m == OK); pull_socket ! task; fi od send_req( DISCONNECT, 0); assert (reply.m == OK); exit: skip; } proctype collector(byte state) { byte task; req_message msg; rep_message reply; bit loop = 1; do :: (loop == 0) -> break :: else -> pull_socket ? task; /* Handle result */ send_req(DELTASK, task); assert (reply.m == OK); loop = reply.value; od send_req( TERMINATE, 0); assert (reply.m == OK); } proctype pub_thread() { mtype state = WAITING; do :: (state == STOPPED) -> break; :: (pair_socket ? [state]) -> pair_socket ? state; global_state = state; od }