Crash MPI

Following are basically the note when I went through https://www.hlrs.de/training/self-study-materials. This tutorial is highly recommended as it contains plenty of informative code exercises!

Overview

Sequential vs Parallel

Sequential - One memory, one processor; all the works done in linear order

Parallel - Many memories, many processors; at different levels, processors may share the memory / communicate via MPI (Socket - Node - Cluster)

SPMD

mingle (sub-)program, multiple data

MPMD (multiple program, multiple data) can be implemented with ranks.

rank in MPI

the rank of each processor determines the behavior / role in the program

First MPI example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from mpi4py import MPI

# application-related data
n = None
result = None

comm_world = MPI.COMM_WORLD
# MPI-related data
my_rank = comm_world.Get_rank() # or my_rank = MPI.COMM_WORLD.Get_rank()
num_procs = comm_world.Get_size() # or ditto ...

if (my_rank == 0):
# reading the application data "n" from stdin only by process 0:
n = int(input("Enter the number of elements (n): "))

# broadcasting the content of variable "n" in process 0
# into variables "n" in all other processes:
n = comm_world.bcast(n, root=0)

# doing some application work in each process, e.g.:
result = 1.0 * my_rank * n
print(f"I am process {my_rank} out of {num_procs} handling the {my_rank}ith part of n={n} elements, result={result}")

if (my_rank != 0):
# sending some results from all processes (except 0) to process 0:
comm_world.send(result, dest=0, tag=99)
else:
# receiving all these messages and, e.g., printing them
rank = None
print(f"I'm proc 0: My own result is {result}")
for rank in range(1,num_procs):
result = comm_world.recv(source=rank, tag=99)
print(f"I'm proc 0: received result of process {rank} is {result}")

Execution with mpirun -np 4 python first-example.py ; the output is

1
2
3
4
5
6
7
8
9
Enter the number of elements (n): 100
I am process 1 out of 4 handling the 1ith part of n=100 elements, result=100.0
I am process 2 out of 4 handling the 2ith part of n=100 elements, result=200.0
I am process 3 out of 4 handling the 3ith part of n=100 elements, result=300.0
I am process 0 out of 4 handling the 0ith part of n=100 elements, result=0.0
I'm proc 0: My own result is 0.0
I'm proc 0: received result of process 1 is 100.0
I'm proc 0: received result of process 2 is 200.0
I'm proc 0: received result of process 3 is 300.0

Bottom lines

  1. The output of each process is in the well defined sequence of its program
  2. The output from different processes can be intermixed in any sequence
  3. A sub-program needs to be connected to a message passing system
  4. The total program (i.e., all sub-programs of the program) must be started with the MPI startup tool

Messages among processors

  • process id (rank)
  • location (sender / reciever)
  • Data type (sender / reciever)
  • Data size (sender / reciever)

P2P Communication

From one processor to another;

Two types: synchronous / asynchronous (buffered)

Synchronous send

Both sender and reciever works at the same time; sender recieved an info when finished;

This operation blocks sender until the receive is posted; receive operation blocks until message was sent

Asynchronous send

Sender puts the data in the buffer and know nothing about if received

Blocking / Nonblocking

Some operations may block until another process acts;

Non-block: returns immediately, allows the subprocess to continue the work

All nonblocking procedures must have a matching wait (or test) procedure.

Amdahl’s Law

Extra cost of parallel sets a limitation of speedup $\propto 1/f$

Process Model and Language Bindings

Headers

1
from mpi4py import MPI

Functions

1
2
3
4
5
6
7
8
9
comm_world = MPI.COMM_WORLD

# Direct communication via numpy array
comm_world.Send((snd_buf, ...), ...)
comm_world.Recv((snd_buf, ...), ...)

# object-serialization or packled
comm_world.send(snd_buf, ...)
rcv_buf = comm_world.recv(...)

Initialize

1
2
3
from mpi4py import MPI
# MPI.Init()
# no need as included in the import operation

Finalize

1
2
# MPI.Finalize()
# no need as automatically called at the end of progran

No more MPI call after this (including re-initialize)

Start MPI program

With either mpirun or mpiexec; the latter is recommended for standard reasons.

MPI_COMM_WORLD

This is a pre-defined handler for all the processes; each is assigned with a rank (0, 1, …, np - 1)

Handles

Handles are the idenfication of MPI objects;

1
2
3
4
5
# predefined constants
MPI.COMM_WORLD

# value returned by MPI routines
sub_comm = MPI.COMM_WORLD.Split(...)

Rank

Ranks are the identification of processes

1
rank = comm.Get_rank()

Size

Total number of processors in the communicator (= max(rank) + 1)

1
size = comm.Get_size()

is Parallel ! awesome

We cannot guarantee the order of output, unless the processors can communicate ahead of output.

Messages and Point-to-Point Communication

Messages in Python

There are two ways of messaging in Python

1
2
3
4
5
6
7
# Fast way, only for numpy
comm_world.Send()
comm_world.Recv()

# Slow way, packled
comm_world.send()
comm_world.recv()

Sending

1
2
comm.Send(buf, int dest, int tag=0)
comm.send(obj, int dest, int tag=0)

buf is a ndarry (or anything implement the Python buffer protocol) while obj can be any object;

dest is the rank of the dest. processor;

tag is a int with additional info from the sender (usually used to specify the data type)

Various sending

ssend for synchronous send - Completes iff the receive has started

bsend for buffered / async send - Always complete, need to specify the place of buffer

send for standard send - Either buffered or synchronous, use interal buffer

rsend for ready send - Start only the receiver has been posted (?)

Which is better?

Synchronous ones requires waiting — high latency but best bandwidth

Async ones — low latency but bad bandwidth

Receiving

1
2
comm.Recv(buf, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
obj = comm.recv(int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)

buf is the receive buffer;

source is the rank of the src. processor;

tag is still tag (tag must match!);

For Python, you need to match Send-Recv and send-recv

One receiving

recv can accpet all kind of send - Completes when a message arrived

On print function of Python

Following is the pingpong function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from mpi4py import MPI

buffer = [ None ]

comm_world = MPI.COMM_WORLD
my_rank = comm_world.Get_rank()

if (my_rank == 0):
print(f"I am {my_rank} before send ping")
comm_world.send(buffer, dest=1, tag=17)
buf = comm_world.recv(source=1, tag=17)
print(f"I am {my_rank} after recv pong")

elif (my_rank == 1):
buf = comm_world.recv(source=0, tag=17)
print(f"I am {my_rank} after recv ping")
print(f"I am {my_rank} before send pong")
comm_world.send(buf, dest=0, tag=17)

My expectation output is

1
2
3
4
I am 0 before send ping
I am 1 after recv ping
I am 1 before send pong
I am 0 after recv pong

Which demonstrates the order of send, recv operations; however, the real output is

1
2
3
4
I am 1 after  recv ping
I am 1 before send pong
I am 0 before send ping
I am 0 after recv pong

This is actually due to the feature of print rather than MPI: if we add time info into the output, we have

1
2
3
4
I am 1 after  recv ping 1680684130.3630078
I am 1 before send pong 1680684130.363044
I am 0 before send ping 1680684130.362919
I am 0 after recv pong 1680684130.3631082

Which means the order is #0 send -> #1 recv -> #1 send -> #0 recv. So the flush is only emptied when finishing parallel.

Dead locks

If Sync is used in send/recv, dead lock forms if two (or more) processors are waiting each other.

Status

These are info from src to dst; contains metadata of the communication

1
2
3
status.Get_source()
status.Get_tag()
status.Get_count()

Message Order Preservation

One link one order