6 Parallelism
Thomas Applencourt edited this page 2017-12-18 13:17:42 -06:00

Task parallelism

A task scheduler is implemented in OCaml and runs in the qp_run program. The IRPF90 programs communicate with this scheduler to fetch new tasks to do. The flexibility of IRPF90 enables to build micro-services to help a running program. For example, if the computation of the AOs takes too much time, it is possible to start on multiple compute nodes some tasks that will accelerate the running program.

The typical scheme is the following:

  1. The program (Fortran) asks qp_run to create a new queue for a state of the calculation

  2. The program adds multiple tasks to do to the queue

  3. The program starts a collector thread that waits for the results computed by the workers

  4. The program starts multiple worker threads that will fetch tasks to do from the queue, compute the corresponding task, and send the result directly to the collector. Then, the queue is informed that the task has been done

  5. When the queue is empty and all workers have sent their results, the last worker receives from qp_run a control integer, and sends it to the collector thread

  6. The collector thread checks that the control integer is correct : this can be for instance the number of AOs to compute and the number of actually computed AOs.

  7. The parallel section is terminated

The task scheduler

The task scheduler (implemented in the TaskServer.ml file) understands text messages, and transforms them in typed messages. The list of understood messages can be found in the of_string function of the Message.ml file.

When qp_run starts, it tries to opens a ZeroMQ REP socket on a default port, and tries again on different ports until a suitable port range is found. The port is bound using both the TCP protocal and the inproc protocol to enable both multi-threaded and distributed support.

To initiate a parallel job, a Newjob message has to be sent to the scheduler, with a job name (state) that will be checked at every connection to the scheduler.

new_job <state> <push_address_tcp> <push_address_inproc>

The collector thread needs to opens a PULL socket bound both using the TCP and the inproc protocols, and those endpoints need to be given together with the Newjob message.

Now, a new Queuing_system instance is created. The Queuing_system contains

  • A list of tasks
  • A list of connected clients (empty at the initialization)
  • The subset of tasks still queued
  • The subset of tasks currently running, and on which client they run

The format of tasks is up to the user : it is just a string. To add new tasks to the system, the AddTask message is sent:

  • add_task <state> <string> : Add a unique task to the Queuing_system
  • add_task <state> range <i> <j> : Builds a list of tasks (j) where i<l<j.
  • add_task <state> triangle <i> : Builds a list of tasks (l,i) where 1<l<i.

When workers connect to the Queuing_system with a Connect message, they obtain as a reply the address of the PULL socket of the collector, as well as a new Client ID :

connect (tcp|inproc)

Now the Workers connect a PUSH socket to the PULL endpoint of the collector (TCP or inproc). Workers are now ready to fetch new tasks using GetTask messages:

get_task <state> <client_id>

and the Queuing_system now knows which tasks runs on which client. The reply is the task, as a string, and the corresponding Task ID.

When the task is done, the worker pushes the results to the collector, and sends to qp_run a TaskDone message with the corresponding Task ID, and its contribution to the control integer which will be accumulated in the Queuing_system instance:

task_done <state> <client_id> <task_id> <control>

If the queue is empty, the reply to the GetTaskmessage is aTerminate` message which informs the workers to terminate.

When a worker terminates, it sends a Disconnect message to the scheduler:

disconnect <state> <client_id>

If there are remaining running clients, the reply is 0. For the last client, the reply contains the control integer, and this allows the worker to inform the collector that all tasks are done and all workers are disconnected.

Once the collector thread has finished to pull all the data, it can terminate.

Now, the main thread can send an End_job message to the scheduler to inform it that the parallel task is done.

MPI layer

The ZeroMQ slave can be an MPI program. In that case, MPI is used to broadcast large arrays such as the wave function, and the input data which are read only by the MPI master. But the communication between the ZeroMQ slave and the ZeroMQ master is still done via ZeroMQ.