TCCM2021/parallelism_scemama.org
2021-11-21 23:49:55 +01:00

2223 lines
64 KiB
Org Mode

#+TITLE: Fundamentals of parallelization
#+DATE: 23-24/11/2021
#+AUTHOR: Anthony Scemama
#+LaTeX_HEADER: \institute{Lab. Chimie et Physique Quantiques, IRSAMC, UPS/CNRS, Toulouse (France)}
#+LATEX_CLASS: beamer
#+LaTeX_CLASS_OPTIONS:[aspectratio=169]
#+BEAMER_THEME: trex
#+LaTeX_HEADER: \usepackage{minted}
#+LaTeX_HEADER: \usemintedstyle{emacs}
#+LaTeX_HEADER: \usepackage[utf8]{inputenc}
#+LaTeX_HEADER: \usepackage[T1]{fontenc}
#+LaTeX_HEADER: \usepackage{hyperref}
#+LaTeX_HEADER: \usepackage{mathtools}
#+LaTeX_HEADER: \usepackage{physics}
#+LaTeX_HEADER: \definecolor{darkgreen}{rgb}{0.,0.6,0.}
#+LaTeX_HEADER: \definecolor{darkblue}{rgb}{0.,0.2,0.7}
#+LaTeX_HEADER: \definecolor{darkred}{rgb}{0.6,0.1,0.1}
#+LaTeX_HEADER: \definecolor{darkpink}{rgb}{0.7,0.0,0.7}
#+PROPERTY: header-args :exports code
#+EXPORT_EXCLUDE_TAGS: noexport
#+startup: beamer
#+options: H:2
* Program NOEXPORT :noexport:
14:00-14:30 : Supercomputers
14:30-15:30 : Parallelism
15:45-17:30 : MPI / OpenMP
09:00-10:30 : Presentation IRP + QP
Pour IRPF90, je peux faire une presentation assez generale.
10:30-12:00 : TP IRPF90
J'ai deja un tutoriel pour ecrire un programme de dynamique moleculaire avec un
potentiel Lennard-Jones, je pense que ce sera plus facile puisqu'il n'y a pas
beaucoup de temps pour faire les TPs.
13:45-17:00 : TP Parallel
- calculer pi avec un algorithme Monte Carlo avec MPI (Master-Worker) d'abord en Python puis en Fortran/C.
- paralleliser un produit de matrices avec OpenMP en Fortran/C
- calculer une integrale dans R^3 sur une grille de points avec MPI en Fortran/C.
* Supercomputers :noexport:
** Computers
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.7\textwidth}
#+LATEX: \begin{exampleblock}{Today (order of magnitude)}
- 1 *socket* (x86 CPU @ 2.2-3.3 GHz, *4 cores*, hyperthreading)
- \sim 4-16 GB RAM
- \sim 500GB SSD
- Graphics card : ATI Radeon, Nvidia GeForce
- Gigabit ethernet
- USB, Webcam, Sound card, etc
- \sim 500 euros
#+LATEX: \end{exampleblock}
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.3\textwidth}
#+ATTR_LATEX: :width \textwidth
[[./desktop-inspiron-MT-3650-pdp-module-1.jpg]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Computer designed for computation
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+LATEX: \begin{exampleblock}{Today (order of magnitude)}
- 2 sockets (x86 CPU @ 2.2 GHz, 32 cores/socket, hyperthreading)
- 64-128 GB RAM
- Multiple SSD HDDs (RAID0)
- Gigabit ethernet
- Possibly an Accelerator (Nvidia Volta/Ampere)
- \sim 5k euros
#+LATEX: \end{exampleblock}
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
#+ATTR_LATEX: :width \textwidth
[[./z840_gallery_img4_tcm245_2164103_tcm245_1871309_tcm245-2164103.jpg]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Cluster
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+LATEX: \begin{exampleblock}{}
- Many computers designed for computation
- Compact (1-2U in rack) / machine
- Network switches
- Login server
- Batch queuing system (SLURM / PBS / SGE / LFS)
- Cheap Cooling system
- Requires a lot of electrical power (~10kW/rack)
- Possibly a Low-latency / High-bandwidth network (Infiniband or 10Gb ethernet)
- >50k euros
#+LATEX: \end{exampleblock}
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
#+ATTR_LATEX: :width \textwidth
[[./img_20160510_152246_resize.jpg]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Supercomputer
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+LATEX: \begin{exampleblock}{}
- Many computers designed for computation
- Very compact (<1U in rack) / machine
- Low-latency / High-bandwidth network (Infiniband or 10Gb ethernet)
- Network switches
- Parallel filesystem for scratch space (Lustre / BeeGFS / GPFS)
- Multiple login servers
- Batch queuing system (SLURM / PBS / SGE / LFS)
- Highly efficient cooling system (water)
- Requires a lot of electrical power (>100kW)
#+LATEX: \end{exampleblock}
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
#+ATTR_LATEX: :width \textwidth
[[./Eos.png]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Definitions
- Top500 :: Rank of the 500 fastest supercomputers
- Flop :: Floating point operation
- Flops :: Flop/s, Number of Floating point operations per second
- RPeak :: Peak performance, max possible number of Flops
- RMax :: Real performance on the Linpack benchmark (dense eigenproblem)
- SP :: Single precision (32-bit floats)
- DP :: Double precision (64-bit floats)
- FPU :: Floating Point Unit
- FMA :: Fused multiply-add ($a\times x+b$ in 1 instruction)
** Quantifying performance
#+LATEX: \begin{exampleblock}{Example} *RPeak* of Intel Xeon Gold 6140 Processor :
- 18 cores
- 2.3 GHz
- 2 FPUs
- 8 FMA (DP)/FPU/cycle
$18 \times 2.3\, 10^9 \times 2 \times 8 \times 2 = 1.3$ TFlops (DP)
#+LATEX: \end{exampleblock}
- Number of hours :: 730/month, 8760/year
- Units :: Kilo (K), Mega (M), Giga (G), Tera (T), Peta (P), Exa (E), ...
** Top500 (1996)
#+ATTR_LATEX: :height 0.9\textheight
[[./top500_95.png]]
** Top500 (2021)
#+ATTR_LATEX: :height 0.9\textheight
[[./top500_21.png]]
https://www.top500.org/lists/top500/2021/11/
** Curie thin nodes (TGCC, France)
Ranked 9th in 2012, 77 184 cores, 1.7 PFlops, 2.1 MW
#+ATTR_LATEX: :height 0.8\textheight
[[./tgcc.jpg]]
** Mare Nostrum (BSC, Spain)
Ranked 13th in 2017, 153 216 cores, 6.5 PFlops, 1.6 MW
#+ATTR_LATEX: :height 0.8\textheight
[[./marenostrum.jpg]]
** Architecture
#+ATTR_LATEX: :height 0.9\textheight
[[./hierarchy.pdf]]
** Chassis (Front)
#+ATTR_LATEX: :height 0.9\textheight
[[./chassis.jpg]]
** Chassis (Back)
#+ATTR_LATEX: :height 0.9\textheight
[[./chassis_back.jpg]]
** Compute Node
#+ATTR_LATEX: :height 0.9\textheight
[[./blade.jpg]]
** Socket
#+ATTR_LATEX: :height 0.9\textheight
[[./socket.jpg]]
** Core
#+ATTR_LATEX: :height 0.9\textheight
[[./Nehalem.jpg]]
** Why such an architecture?
*** Moore's "Law"
- /The number of transistors doubles every two years/.
- Often interpreted as /the computational power doubles every two years/.
- Exponential law $\Longrightarrow$ will fail.
- 7~nm semiconductors in 2021: the atomic limit is approaching.
** Why such an architecture?
*** Main problem: energy
Dissipation : $D = F \times V^2 \times C$
- $F$ : Frequency (\sim 3 GHz)
- $V$ : tension
- $C$ : constant related to the size of semiconductors (nm)
For the processor to be stable, the tension needs to be linear with the
frequency: so $D = \mathcal{O}(F^3)$ $\Longrightarrow$ The
frequency has to be kept around 3 GHz.
1. Double the number of Flops = double the number of processors.
2. Use accelerators (GPUs, FPGA, TPUs, ...)
\pause
*** Consequence
This requires to re-think programming $\Longrightarrow$
Parallel programming is *unavoidable*.
** Today's main problems
1. Energy, physical limits
2. Interconnect technology
3. Increase of computational power is faster than memory capacity :
reduction of memory per CPU core
4. Latencies can't be reduced much more : moving data becomes very expensive
5. File systems are *extremely* slow
6. Supercomputers are well tuned for benchmarks (dense linear algebra), but not
that much for general scientific applications
7. Fault-tolerance
** Practical solution
*** GPUs
Use less intelligent CPUs, but in a much larger number
$\Longrightarrow$ Better flops/watt rate
*** But...
- Transfer from main memory to GPU as expensive as a network communication
- More computational power and less RAM than a CPU node
- Not adapted to all algorithms
- Requires re-thinking and rewriting of codes: loss of years of debugging
- Software stack is not as mature as the CPU stack $\Longrightarrow$
needs hacking to get performance
* Fundamentals of parallelization :noexport:
** Introduction
*** Definitions
- Concurrency :: Running multiple computations at the same time.
- Parallelism :: Running multiple computations *on different execution units*.
*** Multiple levels
| *Distributed* | Multiple machines |
| *Shared memory* | Single machine, multiple CPU cores |
| *Hybrid* | With accelerators (GPUs, ...) |
| *Instruction-level* | Superscalar processors |
| *Bit-level* | Vectorization |
All levels of parallelism can be exploited in the same code,
but every problem is not parallelizable at all levels.
** Amdahl's law
*** Definition
If P is the proportion of a program that can be made parallel, the maximum speedup
that can be achieved is:
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.1\textwidth}
\[
S_{\max} = \frac{1}{(1-P)+P/n}
\]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.8\textwidth}
- $S$ :: : Speedup
- $P$ :: : Proportion of a program that can be made parallel
- $n$ :: : Number of cores
#+LATEX: \end{column}
#+LATEX: \end{columns}
*** Example
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.4\textwidth}
#+ATTR_LATEX: :width 0.8 \textwidth
[[./Amdahl2.pdf]]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.6\textwidth}
- Max speedup : 100$\times$
- Perfect scaling needs *all* the program to be parallelized (difficult)
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Outline
1. Human parallel machine
2. Compute a PES with GNU parallel (Bash)
3. Compute $\pi$ with a Monte Carlo algorithm (Python, sockets)
4. Compute $\pi$ with a deterministic algorithm (Python, MPI)
5. Matrix multiplication (Fortran, OpenMP)
** Experiment
*** The Human Parallel Machine
- Sort a deck of cards
- Do it as fast as you can
- I did it alone in 5'25'' (325 seconds)
- Each one of you is a /human compute node/
- How fast can all of you sort the same deck?
** Disappointing result
You were many more people than me, but you didn't go many times faster !
$\Longrightarrow$ Try the Merge sort
** Merge sort
#+ATTR_LATEX: :height 0.9\textheight
[[file:merge.png]]
** Merge sort
#+ATTR_LATEX: :height 0.9\textheight
[[file:merge_parallel.png]]
** Better, but still not perfect
- Moving the cards around the room takes time (communication)
- Sorting the sub-piles is super-fast (computation)
Algorithm is *bounded by communication* : Difficult to scale
#+LATEX: \begin{exampleblock}{If the assignment was}
- Consider the function $f(a,b,x) = a\,x^5 - \frac{b\,x^3}{a}$
- Each card has a value for $a$, $b$ and $x$
- Evaluate $f(a,b,x)$ for each card and write the result on the card
- Sort the results
#+LATEX: \end{exampleblock}
Same communication pattern, but more computational effort $\Longrightarrow$ better scaling.
#+LATEX: \begin{alertblock}{Take-home messages}
- Parallel workloads may require different algorithms than scalar workloads
- Difficulty is /data movement/ (communication), not /computation/
#+LATEX: \end{alertblock}
* Embarrassingly parallel computations :noexport:
** Example: Potential energy surface
We want to create the CCSD(T) potential energy surface of the water molecule.
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.3\textwidth}
[[./h2o.png]]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.6\textwidth}
[[./pes.png]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Example: Potential energy surface
*** Constraints
- We want to compute $25 \times 25 \times 25 = 15~625$ points
- We are allowed to use 100 CPU cores simultaneously
- We like to use GAU$$IAN to calculate the CCSD(T) energy
*** But:
- The grid points are completely independent
- Any CPU core can calculate any point
** Example: Potential energy surface
*** Optimal solution: /work stealing/
- One grid point is $E(r_1,r_2,\theta)$
- Dress the list of all the arguments $(r_1,r_2,\theta)$: ~[ (0.8,0.8,70.), ..., (1.1,1.1,140.) ]~ (the *queue*)
- Each CPU core, when idle, pops out the head of the queue
and computes $E(r_1,r_2,\theta)$
- All the results are stored in a single file
- The results are sorted for plotting
** GNU parallel
GNU Parallel executes Linux commands in parallel and can guarantee that
the output is the same as if the commands were executed sequentially.
#+begin_src bash
$ parallel echo ::: A B C
A
B
C
#+end_src
is equivalent to:
#+begin_src bash
$ echo A ; echo B ; echo C
#+end_src
Multiple input sources can be given:
#+begin_src bash
$ parallel echo ::: A B ::: C D
A C
A D
B C
B D
#+end_src
** GNU parallel
If no command is given after parallel the arguments are treated as commands:
#+begin_src bash
$ parallel ::: pwd hostname "echo $TMPDIR"
/home/scemama
lpqdh82.ups-tlse.fr
/tmp
#+end_src
Jobs can be run on remote servers:
#+begin_src bash
$ parallel ::: "echo hello" hostname
lpqdh82.ups-tlse.fr
hello
$ parallel -S lpqlx139.ups-tlse.fr ::: "echo hello" hostname
hello
lpqlx139.ups-tlse.fr
#+end_src
** GNU parallel
File can be transfered to the remote hosts:
#+begin_src bash
$ echo Hello > input
$ parallel -S lpqlx139.ups-tlse.fr cat ::: input
cat: input: No such file or directory
$ echo Hello > input
$ parallel -S lpqlx139.ups-tlse.fr --transfer --cleanup cat ::: input
Hello
#+end_src
** GNU Parallel: example
Convert thousands of images from =.gif= to =.jpg=
#+begin_src bash
$ ls
img1000.gif img241.gif img394.gif img546.gif img699.gif img850.gif
img1001.gif img242.gif img395.gif img547.gif img69.gif img851.gif
[...]
img23.gif img392.gif img544.gif img697.gif img849.gif
img240.gif img393.gif img545.gif img698.gif img84.gif
#+end_src
To convert one =.gif= into =.jpg= format:
#+begin_src bash
$ time convert img1.gif img1.jpg
real 0m0.008s
user 0m0.000s
sys 0m0.000s
#+end_src
** GNU Parallel: example
*** Sequential execution
#+begin_src bash
$ time for i in {1..1011}
> do
> convert img${i}.gif img${i}.jpg
> done
real 0m7.936s
user 0m0.210s
sys 0m0.270s
#+end_src
*** Parallel execution on a quad-core:
#+begin_src bash
$ time parallel convert {.}.gif {.}.jpg ::: *.gif
real 0m2.051s
user 0m1.000s
sys 0m0.540s
#+end_src
** Computing the CCSD(T) surface *1. Fetch the energy in an output file*
Running a CCSD(T) calculation with GAU$$IAN gives the energy
somewhere in the output::
#+begin_example
CCSD(T)= -0.76329294074D+02
#+end_example
To get only the energy in the output, we can use the following command:
#+begin_src bash
$ g09 < input | grep "^ CCSD(T)=" | cut -d "=" -f 2
-0.76329294074D+02
#+end_src
** Computing the CCSD(T) surface *2. Script =run_h2o.sh= that takes $r_1$, $r_2$ and $\theta$ as arguments*
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+begin_src bash
#!/bin/bash
r1=$1 ; r2=$2 ; angle=$3
# Create Gaussian input file, pipe it to Gaussian,
# and get the CCSD(T) energy
cat << EOF | g09 | grep "^ CCSD(T)=" | cut -d "=" -f 2
# CCSD(T)/cc-pVTZ
Water molecule r1=${r1} r2=${r2} angle=${angle}
0 1
h
o 1 ${r1}
h 2 ${r2} 1 ${angle}
EOF
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
Example:
#+begin_src text
$ ./run_h2o.sh 1.08 1.08 104.
-0.76310788178D+02
$ ./run_h2o.sh 0.98 1.0 100.
-0.76330291742D+02
#+end_src
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Computing the CCSD(T) surface *3. Files containing the parameters*
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.5\textwidth}
#+begin_src text
$ cat r1_file
0.75
0.80
0.85
0.90
0.95
1.00
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.5\textwidth}
#+begin_src text
$ cat angle_file
100.
101.
102.
103.
104.
105.
106.
#+end_src
#+LATEX: \end{column}
#+LATEX: \end{columns}
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.5\textwidth}
#+begin_src text
$ cat nodefile
2//usr/bin/ssh compute-0-10.local
2//usr/bin/ssh compute-0-6.local
16//usr/bin/ssh compute-0-12.local
16//usr/bin/ssh compute-0-5.local
16//usr/bin/ssh compute-0-7.local
6//usr/bin/ssh compute-0-1.local
2//usr/bin/ssh compute-0-13.local
4//usr/bin/ssh compute-0-8.local
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.5\textwidth} =nodefile= contains the names of the machines and the number of CPUs.
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Computing the CCSD(T) surface *4. Run with GNU parallel*
On a single CPU:
#+begin_src text
$ time parallel -a r1_file -a r1_file -a angle_file \
--keep-order --tag -j 1 $PWD/run_h2o.sh
0.75 0.75 100. -0.76185942070D+02
0.75 0.75 101. -0.76186697072D+02
0.75 0.75 102. -0.76187387594D+02
[...]
0.80 1.00 106. -0.76294078963D+02
0.85 0.75 100. -0.76243282762D+02
0.85 0.75 101. -0.76243869316D+02
[...]
1.00 1.00 105. -0.76329165017D+02
1.00 1.00 106. -0.76328988177D+02
real 15m5.293s
user 11m25.679s
sys 2m20.194s
#+end_src
** Computing the CCSD(T) surface *4. RUn with GNU parallel*
On 64 CPUs: $39\times$ faster!
#+begin_src text
$ time parallel -a r1_file -a r1_file -a angle_file \
--keep-order --tag --sshloginfile nodefile $PWD/run_h2o.sh
0.75 0.75 100. -0.76185942070D+02
0.75 0.75 101. -0.76186697072D+02
0.75 0.75 102. -0.76187387594D+02
[...]
0.80 1.00 106. -0.76294078963D+02
0.85 0.75 100. -0.76243282762D+02
0.85 0.75 101. -0.76243869316D+02
[...]
1.00 1.00 105. -0.76329165017D+02
1.00 1.00 106. -0.76328988177D+02
real 0m23.848s
user 0m3.359s
sys 0m3.172s
#+end_src
* Inter-process communication :noexport:
** Processes /vs/ threads
*** Process
- Has its own memory address space
- Context switching between processes is slow
- Processes interact only through system-provided communication mechanisms
- Fork: creates a copy of the current process
- Exec: switches to running another binary executable
- Spawn: =Fork=, =exec= the child and =wait= for its termination
*** Thread
- Exist as subsets of a process
- Context switching between threads is fast
- Share the same memory address space : interact via shared memory
** Inter-process communication
- Processes exchange data via read/write on /File descriptors/
- These file descriptors can point to:
- Files on disk: Simplest choice
- Named Pipes: Same program as with files
- Pipes: Same behavior as files
- Network sockets
** Named pipes
A named pipe is a *virtual* file which is read by a process and
written by other processes. It allows processes to communicate using
standard I/O operations:
#+ATTR_LATEX: :height 0.7\textheight
[[./pipe.png]]
** Example
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.5\textwidth}
#+ATTR_LATEX: :height 0.8\textheight
[[./pipe_example.png]]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.5\textwidth}
- Unzip =input.gz=
- Sort the unzipped file
- Zip the result into =output.gz=
Same as
#+begin_src bash
$ gunzip --to-stdout input.gz \
| sort \
| gzip > output.gz
#+end_src
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Example: Program 1
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.5\textwidth}
#+begin_src bash
#!/bin/bash
# Create two pipes using the mkfifo command
mkfifo /tmp/pipe /tmp/pipe2
# Unzip the input file and write the result
# in the 1st pipe
echo "Run gunzip"
gunzip --to-stdout input.gz > /tmp/pipe
# Zip what comes from the second pipe
echo "Run gzip"
gzip < /tmp/pipe2 > output.gz
# Clear the pipes in the filesystem
rm /tmp/pipe /tmp/pipe2
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.5\textwidth}
#+begin_src bash
#!/bin/bash
# Read the 1st pipe, sort the result and write
# in the 2nd pipe
echo "Run sort"
sort < /tmp/pipe > /tmp/pipe2
#+end_src
#+LATEX: \begin{exampleblock}{Execution}
#+begin_src text
$ ./p1.sh &
Run gunzip
$ ./p2.sh
Run sort
Run gzip
[1]+ Done
./p1.sh
#+end_src
#+LATEX: \end{exampleblock}
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Pipe
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth} *Fork* : Copy the current process
Pipes don't have an entry on the file system, and are opened/closed in the programs.
1. Create the pipe
2. Fork in parent/child processes
3. Exchange data between parent and child
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
[[./fork.png]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Pipe
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+begin_src python :tangle Codes/fork.py
#!/usr/bin/env python
import sys,os
def main():
print("Process ID: %d" % (os.getpid()))
r, w = os.pipe()
new_pid = os.fork()
if new_pid != 0: # This is the parent process
print("I am the parent, my PID is %d"%(os.getpid()))
print("and the PID of my child is %d"%(new_pid))
# Close write and open read file descriptors
os.close(w)
r = os.fdopen(r,'r')
# Read data from the child
print("Reading from the child")
s = r.read()
print("Read '%s' from the child"%(s))
r.close() ; os.wait()
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
[[./fork.png]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Pipe
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+begin_src python :tangle Codes/fork.py
else: # This is the child process
print(" I am the child, my PID is %d"%(os.getpid()))
# Close read and open write file descriptors
os.close(r)
w = os.fdopen(w,'w')
# Send 'Hello' to the parent
print(" Sending 'Hello' to the parent")
w.write( "Hello!" )
w.close()
print(" Sent 'Hello'")
if __name__ == "__main__":
main()
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
[[./fork.png]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Computation of $\pi$
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.4\textwidth}
#+ATTR_LATEX: width=0.9\textwidth
[[./pi.png]]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.6\textwidth}
- The surface of the circle is $\pi r^2$ $\Longrightarrow$ For a unit circle, the
surface is $\pi$
- The function in the red square is $y = \sqrt{1-x^2}$ (the circle is $\sqrt{x^2 + y^2} = 1$)
- The surface in grey corresponds to \[ \int_0^1 \sqrt{1-x^2} dx = \frac{\pi}{4} \]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Monte Carlo computation of $\pi$
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.4\textwidth}
#+ATTR_LATEX: width=0.9\textwidth
[[./pi_mc.png]]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.6\textwidth}
- Points $(x,y)$ are drawn randomly in the unit square
- Count how many times the points are inside the circle
\[
\frac{N_{\text{in}}}{N_{\text{in}}+N_{\text{out}}} = \frac{\pi}{4}
\]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Optimal algorithm
- Each core $1 \le i \le M$ computes its own average $X_i$
- All $M$ results are independent $\Longrightarrow$
Gaussian-distributed random variables (central-limit theorem)
#+ATTR_LATEX: :height 0.6\textheight
[[./pi_convergence.png]]
** Computation of \pi with pipes in Python
#+begin_src python :tangle Codes/pi_python.py
#!/usr/bin/env python
import os, sys
from random import random, seed
from math import sqrt
NMAX = 10000000 # Nb of MC steps/process
error_threshold = 1.0e-4 # Stopping criterion
NPROC=4 # Use 4 processes
def compute_pi():
"""Local Monte Carlo calculation of pi"""
seed(None) # Initialize random number generator
result = 0.
for i in range(NMAX): # Loop NMAX times
x,y = random(), random() # Draw 2 random numbers x and y
if x*x + y*y <= 1.: # Check if (x,y) is in the circle
result += 1
return 4.* float(result)/float(NMAX) # Estimate of pi
#+end_src
** Computation of \pi with pipes in Python
#+begin_src python :tangle Codes/pi_python.py
def main():
r = [None]*NPROC # Reading edges of the pipes
pid = [None]*NPROC # Running processes
for i in range(NPROC):
r[i], w = os.pipe() # Create the pipe
pid[i] = os.fork() # Fork and save the PIDs
if pid[i] != 0: # This is the parent process
os.close(w)
r[i] = os.fdopen(r[i],'r')
else: # This is the child process
os.close(r[i])
w = os.fdopen(w,'w')
while True: # Compute pi on this process
X = compute_pi()
try:
w.write("%f\n"%(X)) # Write the result in the pipe
w.flush()
except IOError: # Child process exits here
sys.exit(0)
#+end_src
** Computation of \pi with pipes in Python
#+begin_src python :tangle Codes/pi_python.py
data = []
while True:
for i in range(NPROC): # Read in the pipe of each process
data.append( float(r[i].readline()) )
N = len(data)
average = sum(data)/N # Compute average
if N > 2: # Compute variance
l = [ (x-average)*(x-average) for x in data ]
variance = sum(l)/(N-1.)
else:
variance = 0.
error = sqrt(variance)/sqrt(N) # Compute error
print("%f +/- %f %d"%(average, error, N))
if N > 2 and error < error_threshold: # Stopping condition
for i in range(NPROC): # Kill all children
try: os.kill(pid[i],9)
except: pass
sys.exit(0)
if __name__ == '__main__': main()
#+end_src
** Computation of \pi with pipes in Python
#+begin_src text
$ python pi_python.py
3.141974 +/- 0.000000 1
3.142569 +/- 0.000000 2
3.142168 +/- 0.000528 3
3.141938 +/- 0.000439 4
3.141947 +/- 0.000340 5
[...]
3.141625 +/- 0.000107 33
3.141614 +/- 0.000104 34
3.141617 +/- 0.000101 35
3.141606 +/- 0.000099 36
#+end_src
** Sockets
Sockets are analogous to pipes, but they allow both ends of the pipe to be
on different machines connected by a network interface.
An Internet socket is characterized by a unique combination of:
- A transport protocol: TCP, UDP, raw IP, ...
- A local socket address: Local IP address and port number, for example 192.168.2.2:22
- A remote socket address: Optional (TCP)
** Sockets
#+ATTR_LATEX: :height 0.9\textheight
[[./socket.png]]
** Sockets: server code in Python
#+begin_src python :tangle Codes/socket_server.py
#!/usr/bin/env python
import sys, os, socket, datetime
now = datetime.datetime.now
def main():
HOSTNAME = socket.gethostname()
PORT = 11279
print(now(), "I am the server : %s:%d"%(HOSTNAME,PORT))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port
s.listen(5) # Wait for incoming connections
conn, addr = s.accept() # Accept connection
print(now(), "Connected by", addr)
#+end_src
** Sockets: server code in Python
#+begin_src python :tangle Codes/socket_server.py
data = ""
while True: # Buffered read of the socket
message = conn.recv(8).decode('utf-8')
print(now(), "Buffer : "+message)
data += message
if len(message) < 8: break
print(now(), "Received data : ", data)
print(now(), "Sending thank you...")
conn.send("Thank you".encode())
conn.close()
if __name__ == "__main__":
main()
#+end_src
** Sockets: client code in Python
#+begin_src python :tangle Codes/socket_client.py
#!/usr/bin/env python
import sys, os, socket, datetime
now = datetime.datetime.now
def main():
HOSTNAME = sys.argv[1] # Get host name from command line
PORT = int(sys.argv[2]) # Get port from command line
print(now(), "The target server is : %s:%d"%(HOSTNAME,PORT))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
s.connect( (HOSTNAME, PORT) ) # Connect the socket to the address and port
# of the server
message = "Hello, world!!!!!!!"
print(now(), "Sending : "+message)
s.send(message.encode()) # Send the data
data = s.recv(1024) # Read the reply of the server
s.close()
print(now(), 'Received: ', data.decode('utf-8'))
if __name__ == "__main__": main()
#+end_src
** Sockets: Execution
#+begin_src text
$ python Codes/socket_client.py lpqdh82 11279
2021-11-20 21:20:32.258632 The target server is : lpqdh82:11279
2021-11-20 21:20:32.258959 Sending : Hello, world!!!!!!!
2021-11-20 21:20:32.259042 Connected by ('127.0.0.1', 36798)
2021-11-20 21:20:32.259068 Buffer : Hello, w
2021-11-20 21:20:32.259076 Buffer : orld!!!!
2021-11-20 21:20:32.259083 Buffer : !!!
2021-11-20 21:20:32.259088 Received data : Hello, world!!!!!!!
2021-11-20 21:20:32.259093 Sending thank you...
2021-11-20 21:20:32.259133 Received: Thank you
#+end_src
Note that the client and server can be written in different languages.
** Server for \pi with sockets in Python
#+NAME:py_server
#+begin_src python :tangle Codes/pi_server_python.py
#!/usr/bin/env python
import sys, os, socket
from math import sqrt
PORT = 1666
error_threshold = 1.e-4 # Stopping criterion
def main():
data = []
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
s.bind( (socket.gethostname(), PORT) ) # Bind the socket to the address and port
while True:
s.listen(5) # Wait for incoming connections
conn, addr = s.accept() # Accept connection
X = ""
while True: # Buffered read of the socket
message = conn.recv(128)
X += message.decode('utf-8')
if len(message) < 128: break
data.append( float(X) )
N = len(data)
#+end_src
** Server for \pi with sockets in Python
#+NAME:py_server2
#+begin_src python :tangle Codes/pi_server_python.py
average = sum(data)/N # Compute average
if N > 2: # Compute variance
l = [ (x-average)*(x-average) for x in data ]
variance = sum(l)/(N-1.)
else:
variance = 0.
error = sqrt(variance)/sqrt(N) # Compute error
print('%f +/- %f'%(average,error))
# Stopping condition
if N > 2 and error < error_threshold:
conn.send("STOP".encode())
break
else:
conn.send("OK".encode())
conn.close()
if __name__ == "__main__":
main()
#+end_src
** Client for \pi with sockets in Python
#+begin_src python :tangle Codes/pi_client_python.py
#!/usr/bin/env python
import os, sys, socket
from random import random, seed
from math import sqrt
HOSTNAME = "localhost"
PORT = 1666
NMAX = 10000000 # Nb of MC steps/process
error_threshold = 1.0e-4 # Stopping criterion
NPROC=4 # Use 4 processes
def compute_pi():
"""Local Monte Carlo calculation of pi"""
seed(None) # Initialize random number generator
result = 0.
for i in range(NMAX): # Loop NMAX times
x,y = random(), random() # Draw 2 random numbers x and y
if x*x + y*y <= 1.: # Check if (x,y) is in the circle
result += 1
return 4.* float(result)/float(NMAX) # Estimate of pi
#+end_src
** Client for \pi with sockets in Python
#+begin_src python :tangle Codes/pi_client_python.py
def main():
while True:
X = compute_pi()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
try: # Connect the socket to the address and port of the server
s.connect( (HOSTAME, PORT) )
except socket.error:
break
message = str(X)
s.send(message.encode()) # Send the data
reply = s.recv(128).decode('utf-8') # Read the reply of the server
s.close()
if reply == "STOP": break
if __name__ == '__main__':
main()
#+end_src
** Execution of \pi with sockets in Python
#+begin_src text
$ python pi_server_python.py &
> for i in {1..8} ; do
> python pi_client_python.py &
> done ; wait
3.142136 +/- 0.000000
3.141783 +/- 0.000000
3.141992 +/- 0.000291
3.141804 +/- 0.000279
[...]
3.141687 +/- 0.000104
3.141666 +/- 0.000102
3.141651 +/- 0.000098
#+end_src
* Message Passing Interface (MPI) :noexport:
** Message Passing Interface
- Application Programming Interface for inter-process communication
- Takes advantage of HPC hardware:
- TCP/IP: 50 $\mu \text{s}$ latency
- Remote Direct Memory Access (RDMA): <2 $\mu \text{s}$
(low-latency network)
- Portable
- Each vendor has its own implementation adapted to the hardware
- Standard in HPC
- Initially designed for fixed number of processes:
- No problem for the discovery of peers
- Fast collective communications
- Single Program Multiple Data (SPMD) paradigm
** Communicators
- Group of processes that can communicate together
- Each process has an ID in the communicator: no need for IP
adresses and port numbers
- ~MPI_COMM_WORLD~: Global communicator, default
- ~size~: number of processes in the communicator
- ~rank~: ID of the process in the communicator
** Point-to-point communication
*** Python
- Send: ~comm.send(data, dest, tag)~
- Receive: ~comm.recv(source, tag)~
*** Fortran
- Send: ~MPI_SEND(buffer, count, datatype, destination, tag, communicator, ierror)~
- Receive:
~MPI_RECV(buffer, count, datatype, source, tag, communicator,
status, ierror)~
** Synchronous or asynchronous?
- ~MPI_Ssend~ :: Blocking synchronous send. Returns upon notification
that the message has been received (safe).
- ~MPI_Isend~ :: Non-blocking send. Returns immediately (dangerous).
- ~MPI_Send~ :: Returns when the send buffer is ready for
use. Non-deterministic: depends on MPI implementation, size of
the data, number of ranks, ... (dangerous)
#+LATEX: \begin{alertblock}{Important}
- Writing the program and Debugging:
- /Always/ use ~MPI_Ssend~
- Use ~MPI_Isend~ only when ~MPI_Ssend~ is irrelevant
- Once the code is well checked
- You can replace ~MPI_Ssend~ by ~MPI_Send~ as an optimization.
- If ~MPI_Send~ is still too slow, consider rewriting for ~MPI_Isend~
#+LATEX: \end{alertblock}
** Point-to-point communication (Python)
#+begin_src python :tangle Codes/mpi_rank.py
from mpi4py import MPI
def main():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
data = 42
print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data))
comm.ssend(data, dest=1, tag=11)
print("After : Rank: %d Size: %d Data: %d"%(rank, size, data))
elif rank == 1:
data = 0
print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data))
data = comm.recv(source=0, tag=11)
print("After : Rank: %d Size: %d Data: %d"%(rank, size, data))
if __name__ == "__main__": main()
#+end_src
** Point-to-point communication (Python)
#+begin_src text
$ mpiexec -n 4 python mpi_rank.py
Before: Rank: 0 Size: 4 Data: 42
Before: Rank: 1 Size: 4 Data: 0
After : Rank: 0 Size: 4 Data: 42
After : Rank: 1 Size: 4 Data: 42
#+end_src
In Fortran, compile using =mpif90= and execute using =mpiexec= (or =mpirun=).
** Point-to-point communication (Fortran)
#+begin_src fortran :tangle Codes/mpi_rank.f90
program test_rank
use mpi
implicit none
integer :: rank, size, data, ierr, status(mpi_status_size)
call MPI_INIT(ierr) ! Initialize library (required)
if (ierr /= MPI_SUCCESS) then
call MPI_ABORT(MPI_COMM_WORLD, 1, ierr)
end if
call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr)
if (ierr /= MPI_SUCCESS) then
call MPI_ABORT(MPI_COMM_WORLD, 2, ierr)
end if
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
if (ierr /= MPI_SUCCESS) then
call MPI_ABORT(MPI_COMM_WORLD, 3, ierr)
end if
#+end_src
** Point-to-point communication (Fortran)
#+begin_src fortran :tangle Codes/mpi_rank.f90
if (rank == 0) then
data = 42
print *, "Before: Rank:", rank, "Size:", size, "Data: ", data
call MPI_SSEND(data, 1, MPI_INTEGER, 1, 11, MPI_COMM_WORLD, ierr)
print *, "After : Rank:", rank, "Size:", size, "Data: ", data
else if (rank == 1) then
data = 0
print *, "Before: Rank:", rank, "Size:", size, "Data: ", data
call MPI_RECV(data, 1, MPI_INTEGER, 0, 11, MPI_COMM_WORLD, &
status, ierr)
print *, "After : Rank:", rank, "Size:", size, "Data: ", data
end if
call MPI_FINALIZE(ierr) ! De-initialize library (required)
end program
#+end_src
** Deadlocks
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.3\textwidth}
#+ATTR_LATEX: :height 0.8 \textheight
[[./deadlock.png]]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.7\textwidth}
#+LATEX: \begin{exampleblock}{Deadlock}
Each process waits for a message coming from another process
#+LATEX: \end{exampleblock}
Example: round-robin
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Deadlock example
#+begin_src fortran :tangle Codes/mpi_deadlock.f90
program deadlock
use mpi
implicit none
integer :: rank, size, value, source, destination, ierr
integer, parameter :: tag=100
call MPI_INIT(ierr)
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr)
source = mod(size+rank-1, size)
destination = mod(rank+1 , size)
call MPI_SSEND(rank+10, 1, MPI_INTEGER, destination, tag, MPI_COMM_WORLD, ierr)
call MPI_RECV(value, 1, MPI_INTEGER, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE, ierr)
print *, rank, 'received', value, 'from', source
call MPI_FINALIZE(ierr)
end program
#+end_src
** Send-receive
The ~MPI_Sendrecv~ function sends and receives a message /simultaneously/.
It can avoid deadlocks.
#+begin_example
MPI_SENDRECV(SENDBUF, SENDCOUNT, SENDTYPE, DEST, SENDTAG,
RECVBUF, RECVCOUNT, RECVTYPE, SOURCE, RECVTAG,
COMM, STATUS, IERROR)
<type> SENDBUF(*), RECVBUF(*)
INTEGER :: SENDCOUNT, SENDTYPE, DEST, SENDTAG
INTEGER :: RECVCOUNT, RECVTYPE, SOURCE, RECVTAG, COMM
INTEGER :: STATUS(MPI_STATUS_SIZE), IERROR
#+end_example
** Send-receive
#+begin_src fortran :tangle Codes/mpi_sendrecv.f90
program sendrecv
use mpi
implicit none
integer :: rank, size, value, source, destination, ierr
integer, parameter :: tag=100
call MPI_INIT(ierr)
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr)
source = mod(size+rank-1, size)
destination = mod(rank+1 , size)
call MPI_SENDRECV(rank+10, 1, MPI_INTEGER, destination, tag, value, &
1, MPI_INTEGER, source, tag, MPI_COMM_WORLD, &
MPI_STATUS_IGNORE, ierr)
print *, rank, 'received', value, 'from', source
call MPI_FINALIZE(ierr)
end program
#+end_src
** Collective communications
*** One-to-all
- ~MPI_Bcast~ :: Broadcast the same data to all
- ~MPI_Scatter~ :: distribute an array
*** All-to-one
- ~MPI_Reduce~ :: Sum/product/... of data coming from all ranks
- ~Gather~ :: collect a distributed array
*** All-to-all
- ~MPI_Barrier~ :: Global synchronization
- ~MPI_AllReduce~ :: Reduce and broadcast the result
- ~MPI_AllGather~ :: Gather and broadcast the result
- ~MPI_Alltoall~ :: Gather and scatter the result
** Computation of \pi with MPI
\[
\pi = 4 \int_{0}^{1} \sqrt{1-x^2} dx
\sim 4 \sum_{i=1}^M \sqrt{1-x_i^2}\, \Delta x
\]
\[
\; \text{with} \;
0 \le x_i \le 1 \;\text{and}\; \Delta x = x_{i+1} - x_i
\]
- Define a grid of $M$ points
- Split the grid on $N$ processes
- Each process computes part of the sum
- The partial sums are reduced on the master process
** Computation of \pi with MPI (Python)
#+begin_src python :tangle Codes/mpi_pi.py
#!/usr/bin/env python
from mpi4py import MPI
import sys
from math import sqrt
def main():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
M = int(sys.argv[1]) # Total Number of grid points
M_local = (M-1) // size + 1 # Number of grid points to compute locally
istart = rank * M_local # Beginning of interval
iend = min(istart + M_local, M) # End of interval
dx = 1./float(M) # Delta x
sum = 0.
for i in range(istart, iend):
x = (i+0.5)*dx
sum += sqrt(1.-x*x)
#+end_src
** Computation of \pi with MPI (Python)
#+begin_src python :tangle Codes/mpi_pi.py
result = 4. * dx * sum
pi = comm.reduce(result, MPI.SUM)
print("%10f -> %10f : %10f"%(istart*dx, iend*dx, result))
if rank == 0:
print("Result = ", pi)
if __name__ == "__main__": main()
#+end_src
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+begin_src text
$ mpiexec -n 4 python mpi_pi.py 100000000
0.000000 -> 0.250000 : 0.989483
Result = 3.1415926535902408
0.250000 -> 0.500000 : 0.923740
0.500000 -> 0.750000 : 0.775058
0.750000 -> 1.000000 : 0.453312
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.3\textwidth}
#+ATTR_LATEX: :width \textwidth
[[./pi_v1.png]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Alternate Computation of \pi with MPI (Python)
#+begin_src python :tangle Codes/mpi_pi_v2.py
#!/usr/bin/env python
from mpi4py import MPI
import sys
from math import sqrt
def main():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
M = int(sys.argv[1]) # Total Number of grid points
dx = 1./float(M) # Delta x
sum = 0.
for i in range(rank, M, size):
x = (i+0.5)*dx
sum += sqrt(1.-x*x)
#+end_src
** Computation of \pi with MPI (Python)
#+begin_src python :tangle Codes/mpi_pi_v2.py
result = 4. * dx * sum
pi = comm.reduce(result, MPI.SUM)
print(rank, result)
if rank == 0:
print("Result = ", pi)
if __name__ == "__main__": main()
#+end_src
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.6\textwidth}
#+begin_src text
$ mpiexec -n 4 python mpi_pi_v2.py 100000000
0 0.7853981783959749
Result = 3.1415926535901777
2 0.7853981583983196
3 0.7853981483981102
1 0.7853981683977732
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.3\textwidth}
#+ATTR_LATEX: :width \textwidth
[[./pi_v2.png]]
#+LATEX: \end{column}
#+LATEX: \end{columns}
* Multi-threading :noesport:
** Processes /vs/ threads
*** Process
- Has its own memory address space
- Context switching between processes is slow
- Processes interact only through system-provided communication mechanisms
- Fork: creates a copy of the current process
- Exec: switches to running another binary executable
- Spawn: =Fork=, then =exec= the child
*** Thread
- Exist as subsets of a process
- Context switching between threads is fast
- Share the same memory address space : interact via shared memory
** Threads
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.5\textwidth}
#+ATTR_LATEX: :height 0.5\textheight
[[./smp.png]]
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.5\textwidth}
- Concurrent programming
- Graphical user interfaces (progress bars, ...)
- Asynchronous I/O
- Standard library: POSIX threads (=pthreads=)
#+LATEX: \end{column}
#+LATEX: \end{columns}
*** Communication time
- Low latency network latency : \sim 1.2 microsecond
- Random memory access : \sim 0.1 microsecond
** Threads example (Python)
#+begin_src python :tangle Codes/thread_python.py
#!/usr/bin/env python
import threading
import time
class test:
def __init__(self, Nthreads):
self.Nthreads = Nthreads
self.data = [ i for i in range(Nthreads) ]
def run_thread(self, j):
self.data[j] = 0
time.sleep(j)
self.data[j] = j
#+end_src
** Threads example (Python)
#+begin_src python :tangle Codes/thread_python.py
def run(self):
thread = [ None ] * self.Nthreads
t0 = time.time()
print(self.data)
for i in range(self.Nthreads):
thread[i] = threading.Thread( target=self.run_thread, args=(i,) )
thread[i].start()
for i in range(self.Nthreads):
thread[i].join()
print(time.time()-t0, "seconds. ", self.data)
if __name__ == '__main__':
t = test(4)
t.run()
#+end_src
-------------------
#+begin_src text
$ python thread_python.py
[0, 1, 2, 3]
0.0009775161743164062 seconds. [0, 0, 0, 0]
1.0018701553344727 seconds. [0, 1, 0, 0]
2.003377676010132 seconds. [0, 1, 2, 0]
3.004056930541992 seconds. [0, 1, 2, 3]
#+end_src
** Computation of \pi with threads in Python
#+begin_src python :tangle Codes/pi_thread_python.py
#!/usr/bin/env python
import os, sys, threading
from random import random, seed
from math import sqrt
NMAX = 10000000 # Nb of MC steps/process
error_threshold = 1.0e-4 # Stopping criterion
class pi_calculator:
def __init__(self, Nthreads):
self.Nthreads= Nthreads
self.results = []
self.lock = threading.Lock()
def compute_pi(self):
result = 0.
for i in range(NMAX): # Loop NMAX times
x,y = random(), random() # Draw 2 random numbers x and y
if x*x + y*y <= 1.: # Check if (x,y) is in the circle
result += 1
with self.lock:
self.results.append(4.* float(result)/float(NMAX))
#+end_src
** Computation of \pi with threads in Python
#+begin_src python :tangle Codes/pi_thread_python.py
def run(self):
thread = [None] * self.Nthreads
for i in range(self.Nthreads):
thread[i] = threading.Thread( target=self.compute_pi, args=() )
thread[i].start()
print("All threads started")
while True:
for i in range(self.Nthreads):
thread[i].join()
N = len(self.results)
average = sum(self.results)/N # Compute average
if N > 2: # Compute variance
l = [ (x-average)*(x-average) for x in self.results ]
variance = sum(l)/(N-1.)
else:
variance = 0.
error = sqrt(variance)/sqrt(N) # Compute error
print("%f +/- %f %d"%(average, error, N))
#+end_src
** Computation of \pi with threads in Python
#+begin_src python :tangle Codes/pi_thread_python.py
if N > 2 and error < error_threshold: # Stopping condition
return
for i in range(self.Nthreads):
thread[i] = threading.Thread( target=self.compute_pi, args=() )
thread[i].start()
if __name__ == '__main__':
calc = pi_calculator(4)
calc.run()
#+end_src
Note: Inefficient in Python because of the Global Interpreter Lock
(GIL), but you got the idea.
* OpenMP
** OpenMP
- OpenMP is an extension of programming languages that enable the use of
multi-threading to parallelize the code using directives.
- The OpenMP library may be implemented using =pthreads=
- Extensions in OpenMP 5.0 to offload code execution to GPUs
- The same source code can be executed with/without OpenMP
#+begin_src fortran
!$OMP PARALLEL DEFAULT(SHARED) PRIVATE(i)
!$OMP DO
do i=1,n
A(i) = B(i) + C(i)
end do
!$OMP END DO
!$OMP END PARALLEL
#+end_src
** OpenMP directives (Fortran)
- ~!$OMP PARALLEL~ starts a new multi-threaded section.
Everything inside this block is executed by /all/ the threads
- ~!$OMP DO~ tells the compiler to split the loop among the different
threads (by changing the loop boundaries for instance)
- ~!$OMP END DO~ marks the end of the parallel loop.
It contains an implicit synchronization.
After this line, all the threads have finished executing the loop.
- ~!$OMP END PARALLEL~ marks the end of the parallel section. Contains also an implicit barrier.
- ~DEFAULT(SHARED)~ : all the variables (A,B,C) are in shared memory by default
- ~PRIVATE(i)~ : the variable i is private to every thread
** OpenMP directives (Fortran)
- ~!$OMP CRITICAL ... !$OMP END CRITICAL~ : all the statements in this block are protected by a lock
- ~!$OMP TASK ... !$OMP END TASK~ : define a new task to execute
- ~!$OMP BARRIER~ : synchronization barrier
- ~!$OMP SINGLE ... !$OMP END SINGLE~ : all the statements in this block are executed by a single thread
- ~!$OMP MASTER ... !$OMP END MASTER~ : all the statements in this block are executed by the master thread
** OpenMP
*** Functions
- ~omp_get_thread_num()~ : returns the ID of the current running
thread (like ~MPI_Rank~)
- ~omp_get_num_threads()~ : returns the total number of running
threads (like ~MPI_Size~)
- ~OMP_NUM_THREADS~ : Environment variable (shell) that fixes the
number of threads to run
*** Important
- Multiple threads *can read* at the same memory address
- Multiple threads **must not write** at the same address
** Matrix multiplication
\[ C_{ij} = \sum_{k=1}^N A_{ik} B_{kj} \]
#+begin_src fortran
do j=1,N
do i=1,N
C(i,j) = 0.d0
do k=1,N
C(i,j) = C(i,j) + A(i,k) * B(k,j)
end do
end do
end do
#+end_src
** Matrix multiplication with OpenMP
\[ C_{ij} = \sum_{k=1}^N A_{ik} B_{kj} \]
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.5\textwidth}
#+begin_src fortran
!$OMP PARALLEL DO DEFAULT(SHARED) PRIVATE (i,j,k)
do j=1,N
do i=1,N
C(i,j) = 0.d0
do k=1,N
C(i,j) = C(i,j) + A(i,k) * B(k,j)
end do
end do
end do
!$OMP END PARALLEL DO
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
- Loop is parallelized over ~j~
- Writing in ~C(i,j)~ is OK
#+LATEX: \end{column}
#+LATEX: \end{columns}
** Matrix multiplication with OpenMP
\[ C_{ij} = \sum_{k=1}^N A_{ik} B_{kj} \]
#+LATEX: \begin{columns}
#+LATEX: \begin{column}{0.5\textwidth}
#+begin_src fortran
!$OMP PARALLEL DEFAULT(SHARED) PRIVATE (i,j,k)
!$OMP DO COLLAPSE(2)
do j=1,N
do i=1,N
C(i,j) = 0.d0
do k=1,N
C(i,j) = C(i,j) + A(i,k) * B(k,j)
end do
end do
end do
!$OMP END DO
!$OMP END PARALLEL
#+end_src
#+LATEX: \end{column}
#+LATEX: \begin{column}{0.4\textwidth}
- Loop is parallelized over pairs ~(i,j)~
- Writing in ~C(i,j)~ is OK
#+LATEX: \end{column}
#+LATEX: \end{columns}
* Exercises
** Exercise 1: Monte Carlo
1. Write a Fortran src_fortran{double precision function compute_pi(M)} that computes
$\pi$ with the Monte Carlo algorithm using $M$ samples
2. Call it using this main program:
#+begin_src fortran :tangle Exercises/Ex1/main.f90
program pi_mc
implicit none
integer :: M
logical :: iterate
double precision :: sample
double precision, external :: compute_pi
call random_seed() ! Initialize random number generator
open(unit=11, file='fortran_out.fifo', status='old', action='write', &
access='stream',form='formatted')
iterate = .True.
do while (iterate) ! Compute pi over N samples until 'iterate=.False.'
sample = compute_pi()
write(11,*) sample
end do
end program pi_mc
#+end_src
** Exercise 1: Monte Carlo
3. Write a Python server =pi_server.py= that receives samples of \pi in a socket
and compute the running average of \pi. Its address and port
number are written in a file =server.cfg=.
4. Write a Python script =pi_bridge.py= that reads samples of \pi in a named pipe =fortran_out.fifo= and sends the samples to the server.
This script can read the the address and port number of the
server from the file =server.cfg=.
5. When the convergence criterion is reached, the server informs
the bridges that they can stop.
*** Running a simulation on multiple nodes
- Run a single server
- Run one bridge per compute node using the =mpiexec= command
- Run one Fortran process per core using the =mpiexec= command
** Exercise 2: Divide and conquer for matrix multiplication
* Solutions :noexport:
** Exercise 1: Monte Carlo
*** Compute_pi.f90
#+begin_src fortran :tangle Exercises/Ex1/compute_pi.f90
double precision function compute_pi()
implicit none
integer, parameter :: M=100000000
double precision :: x, y, n_in
integer :: i
n_in = 0.d0
do i=1, M
call random_number(x)
call random_number(y)
if (x*x + y*y <= 1.d0) then
n_in = n_in+1.d0
end if
end do
compute_pi = 4.d0*n_in/dble(M)
end function compute_pi
#+end_src
*** Compilation
#+begin_src bash
gfortran compute_pi.f90 main.f90 -o pi.x
#+end_src
*** Python bridge
#+begin_src python :tangle Exercises/Ex1/pi_bridge.py
#!/usr/bin/env python
import os, sys, socket
server_config = "server.cfg" # Configuration file
pipe_in = "fortran_out.fifo" # Named pipe for input
def main():
# Read hostname and port number from config file
with open(server_config, 'r') as f:
HOSTNAME = f.readline().strip()
PORT = int( f.readline() )
# Open named pipe for reading the output of the Fortran executables
p_in = open(pipe_in, 'r')
print("Bridge connects to", HOSTNAME, PORT)
while True:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
try: # Connect the socket to the address and port of the server
s.connect( (HOSTNAME, PORT) )
except socket.error:
print("Server inactive")
break
X = p_in.readline()
message = str(X)
s.send(message.encode()) # Send the data
reply = s.recv(128).decode('utf-8') # Read the reply of the server
if reply == "STOP": break
if __name__ == '__main__':
main()
#+end_src
*** Python server
#+begin_src python :tangle Exercises/Ex1/pi_server.py
#!/usr/bin/env python
import sys, os, socket
from math import sqrt
error_threshold = 1.e-5 # Stopping criterion
PORT = 1666 # Port number
server_config = "server.cfg" # Config file for the server
def main():
HOSTNAME = "localhost" #socket.gethostname()
with open(server_config, 'w') as f: # Write hostname to config file
f.write(HOSTNAME+"\n")
f.write(str(PORT)+"\n")
print("Server is", HOSTNAME, PORT)
data = []
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port
while True:
s.listen(5) # Wait for incoming connections
conn, addr = s.accept() # Accept connection
X = ""
while True: # Buffered read of the socket
message = conn.recv(128)
X += message.decode('utf-8')
if len(message) < 128: break
data.append( float(X) )
N = len(data)
average = sum(data)/N # Compute average
if N > 2: # Compute variance
l = [ (x-average)*(x-average) for x in data ]
variance = sum(l)/(N-1.)
else:
variance = 0.
error = sqrt(variance)/sqrt(N) # Compute error
print('%f +/- %f'%(average,error))
# Stopping condition
if N > 2 and error < error_threshold:
conn.send("STOP".encode())
break
else:
conn.send("OK".encode())
conn.close()
if __name__ == "__main__":
main()
#+end_src
*** Script
#+begin_src bash :tangle Exercises/Ex1/run.sh
#!/bin/bash
mkfifo fortran_out.fifo
python pi_server.py &
mpiexec --pernode python pi_bridge.py &
mpiexec pi.x
rm fortran_out.fifo
#+end_src
** Divide and conquer for matrix multiplication
* Figures :noexport:
#+BEGIN_SRC dot :output file :file merge.png
digraph G {
graph [layout=dot rankdir=TB]
node [shape=record]
subgraph clusterA {
label="A"
node37641025 [label="3|7|6|4|1|0|2|5"];
{rank=same ; node3764 [label="3|7|6|4"] ; node1025 [label="1|0|2|5"]}
{rank=same ; node37 [label="3|7"] ; node64 [label="6|4"]}
{rank=same ; node10 [label="1|0"] ; node25 [label="2|5"]}
{rank=same ; node3 [label="3"] ; node7 [label="7"]}
{rank=same ; node6 [label="6"] ; node4 [label="4"]}
{rank=same ; node1 [label="1"] ; node0 [label="0"]}
{rank=same ; node2 [label="2"] ; node5 [label="5"]}
{rank=same ; node37_ [label="3|7"] ; node46_ [label="4|6"]}
{rank=same ; node01_ [label="0|1"] ; node25_ [label="2|5"]}
{rank=same ; node3467_ [label="3|4|6|7"] ; node0125_ [label="0|1|2|5"]}
node01234567_ [label="0|1|2|3|4|5|6|7"];
}
node37641025 -> node3764;
node3764 -> node37;
node37 -> node3;
node37 -> node7;
node3764 -> node64;
node64 -> node6;
node64 -> node4;
node37641025 -> node1025;
node1025 -> node10;
node10 -> node1;
node10 -> node0;
node1025 -> node25;
node25 -> node2;
node25 -> node5;
node3 -> node37_;
node7 -> node37_;
node6 -> node46_;
node4 -> node46_;
node1 -> node01_;
node0 -> node01_;
node2 -> node25_;
node5 -> node25_;
node37_ -> node3467_;
node46_ -> node3467_;
node01_ -> node0125_;
node25_ -> node0125_;
node0125_ -> node01234567_;
node3467_ -> node01234567_;
}
#+END_SRC
#+RESULTS:
#+BEGIN_SRC dot :output file :file merge_parallel.png
digraph G {
graph [layout=dot rankdir=TB]
node [shape=record]
node37641025 [label="3|7|6|4|1|0|2|5"];
node3764 [label="3|7|6|4"] ; node1025 [label="1|0|2|5"]
node37 [label="3|7"] ; node64 [label="6|4"]
node10 [label="1|0"] ; node25 [label="2|5"]
node3 [label="3"] ; node7 [label="7"]
node6 [label="6"] ; node4 [label="4"]
node1 [label="1"] ; node0 [label="0"]
node2 [label="2"] ; node5 [label="5"]
node37_ [label="3|7"] ; node46_ [label="4|6"]
node01_ [label="0|1"] ; node25_ [label="2|5"]
node3467_ [label="3|4|6|7"] ; node0125_ [label="0|1|2|5"]
node01234567_ [label="0|1|2|3|4|5|6|7"];
subgraph clusterA {
label="A"
node37641025;
node3764;
node37;
node3;
node7;
node37_;
node3467_;
node01234567_;
}
subgraph clusterB {
label="B";
node1025;
node10;
node1;
node0;
node01_;
node0125_;
}
subgraph clusterC {
label="C";
node64;
node6;
node4;
node46_;
}
subgraph clusterD {
label="D";
node25;
node2;
node5;
node25_;
}
node37641025 -> node3764;
node3764 -> node37;
node37 -> node3;
node37 -> node7;
node3764 -> node64;
node64 -> node6;
node64 -> node4;
node37641025 -> node1025;
node1025 -> node10;
node10 -> node1;
node10 -> node0;
node1025 -> node25;
node25 -> node2;
node25 -> node5;
node3 -> node37_;
node7 -> node37_;
node6 -> node46_;
node4 -> node46_;
node1 -> node01_;
node0 -> node01_;
node2 -> node25_;
node5 -> node25_;
node37_ -> node3467_;
node46_ -> node3467_;
node01_ -> node0125_;
node25_ -> node0125_;
node0125_ -> node01234567_;
node3467_ -> node01234567_;
}
#+END_SRC
#+RESULTS:
#+BEGIN_SRC dot :output file :file deadlock.png
digraph G {
graph [layout=dot]
A -> B ;
B -> C ;
C -> D ;
D -> A ;
}
#+END_SRC
#+RESULTS:
[[file:deadlock.png]]
* Exam questions :noexport:
** Question 1
Consider a matrix multiplication. Which one can /not/ be executed safely
in parallel?
a)
#+begin_src fortran
!$OMP PARALLEL DO PRIVATE(i,j,k) SHARED(A,B,C)
do j=1,n
do i=1,n
do k=1,n
C(i,j) = C(i,j) + A(i,k) + B(k,j)
end do
end do
end do
!$OMP END PARALLEL DO
#+end_src
b)
#+begin_src fortran
!$OMP PARALLEL PRIVATE(i,j,k) SHARED(A,B,C)
!$OMP DO
do j=1,n
do i=1,n
do k=1,n
C(i,j) = C(i,j) + A(i,k) + B(k,j)
end do
end do
end do
!$OMP END DO
!$OMP END PARALLEL
#+end_src
c)
#+begin_src fortran
!$OMP PARALLEL PRIVATE(i,j,k) SHARED(A,B,C)
do j=1,n
!$OMP DO
do i=1,n
do k=1,n
C(i,j) = C(i,j) + A(i,k) + B(k,j)
end do
end do
!$OMP END DO
end do
!$OMP END PARALLEL
#+end_src
d)
#+begin_src fortran
!$OMP PARALLEL PRIVATE(i,j,k) SHARED(A,B,C)
do j=1,n
do i=1,n
!$OMP DO
do k=1,n
C(i,j) = C(i,j) + A(i,k) + B(k,j)
end do
!$OMP END DO
end do
end do
!$OMP END PARALLEL
#+end_src
** Question 2
** Question 3
** Question 4
* Export :noexport:
#+BEGIN_SRC elisp :output none
(setq org-latex-listings 'minted)
(setq org-latex-custom-lang-environments nil)
(setq org-latex-minted-options
'(
("fontsize" "\\scriptsize")
("linenos" "")))
(setq org-latex-to-pdf-process
'("pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
"pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
"pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"))
(org-beamer-export-to-pdf)
#+END_SRC
#+RESULTS:
: /home/scemama/MEGA/TEX/Cours/TCCM/TCCM2022/Parallelism/parallelism_scemama.pdf