The Large Impact of MPI Collective Algorithms on AWS EC2 HPC cluster

I recently found that, on an EC2 HPC cluster, OpenMPI collectives like MPI_Bcast and MPI_Allreduce are 3~5 times slower than Intel-MPI, due to their underlying collective algorithms. This problem is reported in detail at https://github.com/aws/aws-parallelcluster/issues/1436. The take-away is that one should using Intel-MPI instead of OpenMPI to get optimal performance on EC2. The performance of MPI_Bcast is critical for I/O-intensive applications that need to broadcast data from master to the rest of processes.

It is also possible to fine-tune OpenMPI to narrow the performance gap. For MPI_Bcast, the "knormial tree" algorithm in OpenMPI4 can give 2~3x speed-up over the out-of-box OpenMPI3/OpenMPI4, getting closer to Intel-MPI without EFA. The algorithm can be selected by:

orterun --mca coll_tuned_use_dynamic_rules 1 --mca coll_tuned_bcast_algorithm 7 your_mpi_program

Here are the complete code and instructions to benchmark all algorithms: https://github.com/JiaweiZhuang/aws-mpi-benchmark

Below is a comparison of all available MPI_Bcast algorithms (more in the GitHub issue). The x-axis is clipped to not show very slow algorithms. See the numbers on each bar for the actual time.

/images/mpi_collectives/MPI_bcast.png

MPI over Multiple TCP Connections on EC2 C5n Instances

This is just a quick note regarding interesting MPI behaviors on EC2.

EC2 C5n instances provide an amazing 100 Gb/s bandwidth 1 , much higher than the 56 Gb/s FDR InfiniBand network on Harvard's HPC cluster 2. It turns out that actually getting the 100 Gb/s bandwidth needs a bit tweak. This post sorely focuses on bandwidth. In terms of latency, Ethernet + TCP (20~30 us) is hard to compete with InfiniBand + RDMA (~1 us).

Tests are conducted on an AWS ParallelCluster with two c5n.18xlarge instances, as in my cloud-HPC guide.

1 TCP bandwidth test with iPerf

Before doing any MPI stuff, first use the general-purpose network testing tool iPerf/iPerf3. AWS has provided an example of using iPerf on EC2 3. Note that iPerf2 and iPerf3 handle parallelism quite differently 4: the --parallel/-P option in iPerf2 creates multiplt threads (thus can ulitize multiple CPU cores), while the same option in iPerf3 opens multiple TCP connections but only one thread (thus can only use a single CPU core). This can lead to quite different benchmark results at high concurrency.

1.1 Single-thread results with iPerf3

Start server:

$ ssh ip-172-31-2-150  # go to one compute node
$ sudo yum install iperf3
$ iperf3 -s  # let it keep running

Start client (in a separate shell):

$ ssh ip-172-31-11-54  # go to another compute node
$ sudo yum install iperf3

# Single TCP stream
# `-c` specifies the server hostname (EC2 private IP).
# Most parameters are kept as default, which seem to perform well.
$ iperf3 -c ip-172-31-2-150 -t 4
...
[ ID] Interval           Transfer     Bandwidth       Retr
[  4]   0.00-4.00   sec  4.40 GBytes  9.46 Gbits/sec    0             sender
[  4]   0.00-4.00   sec  4.40 GBytes  9.46 Gbits/sec                  receiver
...

# Multiple TCP stream
$ iperf3 -c ip-172-31-2-150 -P 4 -i 1 -t 4
...
[SUM]   0.00-4.00   sec  11.8 GBytes  25.4 Gbits/sec    0             sender
[SUM]   0.00-4.00   sec  11.8 GBytes  25.3 Gbits/sec                  receiver
...

A single stream gets ~9.5 Gb/s, while -P 4 achieves the maximum bandwidth of ~25.4 Gb/s. Using more streams does not help further, as the workload starts to become CPU-limited.

1.2 Multi-thread results with iPerf2

On server:

$ sudo yum install iperf
$ iperf -s

On client:

$ sudo yum install iperf

# Single TCP stream
$ iperf -c ip-172-31-2-150 -t 4  # consistent with iPerf3 result
...
[ ID] Interval       Transfer     Bandwidth
[  3]  0.0- 4.0 sec  4.40 GBytes  9.45 Gbits/sec
...

# Multiple TCP stream
$ iperf -c ip-172-31-2-150 -P 36 -t 4  # much higher than with iPerf3
...
[SUM]  0.0- 4.0 sec  43.4 GBytes  93.0 Gbits/sec
...

Unlike iPerf3, iPerf2 is able to approach the theoretical 100 Gb/s by using all the available cores.

2 MPI bandwidth test with OSU mirco-benchmarks

Next, do OSU Micro-Benchmarks, a well-known MPI benchmarking framework. Similar tests can be done with Intel MPI Benchmarks.

Get OpenMPI v4.0.0, which allows a single pair of MPI processes to use multiple TCP connections 5.

$ spack install openmpi@4.0.0+pmi schedulers=slurm  # Need to fix https://github.com/spack/spack/pull/10853

Get OSU:

$ spack install osu-micro-benchmarks ^openmpi@4.0.0+pmi schedulers=slurm

Focus on point-to-point communication here:

# all the later commands are executed in this directory
$ cd $(spack location -i osu-micro-benchmarks)/libexec/osu-micro-benchmarks/mpi/pt2pt/

2.1 Single-stream

osu_bw tests bandwidth between a single pair of MPI processes.

$ srun -N 2 -n 2 ./osu_bw
  OSU MPI Bandwidth Test v5.5
  Size      Bandwidth (MB/s)
1                       0.47
2                       0.95
4                       1.90
8                       3.78
16                      7.66
32                     15.17
64                     29.94
128                    53.69
256                   105.53
512                   202.98
1024                  376.49
2048                  626.50
4096                  904.27
8192                 1193.19
16384                1178.43
32768                1180.01
65536                1179.70
131072               1180.92
262144               1181.41
524288               1181.67
1048576              1181.62
2097152              1181.72
4194304              1180.56

This matches the single-stream result 9.5 Gb/s = 9.5/8 GB/s ~ 1200 MB/s from iPerf.

Note

1 GigaByte (GB) = 8 Gigabits (Gb)

2.2 Multi-stream

osu_mbw_mr tests bandwidth between multiple pairs of MPI processes.

# Simply calling `srun` on `osu_mbw_mr` seems to hang forever. Not sure why.
$ # srun -N 2 --ntasks-per-node 36 ./osu_mbw_mr  # in principle it should work

# Do it in two steps fixes the problem.
$ srun -N 2 --ntasks-per-node 72 --pty /bin/bash  # request interactive shell

# `osu_mbw_mr` requires the first half of MPI ranks to be on one node.
# Check it with the verbose output below. Slurm should have the correct placement by default.
$ $(spack location -i openmpi)/bin/orterun --tag-output hostname
...

# Actually running the benchmark
$ $(spack location -i openmpi)/bin/orterun ./osu_mbw_mr
  OSU MPI Multiple Bandwidth / Message Rate Test v5.5
  [ pairs: 72 ] [ window size: 64 ]
  Size                  MB/s        Messages/s
1                      17.94       17944422.39
2                      35.76       17878198.29
4                      71.85       17963002.53
8                     143.80       17974644.52
16                    283.00       17687790.85
32                    551.03       17219816.70
64                   1067.73       16683260.55
128                  2076.05       16219122.14
256                  3890.82       15198501.12
512                  6790.84       13263356.64
1024                10165.19        9926942.84
2048                11454.89        5593209.95
4096                11967.32        2921708.63
8192                12597.32        1537758.49
16384               12686.13         774299.68
32768               12765.72         389578.83
65536               12857.16         196184.66
131072              12829.56          97881.74
262144              12994.67          49570.75
524288              12988.97          24774.49
1048576             12983.20          12381.74
2097152             13011.67           6204.45
4194304             12910.31           3078.06

This matches the theoretical maximum bandwidth (100 Gb/s ~ 12500 MB/s).

On an InfiniBand cluster there is typically little difference between single-stream and multi-stream bandwidth. Something to keep in mind regarding TCP/Ethernet/EC2.

3 Tweaking TCP connections for OpenMPI

OpenMPI v4.0.0 allows one pair of MPI processes to use multiple TCP connections via the btl_tcp_links parameter 5.

$ export OMPI_MCA_btl_tcp_links=36  # https://www.open-mpi.org/faq/?category=tuning#setting-mca-params
$ ompi_info --param btl tcp --level 4 | grep btl_tcp_links  # double check
MCA btl tcp: parameter "btl_tcp_links" (current value: "36", data source: environment, level: 4 tuner/basic, type: unsigned_int)
$ srun -N 2 -n 2 ./osu_bw
  OSU MPI Bandwidth Test v5.5
  Size      Bandwidth (MB/s)
1                       0.46
2                       0.92
4                       1.88
8                       3.78
16                      7.60
32                     14.95
64                     30.34
128                    56.95
256                   113.52
512                   213.36
1024                  400.18
2048                  665.80
4096                  963.67
8192                 1187.67
16384                1180.56
32768                1179.53
65536                2349.06
131072               2379.48
262144               2589.47
524288               2805.73
1048576              2853.21
2097152              2882.12
4194304              2811.13

This matches the previous iPerf3 result (25 Gb/s ~ 3000 MB/s) regarding single-thread, mutli-TCP bandwidth. A single MPI pair is hard to go further, as the communication is now limited by thread/CPU.

This tweak doesn't actually improve the performance of my real-world HPC code, which should already have multiple MPI connections. The lesson learned is probably -- be careful when conducting micro-benchmarks. The out-of-box osu_bw can be misleading on EC2.

4 References

1

New C5n Instances with 100 Gbps Networking: https://aws.amazon.com/blogs/aws/new-c5n-instances-with-100-gbps-networking/

2

Odyssey Architecture: https://www.rc.fas.harvard.edu/resources/odyssey-architecture/

3

"How do I benchmark network throughput between Amazon EC2 Linux instances in the same VPC?" https://aws.amazon.com/premiumsupport/knowledge-center/network-throughput-benchmark-linux-ec2/

4

Discussions on multithreaded iperf3 at: https://github.com/esnet/iperf/issues/289

5(1,2)

"Can I use multiple TCP connections to improve network performance?" https://www.open-mpi.org/faq/?category=tcp#tcp-multi-links

[Notebook] Dask and Xarray on AWS-HPC Cluster: Distributed Processing of Earth Data

This notebook continues the previous post by showing the actual code for distributed data processing.

In [1]:
%matplotlib inline
import xarray as xr
import matplotlib.pyplot as plt
import cartopy.crs as ccrs

from dask.diagnostics import ProgressBar
from dask_jobqueue import SLURMCluster
from distributed import Client, progress
In [2]:
import dask
import distributed
dask.__version__, distributed.__version__
Out[2]:
('1.1.3', '1.26.0')
In [3]:
%env HDF5_USE_FILE_LOCKING=FALSE
env: HDF5_USE_FILE_LOCKING=FALSE

Data exploration

Data are organized by year/month:

In [4]:
ls /fsx
2008/  2010/  2012/  2014/  2016/  2018/
2009/  2011/  2013/  2015/  2017/  QA/
In [5]:
ls /fsx/2008/
01/  02/  03/  04/  05/  06/  07/  08/  09/  10/  11/  12/
In [6]:
ls /fsx/2008/01/data  # one variable per file
air_pressure_at_mean_sea_level.nc*
air_temperature_at_2_metres_1hour_Maximum.nc*
air_temperature_at_2_metres_1hour_Minimum.nc*
air_temperature_at_2_metres.nc*
dew_point_temperature_at_2_metres.nc*
eastward_wind_at_100_metres.nc*
eastward_wind_at_10_metres.nc*
integral_wrt_time_of_surface_direct_downwelling_shortwave_flux_in_air_1hour_Accumulation.nc*
lwe_thickness_of_surface_snow_amount.nc*
northward_wind_at_100_metres.nc*
northward_wind_at_10_metres.nc*
precipitation_amount_1hour_Accumulation.nc*
sea_surface_temperature.nc*
snow_density.nc*
surface_air_pressure.nc*
In [7]:
# hourly data over a month
dr = xr.open_dataarray('/fsx/2008/01/data/sea_surface_temperature.nc')
dr
Out[7]:
<xarray.DataArray 'sea_surface_temperature' (time0: 744, lat: 640, lon: 1280)>
[609484800 values with dtype=float32]
Coordinates:
  * lon      (lon) float32 0.0 0.2812494 0.5624988 ... 359.43674 359.718
  * lat      (lat) float32 89.784874 89.5062 89.22588 ... -89.5062 -89.784874
  * time0    (time0) datetime64[ns] 2008-01-01T07:00:00 ... 2008-02-01T06:00:00
Attributes:
    standard_name:  sea_surface_temperature
    units:          K
    long_name:      Sea surface temperature
    nameECMWF:      Sea surface temperature
    nameCDM:        Sea_surface_temperature_surface
In [8]:
# Static plot of the first time slice
fig, ax = plt.subplots(1, 1, figsize=[12, 8], subplot_kw={'projection': ccrs.PlateCarree()})
dr[0].plot(ax=ax, transform=ccrs.PlateCarree(), cbar_kwargs={'shrink': 0.6})
ax.coastlines();

What happens to the values over the land? Easier to check by an interactive plot.

In [9]:
import geoviews as gv
import hvplot.xarray

fig_hv = dr[0].hvplot.quadmesh(
    x='lon', y='lat', rasterize=True, cmap='viridis', geo=True,
    crs=ccrs.PlateCarree(), projection=ccrs.PlateCarree(), project=True,
    width=800, height=400, 
) * gv.feature.coastline

# fig_hv