2223 lines
64 KiB
Org Mode
2223 lines
64 KiB
Org Mode
#+TITLE: Fundamentals of parallelization
|
|
#+DATE: 23-24/11/2021
|
|
#+AUTHOR: Anthony Scemama
|
|
|
|
#+LaTeX_HEADER: \institute{Lab. Chimie et Physique Quantiques, IRSAMC, UPS/CNRS, Toulouse (France)}
|
|
#+LATEX_CLASS: beamer
|
|
#+LaTeX_CLASS_OPTIONS:[aspectratio=169]
|
|
#+BEAMER_THEME: trex
|
|
#+LaTeX_HEADER: \usepackage{minted}
|
|
#+LaTeX_HEADER: \usemintedstyle{emacs}
|
|
#+LaTeX_HEADER: \usepackage[utf8]{inputenc}
|
|
#+LaTeX_HEADER: \usepackage[T1]{fontenc}
|
|
#+LaTeX_HEADER: \usepackage{hyperref}
|
|
#+LaTeX_HEADER: \usepackage{mathtools}
|
|
#+LaTeX_HEADER: \usepackage{physics}
|
|
#+LaTeX_HEADER: \definecolor{darkgreen}{rgb}{0.,0.6,0.}
|
|
#+LaTeX_HEADER: \definecolor{darkblue}{rgb}{0.,0.2,0.7}
|
|
#+LaTeX_HEADER: \definecolor{darkred}{rgb}{0.6,0.1,0.1}
|
|
#+LaTeX_HEADER: \definecolor{darkpink}{rgb}{0.7,0.0,0.7}
|
|
#+PROPERTY: header-args :exports code
|
|
#+EXPORT_EXCLUDE_TAGS: noexport
|
|
|
|
#+startup: beamer
|
|
#+options: H:2
|
|
|
|
* Program NOEXPORT :noexport:
|
|
|
|
14:00-14:30 : Supercomputers
|
|
14:30-15:30 : Parallelism
|
|
15:45-17:30 : MPI / OpenMP
|
|
|
|
09:00-10:30 : Presentation IRP + QP
|
|
Pour IRPF90, je peux faire une presentation assez generale.
|
|
10:30-12:00 : TP IRPF90
|
|
J'ai deja un tutoriel pour ecrire un programme de dynamique moleculaire avec un
|
|
potentiel Lennard-Jones, je pense que ce sera plus facile puisqu'il n'y a pas
|
|
beaucoup de temps pour faire les TPs.
|
|
|
|
13:45-17:00 : TP Parallel
|
|
- calculer pi avec un algorithme Monte Carlo avec MPI (Master-Worker) d'abord en Python puis en Fortran/C.
|
|
- paralleliser un produit de matrices avec OpenMP en Fortran/C
|
|
- calculer une integrale dans R^3 sur une grille de points avec MPI en Fortran/C.
|
|
|
|
* Supercomputers :noexport:
|
|
|
|
** Computers
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.7\textwidth}
|
|
#+LATEX: \begin{exampleblock}{Today (order of magnitude)}
|
|
|
|
- 1 *socket* (x86 CPU @ 2.2-3.3 GHz, *4 cores*, hyperthreading)
|
|
- \sim 4-16 GB RAM
|
|
- \sim 500GB SSD
|
|
- Graphics card : ATI Radeon, Nvidia GeForce
|
|
- Gigabit ethernet
|
|
- USB, Webcam, Sound card, etc
|
|
- \sim 500 euros
|
|
#+LATEX: \end{exampleblock}
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.3\textwidth}
|
|
#+ATTR_LATEX: :width \textwidth
|
|
|
|
[[./desktop-inspiron-MT-3650-pdp-module-1.jpg]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Computer designed for computation
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+LATEX: \begin{exampleblock}{Today (order of magnitude)}
|
|
|
|
- 2 sockets (x86 CPU @ 2.2 GHz, 32 cores/socket, hyperthreading)
|
|
- 64-128 GB RAM
|
|
- Multiple SSD HDDs (RAID0)
|
|
- Gigabit ethernet
|
|
- Possibly an Accelerator (Nvidia Volta/Ampere)
|
|
- \sim 5k euros
|
|
|
|
#+LATEX: \end{exampleblock}
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
#+ATTR_LATEX: :width \textwidth
|
|
|
|
[[./z840_gallery_img4_tcm245_2164103_tcm245_1871309_tcm245-2164103.jpg]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Cluster
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+LATEX: \begin{exampleblock}{}
|
|
- Many computers designed for computation
|
|
- Compact (1-2U in rack) / machine
|
|
- Network switches
|
|
- Login server
|
|
- Batch queuing system (SLURM / PBS / SGE / LFS)
|
|
- Cheap Cooling system
|
|
- Requires a lot of electrical power (~10kW/rack)
|
|
- Possibly a Low-latency / High-bandwidth network (Infiniband or 10Gb ethernet)
|
|
- >50k euros
|
|
#+LATEX: \end{exampleblock}
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
#+ATTR_LATEX: :width \textwidth
|
|
|
|
[[./img_20160510_152246_resize.jpg]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Supercomputer
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+LATEX: \begin{exampleblock}{}
|
|
- Many computers designed for computation
|
|
- Very compact (<1U in rack) / machine
|
|
- Low-latency / High-bandwidth network (Infiniband or 10Gb ethernet)
|
|
- Network switches
|
|
- Parallel filesystem for scratch space (Lustre / BeeGFS / GPFS)
|
|
- Multiple login servers
|
|
- Batch queuing system (SLURM / PBS / SGE / LFS)
|
|
- Highly efficient cooling system (water)
|
|
- Requires a lot of electrical power (>100kW)
|
|
#+LATEX: \end{exampleblock}
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
#+ATTR_LATEX: :width \textwidth
|
|
|
|
[[./Eos.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Definitions
|
|
- Top500 :: Rank of the 500 fastest supercomputers
|
|
- Flop :: Floating point operation
|
|
- Flops :: Flop/s, Number of Floating point operations per second
|
|
- RPeak :: Peak performance, max possible number of Flops
|
|
- RMax :: Real performance on the Linpack benchmark (dense eigenproblem)
|
|
- SP :: Single precision (32-bit floats)
|
|
- DP :: Double precision (64-bit floats)
|
|
- FPU :: Floating Point Unit
|
|
- FMA :: Fused multiply-add ($a\times x+b$ in 1 instruction)
|
|
|
|
** Quantifying performance
|
|
|
|
#+LATEX: \begin{exampleblock}{Example} *RPeak* of Intel Xeon Gold 6140 Processor :
|
|
- 18 cores
|
|
- 2.3 GHz
|
|
- 2 FPUs
|
|
- 8 FMA (DP)/FPU/cycle
|
|
|
|
$18 \times 2.3\, 10^9 \times 2 \times 8 \times 2 = 1.3$ TFlops (DP)
|
|
#+LATEX: \end{exampleblock}
|
|
|
|
- Number of hours :: 730/month, 8760/year
|
|
- Units :: Kilo (K), Mega (M), Giga (G), Tera (T), Peta (P), Exa (E), ...
|
|
|
|
** Top500 (1996)
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./top500_95.png]]
|
|
|
|
** Top500 (2021)
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./top500_21.png]]
|
|
|
|
https://www.top500.org/lists/top500/2021/11/
|
|
** Curie thin nodes (TGCC, France)
|
|
Ranked 9th in 2012, 77 184 cores, 1.7 PFlops, 2.1 MW
|
|
#+ATTR_LATEX: :height 0.8\textheight
|
|
[[./tgcc.jpg]]
|
|
|
|
** Mare Nostrum (BSC, Spain)
|
|
Ranked 13th in 2017, 153 216 cores, 6.5 PFlops, 1.6 MW
|
|
#+ATTR_LATEX: :height 0.8\textheight
|
|
[[./marenostrum.jpg]]
|
|
|
|
** Architecture
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./hierarchy.pdf]]
|
|
** Chassis (Front)
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./chassis.jpg]]
|
|
** Chassis (Back)
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./chassis_back.jpg]]
|
|
** Compute Node
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./blade.jpg]]
|
|
** Socket
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./socket.jpg]]
|
|
** Core
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./Nehalem.jpg]]
|
|
** Why such an architecture?
|
|
*** Moore's "Law"
|
|
- /The number of transistors doubles every two years/.
|
|
- Often interpreted as /the computational power doubles every two years/.
|
|
- Exponential law $\Longrightarrow$ will fail.
|
|
- 7~nm semiconductors in 2021: the atomic limit is approaching.
|
|
|
|
** Why such an architecture?
|
|
*** Main problem: energy
|
|
Dissipation : $D = F \times V^2 \times C$
|
|
|
|
- $F$ : Frequency (\sim 3 GHz)
|
|
- $V$ : tension
|
|
- $C$ : constant related to the size of semiconductors (nm)
|
|
|
|
For the processor to be stable, the tension needs to be linear with the
|
|
frequency: so $D = \mathcal{O}(F^3)$ $\Longrightarrow$ The
|
|
frequency has to be kept around 3 GHz.
|
|
|
|
1. Double the number of Flops = double the number of processors.
|
|
2. Use accelerators (GPUs, FPGA, TPUs, ...)
|
|
\pause
|
|
*** Consequence
|
|
This requires to re-think programming $\Longrightarrow$
|
|
Parallel programming is *unavoidable*.
|
|
|
|
** Today's main problems
|
|
|
|
1. Energy, physical limits
|
|
2. Interconnect technology
|
|
3. Increase of computational power is faster than memory capacity :
|
|
reduction of memory per CPU core
|
|
4. Latencies can't be reduced much more : moving data becomes very expensive
|
|
5. File systems are *extremely* slow
|
|
6. Supercomputers are well tuned for benchmarks (dense linear algebra), but not
|
|
that much for general scientific applications
|
|
7. Fault-tolerance
|
|
|
|
** Practical solution
|
|
|
|
*** GPUs
|
|
Use less intelligent CPUs, but in a much larger number
|
|
$\Longrightarrow$ Better flops/watt rate
|
|
|
|
*** But...
|
|
- Transfer from main memory to GPU as expensive as a network communication
|
|
- More computational power and less RAM than a CPU node
|
|
- Not adapted to all algorithms
|
|
- Requires re-thinking and rewriting of codes: loss of years of debugging
|
|
- Software stack is not as mature as the CPU stack $\Longrightarrow$
|
|
needs hacking to get performance
|
|
|
|
* Fundamentals of parallelization :noexport:
|
|
|
|
** Introduction
|
|
|
|
*** Definitions
|
|
- Concurrency :: Running multiple computations at the same time.
|
|
- Parallelism :: Running multiple computations *on different execution units*.
|
|
|
|
*** Multiple levels
|
|
|
|
| *Distributed* | Multiple machines |
|
|
| *Shared memory* | Single machine, multiple CPU cores |
|
|
| *Hybrid* | With accelerators (GPUs, ...) |
|
|
| *Instruction-level* | Superscalar processors |
|
|
| *Bit-level* | Vectorization |
|
|
|
|
All levels of parallelism can be exploited in the same code,
|
|
but every problem is not parallelizable at all levels.
|
|
|
|
** Amdahl's law
|
|
|
|
*** Definition
|
|
If P is the proportion of a program that can be made parallel, the maximum speedup
|
|
that can be achieved is:
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.1\textwidth}
|
|
\[
|
|
S_{\max} = \frac{1}{(1-P)+P/n}
|
|
\]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.8\textwidth}
|
|
- $S$ :: : Speedup
|
|
- $P$ :: : Proportion of a program that can be made parallel
|
|
- $n$ :: : Number of cores
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
*** Example
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
#+ATTR_LATEX: :width 0.8 \textwidth
|
|
[[./Amdahl2.pdf]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
- Max speedup : 100$\times$
|
|
- Perfect scaling needs *all* the program to be parallelized (difficult)
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Outline
|
|
1. Human parallel machine
|
|
2. Compute a PES with GNU parallel (Bash)
|
|
3. Compute $\pi$ with a Monte Carlo algorithm (Python, sockets)
|
|
4. Compute $\pi$ with a deterministic algorithm (Python, MPI)
|
|
5. Matrix multiplication (Fortran, OpenMP)
|
|
|
|
** Experiment
|
|
*** The Human Parallel Machine
|
|
- Sort a deck of cards
|
|
- Do it as fast as you can
|
|
- I did it alone in 5'25'' (325 seconds)
|
|
- Each one of you is a /human compute node/
|
|
- How fast can all of you sort the same deck?
|
|
** Disappointing result
|
|
|
|
You were many more people than me, but you didn't go many times faster !
|
|
|
|
$\Longrightarrow$ Try the Merge sort
|
|
|
|
** Merge sort
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[file:merge.png]]
|
|
** Merge sort
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[file:merge_parallel.png]]
|
|
** Better, but still not perfect
|
|
|
|
- Moving the cards around the room takes time (communication)
|
|
- Sorting the sub-piles is super-fast (computation)
|
|
|
|
Algorithm is *bounded by communication* : Difficult to scale
|
|
|
|
#+LATEX: \begin{exampleblock}{If the assignment was}
|
|
- Consider the function $f(a,b,x) = a\,x^5 - \frac{b\,x^3}{a}$
|
|
- Each card has a value for $a$, $b$ and $x$
|
|
- Evaluate $f(a,b,x)$ for each card and write the result on the card
|
|
- Sort the results
|
|
#+LATEX: \end{exampleblock}
|
|
|
|
Same communication pattern, but more computational effort $\Longrightarrow$ better scaling.
|
|
|
|
#+LATEX: \begin{alertblock}{Take-home messages}
|
|
- Parallel workloads may require different algorithms than scalar workloads
|
|
- Difficulty is /data movement/ (communication), not /computation/
|
|
#+LATEX: \end{alertblock}
|
|
|
|
* Embarrassingly parallel computations :noexport:
|
|
|
|
** Example: Potential energy surface
|
|
We want to create the CCSD(T) potential energy surface of the water molecule.
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.3\textwidth}
|
|
[[./h2o.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
[[./pes.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Example: Potential energy surface
|
|
*** Constraints
|
|
|
|
- We want to compute $25 \times 25 \times 25 = 15~625$ points
|
|
- We are allowed to use 100 CPU cores simultaneously
|
|
- We like to use GAU$$IAN to calculate the CCSD(T) energy
|
|
|
|
*** But:
|
|
|
|
- The grid points are completely independent
|
|
- Any CPU core can calculate any point
|
|
|
|
** Example: Potential energy surface
|
|
*** Optimal solution: /work stealing/
|
|
|
|
- One grid point is $E(r_1,r_2,\theta)$
|
|
- Dress the list of all the arguments $(r_1,r_2,\theta)$: ~[ (0.8,0.8,70.), ..., (1.1,1.1,140.) ]~ (the *queue*)
|
|
- Each CPU core, when idle, pops out the head of the queue
|
|
and computes $E(r_1,r_2,\theta)$
|
|
- All the results are stored in a single file
|
|
- The results are sorted for plotting
|
|
|
|
** GNU parallel
|
|
|
|
GNU Parallel executes Linux commands in parallel and can guarantee that
|
|
the output is the same as if the commands were executed sequentially.
|
|
|
|
#+begin_src bash
|
|
$ parallel echo ::: A B C
|
|
A
|
|
B
|
|
C
|
|
#+end_src
|
|
|
|
is equivalent to:
|
|
|
|
#+begin_src bash
|
|
$ echo A ; echo B ; echo C
|
|
#+end_src
|
|
|
|
Multiple input sources can be given:
|
|
|
|
#+begin_src bash
|
|
$ parallel echo ::: A B ::: C D
|
|
A C
|
|
A D
|
|
B C
|
|
B D
|
|
#+end_src
|
|
|
|
** GNU parallel
|
|
|
|
If no command is given after parallel the arguments are treated as commands:
|
|
|
|
#+begin_src bash
|
|
$ parallel ::: pwd hostname "echo $TMPDIR"
|
|
/home/scemama
|
|
lpqdh82.ups-tlse.fr
|
|
/tmp
|
|
#+end_src
|
|
|
|
Jobs can be run on remote servers:
|
|
|
|
#+begin_src bash
|
|
$ parallel ::: "echo hello" hostname
|
|
lpqdh82.ups-tlse.fr
|
|
hello
|
|
$ parallel -S lpqlx139.ups-tlse.fr ::: "echo hello" hostname
|
|
hello
|
|
lpqlx139.ups-tlse.fr
|
|
#+end_src
|
|
|
|
** GNU parallel
|
|
|
|
File can be transfered to the remote hosts:
|
|
|
|
#+begin_src bash
|
|
$ echo Hello > input
|
|
$ parallel -S lpqlx139.ups-tlse.fr cat ::: input
|
|
cat: input: No such file or directory
|
|
|
|
$ echo Hello > input
|
|
$ parallel -S lpqlx139.ups-tlse.fr --transfer --cleanup cat ::: input
|
|
Hello
|
|
#+end_src
|
|
|
|
** GNU Parallel: example
|
|
|
|
Convert thousands of images from =.gif= to =.jpg=
|
|
|
|
#+begin_src bash
|
|
$ ls
|
|
img1000.gif img241.gif img394.gif img546.gif img699.gif img850.gif
|
|
img1001.gif img242.gif img395.gif img547.gif img69.gif img851.gif
|
|
[...]
|
|
img23.gif img392.gif img544.gif img697.gif img849.gif
|
|
img240.gif img393.gif img545.gif img698.gif img84.gif
|
|
#+end_src
|
|
|
|
To convert one =.gif= into =.jpg= format:
|
|
|
|
#+begin_src bash
|
|
$ time convert img1.gif img1.jpg
|
|
real 0m0.008s
|
|
user 0m0.000s
|
|
sys 0m0.000s
|
|
#+end_src
|
|
|
|
** GNU Parallel: example
|
|
|
|
*** Sequential execution
|
|
|
|
#+begin_src bash
|
|
$ time for i in {1..1011}
|
|
> do
|
|
> convert img${i}.gif img${i}.jpg
|
|
> done
|
|
real 0m7.936s
|
|
user 0m0.210s
|
|
sys 0m0.270s
|
|
#+end_src
|
|
|
|
*** Parallel execution on a quad-core:
|
|
|
|
#+begin_src bash
|
|
$ time parallel convert {.}.gif {.}.jpg ::: *.gif
|
|
real 0m2.051s
|
|
user 0m1.000s
|
|
sys 0m0.540s
|
|
#+end_src
|
|
|
|
** Computing the CCSD(T) surface *1. Fetch the energy in an output file*
|
|
|
|
Running a CCSD(T) calculation with GAU$$IAN gives the energy
|
|
somewhere in the output::
|
|
#+begin_example
|
|
CCSD(T)= -0.76329294074D+02
|
|
#+end_example
|
|
|
|
To get only the energy in the output, we can use the following command:
|
|
|
|
#+begin_src bash
|
|
$ g09 < input | grep "^ CCSD(T)=" | cut -d "=" -f 2
|
|
-0.76329294074D+02
|
|
#+end_src
|
|
|
|
** Computing the CCSD(T) surface *2. Script =run_h2o.sh= that takes $r_1$, $r_2$ and $\theta$ as arguments*
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+begin_src bash
|
|
#!/bin/bash
|
|
|
|
r1=$1 ; r2=$2 ; angle=$3
|
|
|
|
# Create Gaussian input file, pipe it to Gaussian,
|
|
# and get the CCSD(T) energy
|
|
cat << EOF | g09 | grep "^ CCSD(T)=" | cut -d "=" -f 2
|
|
# CCSD(T)/cc-pVTZ
|
|
|
|
Water molecule r1=${r1} r2=${r2} angle=${angle}
|
|
|
|
0 1
|
|
h
|
|
o 1 ${r1}
|
|
h 2 ${r2} 1 ${angle}
|
|
|
|
|
|
EOF
|
|
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
Example:
|
|
|
|
#+begin_src text
|
|
$ ./run_h2o.sh 1.08 1.08 104.
|
|
-0.76310788178D+02
|
|
|
|
$ ./run_h2o.sh 0.98 1.0 100.
|
|
-0.76330291742D+02
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Computing the CCSD(T) surface *3. Files containing the parameters*
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+begin_src text
|
|
$ cat r1_file
|
|
0.75
|
|
0.80
|
|
0.85
|
|
0.90
|
|
0.95
|
|
1.00
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+begin_src text
|
|
$ cat angle_file
|
|
100.
|
|
101.
|
|
102.
|
|
103.
|
|
104.
|
|
105.
|
|
106.
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+begin_src text
|
|
$ cat nodefile
|
|
2//usr/bin/ssh compute-0-10.local
|
|
2//usr/bin/ssh compute-0-6.local
|
|
16//usr/bin/ssh compute-0-12.local
|
|
16//usr/bin/ssh compute-0-5.local
|
|
16//usr/bin/ssh compute-0-7.local
|
|
6//usr/bin/ssh compute-0-1.local
|
|
2//usr/bin/ssh compute-0-13.local
|
|
4//usr/bin/ssh compute-0-8.local
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.5\textwidth} =nodefile= contains the names of the machines and the number of CPUs.
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Computing the CCSD(T) surface *4. Run with GNU parallel*
|
|
|
|
On a single CPU:
|
|
#+begin_src text
|
|
$ time parallel -a r1_file -a r1_file -a angle_file \
|
|
--keep-order --tag -j 1 $PWD/run_h2o.sh
|
|
0.75 0.75 100. -0.76185942070D+02
|
|
0.75 0.75 101. -0.76186697072D+02
|
|
0.75 0.75 102. -0.76187387594D+02
|
|
[...]
|
|
0.80 1.00 106. -0.76294078963D+02
|
|
0.85 0.75 100. -0.76243282762D+02
|
|
0.85 0.75 101. -0.76243869316D+02
|
|
[...]
|
|
1.00 1.00 105. -0.76329165017D+02
|
|
1.00 1.00 106. -0.76328988177D+02
|
|
|
|
real 15m5.293s
|
|
user 11m25.679s
|
|
sys 2m20.194s
|
|
#+end_src
|
|
|
|
** Computing the CCSD(T) surface *4. RUn with GNU parallel*
|
|
|
|
On 64 CPUs: $39\times$ faster!
|
|
|
|
#+begin_src text
|
|
$ time parallel -a r1_file -a r1_file -a angle_file \
|
|
--keep-order --tag --sshloginfile nodefile $PWD/run_h2o.sh
|
|
0.75 0.75 100. -0.76185942070D+02
|
|
0.75 0.75 101. -0.76186697072D+02
|
|
0.75 0.75 102. -0.76187387594D+02
|
|
[...]
|
|
0.80 1.00 106. -0.76294078963D+02
|
|
0.85 0.75 100. -0.76243282762D+02
|
|
0.85 0.75 101. -0.76243869316D+02
|
|
[...]
|
|
1.00 1.00 105. -0.76329165017D+02
|
|
1.00 1.00 106. -0.76328988177D+02
|
|
|
|
real 0m23.848s
|
|
user 0m3.359s
|
|
sys 0m3.172s
|
|
#+end_src
|
|
|
|
* Inter-process communication :noexport:
|
|
|
|
** Processes /vs/ threads
|
|
|
|
*** Process
|
|
- Has its own memory address space
|
|
- Context switching between processes is slow
|
|
- Processes interact only through system-provided communication mechanisms
|
|
- Fork: creates a copy of the current process
|
|
- Exec: switches to running another binary executable
|
|
- Spawn: =Fork=, =exec= the child and =wait= for its termination
|
|
*** Thread
|
|
- Exist as subsets of a process
|
|
- Context switching between threads is fast
|
|
- Share the same memory address space : interact via shared memory
|
|
|
|
** Inter-process communication
|
|
|
|
- Processes exchange data via read/write on /File descriptors/
|
|
- These file descriptors can point to:
|
|
- Files on disk: Simplest choice
|
|
- Named Pipes: Same program as with files
|
|
- Pipes: Same behavior as files
|
|
- Network sockets
|
|
|
|
** Named pipes
|
|
A named pipe is a *virtual* file which is read by a process and
|
|
written by other processes. It allows processes to communicate using
|
|
standard I/O operations:
|
|
#+ATTR_LATEX: :height 0.7\textheight
|
|
[[./pipe.png]]
|
|
|
|
** Example
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+ATTR_LATEX: :height 0.8\textheight
|
|
[[./pipe_example.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
- Unzip =input.gz=
|
|
- Sort the unzipped file
|
|
- Zip the result into =output.gz=
|
|
|
|
Same as
|
|
#+begin_src bash
|
|
$ gunzip --to-stdout input.gz \
|
|
| sort \
|
|
| gzip > output.gz
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Example: Program 1
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+begin_src bash
|
|
#!/bin/bash
|
|
|
|
# Create two pipes using the mkfifo command
|
|
mkfifo /tmp/pipe /tmp/pipe2
|
|
|
|
# Unzip the input file and write the result
|
|
# in the 1st pipe
|
|
echo "Run gunzip"
|
|
gunzip --to-stdout input.gz > /tmp/pipe
|
|
|
|
# Zip what comes from the second pipe
|
|
echo "Run gzip"
|
|
gzip < /tmp/pipe2 > output.gz
|
|
|
|
# Clear the pipes in the filesystem
|
|
rm /tmp/pipe /tmp/pipe2
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+begin_src bash
|
|
#!/bin/bash
|
|
|
|
# Read the 1st pipe, sort the result and write
|
|
# in the 2nd pipe
|
|
echo "Run sort"
|
|
sort < /tmp/pipe > /tmp/pipe2
|
|
#+end_src
|
|
|
|
#+LATEX: \begin{exampleblock}{Execution}
|
|
#+begin_src text
|
|
$ ./p1.sh &
|
|
Run gunzip
|
|
$ ./p2.sh
|
|
Run sort
|
|
Run gzip
|
|
[1]+ Done
|
|
./p1.sh
|
|
#+end_src
|
|
#+LATEX: \end{exampleblock}
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Pipe
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth} *Fork* : Copy the current process
|
|
|
|
Pipes don't have an entry on the file system, and are opened/closed in the programs.
|
|
1. Create the pipe
|
|
2. Fork in parent/child processes
|
|
3. Exchange data between parent and child
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
[[./fork.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Pipe
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+begin_src python :tangle Codes/fork.py
|
|
#!/usr/bin/env python
|
|
import sys,os
|
|
|
|
def main():
|
|
print("Process ID: %d" % (os.getpid()))
|
|
r, w = os.pipe()
|
|
new_pid = os.fork()
|
|
|
|
if new_pid != 0: # This is the parent process
|
|
print("I am the parent, my PID is %d"%(os.getpid()))
|
|
print("and the PID of my child is %d"%(new_pid))
|
|
|
|
# Close write and open read file descriptors
|
|
os.close(w)
|
|
r = os.fdopen(r,'r')
|
|
|
|
# Read data from the child
|
|
print("Reading from the child")
|
|
s = r.read()
|
|
print("Read '%s' from the child"%(s))
|
|
r.close() ; os.wait()
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
[[./fork.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Pipe
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+begin_src python :tangle Codes/fork.py
|
|
else: # This is the child process
|
|
print(" I am the child, my PID is %d"%(os.getpid()))
|
|
|
|
# Close read and open write file descriptors
|
|
os.close(r)
|
|
w = os.fdopen(w,'w')
|
|
|
|
# Send 'Hello' to the parent
|
|
print(" Sending 'Hello' to the parent")
|
|
w.write( "Hello!" )
|
|
w.close()
|
|
|
|
print(" Sent 'Hello'")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
[[./fork.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Computation of $\pi$
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
#+ATTR_LATEX: width=0.9\textwidth
|
|
[[./pi.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
- The surface of the circle is $\pi r^2$ $\Longrightarrow$ For a unit circle, the
|
|
surface is $\pi$
|
|
- The function in the red square is $y = \sqrt{1-x^2}$ (the circle is $\sqrt{x^2 + y^2} = 1$)
|
|
- The surface in grey corresponds to \[ \int_0^1 \sqrt{1-x^2} dx = \frac{\pi}{4} \]
|
|
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Monte Carlo computation of $\pi$
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
#+ATTR_LATEX: width=0.9\textwidth
|
|
[[./pi_mc.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
- Points $(x,y)$ are drawn randomly in the unit square
|
|
- Count how many times the points are inside the circle
|
|
|
|
\[
|
|
\frac{N_{\text{in}}}{N_{\text{in}}+N_{\text{out}}} = \frac{\pi}{4}
|
|
\]
|
|
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Optimal algorithm
|
|
- Each core $1 \le i \le M$ computes its own average $X_i$
|
|
- All $M$ results are independent $\Longrightarrow$
|
|
Gaussian-distributed random variables (central-limit theorem)
|
|
|
|
#+ATTR_LATEX: :height 0.6\textheight
|
|
[[./pi_convergence.png]]
|
|
|
|
** Computation of \pi with pipes in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_python.py
|
|
#!/usr/bin/env python
|
|
import os, sys
|
|
from random import random, seed
|
|
from math import sqrt
|
|
|
|
NMAX = 10000000 # Nb of MC steps/process
|
|
error_threshold = 1.0e-4 # Stopping criterion
|
|
NPROC=4 # Use 4 processes
|
|
|
|
def compute_pi():
|
|
"""Local Monte Carlo calculation of pi"""
|
|
seed(None) # Initialize random number generator
|
|
|
|
result = 0.
|
|
for i in range(NMAX): # Loop NMAX times
|
|
x,y = random(), random() # Draw 2 random numbers x and y
|
|
if x*x + y*y <= 1.: # Check if (x,y) is in the circle
|
|
result += 1
|
|
return 4.* float(result)/float(NMAX) # Estimate of pi
|
|
#+end_src
|
|
|
|
** Computation of \pi with pipes in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_python.py
|
|
def main():
|
|
r = [None]*NPROC # Reading edges of the pipes
|
|
pid = [None]*NPROC # Running processes
|
|
|
|
for i in range(NPROC):
|
|
r[i], w = os.pipe() # Create the pipe
|
|
pid[i] = os.fork() # Fork and save the PIDs
|
|
|
|
if pid[i] != 0: # This is the parent process
|
|
os.close(w)
|
|
r[i] = os.fdopen(r[i],'r')
|
|
else: # This is the child process
|
|
os.close(r[i])
|
|
w = os.fdopen(w,'w')
|
|
while True: # Compute pi on this process
|
|
X = compute_pi()
|
|
try:
|
|
w.write("%f\n"%(X)) # Write the result in the pipe
|
|
w.flush()
|
|
except IOError: # Child process exits here
|
|
sys.exit(0)
|
|
#+end_src
|
|
|
|
** Computation of \pi with pipes in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_python.py
|
|
data = []
|
|
while True:
|
|
for i in range(NPROC): # Read in the pipe of each process
|
|
data.append( float(r[i].readline()) )
|
|
N = len(data)
|
|
average = sum(data)/N # Compute average
|
|
if N > 2: # Compute variance
|
|
l = [ (x-average)*(x-average) for x in data ]
|
|
variance = sum(l)/(N-1.)
|
|
else:
|
|
variance = 0.
|
|
error = sqrt(variance)/sqrt(N) # Compute error
|
|
print("%f +/- %f %d"%(average, error, N))
|
|
|
|
if N > 2 and error < error_threshold: # Stopping condition
|
|
for i in range(NPROC): # Kill all children
|
|
try: os.kill(pid[i],9)
|
|
except: pass
|
|
sys.exit(0)
|
|
|
|
if __name__ == '__main__': main()
|
|
#+end_src
|
|
|
|
** Computation of \pi with pipes in Python
|
|
|
|
#+begin_src text
|
|
$ python pi_python.py
|
|
3.141974 +/- 0.000000 1
|
|
3.142569 +/- 0.000000 2
|
|
3.142168 +/- 0.000528 3
|
|
3.141938 +/- 0.000439 4
|
|
3.141947 +/- 0.000340 5
|
|
[...]
|
|
3.141625 +/- 0.000107 33
|
|
3.141614 +/- 0.000104 34
|
|
3.141617 +/- 0.000101 35
|
|
3.141606 +/- 0.000099 36
|
|
#+end_src
|
|
|
|
** Sockets
|
|
|
|
Sockets are analogous to pipes, but they allow both ends of the pipe to be
|
|
on different machines connected by a network interface.
|
|
An Internet socket is characterized by a unique combination of:
|
|
|
|
- A transport protocol: TCP, UDP, raw IP, ...
|
|
- A local socket address: Local IP address and port number, for example 192.168.2.2:22
|
|
- A remote socket address: Optional (TCP)
|
|
|
|
** Sockets
|
|
|
|
#+ATTR_LATEX: :height 0.9\textheight
|
|
[[./socket.png]]
|
|
|
|
** Sockets: server code in Python
|
|
|
|
#+begin_src python :tangle Codes/socket_server.py
|
|
#!/usr/bin/env python
|
|
|
|
import sys, os, socket, datetime
|
|
now = datetime.datetime.now
|
|
|
|
def main():
|
|
HOSTNAME = socket.gethostname()
|
|
PORT = 11279
|
|
print(now(), "I am the server : %s:%d"%(HOSTNAME,PORT))
|
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
|
s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port
|
|
s.listen(5) # Wait for incoming connections
|
|
conn, addr = s.accept() # Accept connection
|
|
print(now(), "Connected by", addr)
|
|
|
|
#+end_src
|
|
|
|
** Sockets: server code in Python
|
|
|
|
#+begin_src python :tangle Codes/socket_server.py
|
|
data = ""
|
|
while True: # Buffered read of the socket
|
|
message = conn.recv(8).decode('utf-8')
|
|
print(now(), "Buffer : "+message)
|
|
data += message
|
|
if len(message) < 8: break
|
|
print(now(), "Received data : ", data)
|
|
|
|
print(now(), "Sending thank you...")
|
|
conn.send("Thank you".encode())
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
#+end_src
|
|
|
|
** Sockets: client code in Python
|
|
|
|
#+begin_src python :tangle Codes/socket_client.py
|
|
#!/usr/bin/env python
|
|
import sys, os, socket, datetime
|
|
now = datetime.datetime.now
|
|
|
|
def main():
|
|
HOSTNAME = sys.argv[1] # Get host name from command line
|
|
PORT = int(sys.argv[2]) # Get port from command line
|
|
print(now(), "The target server is : %s:%d"%(HOSTNAME,PORT))
|
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
|
s.connect( (HOSTNAME, PORT) ) # Connect the socket to the address and port
|
|
# of the server
|
|
message = "Hello, world!!!!!!!"
|
|
print(now(), "Sending : "+message)
|
|
s.send(message.encode()) # Send the data
|
|
|
|
data = s.recv(1024) # Read the reply of the server
|
|
s.close()
|
|
print(now(), 'Received: ', data.decode('utf-8'))
|
|
|
|
if __name__ == "__main__": main()
|
|
#+end_src
|
|
|
|
** Sockets: Execution
|
|
|
|
#+begin_src text
|
|
$ python Codes/socket_client.py lpqdh82 11279
|
|
2021-11-20 21:20:32.258632 The target server is : lpqdh82:11279
|
|
2021-11-20 21:20:32.258959 Sending : Hello, world!!!!!!!
|
|
2021-11-20 21:20:32.259042 Connected by ('127.0.0.1', 36798)
|
|
2021-11-20 21:20:32.259068 Buffer : Hello, w
|
|
2021-11-20 21:20:32.259076 Buffer : orld!!!!
|
|
2021-11-20 21:20:32.259083 Buffer : !!!
|
|
2021-11-20 21:20:32.259088 Received data : Hello, world!!!!!!!
|
|
2021-11-20 21:20:32.259093 Sending thank you...
|
|
2021-11-20 21:20:32.259133 Received: Thank you
|
|
#+end_src
|
|
|
|
Note that the client and server can be written in different languages.
|
|
|
|
** Server for \pi with sockets in Python
|
|
|
|
#+NAME:py_server
|
|
#+begin_src python :tangle Codes/pi_server_python.py
|
|
#!/usr/bin/env python
|
|
import sys, os, socket
|
|
from math import sqrt
|
|
PORT = 1666
|
|
error_threshold = 1.e-4 # Stopping criterion
|
|
|
|
def main():
|
|
data = []
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
|
s.bind( (socket.gethostname(), PORT) ) # Bind the socket to the address and port
|
|
while True:
|
|
s.listen(5) # Wait for incoming connections
|
|
conn, addr = s.accept() # Accept connection
|
|
X = ""
|
|
while True: # Buffered read of the socket
|
|
message = conn.recv(128)
|
|
X += message.decode('utf-8')
|
|
if len(message) < 128: break
|
|
data.append( float(X) )
|
|
N = len(data)
|
|
#+end_src
|
|
|
|
** Server for \pi with sockets in Python
|
|
|
|
#+NAME:py_server2
|
|
#+begin_src python :tangle Codes/pi_server_python.py
|
|
average = sum(data)/N # Compute average
|
|
if N > 2: # Compute variance
|
|
l = [ (x-average)*(x-average) for x in data ]
|
|
variance = sum(l)/(N-1.)
|
|
else:
|
|
variance = 0.
|
|
error = sqrt(variance)/sqrt(N) # Compute error
|
|
|
|
print('%f +/- %f'%(average,error))
|
|
|
|
# Stopping condition
|
|
if N > 2 and error < error_threshold:
|
|
conn.send("STOP".encode())
|
|
break
|
|
else:
|
|
conn.send("OK".encode())
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
#+end_src
|
|
|
|
** Client for \pi with sockets in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_client_python.py
|
|
#!/usr/bin/env python
|
|
import os, sys, socket
|
|
from random import random, seed
|
|
from math import sqrt
|
|
HOSTNAME = "localhost"
|
|
PORT = 1666
|
|
NMAX = 10000000 # Nb of MC steps/process
|
|
error_threshold = 1.0e-4 # Stopping criterion
|
|
NPROC=4 # Use 4 processes
|
|
|
|
def compute_pi():
|
|
"""Local Monte Carlo calculation of pi"""
|
|
seed(None) # Initialize random number generator
|
|
|
|
result = 0.
|
|
for i in range(NMAX): # Loop NMAX times
|
|
x,y = random(), random() # Draw 2 random numbers x and y
|
|
if x*x + y*y <= 1.: # Check if (x,y) is in the circle
|
|
result += 1
|
|
return 4.* float(result)/float(NMAX) # Estimate of pi
|
|
#+end_src
|
|
|
|
** Client for \pi with sockets in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_client_python.py
|
|
def main():
|
|
|
|
while True:
|
|
X = compute_pi()
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
|
try: # Connect the socket to the address and port of the server
|
|
s.connect( (HOSTAME, PORT) )
|
|
except socket.error:
|
|
break
|
|
message = str(X)
|
|
s.send(message.encode()) # Send the data
|
|
reply = s.recv(128).decode('utf-8') # Read the reply of the server
|
|
s.close()
|
|
|
|
if reply == "STOP": break
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
#+end_src
|
|
|
|
** Execution of \pi with sockets in Python
|
|
|
|
#+begin_src text
|
|
$ python pi_server_python.py &
|
|
> for i in {1..8} ; do
|
|
> python pi_client_python.py &
|
|
> done ; wait
|
|
|
|
3.142136 +/- 0.000000
|
|
3.141783 +/- 0.000000
|
|
3.141992 +/- 0.000291
|
|
3.141804 +/- 0.000279
|
|
[...]
|
|
3.141687 +/- 0.000104
|
|
3.141666 +/- 0.000102
|
|
3.141651 +/- 0.000098
|
|
|
|
#+end_src
|
|
|
|
* Message Passing Interface (MPI) :noexport:
|
|
|
|
** Message Passing Interface
|
|
|
|
- Application Programming Interface for inter-process communication
|
|
- Takes advantage of HPC hardware:
|
|
- TCP/IP: 50 $\mu \text{s}$ latency
|
|
- Remote Direct Memory Access (RDMA): <2 $\mu \text{s}$
|
|
(low-latency network)
|
|
- Portable
|
|
- Each vendor has its own implementation adapted to the hardware
|
|
- Standard in HPC
|
|
- Initially designed for fixed number of processes:
|
|
- No problem for the discovery of peers
|
|
- Fast collective communications
|
|
- Single Program Multiple Data (SPMD) paradigm
|
|
|
|
** Communicators
|
|
|
|
- Group of processes that can communicate together
|
|
- Each process has an ID in the communicator: no need for IP
|
|
adresses and port numbers
|
|
- ~MPI_COMM_WORLD~: Global communicator, default
|
|
- ~size~: number of processes in the communicator
|
|
- ~rank~: ID of the process in the communicator
|
|
|
|
** Point-to-point communication
|
|
|
|
*** Python
|
|
- Send: ~comm.send(data, dest, tag)~
|
|
- Receive: ~comm.recv(source, tag)~
|
|
*** Fortran
|
|
- Send: ~MPI_SEND(buffer, count, datatype, destination, tag, communicator, ierror)~
|
|
- Receive:
|
|
~MPI_RECV(buffer, count, datatype, source, tag, communicator,
|
|
status, ierror)~
|
|
|
|
** Synchronous or asynchronous?
|
|
|
|
- ~MPI_Ssend~ :: Blocking synchronous send. Returns upon notification
|
|
that the message has been received (safe).
|
|
- ~MPI_Isend~ :: Non-blocking send. Returns immediately (dangerous).
|
|
- ~MPI_Send~ :: Returns when the send buffer is ready for
|
|
use. Non-deterministic: depends on MPI implementation, size of
|
|
the data, number of ranks, ... (dangerous)
|
|
|
|
#+LATEX: \begin{alertblock}{Important}
|
|
- Writing the program and Debugging:
|
|
- /Always/ use ~MPI_Ssend~
|
|
- Use ~MPI_Isend~ only when ~MPI_Ssend~ is irrelevant
|
|
- Once the code is well checked
|
|
- You can replace ~MPI_Ssend~ by ~MPI_Send~ as an optimization.
|
|
- If ~MPI_Send~ is still too slow, consider rewriting for ~MPI_Isend~
|
|
#+LATEX: \end{alertblock}
|
|
|
|
** Point-to-point communication (Python)
|
|
|
|
#+begin_src python :tangle Codes/mpi_rank.py
|
|
from mpi4py import MPI
|
|
|
|
def main():
|
|
comm = MPI.COMM_WORLD
|
|
rank = comm.Get_rank()
|
|
size = comm.Get_size()
|
|
|
|
if rank == 0:
|
|
data = 42
|
|
print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data))
|
|
comm.ssend(data, dest=1, tag=11)
|
|
print("After : Rank: %d Size: %d Data: %d"%(rank, size, data))
|
|
elif rank == 1:
|
|
data = 0
|
|
print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data))
|
|
data = comm.recv(source=0, tag=11)
|
|
print("After : Rank: %d Size: %d Data: %d"%(rank, size, data))
|
|
|
|
if __name__ == "__main__": main()
|
|
#+end_src
|
|
|
|
** Point-to-point communication (Python)
|
|
|
|
#+begin_src text
|
|
$ mpiexec -n 4 python mpi_rank.py
|
|
Before: Rank: 0 Size: 4 Data: 42
|
|
Before: Rank: 1 Size: 4 Data: 0
|
|
After : Rank: 0 Size: 4 Data: 42
|
|
After : Rank: 1 Size: 4 Data: 42
|
|
#+end_src
|
|
|
|
In Fortran, compile using =mpif90= and execute using =mpiexec= (or =mpirun=).
|
|
|
|
** Point-to-point communication (Fortran)
|
|
|
|
#+begin_src fortran :tangle Codes/mpi_rank.f90
|
|
program test_rank
|
|
use mpi
|
|
implicit none
|
|
integer :: rank, size, data, ierr, status(mpi_status_size)
|
|
|
|
call MPI_INIT(ierr) ! Initialize library (required)
|
|
if (ierr /= MPI_SUCCESS) then
|
|
call MPI_ABORT(MPI_COMM_WORLD, 1, ierr)
|
|
end if
|
|
|
|
call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr)
|
|
if (ierr /= MPI_SUCCESS) then
|
|
call MPI_ABORT(MPI_COMM_WORLD, 2, ierr)
|
|
end if
|
|
|
|
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
|
|
if (ierr /= MPI_SUCCESS) then
|
|
call MPI_ABORT(MPI_COMM_WORLD, 3, ierr)
|
|
end if
|
|
#+end_src
|
|
|
|
** Point-to-point communication (Fortran)
|
|
|
|
#+begin_src fortran :tangle Codes/mpi_rank.f90
|
|
if (rank == 0) then
|
|
data = 42
|
|
print *, "Before: Rank:", rank, "Size:", size, "Data: ", data
|
|
call MPI_SSEND(data, 1, MPI_INTEGER, 1, 11, MPI_COMM_WORLD, ierr)
|
|
print *, "After : Rank:", rank, "Size:", size, "Data: ", data
|
|
|
|
else if (rank == 1) then
|
|
data = 0
|
|
print *, "Before: Rank:", rank, "Size:", size, "Data: ", data
|
|
call MPI_RECV(data, 1, MPI_INTEGER, 0, 11, MPI_COMM_WORLD, &
|
|
status, ierr)
|
|
print *, "After : Rank:", rank, "Size:", size, "Data: ", data
|
|
|
|
end if
|
|
call MPI_FINALIZE(ierr) ! De-initialize library (required)
|
|
end program
|
|
#+end_src
|
|
|
|
** Deadlocks
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.3\textwidth}
|
|
#+ATTR_LATEX: :height 0.8 \textheight
|
|
[[./deadlock.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.7\textwidth}
|
|
|
|
#+LATEX: \begin{exampleblock}{Deadlock}
|
|
Each process waits for a message coming from another process
|
|
#+LATEX: \end{exampleblock}
|
|
|
|
Example: round-robin
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Deadlock example
|
|
|
|
#+begin_src fortran :tangle Codes/mpi_deadlock.f90
|
|
program deadlock
|
|
use mpi
|
|
implicit none
|
|
integer :: rank, size, value, source, destination, ierr
|
|
integer, parameter :: tag=100
|
|
|
|
call MPI_INIT(ierr)
|
|
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
|
|
call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr)
|
|
|
|
source = mod(size+rank-1, size)
|
|
destination = mod(rank+1 , size)
|
|
|
|
call MPI_SSEND(rank+10, 1, MPI_INTEGER, destination, tag, MPI_COMM_WORLD, ierr)
|
|
call MPI_RECV(value, 1, MPI_INTEGER, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE, ierr)
|
|
|
|
print *, rank, 'received', value, 'from', source
|
|
call MPI_FINALIZE(ierr)
|
|
end program
|
|
#+end_src
|
|
|
|
** Send-receive
|
|
|
|
The ~MPI_Sendrecv~ function sends and receives a message /simultaneously/.
|
|
It can avoid deadlocks.
|
|
|
|
#+begin_example
|
|
MPI_SENDRECV(SENDBUF, SENDCOUNT, SENDTYPE, DEST, SENDTAG,
|
|
RECVBUF, RECVCOUNT, RECVTYPE, SOURCE, RECVTAG,
|
|
COMM, STATUS, IERROR)
|
|
|
|
<type> SENDBUF(*), RECVBUF(*)
|
|
INTEGER :: SENDCOUNT, SENDTYPE, DEST, SENDTAG
|
|
INTEGER :: RECVCOUNT, RECVTYPE, SOURCE, RECVTAG, COMM
|
|
INTEGER :: STATUS(MPI_STATUS_SIZE), IERROR
|
|
#+end_example
|
|
|
|
** Send-receive
|
|
|
|
#+begin_src fortran :tangle Codes/mpi_sendrecv.f90
|
|
program sendrecv
|
|
use mpi
|
|
implicit none
|
|
integer :: rank, size, value, source, destination, ierr
|
|
integer, parameter :: tag=100
|
|
|
|
call MPI_INIT(ierr)
|
|
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
|
|
call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr)
|
|
|
|
source = mod(size+rank-1, size)
|
|
destination = mod(rank+1 , size)
|
|
|
|
call MPI_SENDRECV(rank+10, 1, MPI_INTEGER, destination, tag, value, &
|
|
1, MPI_INTEGER, source, tag, MPI_COMM_WORLD, &
|
|
MPI_STATUS_IGNORE, ierr)
|
|
|
|
print *, rank, 'received', value, 'from', source
|
|
|
|
call MPI_FINALIZE(ierr)
|
|
end program
|
|
#+end_src
|
|
|
|
** Collective communications
|
|
|
|
*** One-to-all
|
|
- ~MPI_Bcast~ :: Broadcast the same data to all
|
|
- ~MPI_Scatter~ :: distribute an array
|
|
*** All-to-one
|
|
- ~MPI_Reduce~ :: Sum/product/... of data coming from all ranks
|
|
- ~Gather~ :: collect a distributed array
|
|
*** All-to-all
|
|
- ~MPI_Barrier~ :: Global synchronization
|
|
- ~MPI_AllReduce~ :: Reduce and broadcast the result
|
|
- ~MPI_AllGather~ :: Gather and broadcast the result
|
|
- ~MPI_Alltoall~ :: Gather and scatter the result
|
|
|
|
** Computation of \pi with MPI
|
|
|
|
\[
|
|
\pi = 4 \int_{0}^{1} \sqrt{1-x^2} dx
|
|
\sim 4 \sum_{i=1}^M \sqrt{1-x_i^2}\, \Delta x
|
|
\]
|
|
\[
|
|
\; \text{with} \;
|
|
0 \le x_i \le 1 \;\text{and}\; \Delta x = x_{i+1} - x_i
|
|
\]
|
|
|
|
- Define a grid of $M$ points
|
|
- Split the grid on $N$ processes
|
|
- Each process computes part of the sum
|
|
- The partial sums are reduced on the master process
|
|
|
|
** Computation of \pi with MPI (Python)
|
|
|
|
#+begin_src python :tangle Codes/mpi_pi.py
|
|
#!/usr/bin/env python
|
|
from mpi4py import MPI
|
|
import sys
|
|
from math import sqrt
|
|
|
|
def main():
|
|
comm = MPI.COMM_WORLD
|
|
rank = comm.Get_rank()
|
|
size = comm.Get_size()
|
|
|
|
M = int(sys.argv[1]) # Total Number of grid points
|
|
M_local = (M-1) // size + 1 # Number of grid points to compute locally
|
|
istart = rank * M_local # Beginning of interval
|
|
iend = min(istart + M_local, M) # End of interval
|
|
dx = 1./float(M) # Delta x
|
|
|
|
sum = 0.
|
|
for i in range(istart, iend):
|
|
x = (i+0.5)*dx
|
|
sum += sqrt(1.-x*x)
|
|
#+end_src
|
|
|
|
** Computation of \pi with MPI (Python)
|
|
|
|
#+begin_src python :tangle Codes/mpi_pi.py
|
|
result = 4. * dx * sum
|
|
pi = comm.reduce(result, MPI.SUM)
|
|
print("%10f -> %10f : %10f"%(istart*dx, iend*dx, result))
|
|
if rank == 0:
|
|
print("Result = ", pi)
|
|
|
|
if __name__ == "__main__": main()
|
|
#+end_src
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+begin_src text
|
|
$ mpiexec -n 4 python mpi_pi.py 100000000
|
|
0.000000 -> 0.250000 : 0.989483
|
|
Result = 3.1415926535902408
|
|
0.250000 -> 0.500000 : 0.923740
|
|
0.500000 -> 0.750000 : 0.775058
|
|
0.750000 -> 1.000000 : 0.453312
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.3\textwidth}
|
|
#+ATTR_LATEX: :width \textwidth
|
|
[[./pi_v1.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Alternate Computation of \pi with MPI (Python)
|
|
|
|
#+begin_src python :tangle Codes/mpi_pi_v2.py
|
|
#!/usr/bin/env python
|
|
from mpi4py import MPI
|
|
import sys
|
|
from math import sqrt
|
|
|
|
def main():
|
|
comm = MPI.COMM_WORLD
|
|
rank = comm.Get_rank()
|
|
size = comm.Get_size()
|
|
|
|
M = int(sys.argv[1]) # Total Number of grid points
|
|
dx = 1./float(M) # Delta x
|
|
|
|
sum = 0.
|
|
for i in range(rank, M, size):
|
|
x = (i+0.5)*dx
|
|
sum += sqrt(1.-x*x)
|
|
#+end_src
|
|
|
|
** Computation of \pi with MPI (Python)
|
|
|
|
#+begin_src python :tangle Codes/mpi_pi_v2.py
|
|
result = 4. * dx * sum
|
|
pi = comm.reduce(result, MPI.SUM)
|
|
print(rank, result)
|
|
if rank == 0:
|
|
print("Result = ", pi)
|
|
|
|
if __name__ == "__main__": main()
|
|
#+end_src
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.6\textwidth}
|
|
#+begin_src text
|
|
$ mpiexec -n 4 python mpi_pi_v2.py 100000000
|
|
0 0.7853981783959749
|
|
Result = 3.1415926535901777
|
|
2 0.7853981583983196
|
|
3 0.7853981483981102
|
|
1 0.7853981683977732
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.3\textwidth}
|
|
#+ATTR_LATEX: :width \textwidth
|
|
[[./pi_v2.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
* Multi-threading :noesport:
|
|
** Processes /vs/ threads
|
|
|
|
*** Process
|
|
- Has its own memory address space
|
|
- Context switching between processes is slow
|
|
- Processes interact only through system-provided communication mechanisms
|
|
- Fork: creates a copy of the current process
|
|
- Exec: switches to running another binary executable
|
|
- Spawn: =Fork=, then =exec= the child
|
|
*** Thread
|
|
- Exist as subsets of a process
|
|
- Context switching between threads is fast
|
|
- Share the same memory address space : interact via shared memory
|
|
|
|
** Threads
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+ATTR_LATEX: :height 0.5\textheight
|
|
[[./smp.png]]
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
- Concurrent programming
|
|
- Graphical user interfaces (progress bars, ...)
|
|
- Asynchronous I/O
|
|
- Standard library: POSIX threads (=pthreads=)
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
*** Communication time
|
|
- Low latency network latency : \sim 1.2 microsecond
|
|
- Random memory access : \sim 0.1 microsecond
|
|
|
|
** Threads example (Python)
|
|
|
|
#+begin_src python :tangle Codes/thread_python.py
|
|
#!/usr/bin/env python
|
|
import threading
|
|
import time
|
|
|
|
class test:
|
|
def __init__(self, Nthreads):
|
|
self.Nthreads = Nthreads
|
|
self.data = [ i for i in range(Nthreads) ]
|
|
|
|
def run_thread(self, j):
|
|
self.data[j] = 0
|
|
time.sleep(j)
|
|
self.data[j] = j
|
|
#+end_src
|
|
|
|
** Threads example (Python)
|
|
|
|
#+begin_src python :tangle Codes/thread_python.py
|
|
def run(self):
|
|
thread = [ None ] * self.Nthreads
|
|
t0 = time.time()
|
|
print(self.data)
|
|
for i in range(self.Nthreads):
|
|
thread[i] = threading.Thread( target=self.run_thread, args=(i,) )
|
|
thread[i].start()
|
|
for i in range(self.Nthreads):
|
|
thread[i].join()
|
|
print(time.time()-t0, "seconds. ", self.data)
|
|
|
|
if __name__ == '__main__':
|
|
t = test(4)
|
|
t.run()
|
|
|
|
#+end_src
|
|
-------------------
|
|
#+begin_src text
|
|
$ python thread_python.py
|
|
[0, 1, 2, 3]
|
|
0.0009775161743164062 seconds. [0, 0, 0, 0]
|
|
1.0018701553344727 seconds. [0, 1, 0, 0]
|
|
2.003377676010132 seconds. [0, 1, 2, 0]
|
|
3.004056930541992 seconds. [0, 1, 2, 3]
|
|
#+end_src
|
|
|
|
** Computation of \pi with threads in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_thread_python.py
|
|
#!/usr/bin/env python
|
|
import os, sys, threading
|
|
from random import random, seed
|
|
from math import sqrt
|
|
|
|
NMAX = 10000000 # Nb of MC steps/process
|
|
error_threshold = 1.0e-4 # Stopping criterion
|
|
|
|
class pi_calculator:
|
|
def __init__(self, Nthreads):
|
|
self.Nthreads= Nthreads
|
|
self.results = []
|
|
self.lock = threading.Lock()
|
|
|
|
def compute_pi(self):
|
|
result = 0.
|
|
for i in range(NMAX): # Loop NMAX times
|
|
x,y = random(), random() # Draw 2 random numbers x and y
|
|
if x*x + y*y <= 1.: # Check if (x,y) is in the circle
|
|
result += 1
|
|
with self.lock:
|
|
self.results.append(4.* float(result)/float(NMAX))
|
|
#+end_src
|
|
|
|
** Computation of \pi with threads in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_thread_python.py
|
|
def run(self):
|
|
thread = [None] * self.Nthreads
|
|
for i in range(self.Nthreads):
|
|
thread[i] = threading.Thread( target=self.compute_pi, args=() )
|
|
thread[i].start()
|
|
print("All threads started")
|
|
|
|
while True:
|
|
for i in range(self.Nthreads):
|
|
thread[i].join()
|
|
N = len(self.results)
|
|
average = sum(self.results)/N # Compute average
|
|
if N > 2: # Compute variance
|
|
l = [ (x-average)*(x-average) for x in self.results ]
|
|
variance = sum(l)/(N-1.)
|
|
else:
|
|
variance = 0.
|
|
error = sqrt(variance)/sqrt(N) # Compute error
|
|
print("%f +/- %f %d"%(average, error, N))
|
|
#+end_src
|
|
|
|
** Computation of \pi with threads in Python
|
|
|
|
#+begin_src python :tangle Codes/pi_thread_python.py
|
|
if N > 2 and error < error_threshold: # Stopping condition
|
|
return
|
|
|
|
for i in range(self.Nthreads):
|
|
thread[i] = threading.Thread( target=self.compute_pi, args=() )
|
|
thread[i].start()
|
|
|
|
if __name__ == '__main__':
|
|
calc = pi_calculator(4)
|
|
calc.run()
|
|
#+end_src
|
|
|
|
Note: Inefficient in Python because of the Global Interpreter Lock
|
|
(GIL), but you got the idea.
|
|
|
|
* OpenMP
|
|
|
|
** OpenMP
|
|
- OpenMP is an extension of programming languages that enable the use of
|
|
multi-threading to parallelize the code using directives.
|
|
- The OpenMP library may be implemented using =pthreads=
|
|
- Extensions in OpenMP 5.0 to offload code execution to GPUs
|
|
- The same source code can be executed with/without OpenMP
|
|
|
|
#+begin_src fortran
|
|
!$OMP PARALLEL DEFAULT(SHARED) PRIVATE(i)
|
|
!$OMP DO
|
|
do i=1,n
|
|
A(i) = B(i) + C(i)
|
|
end do
|
|
!$OMP END DO
|
|
!$OMP END PARALLEL
|
|
#+end_src
|
|
|
|
** OpenMP directives (Fortran)
|
|
|
|
- ~!$OMP PARALLEL~ starts a new multi-threaded section.
|
|
Everything inside this block is executed by /all/ the threads
|
|
- ~!$OMP DO~ tells the compiler to split the loop among the different
|
|
threads (by changing the loop boundaries for instance)
|
|
- ~!$OMP END DO~ marks the end of the parallel loop.
|
|
It contains an implicit synchronization.
|
|
After this line, all the threads have finished executing the loop.
|
|
- ~!$OMP END PARALLEL~ marks the end of the parallel section. Contains also an implicit barrier.
|
|
- ~DEFAULT(SHARED)~ : all the variables (A,B,C) are in shared memory by default
|
|
- ~PRIVATE(i)~ : the variable i is private to every thread
|
|
|
|
** OpenMP directives (Fortran)
|
|
|
|
- ~!$OMP CRITICAL ... !$OMP END CRITICAL~ : all the statements in this block are protected by a lock
|
|
- ~!$OMP TASK ... !$OMP END TASK~ : define a new task to execute
|
|
- ~!$OMP BARRIER~ : synchronization barrier
|
|
- ~!$OMP SINGLE ... !$OMP END SINGLE~ : all the statements in this block are executed by a single thread
|
|
- ~!$OMP MASTER ... !$OMP END MASTER~ : all the statements in this block are executed by the master thread
|
|
|
|
** OpenMP
|
|
|
|
*** Functions
|
|
|
|
- ~omp_get_thread_num()~ : returns the ID of the current running
|
|
thread (like ~MPI_Rank~)
|
|
- ~omp_get_num_threads()~ : returns the total number of running
|
|
threads (like ~MPI_Size~)
|
|
- ~OMP_NUM_THREADS~ : Environment variable (shell) that fixes the
|
|
number of threads to run
|
|
|
|
*** Important
|
|
|
|
- Multiple threads *can read* at the same memory address
|
|
- Multiple threads **must not write** at the same address
|
|
|
|
** Matrix multiplication
|
|
|
|
\[ C_{ij} = \sum_{k=1}^N A_{ik} B_{kj} \]
|
|
|
|
#+begin_src fortran
|
|
do j=1,N
|
|
do i=1,N
|
|
C(i,j) = 0.d0
|
|
do k=1,N
|
|
C(i,j) = C(i,j) + A(i,k) * B(k,j)
|
|
end do
|
|
end do
|
|
end do
|
|
#+end_src
|
|
|
|
** Matrix multiplication with OpenMP
|
|
|
|
\[ C_{ij} = \sum_{k=1}^N A_{ik} B_{kj} \]
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+begin_src fortran
|
|
!$OMP PARALLEL DO DEFAULT(SHARED) PRIVATE (i,j,k)
|
|
do j=1,N
|
|
do i=1,N
|
|
C(i,j) = 0.d0
|
|
do k=1,N
|
|
C(i,j) = C(i,j) + A(i,k) * B(k,j)
|
|
end do
|
|
end do
|
|
end do
|
|
!$OMP END PARALLEL DO
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
- Loop is parallelized over ~j~
|
|
- Writing in ~C(i,j)~ is OK
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
** Matrix multiplication with OpenMP
|
|
|
|
\[ C_{ij} = \sum_{k=1}^N A_{ik} B_{kj} \]
|
|
|
|
#+LATEX: \begin{columns}
|
|
#+LATEX: \begin{column}{0.5\textwidth}
|
|
#+begin_src fortran
|
|
!$OMP PARALLEL DEFAULT(SHARED) PRIVATE (i,j,k)
|
|
!$OMP DO COLLAPSE(2)
|
|
do j=1,N
|
|
do i=1,N
|
|
C(i,j) = 0.d0
|
|
do k=1,N
|
|
C(i,j) = C(i,j) + A(i,k) * B(k,j)
|
|
end do
|
|
end do
|
|
end do
|
|
!$OMP END DO
|
|
!$OMP END PARALLEL
|
|
#+end_src
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \begin{column}{0.4\textwidth}
|
|
- Loop is parallelized over pairs ~(i,j)~
|
|
- Writing in ~C(i,j)~ is OK
|
|
#+LATEX: \end{column}
|
|
#+LATEX: \end{columns}
|
|
|
|
|
|
* Exercises
|
|
|
|
** Exercise 1: Monte Carlo
|
|
|
|
1. Write a Fortran src_fortran{double precision function compute_pi(M)} that computes
|
|
$\pi$ with the Monte Carlo algorithm using $M$ samples
|
|
2. Call it using this main program:
|
|
#+begin_src fortran :tangle Exercises/Ex1/main.f90
|
|
program pi_mc
|
|
implicit none
|
|
integer :: M
|
|
logical :: iterate
|
|
double precision :: sample
|
|
double precision, external :: compute_pi
|
|
call random_seed() ! Initialize random number generator
|
|
open(unit=11, file='fortran_out.fifo', status='old', action='write', &
|
|
access='stream',form='formatted')
|
|
iterate = .True.
|
|
do while (iterate) ! Compute pi over N samples until 'iterate=.False.'
|
|
sample = compute_pi()
|
|
write(11,*) sample
|
|
end do
|
|
end program pi_mc
|
|
#+end_src
|
|
|
|
** Exercise 1: Monte Carlo
|
|
|
|
3. Write a Python server =pi_server.py= that receives samples of \pi in a socket
|
|
and compute the running average of \pi. Its address and port
|
|
number are written in a file =server.cfg=.
|
|
4. Write a Python script =pi_bridge.py= that reads samples of \pi in a named pipe =fortran_out.fifo= and sends the samples to the server.
|
|
This script can read the the address and port number of the
|
|
server from the file =server.cfg=.
|
|
5. When the convergence criterion is reached, the server informs
|
|
the bridges that they can stop.
|
|
|
|
*** Running a simulation on multiple nodes
|
|
- Run a single server
|
|
- Run one bridge per compute node using the =mpiexec= command
|
|
- Run one Fortran process per core using the =mpiexec= command
|
|
|
|
** Exercise 2: Divide and conquer for matrix multiplication
|
|
|
|
* Solutions :noexport:
|
|
|
|
** Exercise 1: Monte Carlo
|
|
|
|
*** Compute_pi.f90
|
|
#+begin_src fortran :tangle Exercises/Ex1/compute_pi.f90
|
|
double precision function compute_pi()
|
|
implicit none
|
|
integer, parameter :: M=100000000
|
|
double precision :: x, y, n_in
|
|
integer :: i
|
|
|
|
n_in = 0.d0
|
|
do i=1, M
|
|
call random_number(x)
|
|
call random_number(y)
|
|
if (x*x + y*y <= 1.d0) then
|
|
n_in = n_in+1.d0
|
|
end if
|
|
end do
|
|
compute_pi = 4.d0*n_in/dble(M)
|
|
|
|
end function compute_pi
|
|
#+end_src
|
|
*** Compilation
|
|
#+begin_src bash
|
|
gfortran compute_pi.f90 main.f90 -o pi.x
|
|
#+end_src
|
|
*** Python bridge
|
|
|
|
#+begin_src python :tangle Exercises/Ex1/pi_bridge.py
|
|
#!/usr/bin/env python
|
|
import os, sys, socket
|
|
|
|
server_config = "server.cfg" # Configuration file
|
|
pipe_in = "fortran_out.fifo" # Named pipe for input
|
|
|
|
def main():
|
|
|
|
# Read hostname and port number from config file
|
|
with open(server_config, 'r') as f:
|
|
HOSTNAME = f.readline().strip()
|
|
PORT = int( f.readline() )
|
|
|
|
# Open named pipe for reading the output of the Fortran executables
|
|
p_in = open(pipe_in, 'r')
|
|
|
|
print("Bridge connects to", HOSTNAME, PORT)
|
|
while True:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
|
try: # Connect the socket to the address and port of the server
|
|
s.connect( (HOSTNAME, PORT) )
|
|
except socket.error:
|
|
print("Server inactive")
|
|
break
|
|
X = p_in.readline()
|
|
message = str(X)
|
|
s.send(message.encode()) # Send the data
|
|
reply = s.recv(128).decode('utf-8') # Read the reply of the server
|
|
|
|
if reply == "STOP": break
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
#+end_src
|
|
|
|
*** Python server
|
|
|
|
#+begin_src python :tangle Exercises/Ex1/pi_server.py
|
|
#!/usr/bin/env python
|
|
import sys, os, socket
|
|
from math import sqrt
|
|
|
|
error_threshold = 1.e-5 # Stopping criterion
|
|
PORT = 1666 # Port number
|
|
server_config = "server.cfg" # Config file for the server
|
|
|
|
def main():
|
|
HOSTNAME = "localhost" #socket.gethostname()
|
|
with open(server_config, 'w') as f: # Write hostname to config file
|
|
f.write(HOSTNAME+"\n")
|
|
f.write(str(PORT)+"\n")
|
|
print("Server is", HOSTNAME, PORT)
|
|
|
|
data = []
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
|
s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port
|
|
|
|
while True:
|
|
s.listen(5) # Wait for incoming connections
|
|
conn, addr = s.accept() # Accept connection
|
|
X = ""
|
|
while True: # Buffered read of the socket
|
|
message = conn.recv(128)
|
|
X += message.decode('utf-8')
|
|
if len(message) < 128: break
|
|
data.append( float(X) )
|
|
N = len(data)
|
|
average = sum(data)/N # Compute average
|
|
if N > 2: # Compute variance
|
|
l = [ (x-average)*(x-average) for x in data ]
|
|
variance = sum(l)/(N-1.)
|
|
else:
|
|
variance = 0.
|
|
error = sqrt(variance)/sqrt(N) # Compute error
|
|
|
|
print('%f +/- %f'%(average,error))
|
|
|
|
# Stopping condition
|
|
if N > 2 and error < error_threshold:
|
|
conn.send("STOP".encode())
|
|
break
|
|
else:
|
|
conn.send("OK".encode())
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
#+end_src
|
|
*** Script
|
|
#+begin_src bash :tangle Exercises/Ex1/run.sh
|
|
#!/bin/bash
|
|
|
|
mkfifo fortran_out.fifo
|
|
python pi_server.py &
|
|
mpiexec --pernode python pi_bridge.py &
|
|
mpiexec pi.x
|
|
rm fortran_out.fifo
|
|
#+end_src
|
|
** Divide and conquer for matrix multiplication
|
|
|
|
* Figures :noexport:
|
|
|
|
#+BEGIN_SRC dot :output file :file merge.png
|
|
digraph G {
|
|
graph [layout=dot rankdir=TB]
|
|
node [shape=record]
|
|
|
|
subgraph clusterA {
|
|
label="A"
|
|
node37641025 [label="3|7|6|4|1|0|2|5"];
|
|
{rank=same ; node3764 [label="3|7|6|4"] ; node1025 [label="1|0|2|5"]}
|
|
{rank=same ; node37 [label="3|7"] ; node64 [label="6|4"]}
|
|
{rank=same ; node10 [label="1|0"] ; node25 [label="2|5"]}
|
|
{rank=same ; node3 [label="3"] ; node7 [label="7"]}
|
|
{rank=same ; node6 [label="6"] ; node4 [label="4"]}
|
|
{rank=same ; node1 [label="1"] ; node0 [label="0"]}
|
|
{rank=same ; node2 [label="2"] ; node5 [label="5"]}
|
|
{rank=same ; node37_ [label="3|7"] ; node46_ [label="4|6"]}
|
|
{rank=same ; node01_ [label="0|1"] ; node25_ [label="2|5"]}
|
|
{rank=same ; node3467_ [label="3|4|6|7"] ; node0125_ [label="0|1|2|5"]}
|
|
node01234567_ [label="0|1|2|3|4|5|6|7"];
|
|
}
|
|
node37641025 -> node3764;
|
|
node3764 -> node37;
|
|
node37 -> node3;
|
|
node37 -> node7;
|
|
node3764 -> node64;
|
|
node64 -> node6;
|
|
node64 -> node4;
|
|
node37641025 -> node1025;
|
|
node1025 -> node10;
|
|
node10 -> node1;
|
|
node10 -> node0;
|
|
node1025 -> node25;
|
|
node25 -> node2;
|
|
node25 -> node5;
|
|
|
|
node3 -> node37_;
|
|
node7 -> node37_;
|
|
node6 -> node46_;
|
|
node4 -> node46_;
|
|
node1 -> node01_;
|
|
node0 -> node01_;
|
|
node2 -> node25_;
|
|
node5 -> node25_;
|
|
node37_ -> node3467_;
|
|
node46_ -> node3467_;
|
|
node01_ -> node0125_;
|
|
node25_ -> node0125_;
|
|
node0125_ -> node01234567_;
|
|
node3467_ -> node01234567_;
|
|
}
|
|
#+END_SRC
|
|
|
|
#+RESULTS:
|
|
|
|
#+BEGIN_SRC dot :output file :file merge_parallel.png
|
|
digraph G {
|
|
graph [layout=dot rankdir=TB]
|
|
node [shape=record]
|
|
|
|
node37641025 [label="3|7|6|4|1|0|2|5"];
|
|
node3764 [label="3|7|6|4"] ; node1025 [label="1|0|2|5"]
|
|
node37 [label="3|7"] ; node64 [label="6|4"]
|
|
node10 [label="1|0"] ; node25 [label="2|5"]
|
|
node3 [label="3"] ; node7 [label="7"]
|
|
node6 [label="6"] ; node4 [label="4"]
|
|
node1 [label="1"] ; node0 [label="0"]
|
|
node2 [label="2"] ; node5 [label="5"]
|
|
node37_ [label="3|7"] ; node46_ [label="4|6"]
|
|
node01_ [label="0|1"] ; node25_ [label="2|5"]
|
|
node3467_ [label="3|4|6|7"] ; node0125_ [label="0|1|2|5"]
|
|
node01234567_ [label="0|1|2|3|4|5|6|7"];
|
|
|
|
subgraph clusterA {
|
|
label="A"
|
|
node37641025;
|
|
node3764;
|
|
node37;
|
|
node3;
|
|
node7;
|
|
node37_;
|
|
node3467_;
|
|
node01234567_;
|
|
}
|
|
|
|
subgraph clusterB {
|
|
label="B";
|
|
node1025;
|
|
node10;
|
|
node1;
|
|
node0;
|
|
node01_;
|
|
node0125_;
|
|
}
|
|
|
|
subgraph clusterC {
|
|
label="C";
|
|
node64;
|
|
node6;
|
|
node4;
|
|
node46_;
|
|
}
|
|
|
|
subgraph clusterD {
|
|
label="D";
|
|
node25;
|
|
node2;
|
|
node5;
|
|
node25_;
|
|
}
|
|
|
|
node37641025 -> node3764;
|
|
node3764 -> node37;
|
|
node37 -> node3;
|
|
node37 -> node7;
|
|
node3764 -> node64;
|
|
node64 -> node6;
|
|
node64 -> node4;
|
|
node37641025 -> node1025;
|
|
node1025 -> node10;
|
|
node10 -> node1;
|
|
node10 -> node0;
|
|
node1025 -> node25;
|
|
node25 -> node2;
|
|
node25 -> node5;
|
|
|
|
node3 -> node37_;
|
|
node7 -> node37_;
|
|
node6 -> node46_;
|
|
node4 -> node46_;
|
|
node1 -> node01_;
|
|
node0 -> node01_;
|
|
node2 -> node25_;
|
|
node5 -> node25_;
|
|
node37_ -> node3467_;
|
|
node46_ -> node3467_;
|
|
node01_ -> node0125_;
|
|
node25_ -> node0125_;
|
|
node0125_ -> node01234567_;
|
|
node3467_ -> node01234567_;
|
|
}
|
|
#+END_SRC
|
|
|
|
#+RESULTS:
|
|
|
|
#+BEGIN_SRC dot :output file :file deadlock.png
|
|
digraph G {
|
|
graph [layout=dot]
|
|
A -> B ;
|
|
B -> C ;
|
|
C -> D ;
|
|
D -> A ;
|
|
}
|
|
#+END_SRC
|
|
|
|
#+RESULTS:
|
|
[[file:deadlock.png]]
|
|
|
|
* Exam questions :noexport:
|
|
|
|
|
|
** Question 1
|
|
Consider a matrix multiplication. Which one can /not/ be executed safely
|
|
in parallel?
|
|
|
|
a)
|
|
#+begin_src fortran
|
|
!$OMP PARALLEL DO PRIVATE(i,j,k) SHARED(A,B,C)
|
|
do j=1,n
|
|
do i=1,n
|
|
do k=1,n
|
|
C(i,j) = C(i,j) + A(i,k) + B(k,j)
|
|
end do
|
|
end do
|
|
end do
|
|
!$OMP END PARALLEL DO
|
|
#+end_src
|
|
|
|
b)
|
|
#+begin_src fortran
|
|
!$OMP PARALLEL PRIVATE(i,j,k) SHARED(A,B,C)
|
|
!$OMP DO
|
|
do j=1,n
|
|
do i=1,n
|
|
do k=1,n
|
|
C(i,j) = C(i,j) + A(i,k) + B(k,j)
|
|
end do
|
|
end do
|
|
end do
|
|
!$OMP END DO
|
|
!$OMP END PARALLEL
|
|
#+end_src
|
|
|
|
c)
|
|
#+begin_src fortran
|
|
!$OMP PARALLEL PRIVATE(i,j,k) SHARED(A,B,C)
|
|
do j=1,n
|
|
!$OMP DO
|
|
do i=1,n
|
|
do k=1,n
|
|
C(i,j) = C(i,j) + A(i,k) + B(k,j)
|
|
end do
|
|
end do
|
|
!$OMP END DO
|
|
end do
|
|
!$OMP END PARALLEL
|
|
#+end_src
|
|
|
|
d)
|
|
#+begin_src fortran
|
|
!$OMP PARALLEL PRIVATE(i,j,k) SHARED(A,B,C)
|
|
do j=1,n
|
|
do i=1,n
|
|
!$OMP DO
|
|
do k=1,n
|
|
C(i,j) = C(i,j) + A(i,k) + B(k,j)
|
|
end do
|
|
!$OMP END DO
|
|
end do
|
|
end do
|
|
!$OMP END PARALLEL
|
|
#+end_src
|
|
|
|
** Question 2
|
|
** Question 3
|
|
** Question 4
|
|
* Export :noexport:
|
|
#+BEGIN_SRC elisp :output none
|
|
(setq org-latex-listings 'minted)
|
|
(setq org-latex-custom-lang-environments nil)
|
|
(setq org-latex-minted-options
|
|
'(
|
|
("fontsize" "\\scriptsize")
|
|
("linenos" "")))
|
|
(setq org-latex-to-pdf-process
|
|
'("pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
|
|
"pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
|
|
"pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"))
|
|
(org-beamer-export-to-pdf)
|
|
#+END_SRC
|
|
|
|
#+RESULTS:
|
|
: /home/scemama/MEGA/TEX/Cours/TCCM/TCCM2022/Parallelism/parallelism_scemama.pdf
|
|
|