Task parallelism
A task scheduler is implemented in OCaml and runs in the
qp_run
program. The IRPF90 programs communicate with this
scheduler to fetch new tasks to do. The flexibility of IRPF90 enables to
build micro-services to help a running program. For example, if the
computation of the AOs takes too much time, it is possible to start on
multiple compute nodes some tasks that will accelerate the running
program.
The typical scheme is the following:
The program (Fortran) asks
qp_run
to create a new queue for a state of the calculationThe program adds multiple tasks to do to the queue
The program starts a collector thread that waits for the results computed by the workers
The program starts multiple worker threads that will fetch tasks to do from the queue, compute the corresponding task, and send the result directly to the collector. Then, the queue is informed that the task has been done
When the queue is empty and all workers have sent their results, the last worker receives from
qp_run
a control integer, and sends it to the collector threadThe collector thread checks that the control integer is correct : this can be for instance the number of AOs to compute and the number of actually computed AOs.
The parallel section is terminated
The task scheduler
The task scheduler (implemented in the TaskServer.ml
file) understands text messages, and transforms them in typed messages.
The list of understood messages can be found in the
of_string
function of the Message.ml
file.
When qp_run
starts, it tries to opens a ZeroMQ REP
socket on a default port, and tries again on different ports until a
suitable port range is found. The port is bound using both the TCP
protocal and the inproc protocol to enable both multi-threaded and
distributed support.
To initiate a parallel job, a Newjob
message has to be
sent to the scheduler, with a job name (state
) that will be
checked at every connection to the scheduler.
new_job <state> <push_address_tcp> <push_address_inproc>
The collector thread needs to opens a PULL
socket bound
both using the TCP and the inproc protocols, and those endpoints need to
be given together with the Newjob
message.
Now, a new Queuing_system
instance is created. The
Queuing_system
contains
- A list of tasks
- A list of connected clients (empty at the initialization)
- The subset of tasks still queued
- The subset of tasks currently running, and on which client they run
The format of tasks is up to the user : it is just a string. To add
new tasks to the system, the AddTask
message is sent:
add_task <state> <string>
: Add a unique task to theQueuing_system
add_task <state> range <i> <j>
: Builds a list of tasks (j) where i<l<j.add_task <state> triangle <i>
: Builds a list of tasks (l,i) where 1<l<i.
When workers connect to the Queuing_system
with a
Connect
message, they obtain as a reply the address of the
PULL
socket of the collector, as well as a new Client ID
:
connect (tcp|inproc)
Now the Workers connect a PUSH
socket to the
PULL
endpoint of the collector (TCP or inproc). Workers are
now ready to fetch new tasks using GetTask
messages:
get_task <state> <client_id>
and the Queuing_system
now knows which tasks runs on
which client. The reply is the task, as a string, and the corresponding
Task ID.
When the task is done, the worker pushes the results to the
collector, and sends to qp_run
a TaskDone
message with the corresponding Task ID, and its contribution to the
control integer which will be accumulated in the
Queuing_system
instance:
task_done <state> <client_id> <task_id> <control>
If the queue is empty, the reply to the
GetTaskmessage is a
Terminate` message which informs the
workers to terminate.
When a worker terminates, it sends a Disconnect
message
to the scheduler:
disconnect <state> <client_id>
If there are remaining running clients, the reply is 0
.
For the last client, the reply contains the control integer, and this
allows the worker to inform the collector that all tasks are done and
all workers are disconnected.
Once the collector thread has finished to pull all the data, it can terminate.
Now, the main thread can send an End_job
message to the
scheduler to inform it that the parallel task is done.
MPI layer
The ZeroMQ slave can be an MPI program. In that case, MPI is used to broadcast large arrays such as the wave function, and the input data which are read only by the MPI master. But the communication between the ZeroMQ slave and the ZeroMQ master is still done via ZeroMQ.