diff --git a/parallelism_scemama.org b/parallelism_scemama.org index dcf03a7..a559ed2 100644 --- a/parallelism_scemama.org +++ b/parallelism_scemama.org @@ -17,6 +17,7 @@ #+LaTeX_HEADER: \definecolor{darkblue}{rgb}{0.,0.2,0.7} #+LaTeX_HEADER: \definecolor{darkred}{rgb}{0.6,0.1,0.1} #+LaTeX_HEADER: \definecolor{darkpink}{rgb}{0.7,0.0,0.7} +#+PROPERTY: header-args :exports code #+EXPORT_EXCLUDE_TAGS: noexport #+startup: beamer @@ -645,7 +646,7 @@ user 0m3.359s sys 0m3.172s #+end_src -* Inter-process communication +* Inter-process communication :noexport: ** Processes /vs/ threads @@ -862,9 +863,10 @@ if __name__ == "__main__": #+ATTR_LATEX: :height 0.6\textheight [[./pi_convergence.png]] -** Computation of $\pi$ with pipes in Python +** Computation of \pi with pipes in Python #+begin_src python :tangle Codes/pi_python.py +#!/usr/bin/env python import os, sys from random import random, seed from math import sqrt @@ -885,7 +887,7 @@ def compute_pi(): return 4.* float(result)/float(NMAX) # Estimate of pi #+end_src -** Computation of $\pi$ with pipes in Python +** Computation of \pi with pipes in Python #+begin_src python :tangle Codes/pi_python.py def main(): @@ -911,7 +913,7 @@ def main(): sys.exit(0) #+end_src -** Computation of $\pi$ with pipes in Python +** Computation of \pi with pipes in Python #+begin_src python :tangle Codes/pi_python.py data = [] @@ -926,7 +928,7 @@ def main(): else: variance = 0. error = sqrt(variance)/sqrt(N) # Compute error - print(f"%{average} +/- %{error} %{N}") + print("%f +/- %f %d"%(average, error, N)) if N > 2 and error < error_threshold: # Stopping condition for i in range(NPROC): # Kill all children @@ -937,8 +939,372 @@ def main(): if __name__ == '__main__': main() #+end_src +** Computation of \pi with pipes in Python + + #+begin_src text +$ python pi_python.py +3.141974 +/- 0.000000 1 +3.142569 +/- 0.000000 2 +3.142168 +/- 0.000528 3 +3.141938 +/- 0.000439 4 +3.141947 +/- 0.000340 5 +[...] +3.141625 +/- 0.000107 33 +3.141614 +/- 0.000104 34 +3.141617 +/- 0.000101 35 +3.141606 +/- 0.000099 36 + #+end_src + +** Sockets + + Sockets are analogous to pipes, but they allow both ends of the pipe to be + on different machines connected by a network interface. + An Internet socket is characterized by a unique combination of: + + - A transport protocol: TCP, UDP, raw IP, ... + - A local socket address: Local IP address and port number, for example 192.168.2.2:22 + - A remote socket address: Optional (TCP) + +** Sockets + + #+ATTR_LATEX: :height 0.9\textheight + [[./socket.png]] + +** Sockets: server code in Python + + #+begin_src python :tangle Codes/socket_server.py +#!/usr/bin/env python + +import sys, os, socket, datetime +now = datetime.datetime.now + +def main(): + HOSTNAME = socket.gethostname() + PORT = 11279 + print(now(), "I am the server : %s:%d"%(HOSTNAME,PORT)) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket + s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port + s.listen(5) # Wait for incoming connections + conn, addr = s.accept() # Accept connection + print(now(), "Connected by", addr) + + #+end_src + +** Sockets: server code in Python + + #+begin_src python :tangle Codes/socket_server.py + data = "" + while True: # Buffered read of the socket + message = conn.recv(8).decode('utf-8') + print(now(), "Buffer : "+message) + data += message + if len(message) < 8: break + print(now(), "Received data : ", data) + + print(now(), "Sending thank you...") + conn.send("Thank you".encode()) + conn.close() + +if __name__ == "__main__": + main() + #+end_src + +** Sockets: client code in Python + + #+begin_src python :tangle Codes/socket_client.py +#!/usr/bin/env python +import sys, os, socket, datetime +now = datetime.datetime.now + +def main(): + HOSTNAME = sys.argv[1] # Get host name from command line + PORT = int(sys.argv[2]) # Get port from command line + print(now(), "The target server is : %s:%d"%(HOSTNAME,PORT)) + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket + s.connect( (HOSTNAME, PORT) ) # Connect the socket to the address and port + # of the server + message = "Hello, world!!!!!!!" + print(now(), "Sending : "+message) + s.send(message.encode()) # Send the data + + data = s.recv(1024) # Read the reply of the server + s.close() + print(now(), 'Received: ', data.decode('utf-8')) + +if __name__ == "__main__": main() + #+end_src + +** Sockets: Execution + + #+begin_src text +$ python Codes/socket_client.py lpqdh82 11279 +2021-11-20 21:20:32.258632 The target server is : lpqdh82:11279 +2021-11-20 21:20:32.258959 Sending : Hello, world!!!!!!! +2021-11-20 21:20:32.259042 Connected by ('127.0.0.1', 36798) +2021-11-20 21:20:32.259068 Buffer : Hello, w +2021-11-20 21:20:32.259076 Buffer : orld!!!! +2021-11-20 21:20:32.259083 Buffer : !!! +2021-11-20 21:20:32.259088 Received data : Hello, world!!!!!!! +2021-11-20 21:20:32.259093 Sending thank you... +2021-11-20 21:20:32.259133 Received: Thank you + #+end_src + + Note that the client and server can be written in different languages. + +** Server for \pi with sockets in Python + + #+begin_src python :tangle Codes/pi_server_python.py +#!/usr/bin/env python +import sys, os, socket +from math import sqrt +HOSTNAME = "localhost" +PORT = 1666 +error_threshold = 1.e-4 # Stopping criterion + +def main(): + data = [] + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket + s.bind( (HOSTNAME, PORT) ) # Bind the socket to the address and port + while True: + s.listen(5) # Wait for incoming connections + conn, addr = s.accept() # Accept connection + X = "" + while True: # Buffered read of the socket + message = conn.recv(128) + X += message.decode('utf-8') + if len(message) < 128: break + data.append( float(X) ) + N = len(data) + #+end_src + +** Server for \pi with sockets in Python + + #+begin_src python :tangle Codes/pi_server_python.py + average = sum(data)/N # Compute average + if N > 2: # Compute variance + l = [ (x-average)*(x-average) for x in data ] + variance = sum(l)/(N-1.) + else: + variance = 0. + error = sqrt(variance)/sqrt(N) # Compute error + + print('%f +/- %f'%(average,error)) + + # Stopping condition + if N > 2 and error < error_threshold: + conn.send("STOP".encode()) + break + else: + conn.send("OK".encode()) + conn.close() + +if __name__ == "__main__": + main() + #+end_src + +** Client for \pi with sockets in Python + + #+begin_src python :tangle Codes/pi_client_python.py +#!/usr/bin/env python +import os, sys, socket +from random import random, seed +from math import sqrt +HOSTNAME = "localhost" +PORT = 1666 +NMAX = 10000000 # Nb of MC steps/process +error_threshold = 1.0e-4 # Stopping criterion +NPROC=4 # Use 4 processes + +def compute_pi(): + """Local Monte Carlo calculation of pi""" + seed(None) # Initialize random number generator + + result = 0. + for i in range(NMAX): # Loop NMAX times + x,y = random(), random() # Draw 2 random numbers x and y + if x*x + y*y <= 1.: # Check if (x,y) is in the circle + result += 1 + return 4.* float(result)/float(NMAX) # Estimate of pi + #+end_src + +** Client for \pi with sockets in Python + + #+begin_src python :tangle Codes/pi_client_python.py +def main(): + + while True: + X = compute_pi() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create an INET socket + try: # Connect the socket to the address and port of the server + s.connect( (HOSTNAME, PORT) ) + except socket.error: + break + message = str(X) + s.send(message.encode()) # Send the data + reply = s.recv(128).decode('utf-8') # Read the reply of the server + s.close() + + if reply == "STOP": break + +if __name__ == '__main__': + main() + + #+end_src + +** Execution of \pi with sockets in Python + + #+begin_src text +$ python pi_server_python.py & +> for i in {1..8} ; do +> python pi_client_python.py & +> done ; wait + +3.142136 +/- 0.000000 +3.141783 +/- 0.000000 +3.141992 +/- 0.000291 +3.141804 +/- 0.000279 +[...] +3.141687 +/- 0.000104 +3.141666 +/- 0.000102 +3.141651 +/- 0.000098 + + #+end_src + + * Message Passing Interface (MPI) +** Message Passing Interface + + - Application Programming Interface for inter-process communication + - Takes advantage of HPC hardware: + - TCP/IP: 50 $\mu \text{s}$ latency + - Remote Direct Memory Access (RDMA): <2 $\mu \text{s}$ + (low-latency network) + - Portable + - Each vendor has its own implementation adapted to the hardware + - Standard in HPC + - Initially designed for fixed number of processes: + - No problem for the discovery of peers + - Fast collective communications + - Single Program Multiple Data (SPMD) paradigm + +** Communicators + + - Group of processes that can communicate together + - Each process has an ID in the communicator: no need for IP + adresses and port numbers + - ~MPI_COMM_WORLD~: Global communicator, default + - ~size~: number of processes in the communicator + - ~rank~: ID of the process in the communicator + +** Point-to-point communication + +*** Python + - Send: + ~comm.send(data, dest, tag)~ + - Receive: + ~comm.recv(source, tag)~ +*** Fortran + - Send: + ~MPI_SEND(buffer, count, datatype, destination, tag, communicator, ierror)~ + - Receive: + ~MPI_RECV(buffer, count, datatype, source, tag, communicator, + status, ierror)~ +** Point-to-point communication (Python) + + #+begin_src python :tangle Codes/mpi_rank.py +from mpi4py import MPI + +def main(): + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + + if rank == 0: + data = 42 + print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data)) + comm.send(data, dest=1, tag=11) + print("After : Rank: %d Size: %d Data: %d"%(rank, size, data)) + elif rank == 1: + data = 0 + print("Before: Rank: %d Size: %d Data: %d"%(rank, size, data)) + data = comm.recv(source=0, tag=11) + print("After : Rank: %d Size: %d Data: %d"%(rank, size, data)) + +if __name__ == "__main__": main() + #+end_src + +** Point-to-point communication (Python) + + #+begin_src text +$ mpiexec -n 4 python mpi_rank.py +Before: Rank: 0 Size: 4 Data: 42 +Before: Rank: 1 Size: 4 Data: 0 +After : Rank: 0 Size: 4 Data: 42 +After : Rank: 1 Size: 4 Data: 42 + #+end_src + + In Fortran, compile using =mpif90= and execute using =mpiexec= (or =mpirun=). + +** Point-to-point communication (Fortran) + + #+begin_src fortran :tangle Codes/mpi_rank.f90 +program test_rank + use mpi + implicit none + integer :: rank, size, data, ierr, status(mpi_status_size) + + call MPI_INIT(ierr) ! Initialize library (required) + if (ierr /= MPI_SUCCESS) then + call MPI_ABORT(MPI_COMM_WORLD, 1, ierr) + end if + + call MPI_COMM_SIZE(MPI_COMM_WORLD, size, ierr) + if (ierr /= MPI_SUCCESS) then + call MPI_ABORT(MPI_COMM_WORLD, 2, ierr) + end if + + call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr) + if (ierr /= MPI_SUCCESS) then + call MPI_ABORT(MPI_COMM_WORLD, 3, ierr) + end if + #+end_src + +** Point-to-point communication (Fortran) + + #+begin_src fortran :tangle Codes/mpi_rank.f90 + if (rank == 0) then + data = 42 + print *, "Before: Rank:", rank, "Size:", size, "Data: ", data + call MPI_SEND(data, 1, MPI_INTEGER, 1, 11, MPI_COMM_WORLD, ierr) + print *, "After : Rank:", rank, "Size:", size, "Data: ", data + + else if (rank == 1) then + data = 0 + print *, "Before: Rank:", rank, "Size:", size, "Data: ", data + call MPI_RECV(data, 1, MPI_INTEGER, 0, 11, MPI_COMM_WORLD, & + status, ierr) + print *, "After : Rank:", rank, "Size:", size, "Data: ", data + + end if + call MPI_FINALIZE(ierr) ! De-initialize library (required) +end program + #+end_src + +** Collective communications + +*** One-to-all + - Broadcast: send same data to all + - Scatter: distribute an array +*** All-to-one + - Reduction: Sum/product/... of data coming from all ranks + - Gather: collect a distributed array +*** All-to-all + - Reduction and broadcast + +** Deadlocks * OpenMP * Exercises @@ -1246,3 +1612,4 @@ digraph G { #+RESULTS: : /home/scemama/MEGA/TEX/Cours/TCCM/TCCM2022/Parallelism/parallelism_scemama.pdf + diff --git a/socket.png b/socket.png new file mode 100644 index 0000000..ea26cc3 Binary files /dev/null and b/socket.png differ