#define NPROC 2 #define BUFSIZE 4 #define not_here false mtype = { NONE, TERMINATE, OK, TEST, ERROR, PROPERTY, WALKERS, EZFIO, GETWALKERS, REGISTER, EZFIO_REPLY, UNREGISTER, STOPPING, STOPPED, QUEUED, RUNNING }; typedef message_req { mtype m = NONE; byte value = 0; chan reply = [BUFSIZE] of { mtype }; } typedef message_pull { mtype m = NONE; byte value = 0; } chan dataserver_pull = [NPROC] of { message_pull }; chan dataserver_req = [NPROC] of { message_req }; byte dataserver_status_pub; bit http_address = 0; bit killall_qmc = 0; bit killall_dataserver = 0; byte dataserver_status = QUEUED; byte dataserver_status_n_connected = 0; /* qmcchem process */ active proctype qmcchem() { byte reply = NONE; byte dataserver_pid; byte i,j; message_req msg; dataserver_pid = run dataserver(); /* Wait until ZMQ socket is open */ (http_address == 1); do :: (reply == OK) -> break :: (reply == NONE) -> msg.m = TEST; dataserver_req ! msg; msg.reply ? reply ; assert (reply == OK || reply == NONE) od; printf("Dataserver is ready.\n"); /* Start the QMC processes */ printf("qmcchem: Starting qmc processes.\n"); atomic { i=0; do :: (i < NPROC) -> run qmc(); i++ :: else -> break od; } printf("qmcchem: qmc processes started.\n"); } /* dataserver process */ proctype dataserver() { byte reply = 0; byte request = 0; byte cont = 0; byte reply_pid = 0; message_req msg; /* Simulate initialization */ http_address = 1; dataserver_req ? msg; msg.reply ! NONE ; /* Status thread */ run dataserver_status_thread(); run dataserver_main_thread(); } #define delay 5 #define stop_time 100 proctype dataserver_status_thread() { byte count=0; byte n_connected = 0; byte time=0; dataserver_status_pub = dataserver_status; do :: (dataserver_status == STOPPED) -> break :: else -> time = (time < stop_time -> time+1 : time); count++; if :: (count != delay) -> skip :: else -> count = 0; if :: (dataserver_status == RUNNING && n_connected == dataserver_status_n_connected && time >= stop_time) -> dataserver_status = STOPPING; printf("Stop time reached : STOPPING\n") :: (dataserver_status == STOPPING && n_connected != dataserver_status_n_connected && dataserver_status_n_connected == 0) -> dataserver_status = STOPPED; printf("No more connected clients : STOPPED\n") :: (n_connected != dataserver_status_n_connected && dataserver_status_n_connected > 0) -> n_connected = dataserver_status_n_connected; :: else -> skip fi fi dataserver_status_pub = dataserver_status; od printf ("End of dataserver_status_thread\n"); } proctype dataserver_main_thread() { byte time = 0; mtype reply; dataserver_status = QUEUED; message_req msg; message_pull pull; /* Inform main process that the qmc processes can start (blocking recv) */ dataserver_req ? msg; assert (msg.m == TEST); msg.reply ! OK; do :: (dataserver_status == STOPPED && (!dataserver_pull ?[pull]) && (!dataserver_req ?[msg])) -> break :: else -> do :: (dataserver_pull ?[pull]) -> dataserver_pull ? pull printf("pull: "); printm(pull.m); printf("\n"); if :: (pull.m == ERROR) -> skip; :: (pull.m == WALKERS) -> skip :: (pull.m == PROPERTY) -> skip; fi :: else -> break od if :: (dataserver_req ?[msg]) -> dataserver_req ? msg; printf("req : "); printm(msg.m); printf("\n"); if :: (msg.m == TEST) -> reply = OK :: (msg.m == EZFIO) -> reply = EZFIO_REPLY :: (msg.m == GETWALKERS) -> reply = WALKERS :: (msg.m == REGISTER && dataserver_status == QUEUED ) -> dataserver_status_n_connected++; dataserver_status = RUNNING; reply = OK; printf("Status changed to RUNNING\n") :: (msg.m == REGISTER && dataserver_status == RUNNING ) -> dataserver_status_n_connected++; reply = OK :: (msg.m == REGISTER && (dataserver_status == STOPPED || dataserver_status == STOPPING) ) -> dataserver_status_n_connected++; reply = ERROR; printf("dataserver_req: register failed \n") :: (msg.m == UNREGISTER) -> dataserver_status_n_connected--; reply = OK; if :: (dataserver_status_n_connected == 0) -> dataserver_status = STOPPED printf("Status changed to STOPPED\n") :: else -> skip fi :: else -> skip fi; msg.reply ! reply :: else -> skip fi od } /* qmc processes */ proctype qmc() { byte status; mtype reply; message_req msg; message_pull pull; /* Init */ status = dataserver_status_pub; msg.m = REGISTER; dataserver_req ! msg; end: msg.reply ? reply; if :: (reply == ERROR) -> goto exit; :: else -> assert (reply == OK); fi; msg.m = EZFIO; dataserver_req ! msg; msg.reply ? reply; if :: (reply == ERROR) -> goto exit; :: else -> assert (reply == EZFIO_REPLY); fi; msg.m = GETWALKERS; dataserver_req ! msg; msg.reply ? reply; if :: (reply == ERROR) -> goto exit; :: else -> assert (reply == WALKERS); fi; /* Equilibration */ (dataserver_status_pub == RUNNING); msg.m = EZFIO; dataserver_req ! msg; msg.reply ? reply; if :: (reply == ERROR) -> goto exit; :: else -> assert (reply == EZFIO_REPLY); fi; status = dataserver_status_pub; /* Cycles */ do :: (status != RUNNING) -> break :: else -> pull.m = PROPERTY; pull.value = 0; dataserver_pull ! pull; pull.m = PROPERTY; pull.value =1 ; dataserver_pull ! pull; pull.m = WALKERS; dataserver_pull ! pull; status = dataserver_status_pub; od; /* Termination */ msg.m = UNREGISTER; dataserver_req ! msg; msg.reply ? reply; assert (reply == OK); exit: skip }