Greg McCord
  • about
  • blog (current)
  • research
  • projects
  • repositories

An Introduction to MPI in Python - Part 2

Created on March 19, 2025   ·   20 min read

2025   ·   python   mpi   ·   code

  • 1. Sample Solutions from Part 1’s Challenge
  • MPI_Finalize
  • Internal Buffering
  • Buffers and Safety
  • Differentiating between send types
    • Short Message, Many Trials
    • Long Message, Fewer Trials
  • Conclusion

Welcome back to this introduction to MPI in Python. Last time, we started with an overview of basic terminology and MPI functions. Today, we’ll be moving onto writing more complex functions with real world use-cases.

1. Sample Solutions from Part 1’s Challenge

Last time, I left you off with a challenge to implement three versions of a scalable alltoall function:

  1. Implement a scalable alltoall function WITH a deadlock while only using Send/Recv for sufficiently large messages.
  2. Correct the deadlock from part 1 while still only using Send/Recv.
  3. Using your solution to part 1, is it possible to resolve the deadlock by instead using Isend/Recv?

I’ll cover some sample solutions here but wanted to really emphasize the importance of recognizing how simple it was to achieve a deadlock here. This is one of the simplest scalable pieces of MPI code you can write, but even this has the potential for deadlocking under certain conditions. It’s also worth noting the different trade offs for the potential solutions. In part 2, I wanted you to use blocking functions, while in part 3 I wanted to test your understanding and intuition for non-blocking ones. One would think that these non-blocking functions would generally prevent deadlocks from occurring (hence the name non-blocking), but these messages don’t magically wait around to be read - they form a queue while the receiver continues to operate. While this frees up the sending processes to continue on, the receiving process could potentially run out of memory due to queueing like this depending on the size of the payload.

In order to achieve a deadlock, I envisioned a system where all nodes try to send to all others and then all nodes try to receive from all others. It’s a fairly normal way to think about the problem in stages, but in MPI, you have to be wary of race conditions like this. In the simplest case where you have two ranks, you end up in a situation where rank 0 and rank 1 are both trying to send a message to the other, but since neither has attempted to receive the message, neither can move on. While you might not see any errors for small package sizes, as you increase scaling_factor you will begin to see deadlocks occurring in a perfectly reproducible fashion. I’ll explain this behavior of different sized payloads later, but the “solution” should look something like the following:

def alltoall(comm, values):
    rank = comm.Get_rank()
    size = comm.Get_size()

    buf_size = len(values[0])

    # Enter Solution Here
    for dest,vals in enumerate(values):
        print(f'Sending {len(vals)} values from {rank} to {dest}')
        comm.Send(vals, dest=dest)

    new_values = []
    for i in range(size):
        buf = np.zeros(buf_size, dtype='l')
        comm.Recv(buf, source=i)
        new_values.append(buf)

    return new_values

Next up, I asked you to try and correct this issue. If the previous solution was to have all processes in the same stage (either send or receive) at the same time, then a potential solution is to have only one rank in the receive stage at a given time and all other nodes send to it before moving on to the next one.

def alltoall(comm, values):
    rank = comm.Get_rank()
    size = comm.Get_size()

    buf_size = len(values[0])

    # Enter Solution Here
    new_values = []
    for dest,val in enumerate(values):
        if rank != dest:
            print(f'Sending {len(val)} values from {rank} to {dest}')
            comm.Send(val, dest=dest)
        else:
            for i in range(size):
                if rank == i:
                    new_values.append(val.copy())
                else:
                    buf = np.zeros(buf_size, dtype='l')
                    comm.Recv(buf, source=i)
                    new_values.append(buf)

    return new_values

Finally, we come to the last question - can you use Isend/Recv to solve this? The answer is yes! (But it’s probably not for the reason that you think.) When faced with this question, I think that many people would intuitively think “yes” given that these are non-blocking functions - therefore, we should be able to use them to not worry about the size of buffers and do things in any order, right? And while that might be the right idea, in practice it can be a bit tricky to think through. Let’s consider swapping out these functions to begin with (just replace Send with Isend without worrying about requests). If you run the code as is (with sufficiently large data), you’ll either get a seg fault, indicating an illegal memory access, or fail the correctness assertion - weird given that the code works just fine if you decrease the scaling_factor! This issue occurs because, similar to Send, there’s an issue with the size of the data being sent being greater than the internal buffer space allotted for the sending process. As a result, when you go to receive the data with Recv, it’s possible that the data no longer exists there or has been deallocated. One way this can happen is if you have many threads and one of them returns because it has received the data from all other threads but other threads haven’t finished receiving the data from it. The seg fault occurs on some other thread, but the real source of the issue is that one thread just shot ahead. The next way that somoene might think to resolve this issue is to call req.wait(), and you’d be correct… but only if you do so in the correct place.

The answer here is actually pretty involved (especially if you were allowed to use Irecv for this problem), so let’s review one more time the finer details on how these non-blocking functions work. Isend returns a request object immediately and moves on. Calling req.wait() after this has only the somewhat vague specification that “it returns when it is safe to interact with the buffer and that the send operation has begun”. However, what happens if you are sending a large message from one rank to another - if req.wait() returns immediately, then if the sending rank returns, we could potentially hit a seg fault as that data is deallocated upon function return despite being read by a separate process. However, most modern implementations including OpenMPI hande this case with a stricter guarantee such that even deallocating memory will not cause a data issue. Even if this weren’t the case however, you could always place a barrier to prevent a return that could cause a seg fault due to deallocation.

So to revisit this again, how should we handle waiting after your Isend? If you do so immediately after (effectively replicating the behavior of Send) you will end up in another deadlock like in part 1 of the problem. Instead, you want to wait until after you have called all of your Recv functions and then Waitall on all of your sent requests to ensure that the data has correctly been sent. Finally, I’d like to consider one final detour before showing the different versions of this code - what about if we had instead used Irecv instead? For similar reasons above, if you don’t wait on the requests for each of these calls, then you could end up in a situation where you return from one process before others have sent or received data from it. As a result, you need to include these requests in your Waitall call as well.

  • Seg Fault
  • Correct
  • Irecv
  • def alltoall(comm, values):
        rank = comm.Get_rank()
        size = comm.Get_size()
    
        buf_size = len(values[0])
    
        # Enter Solution Here
        for dest,vals in enumerate(values):
            print(f'Sending {len(vals)} values from {rank} to {dest}')
            comm.Isend(vals, dest=dest)
    
        new_values = []
        for i in range(size):
            buf = np.zeros(buf_size, dtype='l')
            comm.Recv(buf, source=i)
            new_values.append(buf)
    
        return new_values
    
  • def alltoall(comm, values):
        rank = comm.Get_rank()
        size = comm.Get_size()
    
        buf_size = len(values[0])
    
        # Enter Solution Here
        requests = []
        for dest,vals in enumerate(values):
            print(f'Sending {len(vals)} values from {rank} to {dest}')
            req = comm.Isend(vals, dest=dest)
            requests.append(req)
    
        new_values = []
        for i in range(size):
            buf = np.zeros(buf_size, dtype='l')
            comm.Recv(buf, source=i)
            new_values.append(buf)
    
        MPI.Request.Waitall(requests)
    
        # comm.barrier() # Not necessary for most modern MPI implementations
    
        return new_values
    
  • def alltoall(comm, values):
        rank = comm.Get_rank()
        size = comm.Get_size()
    
        buf_size = len(values[0])
    
        # Enter Solution Here
        requests = []
        for dest,vals in enumerate(values):
            print(f'Sending {len(vals)} values from {rank} to {dest}')
            req = comm.Isend(vals, dest=dest)
            requests.append(req)
    
        new_values = []
        for i in range(size):
            buf = np.zeros(buf_size, dtype='l')
            req = comm.Irecv(buf, source=i)
            requests.append(req)
            new_values.append(buf)
    
        MPI.Request.Waitall(requests)
    
        # comm.barrier() # Not necessary for most modern MPI implementations
    
        return new_values
    

MPI_Finalize

At the end of this, I just wanted to point out a function I’ve kept hidden from you called MPI.Finalize() that, among other things, is something you should generally use at the end of your MPI calls before returning. In addition to waiting on all open non-blocking messages and acting as a barrier before returning, it cleans up files, buffers, and other bits of data in the program. Any MPI functions called after this will have undefined behavior. I waited until now to introduce this crucial function since most of its use is often abstracted away and having a “do this because it’s good practice” response is always an unsatisfying way to answer a question. Hopefully this section has highlighted not only the importance of using Finalize() but also the importance of understanding how each of these different functions behave and how they’re different. In future sections, we’ll see even more complex functions whose nuance will be difficult to understand without a strong understanding of these fundamentals.

Internal Buffering

There’s a very interesting situation that arises here if we try to send a very small payload. For all intents and purposes, the following code should theoretically result in a deadlock but doesn’t unless we increase the amount of data being sent. Why is this you might ask? MPI implementations also account for an internal buffer - when an outgoing send hits this point (and doesn’t overfill the buffer), the sending process may continue as if the receiver already acknowledged it was ready to receive the message. If you don’t realize that there’s such a thing as an internal buffer and only test your implementation on small messages, then you might have a deadlock in your code without even knowing it! MPI_BUFFER_SIZE is configurable in many MPI implementations, but this functionality isn’t exposed through mpi4py. There is a version of the send function that doesn’t use the internal buffer at all, but we’ll cover that further down.

def alltoall(comm, values):
    rank = comm.Get_rank()
    size = comm.Get_size()

    buf_size = len(values[0])

    # Enter Solution Here
    for dest,vals in enumerate(values):
        print(f'Sending {len(vals)} values from {rank} to {dest}')
        comm.Send(vals, dest=dest)

    new_values = []
    for i in range(size):
        buf = np.zeros(buf_size, dtype='l')
        comm.Recv(buf, source=i)
        new_values.append(buf)

    return new_values

scaling_factor = 10000 # Changing this to something like 10 will not result in a deadlock (assuming you don't have too many nodes)
values = [np.array([rank * 10 + i] * scaling_factor, dtype='l') for i in range(size)]
new_values = alltoall(comm, values)
ground_truth = [[rank + i * 10] * scaling_factor for i in range(size)]
assert np.all([np.all(v1 == v2) for v1,v2 in zip(new_values, ground_truth)])

Buffers and Safety

For our next example, let’s go ahead and address another risk with non-blocking sends without waiting. Earlier, I made the claim that after req.wait() on a request from an Isend, it was safe to interact with the buffer. Is there really a risk with doing this before? Can we prove it? The simplest test case we can likey use would be sending a long message from rank 0 to rank 1 and changing the last value at various times. This will be the ultiamte test it will be the fastest update we can make and the last value to be sent. Additionally, we can try this with varying sized arrays (including those that fit within the internal buffer and those that do not) in order to test the output under a range of scenarios. The last value received is always -1, helping to prove our point from earlier. The time.sleep isn’t necessary, but it does ensure that the value is able to be changed before being sent. If there wasn’t a sleep or if the buffer was smaller, it’s possible that the transmission could have completed before the value had been changed by the sender. No matter how large we make the array though, it’s impossible to change the value after req.wait() and have it affect the value recevied, giving us confidence in the results we can pass along.

import time
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

assert size == 2

buf_size = 100000

if rank == 0:
    send_buf = np.arange(buf_size, dtype='i')
    req = comm.Isend(send_buf, dest=1)
    send_buf[-1] = -1
    print('Last value changed to -1')
    req.wait()
    send_buf[-1] = -2
    print('Last value changed to -2')
else:
    time.sleep(1)
    recv_buf = np.zeros(buf_size, dtype='i')
    req = comm.Irecv(recv_buf, source=0)
    print('Receive waiting.')
    req.wait()
    print(f'Receive done. Last value is {recv_buf[-1]}')

Differentiating between send types

I hinted at this earlier, but even for basic point-to-point functions (sends and receives), there are even more unique versions. Nuance matters here, so I’ve included a few samples below that should help to show the difference between the 4 functions. I’d like to introduce the synchronized versions of Send and Isend, aptly called Ssend and Issend. (There are also non-numpy equivalents like ssend and issend, but for the sake of consistency, I’ll continue to use these.) While it might seem like a misnomer, you can indeed have a “non-blocking, synchronous send”!

  • Send - blocking send that returns when the receiver has started receiving the message or when the message has been written into the internal buffer
  • Ssend - blocking send that returns when the receiver has finished receiving the message
  • Isend - non-blocking send that returns immediately; waiting on the request returns when the receiver has started receiving the message or when the message has been written into the internal buffer
  • Issend - non-blocking send that returns immediately; waiting on the request returns when the receiver has finished receiving the message

There are several different behaviors that I want to hit on first with some timing tests. The test simply accumulates the time each process spends (including waiting for non-blocking operations) over a large number of trials and tries altering some of the values similar to the previous example. We run the following code with two configurations (a small message that fits in the internal buffer and a large message that doesn’t) to show the timing differences under different circumstances.

def foo(comm, func):
    comm.barrier()
    rank = comm.Get_rank()

    duration = 0
    wduration = 0
    l = np.zeros(buff_size) if rank == 0 else np.empty(buff_size)
    for _ in range(num_trials):
        start = time.time()
        if rank == 0:
            req = func(l, dest=1)
            duration += time.time() - start
            if req is not None:
                req.wait() # Just for measuring wait time
                wduration += time.time() - start
        if rank == 1:
            comm.Recv(l, source=0)
        comm.barrier()

    if rank == 0:
        print(f'({func.__name__}) Rank {rank} accumulated duration = {duration}')
        if func.__name__.lower().startswith('i'):
            print(f'({func.__name__}) Rank {rank} accumulated duration (including wait time) = {wduration}')

foo(comm, comm.Send)
foo(comm, comm.Ssend)
foo(comm, comm.Isend)
foo(comm, comm.Issend)

And now we see the results. There is some noise in the timing tests, but there are a few key takeaways I want to highlight. First, Send is much faster than Ssend for short messages that fit in the internal buffer but no faster for longer messages that do not. This highlights that Send’s behavior closely mirrors Ssend when it is unable to write to the buffer before returning. Similarly, we see the same takeaway when comparing Isend and Issend across the two tests. Next, as expected, the total wait time for the non-blokcking operations is much lower than the blocking ones, but if you include the wait times, then they are approximately equal. The benefit there is that you are (potentially) able to do different work during this period. Finally, it’s worth noting that if you non-blocking send and then immediately wait on small messages, you will have slightly worse performance than if you did so in a blocking send due to the overhead of setting up an asynchronous send.

Short Message, Many Trials

Operation Duration Duration (w/ Wait)
Send 10.11s N/A
Ssend 16.36s N/A
Isend 9.75s 13.20s
Issend 10.49s 20.08s


Long Message, Fewer Trials

Operation Duration Duration (w/ Wait)
Send 7.43s N/A
Ssend 7.44s N/A
Isend 1.35s 7.77s
Issend 1.34s 7.76s


While there are use-cases for these more nuanced functions, most point-to-point communications in MPI will just use the normal send or isend. Resource constrained setups might prefer the synchronous versions to decrease the reliance on internal buffering while others might use them to help monitor deadlock scenarios. For part 1 of the exercise, had you used ssend instead of send, regardless of the size of the message being sent, you would have achieved a deadlock with that setup since ssend doesn’t internally buffer. In practice though, the flexibility and slightly weaker constraints provide still provide the same safeties as the synchronous versions while having the potential to run faster with decreased synchronization steps.

Conclusion

As has been a common theme, every function has its use and there’s rarely an answer where “you should always do this”. It’s important to understand what these functions do even if you have a general rule of thumb, since when it comes to MPI programming, failing to understand nuances will slow down your code or introduce tricky bugs and deadlocks that take hours to resolve. No challenge for you this time, but get ready for more fun and nuance in the next one!


Previous
An Introduction to MPI in Python - Part 1
Next
An Introduction to MPI in Python - Part 3


© Copyright 2025 Greg McCord. Powered by Jekyll with al-folio. Hosted by GitHub Pages.