An Introduction to MPI in Python - Part 2
Welcome back to this introduction to MPI in Python. Last time, we started with an overview of basic terminology and MPI functions. Today, we’ll be moving onto writing more complex functions with real world use-cases.
1. Sample Solutions from Part 1’s Challenge
Last time, I left you off with a challenge to implement three versions of a scalable alltoall
function:
- Implement a scalable
alltoall
function WITH a deadlock while only usingSend/Recv
for sufficiently large messages. - Correct the deadlock from part 1 while still only using
Send/Recv
. - Using your solution to part 1, is it possible to resolve the deadlock by instead using
Isend/Recv
?
I’ll cover some sample solutions here but wanted to really emphasize the importance of recognizing how simple it was to achieve a deadlock here. This is one of the simplest scalable pieces of MPI code you can write, but even this has the potential for deadlocking under certain conditions. It’s also worth noting the different trade offs for the potential solutions. In part 2, I wanted you to use blocking functions, while in part 3 I wanted to test your understanding and intuition for non-blocking ones. One would think that these non-blocking functions would generally prevent deadlocks from occurring (hence the name non-blocking), but these messages don’t magically wait around to be read - they form a queue while the receiver continues to operate. While this frees up the sending processes to continue on, the receiving process could potentially run out of memory due to queueing like this depending on the size of the payload.
In order to achieve a deadlock, I envisioned a system where all nodes try to send to all others and then all nodes try to receive from all others. It’s a fairly normal way to think about the problem in stages, but in MPI, you have to be wary of race conditions like this. In the simplest case where you have two ranks, you end up in a situation where rank 0 and rank 1 are both trying to send a message to the other, but since neither has attempted to receive the message, neither can move on. While you might not see any errors for small package sizes, as you increase scaling_factor
you will begin to see deadlocks occurring in a perfectly reproducible fashion. I’ll explain this behavior of different sized payloads later, but the “solution” should look something like the following:
def alltoall(comm, values):
rank = comm.Get_rank()
size = comm.Get_size()
buf_size = len(values[0])
# Enter Solution Here
for dest,vals in enumerate(values):
print(f'Sending {len(vals)} values from {rank} to {dest}')
comm.Send(vals, dest=dest)
new_values = []
for i in range(size):
buf = np.zeros(buf_size, dtype='l')
comm.Recv(buf, source=i)
new_values.append(buf)
return new_values
Next up, I asked you to try and correct this issue. If the previous solution was to have all processes in the same stage (either send or receive) at the same time, then a potential solution is to have only one rank in the receive stage at a given time and all other nodes send to it before moving on to the next one.
def alltoall(comm, values):
rank = comm.Get_rank()
size = comm.Get_size()
buf_size = len(values[0])
# Enter Solution Here
new_values = []
for dest,val in enumerate(values):
if rank != dest:
print(f'Sending {len(val)} values from {rank} to {dest}')
comm.Send(val, dest=dest)
else:
for i in range(size):
if rank == i:
new_values.append(val.copy())
else:
buf = np.zeros(buf_size, dtype='l')
comm.Recv(buf, source=i)
new_values.append(buf)
return new_values
Finally, we come to the last question - can you use Isend/Recv
to solve this? The answer is yes! (But it’s probably not for the reason that you think.) When faced with this question, I think that many people would intuitively think “yes” given that these are non-blocking functions - therefore, we should be able to use them to not worry about the size of buffers and do things in any order, right? And while that might be the right idea, in practice it can be a bit tricky to think through. Let’s consider swapping out these functions to begin with (just replace Send
with Isend
without worrying about requests). If you run the code as is (with sufficiently large data), you’ll either get a seg fault, indicating an illegal memory access, or fail the correctness assertion - weird given that the code works just fine if you decrease the scaling_factor
! This issue occurs because, similar to Send
, there’s an issue with the size of the data being sent being greater than the internal buffer space allotted for the sending process. As a result, when you go to receive the data with Recv
, it’s possible that the data no longer exists there or has been deallocated. One way this can happen is if you have many threads and one of them returns because it has received the data from all other threads but other threads haven’t finished receiving the data from it. The seg fault occurs on some other thread, but the real source of the issue is that one thread just shot ahead. The next way that somoene might think to resolve this issue is to call req.wait(), and you’d be correct… but only if you do so in the correct place.
The answer here is actually pretty involved (especially if you were allowed to use Irecv
for this problem), so let’s review one more time the finer details on how these non-blocking functions work. Isend
returns a request object immediately and moves on. Calling req.wait()
after this has only the somewhat vague specification that “it returns when it is safe to interact with the buffer and that the send operation has begun”. However, what happens if you are sending a large message from one rank to another - if req.wait()
returns immediately, then if the sending rank returns, we could potentially hit a seg fault as that data is deallocated upon function return despite being read by a separate process. However, most modern implementations including OpenMPI hande this case with a stricter guarantee such that even deallocating memory will not cause a data issue. Even if this weren’t the case however, you could always place a barrier to prevent a return that could cause a seg fault due to deallocation.
So to revisit this again, how should we handle waiting after your Isend
? If you do so immediately after (effectively replicating the behavior of Send
) you will end up in another deadlock like in part 1 of the problem. Instead, you want to wait until after you have called all of your Recv
functions and then Waitall
on all of your sent requests to ensure that the data has correctly been sent. Finally, I’d like to consider one final detour before showing the different versions of this code - what about if we had instead used Irecv
instead? For similar reasons above, if you don’t wait on the requests for each of these calls, then you could end up in a situation where you return from one process before others have sent or received data from it. As a result, you need to include these requests in your Waitall
call as well.
-
def alltoall(comm, values): rank = comm.Get_rank() size = comm.Get_size() buf_size = len(values[0]) # Enter Solution Here for dest,vals in enumerate(values): print(f'Sending {len(vals)} values from {rank} to {dest}') comm.Isend(vals, dest=dest) new_values = [] for i in range(size): buf = np.zeros(buf_size, dtype='l') comm.Recv(buf, source=i) new_values.append(buf) return new_values
-
def alltoall(comm, values): rank = comm.Get_rank() size = comm.Get_size() buf_size = len(values[0]) # Enter Solution Here requests = [] for dest,vals in enumerate(values): print(f'Sending {len(vals)} values from {rank} to {dest}') req = comm.Isend(vals, dest=dest) requests.append(req) new_values = [] for i in range(size): buf = np.zeros(buf_size, dtype='l') comm.Recv(buf, source=i) new_values.append(buf) MPI.Request.Waitall(requests) # comm.barrier() # Not necessary for most modern MPI implementations return new_values
-
def alltoall(comm, values): rank = comm.Get_rank() size = comm.Get_size() buf_size = len(values[0]) # Enter Solution Here requests = [] for dest,vals in enumerate(values): print(f'Sending {len(vals)} values from {rank} to {dest}') req = comm.Isend(vals, dest=dest) requests.append(req) new_values = [] for i in range(size): buf = np.zeros(buf_size, dtype='l') req = comm.Irecv(buf, source=i) requests.append(req) new_values.append(buf) MPI.Request.Waitall(requests) # comm.barrier() # Not necessary for most modern MPI implementations return new_values
MPI_Finalize
At the end of this, I just wanted to point out a function I’ve kept hidden from you called MPI.Finalize()
that, among other things, is something you should generally use at the end of your MPI calls before returning. In addition to waiting on all open non-blocking messages and acting as a barrier before returning, it cleans up files, buffers, and other bits of data in the program. Any MPI functions called after this will have undefined behavior. I waited until now to introduce this crucial function since most of its use is often abstracted away and having a “do this because it’s good practice” response is always an unsatisfying way to answer a question. Hopefully this section has highlighted not only the importance of using Finalize()
but also the importance of understanding how each of these different functions behave and how they’re different. In future sections, we’ll see even more complex functions whose nuance will be difficult to understand without a strong understanding of these fundamentals.
Internal Buffering
There’s a very interesting situation that arises here if we try to send a very small payload. For all intents and purposes, the following code should theoretically result in a deadlock but doesn’t unless we increase the amount of data being sent. Why is this you might ask? MPI implementations also account for an internal buffer - when an outgoing send hits this point (and doesn’t overfill the buffer), the sending process may continue as if the receiver already acknowledged it was ready to receive the message. If you don’t realize that there’s such a thing as an internal buffer and only test your implementation on small messages, then you might have a deadlock in your code without even knowing it! MPI_BUFFER_SIZE
is configurable in many MPI implementations, but this functionality isn’t exposed through mpi4py
. There is a version of the send
function that doesn’t use the internal buffer at all, but we’ll cover that further down.
def alltoall(comm, values):
rank = comm.Get_rank()
size = comm.Get_size()
buf_size = len(values[0])
# Enter Solution Here
for dest,vals in enumerate(values):
print(f'Sending {len(vals)} values from {rank} to {dest}')
comm.Send(vals, dest=dest)
new_values = []
for i in range(size):
buf = np.zeros(buf_size, dtype='l')
comm.Recv(buf, source=i)
new_values.append(buf)
return new_values
scaling_factor = 10000 # Changing this to something like 10 will not result in a deadlock (assuming you don't have too many nodes)
values = [np.array([rank * 10 + i] * scaling_factor, dtype='l') for i in range(size)]
new_values = alltoall(comm, values)
ground_truth = [[rank + i * 10] * scaling_factor for i in range(size)]
assert np.all([np.all(v1 == v2) for v1,v2 in zip(new_values, ground_truth)])
Buffers and Safety
For our next example, let’s go ahead and address another risk with non-blocking sends without waiting. Earlier, I made the claim that after req.wait()
on a request from an Isend
, it was safe to interact with the buffer. Is there really a risk with doing this before? Can we prove it? The simplest test case we can likey use would be sending a long message from rank 0 to rank 1 and changing the last value at various times. This will be the ultiamte test it will be the fastest update we can make and the last value to be sent. Additionally, we can try this with varying sized arrays (including those that fit within the internal buffer and those that do not) in order to test the output under a range of scenarios. The last value received is always -1
, helping to prove our point from earlier. The time.sleep
isn’t necessary, but it does ensure that the value is able to be changed before being sent. If there wasn’t a sleep or if the buffer was smaller, it’s possible that the transmission could have completed before the value had been changed by the sender. No matter how large we make the array though, it’s impossible to change the value after req.wait()
and have it affect the value recevied, giving us confidence in the results we can pass along.
import time
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
assert size == 2
buf_size = 100000
if rank == 0:
send_buf = np.arange(buf_size, dtype='i')
req = comm.Isend(send_buf, dest=1)
send_buf[-1] = -1
print('Last value changed to -1')
req.wait()
send_buf[-1] = -2
print('Last value changed to -2')
else:
time.sleep(1)
recv_buf = np.zeros(buf_size, dtype='i')
req = comm.Irecv(recv_buf, source=0)
print('Receive waiting.')
req.wait()
print(f'Receive done. Last value is {recv_buf[-1]}')
Differentiating between send types
I hinted at this earlier, but even for basic point-to-point functions (sends and receives), there are even more unique versions. Nuance matters here, so I’ve included a few samples below that should help to show the difference between the 4 functions. I’d like to introduce the synchronized versions of Send
and Isend
, aptly called Ssend
and Issend
. (There are also non-numpy equivalents like ssend
and issend
, but for the sake of consistency, I’ll continue to use these.) While it might seem like a misnomer, you can indeed have a “non-blocking, synchronous send”!
-
Send
- blocking send that returns when the receiver has started receiving the message or when the message has been written into the internal buffer -
Ssend
- blocking send that returns when the receiver has finished receiving the message -
Isend
- non-blocking send that returns immediately; waiting on the request returns when the receiver has started receiving the message or when the message has been written into the internal buffer -
Issend
- non-blocking send that returns immediately; waiting on the request returns when the receiver has finished receiving the message
There are several different behaviors that I want to hit on first with some timing tests. The test simply accumulates the time each process spends (including waiting for non-blocking operations) over a large number of trials and tries altering some of the values similar to the previous example. We run the following code with two configurations (a small message that fits in the internal buffer and a large message that doesn’t) to show the timing differences under different circumstances.
def foo(comm, func):
comm.barrier()
rank = comm.Get_rank()
duration = 0
wduration = 0
l = np.zeros(buff_size) if rank == 0 else np.empty(buff_size)
for _ in range(num_trials):
start = time.time()
if rank == 0:
req = func(l, dest=1)
duration += time.time() - start
if req is not None:
req.wait() # Just for measuring wait time
wduration += time.time() - start
if rank == 1:
comm.Recv(l, source=0)
comm.barrier()
if rank == 0:
print(f'({func.__name__}) Rank {rank} accumulated duration = {duration}')
if func.__name__.lower().startswith('i'):
print(f'({func.__name__}) Rank {rank} accumulated duration (including wait time) = {wduration}')
foo(comm, comm.Send)
foo(comm, comm.Ssend)
foo(comm, comm.Isend)
foo(comm, comm.Issend)
And now we see the results. There is some noise in the timing tests, but there are a few key takeaways I want to highlight. First, Send
is much faster than Ssend
for short messages that fit in the internal buffer but no faster for longer messages that do not. This highlights that Send
’s behavior closely mirrors Ssend
when it is unable to write to the buffer before returning. Similarly, we see the same takeaway when comparing Isend
and Issend
across the two tests. Next, as expected, the total wait time for the non-blokcking operations is much lower than the blocking ones, but if you include the wait times, then they are approximately equal. The benefit there is that you are (potentially) able to do different work during this period. Finally, it’s worth noting that if you non-blocking send and then immediately wait on small messages, you will have slightly worse performance than if you did so in a blocking send due to the overhead of setting up an asynchronous send.
Short Message, Many Trials
Operation | Duration | Duration (w/ Wait) |
---|---|---|
Send | 10.11s | N/A |
Ssend | 16.36s | N/A |
Isend | 9.75s | 13.20s |
Issend | 10.49s | 20.08s |
Long Message, Fewer Trials
Operation | Duration | Duration (w/ Wait) |
---|---|---|
Send | 7.43s | N/A |
Ssend | 7.44s | N/A |
Isend | 1.35s | 7.77s |
Issend | 1.34s | 7.76s |
While there are use-cases for these more nuanced functions, most point-to-point communications in MPI will just use the normal send
or isend
. Resource constrained setups might prefer the synchronous versions to decrease the reliance on internal buffering while others might use them to help monitor deadlock scenarios. For part 1 of the exercise, had you used ssend
instead of send
, regardless of the size of the message being sent, you would have achieved a deadlock with that setup since ssend
doesn’t internally buffer. In practice though, the flexibility and slightly weaker constraints provide still provide the same safeties as the synchronous versions while having the potential to run faster with decreased synchronization steps.
Conclusion
As has been a common theme, every function has its use and there’s rarely an answer where “you should always do this”. It’s important to understand what these functions do even if you have a general rule of thumb, since when it comes to MPI programming, failing to understand nuances will slow down your code or introduce tricky bugs and deadlocks that take hours to resolve. No challenge for you this time, but get ready for more fun and nuance in the next one!