Starting MPI
This commit is contained in:
parent
69ca8512da
commit
7037335bff
@ -17,6 +17,7 @@
|
||||
#+LaTeX_HEADER: \definecolor{darkblue}{rgb}{0.,0.2,0.7}
|
||||
#+LaTeX_HEADER: \definecolor{darkred}{rgb}{0.6,0.1,0.1}
|
||||
#+LaTeX_HEADER: \definecolor{darkpink}{rgb}{0.7,0.0,0.7}
|
||||
#+PROPERTY: header-args :exports code
|
||||
#+EXPORT_EXCLUDE_TAGS: noexport
|
||||
|
||||
#+startup: beamer
|
||||
@ -645,7 +646,7 @@ user 0m3.359s
|
||||
sys 0m3.172s
|
||||
#+end_src
|
||||
|
||||
* Inter-process communication
|
||||
* Inter-process communication :noexport:
|
||||
|
||||
** Processes /vs/ threads
|
||||
|
||||
@ -862,9 +863,10 @@ if __name__ == "__main__":
|
||||
#+ATTR_LATEX: :height 0.6\textheight
|
||||
[[./pi_convergence.png]]
|
||||
|
||||
** Computation of $\pi$ with pipes in Python
|
||||
** Computation of \pi with pipes in Python
|
||||
|
||||
#+begin_src python :tangle Codes/pi_python.py
|
||||
#!/usr/bin/env python
|
||||
import os, sys
|
||||
from random import random, seed
|
||||
from math import sqrt
|
||||
@ -885,7 +887,7 @@ def compute_pi():
|
||||
return 4.* float(result)/float(NMAX) # Estimate of pi
|
||||
#+end_src
|
||||
|
||||
** Computation of $\pi$ with pipes in Python
|
||||
** Computation of \pi with pipes in Python
|
||||
|
||||
#+begin_src python :tangle Codes/pi_python.py
|
||||
def main():
|
||||
@ -911,7 +913,7 @@ def main():
|
||||
sys.exit(0)
|
||||
#+end_src
|
||||
|
||||
** Computation of $\pi$ with pipes in Python
|
||||
** Computation of \pi with pipes in Python
|
||||
|
||||
#+begin_src python :tangle Codes/pi_python.py
|
||||
data = []
|
||||
@ -926,7 +928,7 @@ def main():
|
||||
else:
|
||||
variance = 0.
|
||||
error = sqrt(variance)/sqrt(N) # Compute error
|
||||
print(f"%{average} +/- %{error} %{N}")
|
||||
print("%f +/- %f %d"%(average, error, N))
|
||||
|
||||
if N > 2 and error < error_threshold: # Stopping condition
|
||||
for i in range(NPROC): # Kill all children
|
||||
@ -937,8 +939,372 @@ def main():
|
||||
if __name__ == '__main__': main()
|
||||
#+end_src
|
||||
|
||||
** Computation of \pi with pipes in Python
|
||||
|
||||
#+begin_src text
|
||||
$ python pi_python.py
|
||||
3.141974 +/- 0.000000 1
|
||||
3.142569 +/- 0.000000 2
|
||||
3.142168 +/- 0.000528 3
|
||||
3.141938 +/- 0.000439 4
|
||||
3.141947 +/- 0.000340 5
|
||||
[...]
|
||||
3.141625 +/- 0.000107 33
|
||||
3.141614 +/- 0.000104 34
|
||||
3.141617 +/- 0.000101 35
|
||||
3.141606 +/- 0.000099 36
|
||||
#+end_src
|
||||
|
||||
** Sockets
|
||||
|
||||
Sockets are analogous to pipes, but they allow both ends of the pipe to be
|
||||
on different machines connected by a network interface.
|
||||
An Internet socket is characterized by a unique combination of:
|
||||
|
||||
- A transport protocol: TCP, UDP, raw IP, ...
|
||||
- A local socket address: Local IP address and port number, for example 192.168.2.2:22
|
||||
- A remote socket address: Optional (TCP)
|
||||
|
||||
** Sockets
|
||||
|
||||
#+ATTR_LATEX: :height 0.9\textheight
|
||||
[[./socket.png]]
|
||||
|
||||
** Sockets: server code in Python
|
||||
|
||||
#+begin_src python :tangle Codes/socket_server.py
|
||||
#!/usr/bin/env python
|
||||
|
||||
import sys, os, socket, datetime
|
||||
now = datetime.datetime.now
|
||||
|
||||
def main():
|
||||
HOSTNAME = socket.gethostname()
|
||||
PORT = 11279
|
||||
print(now(), "I am the server : %s:%d"%(HOSTNAME,PORT))
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
||||
s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port
|
||||
s.listen(5) # Wait for incoming connections
|
||||
conn, addr = s.accept() # Accept connection
|
||||
print(now(), "Connected by", addr)
|
||||
|
||||
#+end_src
|
||||
|
||||
** Sockets: server code in Python
|
||||
|
||||
#+begin_src python :tangle Codes/socket_server.py
|
||||
data = ""
|
||||
while True: # Buffered read of the socket
|
||||
message = conn.recv(8).decode('utf-8')
|
||||
print(now(), "Buffer : "+message)
|
||||
data += message
|
||||
if len(message) < 8: break
|
||||
print(now(), "Received data : ", data)
|
||||
|
||||
print(now(), "Sending thank you...")
|
||||
conn.send("Thank you".encode())
|
||||
conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
#+end_src
|
||||
|
||||
** Sockets: client code in Python
|
||||
|
||||
#+begin_src python :tangle Codes/socket_client.py
|
||||
#!/usr/bin/env python
|
||||
import sys, os, socket, datetime
|
||||
now = datetime.datetime.now
|
||||
|
||||
def main():
|
||||
HOSTNAME = sys.argv[1] # Get host name from command line
|
||||
PORT = int(sys.argv[2]) # Get port from command line
|
||||
print(now(), "The target server is : %s:%d"%(HOSTNAME,PORT))
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
||||
s.connect( (HOSTNAME, PORT) ) # Connect the socket to the address and port
|
||||
# of the server
|
||||
message = "Hello, world!!!!!!!"
|
||||
print(now(), "Sending : "+message)
|
||||
s.send(message.encode()) # Send the data
|
||||
|
||||
data = s.recv(1024) # Read the reply of the server
|
||||
s.close()
|
||||
print(now(), 'Received: ', data.decode('utf-8'))
|
||||
|
||||
if __name__ == "__main__": main()
|
||||
#+end_src
|
||||
|
||||
** Sockets: Execution
|
||||
|
||||
#+begin_src text
|
||||
$ python Codes/socket_client.py lpqdh82 11279
|
||||
2021-11-20 21:20:32.258632 The target server is : lpqdh82:11279
|
||||
2021-11-20 21:20:32.258959 Sending : Hello, world!!!!!!!
|
||||
2021-11-20 21:20:32.259042 Connected by ('127.0.0.1', 36798)
|
||||
2021-11-20 21:20:32.259068 Buffer : Hello, w
|
||||
2021-11-20 21:20:32.259076 Buffer : orld!!!!
|
||||
2021-11-20 21:20:32.259083 Buffer : !!!
|
||||
2021-11-20 21:20:32.259088 Received data : Hello, world!!!!!!!
|
||||
2021-11-20 21:20:32.259093 Sending thank you...
|
||||
2021-11-20 21:20:32.259133 Received: Thank you
|
||||
#+end_src
|
||||
|
||||
Note that the client and server can be written in different languages.
|
||||
|
||||
** Server for \pi with sockets in Python
|
||||
|
||||
#+begin_src python :tangle Codes/pi_server_python.py
|
||||
#!/usr/bin/env python
|
||||
import sys, os, socket
|
||||
from math import sqrt
|
||||
HOSTNAME = "localhost"
|
||||
PORT = 1666
|
||||
error_threshold = 1.e-4 # Stopping criterion
|
||||
|
||||
def main():
|
||||
data = []
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
||||
s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port
|
||||
while True:
|
||||
s.listen(5) # Wait for incoming connections
|
||||
conn, addr = s.accept() # Accept connection
|
||||
X = ""
|
||||
while True: # Buffered read of the socket
|
||||
message = conn.recv(128)
|
||||
X += message.decode('utf-8')
|
||||
if len(message) < 128: break
|
||||
data.append( float(X) )
|
||||
N = len(data)
|
||||
#+end_src
|
||||
|
||||
** Server for \pi with sockets in Python
|
||||
|
||||
#+begin_src python :tangle Codes/pi_server_python.py
|
||||
average = sum(data)/N # Compute average
|
||||
if N > 2: # Compute variance
|
||||
l = [ (x-average)*(x-average) for x in data ]
|
||||
variance = sum(l)/(N-1.)
|
||||
else:
|
||||
variance = 0.
|
||||
error = sqrt(variance)/sqrt(N) # Compute error
|
||||
|
||||
print('%f +/- %f'%(average,error))
|
||||
|
||||
# Stopping condition
|
||||
if N > 2 and error < error_threshold:
|
||||
conn.send("STOP".encode())
|
||||
break
|
||||
else:
|
||||
conn.send("OK".encode())
|
||||
conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
#+end_src
|
||||
|
||||
** Client for \pi with sockets in Python
|
||||
|
||||
#+begin_src python :tangle Codes/pi_client_python.py
|
||||
#!/usr/bin/env python
|
||||
import os, sys, socket
|
||||
from random import random, seed
|
||||
from math import sqrt
|
||||
HOSTNAME = "localhost"
|
||||
PORT = 1666
|
||||
NMAX = 10000000 # Nb of MC steps/process
|
||||
error_threshold = 1.0e-4 # Stopping criterion
|
||||
NPROC=4 # Use 4 processes
|
||||
|
||||
def compute_pi():
|
||||
"""Local Monte Carlo calculation of pi"""
|
||||
seed(None) # Initialize random number generator
|
||||
|
||||
result = 0.
|
||||
for i in range(NMAX): # Loop NMAX times
|
||||
x,y = random(), random() # Draw 2 random numbers x and y
|
||||
if x*x + y*y <= 1.: # Check if (x,y) is in the circle
|
||||
result += 1
|
||||
return 4.* float(result)/float(NMAX) # Estimate of pi
|
||||
#+end_src
|
||||
|
||||
** Client for \pi with sockets in Python
|
||||
|
||||
#+begin_src python :tangle Codes/pi_client_python.py
|
||||
def main():
|
||||
|
||||
while True:
|
||||
X = compute_pi()
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket
|
||||
try: # Connect the socket to the address and port of the server
|
||||
s.connect( (HOSTNAME, PORT) )
|
||||
except socket.error:
|
||||
break
|
||||
message = str(X)
|
||||
s.send(message.encode()) # Send the data
|
||||
reply = s.recv(128).decode('utf-8') # Read the reply of the server
|
||||
s.close()
|
||||
|
||||
if reply == "STOP": break
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
#+end_src
|
||||
|
||||
** Execution of \pi with sockets in Python
|
||||
|
||||
#+begin_src text
|
||||
$ python pi_server_python.py &
|
||||
> for i in {1..8} ; do
|
||||
> python pi_client_python.py &
|
||||
> done ; wait
|
||||
|
||||
3.142136 +/- 0.000000
|
||||
3.141783 +/- 0.000000
|
||||
3.141992 +/- 0.000291
|
||||
3.141804 +/- 0.000279
|
||||
[...]
|
||||
3.141687 +/- 0.000104
|
||||
3.141666 +/- 0.000102
|
||||
3.141651 +/- 0.000098
|
||||
|
||||
#+end_src
|
||||
|
||||
|
||||
* Message Passing Interface (MPI)
|
||||
|
||||
** Message Passing Interface
|
||||
|
||||
- Application Programming Interface for inter-process communication
|
||||
- Takes advantage of HPC hardware:
|
||||
- TCP/IP: 50 $\mu \text{s}$ latency
|
||||
- Remote Direct Memory Access (RDMA): <2 $\mu \text{s}$
|
||||
(low-latency network)
|
||||
- Portable
|
||||
- Each vendor has its own implementation adapted to the hardware
|
||||
- Standard in HPC
|
||||
- Initially designed for fixed number of processes:
|
||||
- No problem for the discovery of peers
|
||||
- Fast collective communications
|
||||
- Single Program Multiple Data (SPMD) paradigm
|
||||
|
||||
** Communicators
|
||||
|
||||
- Group of processes that can communicate together
|
||||
- Each process has an ID in the communicator: no need for IP
|
||||
adresses and port numbers
|
||||
- ~MPI_COMM_WORLD~: Global communicator, default
|
||||
- ~size~: number of processes in the communicator
|
||||
- ~rank~: ID of the process in the communicator
|
||||
|
||||
** Point-to-point communication
|
||||
|
||||
*** Python
|
||||
- Send:
|
||||
~comm.send(data, dest, tag)~
|
||||
- Receive:
|
||||
~comm.recv(source, tag)~
|
||||
*** Fortran
|
||||
- Send:
|
||||
~MPI_SEND(buffer, count, datatype, destination, tag, communicator, ierror)~
|
||||
- Receive:
|
||||
~MPI_RECV(buffer, count, datatype, source, tag, communicator,
|
||||
status, ierror)~
|
||||
** Point-to-point communication (Python)
|
||||
|
||||
#+begin_src python :tangle Codes/mpi_rank.py
|
||||
from mpi4py import MPI
|
||||
|
||||
def main():
|
||||
comm = MPI.COMM_WORLD
|
||||
rank = comm.Get_rank()
|
||||
size = comm.Get_size()
|
||||
|
||||
if rank == 0:
|
||||
data = 42
|
||||
print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data))
|
||||
comm.send(data, dest=1, tag=11)
|
||||
print("After : Rank: %d Size: %d Data: %d"%(rank, size, data))
|
||||
elif rank == 1:
|
||||
data = 0
|
||||
print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data))
|
||||
data = comm.recv(source=0, tag=11)
|
||||
print("After : Rank: %d Size: %d Data: %d"%(rank, size, data))
|
||||
|
||||
if __name__ == "__main__": main()
|
||||
#+end_src
|
||||
|
||||
** Point-to-point communication (Python)
|
||||
|
||||
#+begin_src text
|
||||
$ mpiexec -n 4 python mpi_rank.py
|
||||
Before: Rank: 0 Size: 4 Data: 42
|
||||
Before: Rank: 1 Size: 4 Data: 0
|
||||
After : Rank: 0 Size: 4 Data: 42
|
||||
After : Rank: 1 Size: 4 Data: 42
|
||||
#+end_src
|
||||
|
||||
In Fortran, compile using =mpif90= and execute using =mpiexec= (or =mpirun=).
|
||||
|
||||
** Point-to-point communication (Fortran)
|
||||
|
||||
#+begin_src fortran :tangle Codes/mpi_rank.f90
|
||||
program test_rank
|
||||
use mpi
|
||||
implicit none
|
||||
integer :: rank, size, data, ierr, status(mpi_status_size)
|
||||
|
||||
call MPI_INIT(ierr) ! Initialize library (required)
|
||||
if (ierr /= MPI_SUCCESS) then
|
||||
call MPI_ABORT(MPI_COMM_WORLD, 1, ierr)
|
||||
end if
|
||||
|
||||
call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr)
|
||||
if (ierr /= MPI_SUCCESS) then
|
||||
call MPI_ABORT(MPI_COMM_WORLD, 2, ierr)
|
||||
end if
|
||||
|
||||
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
|
||||
if (ierr /= MPI_SUCCESS) then
|
||||
call MPI_ABORT(MPI_COMM_WORLD, 3, ierr)
|
||||
end if
|
||||
#+end_src
|
||||
|
||||
** Point-to-point communication (Fortran)
|
||||
|
||||
#+begin_src fortran :tangle Codes/mpi_rank.f90
|
||||
if (rank == 0) then
|
||||
data = 42
|
||||
print *, "Before: Rank:", rank, "Size:", size, "Data: ", data
|
||||
call MPI_SEND(data, 1, MPI_INTEGER, 1, 11, MPI_COMM_WORLD, ierr)
|
||||
print *, "After : Rank:", rank, "Size:", size, "Data: ", data
|
||||
|
||||
else if (rank == 1) then
|
||||
data = 0
|
||||
print *, "Before: Rank:", rank, "Size:", size, "Data: ", data
|
||||
call MPI_RECV(data, 1, MPI_INTEGER, 0, 11, MPI_COMM_WORLD, &
|
||||
status, ierr)
|
||||
print *, "After : Rank:", rank, "Size:", size, "Data: ", data
|
||||
|
||||
end if
|
||||
call MPI_FINALIZE(ierr) ! De-initialize library (required)
|
||||
end program
|
||||
#+end_src
|
||||
|
||||
** Collective communications
|
||||
|
||||
*** One-to-all
|
||||
- Broadcast: send same data to all
|
||||
- Scatter: distribute an array
|
||||
*** All-to-one
|
||||
- Reduction: Sum/product/... of data coming from all ranks
|
||||
- Gather: collect a distributed array
|
||||
*** All-to-all
|
||||
- Reduction and broadcast
|
||||
|
||||
** Deadlocks
|
||||
* OpenMP
|
||||
|
||||
* Exercises
|
||||
@ -1246,3 +1612,4 @@ digraph G {
|
||||
|
||||
#+RESULTS:
|
||||
: /home/scemama/MEGA/TEX/Cours/TCCM/TCCM2022/Parallelism/parallelism_scemama.pdf
|
||||
|
||||
|
BIN
socket.png
Normal file
BIN
socket.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 28 KiB |
Loading…
Reference in New Issue
Block a user