Greg McCord
  • about
  • blog (current)
  • research
  • projects
  • repositories

An Introduction to MPI in Python - Part 3

Created on March 20, 2025   ·   15 min read

2025   ·   python   mpi   ·   code

  • Overview
  • I/O and MPI
    • 1. The Problem
    • The Solution..?
    • Definitely the Solution…
    • The Crux of the Problem
    • The True Solution
  • MPI-IO and Concurrent Writes
  • Conclusion

Overview

Today, we’ll be diving deeper into MPI with another instance of “the devil is in the details” with a look at distributed I/O within the MPI framework. As usual, while we are using MPI as the lens through which to tackle parallel computing, these difficulties crop up just as frequently in other schemes even if you might not expect it. This will also be the end of this mini-series of “An Introduction to MPI in Python”, but now that we have a handle on how to approach parallel computing situations, I highly recommend checking out my next series on putting this into practice. In the next series, we’ll be implementing statistics functions on distributed datasets using all of the skills and functions introduced in this series, so it will be a natural next-step in the learning process!

I/O and MPI

1. The Problem

If I gave you the following code, it probably wouldn’t take too much convincing to get anyone to realize that the output is going to be almost entirely nondeterminisic. About the only thing we can say for certain is that any message of the form Rank {rank} - {i} will be output in ascending order with respect to i. However, we can make no such assertions about the relative positions of any two messages from different ranks. It’s possible that all messages from rank 0 will output before from rank 1, all of rank 1 before rank 0, or an interleaving of messages from both! We’re truly left to the whims of the underlying scheduler!

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

assert size == 2

for i in range(10):
    print(f'Rank {rank} - {i}')
Rank 1 - 0
Rank 1 - 1
Rank 1 - 2
Rank 1 - 3
Rank 1 - 4
Rank 1 - 5
Rank 1 - 6
Rank 1 - 7
Rank 1 - 8
Rank 1 - 9
Rank 0 - 0
Rank 0 - 1
Rank 0 - 2
...

The Solution..?

Let’s imagine however that we really wanted to make sure that our output was synchronized or that our process was already heavily synchronized in stages. It’s not difficult to imagine that we could change this toy code to instead synchronize at the end of each iteration here in order to solve all of our problems, but wait… Is that actually the output we’d get?

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

assert size == 2

for i in range(10):
    print(f'Rank {rank} - {i}')
    comm.barrier()
Rank 0 - 0
Rank 0 - 1
Rank 1 - 0
Rank 1 - 1
Rank 1 - 2
Rank 1 - 2
Rank 1 - 3
Rank 0 - 2
...

Definitely the Solution…

Hopefully we’re scratching our heads a bit here since this seems bizarre - isn’t the point of a barrier to synchronize processes? (Before I raise too many existential questions, the answer to that is 100% yes!) Why then is PRINTED output desynchronized? Once more, the astute among you might remember that printed output (at least in most languages, python and C++ included) is buffered and that calling “print” doesn’t necessarily mean that the data will get flushed immediately. Since each process handles this independently, it’s actually not too big of a deal that this is occurring. With one more change, we’ve got the following:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

assert size == 2

for i in range(10):
    print(f'Rank {rank} - {i}', flush=True)
    comm.barrier()
Rank 0 - 0
Rank 0 - 1
Rank 1 - 0
Rank 1 - 1
Rank 1 - 2
Rank 1 - 2
Rank 1 - 3
Rank 0 - 2
...

The Crux of the Problem

If you’re thoroughly flustered, then join the club - MPI has these idiosyncracies that can be very difficult to deal with when you want proper synchronization, even just of logging. To fully understand what’s happening here, it’s important to realize that there are multiple levels of buffering when writing from a parallelized source to a single terminal screen (even if all of your processes are on a single machine). There can be latency just sending the output string from your process to the I/O process (potentially very likely if your MPI environment spans multiple machines). Even further, there are OS level buffers that sit underneath of the I/O buffers in python that we just flushed. Finally, some MPI implementations will have an additional level of I/O forwarding. While all three of these could have an impact and we can’t exactly tell what’s happening in a given situation, it’s clear that in all cases, the wallclock time of our print statements with respect to the individual processes will not necessarily be preserved by the time they reach our terminal.

The True Solution

There are 3 common approaches that MPI programmers use for recording output:

  1. Use MPI-IO for synchronized file output.
  2. Sync all relevant logging data to rank 0 (with appropriate timestamps or barriers in place) and handle all I/O operations there.
  3. Write to a separate file per process (worth mentioning but not a solution to the question I posed above).

I’ve detailed sample solutions for options 1 and 2 below:

  • Seg Fault
  • Correct
  • Irecv
  • MPI-IO
  • Rank 0
  • def alltoall(comm, values):
        rank = comm.Get_rank()
        size = comm.Get_size()
    
        buf_size = len(values[0])
    
        # Enter Solution Here
        for dest,vals in enumerate(values):
            print(f'Sending {len(vals)} values from {rank} to {dest}')
            comm.Isend(vals, dest=dest)
    
        new_values = []
        for i in range(size):
            buf = np.zeros(buf_size, dtype='l')
            comm.Recv(buf, source=i)
            new_values.append(buf)
    
        return new_values
    
  • def alltoall(comm, values):
        rank = comm.Get_rank()
        size = comm.Get_size()
    
        buf_size = len(values[0])
    
        # Enter Solution Here
        requests = []
        for dest,vals in enumerate(values):
            print(f'Sending {len(vals)} values from {rank} to {dest}')
            req = comm.Isend(vals, dest=dest)
            requests.append(req)
    
        new_values = []
        for i in range(size):
            buf = np.zeros(buf_size, dtype='l')
            comm.Recv(buf, source=i)
            new_values.append(buf)
    
        MPI.Request.Waitall(requests)
    
        # comm.barrier() # Not necessary for most modern MPI implementations
    
        return new_values
    
  • def alltoall(comm, values):
        rank = comm.Get_rank()
        size = comm.Get_size()
    
        buf_size = len(values[0])
    
        # Enter Solution Here
        requests = []
        for dest,vals in enumerate(values):
            print(f'Sending {len(vals)} values from {rank} to {dest}')
            req = comm.Isend(vals, dest=dest)
            requests.append(req)
    
        new_values = []
        for i in range(size):
            buf = np.zeros(buf_size, dtype='l')
            req = comm.Irecv(buf, source=i)
            requests.append(req)
            new_values.append(buf)
    
        MPI.Request.Waitall(requests)
    
        # comm.barrier() # Not necessary for most modern MPI implementations
    
        return new_values
    
  • from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    assert size == 2
    
    fh = MPI.File.Open(comm, 'output.txt', MPI.MODE_CREATE | MPI.MODE_WRONLY)
    
    for i in range(10):
        fh.Write_shared(f'Rank {rank} - {i}\n'.encode())
        comm.barrier()
    
    fh.Close()
    
  • from mpi4py import MPI
    
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()
    
    assert size == 2
    
    for i in range(10):
        if rank == 0:
            print(f'Rank {rank} - {i}')
            msg = comm.recv(source=1)
            print(msg)
        else:
            msg = f'Rank {rank} - {i}'
            comm.send(msg, dest=0)
        comm.barrier()
    

In general, you should try and avoid these types of constraints when designing MPI code. One of the biggest benefits of using MPI is high degrees of parallelism, and the more syncs that you require, the more you will throttle your code. There are some great use cases for MPI-IO that make some of this workflow more practical, so why don’t we take a further look at that now?

MPI-IO and Concurrent Writes

One of the biggest limitations with I/O is concurrent writes, but MPI-IO locking enables concurrent reads and writes as long as you are writing to different sections of memory. To think of a practical example, lets consider operations on a massive matrix. Maybe you have each process operating on a single row, and you need to write the output to the same file. Recall that efficient matrix implementations represent a matrix as a 1-D, contiguous block of data but virtually represent the data as a matrix - what we’re doing here is cocneptually no different!

import numpy as np

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

n_cols = 10
dtype = np.dtype('d')
outputFile = 'output.txt'

fh = MPI.File.Open(comm, outputFile, MPI.MODE_CREATE | MPI.MODE_WRONLY)

data = np.arange(rank * n_cols, (rank + 1) * n_cols, dtype=dtype)
fh.Write_at(rank * n_cols * dtype.itemsize, data)

fh.Close()

comm.barrier()

if rank == 0:
    arr = np.fromfile(outputFile, dtype=dtype, count=n_cols * size).reshape(size, -1)
    print(arr.dtype)
    print(arr)

The main consideration is whether we use Write_at or Write_at_all. While the code will appear to function identically, let’s take a look at the following example over many iterations and compare the times.

import numpy as np

from mpi4py import MPI

comm = MPI.COMM_WORLD

def foo(comm, test):
    rank = comm.Get_rank()
    size = comm.Get_size()

    n_cols = 1000
    dtype = np.dtype('i')
    outputFile = 'output.txt'
    iter_size = n_cols * size

    start_time = MPI.Wtime()

    fh = MPI.File.Open(comm, outputFile, MPI.MODE_CREATE | MPI.MODE_WRONLY)

    for i in range(100000):
        data = np.arange(rank * n_cols + i * iter_size, (rank + 1) * n_cols + i * iter_size, dtype=dtype)
        if test == 'all':
            fh.Write_at_all(rank * n_cols * dtype.itemsize, data)
        else:
            fh.Write_at(rank * n_cols * dtype.itemsize, data)

    fh.Close()

    end_time = MPI.Wtime()

    if rank == 0:
        print(f'Total time = {end_time - start_time}')

foo(comm, 'all')
foo(comm, None)

For this trial on my machine with 8 cores, the write_at_all takes 3 times as long as write_at, and this ranking stays regardless of the number of columns or number of iterations used. For this test case, the reason we see this is due to the fact that write_at_all (and all write_*_all operations for that matter) are collective operations that require synchronization. For a single machine with a limited number of cores and fast I/O, we’re not going to see any benefit from this collective operation and instead just slow ourselves down. For larger MPI environments however, we can leverage the synchronization for small updates very well. In these cases, the synchronization is able to turn n independent I/O operations potentially into many fewer by operating on contiguous batches of memory at once. We also could see a similar performance uplift with environments with slower I/O as long as the cost of the sync isn’t too bad. For most use cases though, if you’re writing to distinct arrays of memory, it’s best to use write_at by default and test timings of write_at_all if you’re in a larger environment.

Conclusion

Thank you for following all the way through! At the end of this, I really just want to emphasize a couple of crucial points that I fear might get lost in the weeds. First, our initial solution seemed like it should have worked - why wouldn’t placing a barrier help synchronize things? While I/O might be a unique instance of this occurring, the fact remains that someone could have made an innocent mistake and coded what, for all intents and purposes, should have been a correct solution. However, a simple test showed us that that couldn’t be the case. We could have very easily have been working on a much larger project, inserted these print statements and kept coding, trusting our output without even realizing that there were bugs that could desychronize our printed output. This leads neatly into my second point that these kinds of risks and nuances pop up all of the time in computer science (but especially when working on parallel computing tasks). If you don’t have a good grasp of the standards or libraries that you’re using, you will make mistakes that can make things very challenging to debug later on. These I/O challenges and syncing problems that we’ve explored in this series are broadly applicable to parallel computing at large, so athough we’re working in MPI currently, I hope you keep these caveats and tricks in mind if you work in GPU code in the future.

I want to stress even moreso the need to have the curiosity to dive deeper into situations when learning complex paradigms or when faced with a problem that you aren’t certain about. Don’t be afraid to write the code to either prove or refute your hypotheses as you take on new challenges. This has been a really fun blog series for me to write - I love diving deep into problems, and I hope you’ve enjoyed the journey with me. In the next series we’ll be taking a look at implementing some real-world statistical functions (some of which are challenging to implement even without having to worry about parallelism) using MPI, and there will be many more fun and challenging problems ahead! I hope to see you there!


Previous
An Introduction to MPI in Python - Part 2


© Copyright 2025 Greg McCord. Powered by Jekyll with al-folio. Hosted by GitHub Pages.