Setup¶
Much of this notebook requires an IPython.parallel
cluster to be running.
Outside the notebook, run
dacluster start -n4
# some utility imports
from __future__ import print_function
from pprint import pprint
from matplotlib import pyplot as plt
# main imports
import numpy
import distarray
# reduce precision on printed array values
numpy.set_printoptions(precision=2)
# display figures inline
%matplotlib inline
Software Versions¶
print("numpy", numpy.__version__)
import matplotlib
print("matplotlib", matplotlib.__version__)
import h5py
print("h5py", h5py.__version__)
print("distarray", distarray.__version__)
Set a RandomState¶
Set a RandomState
so random numpy arrays don't change between runs.
from numpy.random import RandomState
prng = RandomState(1234567890)
NumPy Arrays¶
DistArray is built on NumPy and provides a NumPy-array-like interface. First, let's generate a NumPy array and examine some of its attributes.
# a 4-row 5-column NumPy array with random contents
nparr = prng.rand(4, 5)
nparr
# NumPy array attributes
print("type:", type(nparr))
print("dtype:", nparr.dtype)
print("ndim:", nparr.ndim)
print("shape:", nparr.shape)
print("itemsize:", nparr.itemsize)
print("nbytes:", nparr.nbytes)
DistArrays¶
We'll make our first DistArray
out of the NumPy array created above.
# First we need a `Context` object. More on this later.
# For now, think of this object like the `NumPy` module.
# `Context`s manage the worker engines for us.
from distarray.globalapi import Context
context = Context()
# Make a DistArray from a NumPy array.
# This will push sections of the original NumPy array out
# to the engines.
darr = context.fromarray(nparr)
darr
# Print the array section stored on each engine
for i, a in enumerate(darr.get_localarrays()):
print(i, a)
# DistArrays have similar attributes to NumPy arrays,
print("type:", type(darr))
print("dtype:", darr.dtype)
print("ndim:", darr.ndim)
print("shape:", darr.shape)
print("itemsize:", darr.itemsize)
print("nbytes:", darr.nbytes)
# and some additional attributes.
print("targets:", darr.targets)
print("context:", darr.context)
print("distribution:", darr.distribution)
Universal Functions (ufuncs)¶
# NumPy provides `ufuncs`, or Universal Functions, that operate
# elementwise over NumPy arrays.
numpy.sin(nparr)
# DistArray provides ufuncs as well, for `DistArray`s.
import distarray.globalapi as da
da.sin(darr)
# `toarray` makes a NumPy array out of a DistArray, pulling all of the
# pieces back to the client. We do this to display the contents of the
# DistArray.
da.sin(darr).toarray()
# A NumPy binary ufunc.
nparr + nparr
# The equivalent DistArray ufunc.
# Notice that a new DistArray is created without
# pulling data back to the client.
darr + darr
# Contents of the resulting DistArray.
(darr + darr).toarray()
Reductions¶
Functions like sum
, mean
, min
, and max
are known as reductions, since they take an array and produce a smaller array or a scalar. In NumPy and DistArray, some of these functions can be applied over a specific axis
.
# NumPy sum
print("sum:", nparr.sum())
print("sum over an axis:", nparr.sum(axis=1))
# DistArray sum
print("sum:", darr.sum(), darr.sum().toarray())
print("sum over an axis:", darr.sum(axis=1), darr.sum(axis=1).toarray())
Indexing and Slicing¶
DistArrays support standard NumPy Indexing and distributed slicing, including slices with a step. Slicing is currently only supported for Block (and undistributed) DistArrays.
# Our example array, as a reminder:
darr.toarray()
# The shapes of the local sections of our DistArray
darr.localshapes()
# Return the value of a single element
darr[0, 2]
# Take a column slice
darr_view = darr[:, 3] # all rows, third column
print(darr_view)
print(darr_view.toarray())
# Slices return a new DistArray that is a view on the
# original, just like in NumPy.
# Changes in the view change the original array.
darr_view[3] = -0.99
print("view:")
print(darr_view.toarray())
print("original:")
print(darr.toarray())
# A more complex slice, with negative indices and a step.
print(darr[:, 2::2])
print(darr[:-1, 2::2].toarray())
# Incomplete indexing
# Grab the first row
darr[0]
Distributions¶
Above, when we created a DistArray out of a NumPy array, we didn't specify how the elements should be distributed among our engines. Distribution
s give you control over this, if you want it. In other words, Distribution
s control which processes own which (global) indices.
# Let's look at the `Distribution` object that was created for us
# automatically by `fromarray`.
distribution = darr.distribution
# This is a 2D distribution: its 0th dimension is Block-distributed,
# and it's 1st dimension isn't distributed.
pprint(distribution.maps)
# Plot this Distribution, color-coding which process each global index
# belongs to.
from distarray.plotting import plot_array_distribution
process_coords = [(0, 0), (1, 0), (2, 0), (3, 0)]
plot_array_distribution(darr, process_coords, cell_label=False, legend=True)
# Check out which sections of this array's 0th dimension are on
# each process.
distribution.maps[0].bounds
The Distribution above was created for us by fromarray
,
but DistArray lets us specify more complex distributions.
Here, we specify that the 0th dimension has a Block distribution ('b') and the 1st dimension has a Cyclic distribution.
DistArray supports Block, Cyclic, Block-Cyclic, Unstructured, and No-distribution dimensions. See the ScaLAPACK Documentation for more information about Distribution types.
from distarray.globalapi import Distribution
distribution = Distribution(context, shape=(64, 64), dist=('b', 'c'))
a = context.zeros(distribution, dtype='int32')
plot_array_distribution(a, process_coords, cell_label=False, legend=True)
Redistribution¶
Since DistArray
s are distributed, the equivalent to NumPy's reshape
(distribute_as
) can be a more complex and costly operation. For convenience, you can supply either a shape
or a full Distribution
object. Only Block distributions (and No-dist) are currently redistributable.
darr
darr.toarray()
# simple reshaping
reshaped = darr.distribute_as((10, 2))
reshaped
reshaped.toarray()
# A more complex resdistribution,
# changing shape, dist, and targets
dist = Distribution(context, shape=(5, 4),
dist=('b', 'b'), targets=(1, 3))
darr.distribute_as(dist)
Contexts¶
Context objects manage the setup of and communication to the worker processes for DistArray objects. They also act as the namespace to which DistArray creation functions are attached.
print("targets:", context.targets)
print("comm:", context.comm)
context.zeros((5, 3))
context.ones((20, 20))
Parallel IO¶
DistArray has support for reading NumPy .npy
files in parallel, for reading and writing .dnpy
files in parallel (our own flat-file format), and reading and writing HDF5 files in parallel (if you have a parallel build of h5py
).
# load .npy files in parallel
numpy.save("/tmp/outfile.npy", nparr)
distribution = Distribution(context, nparr.shape)
new_darr = context.load_npy("/tmp/outfile.npy", distribution)
new_darr
# save to .dnpy (a built-in flat-file format based on .npy)
context.save_dnpy("/tmp/outfile", darr)
# load from .dnpy
context.load_dnpy("/tmp/outfile")
# save DistArrays to .hdf5 files in parallel
context.save_hdf5("/tmp/outfile.hdf5", darr, mode='w')
# load DistArrays from .hdf5 files in parallel (using h5py)
context.load_hdf5("/tmp/outfile.hdf5", distribution)
Context.apply¶
Global view, local control. The apply
method on a Context
allows you to write functions that are applied locally (that is, on the engines) to each section of a DistArray. This allows you to push your computation close to your data, avoiding communication round-trips and possibly speeding up your computations.
def get_local_random():
"""Function to be applied locally."""
import numpy
return numpy.random.randint(10)
context.apply(get_local_random)
def get_local_var(darr):
"""Another local computation."""
return darr.ndarray.var()
context.apply(get_local_var, args=(darr.key,))
Context.register¶
Context.register
is similar to Context.apply
, but it allows you to register your function with a Context
up front, and then call it repeatedly, with a nice syntax.
def local_demean(la):
"""Return the local array with the mean removed."""
return la.ndarray - la.ndarray.mean()
context.register(local_demean)
context.local_demean(darr)
MPI-only Execution¶
Instead of using an IPython client (which uses ZeroMQ to communicate to the engines), you can run your DistArray code in MPI-only mode (using an extra MPI process for the client). This can be more performant.
# an example script to run in MPI-only mode
%cd julia_set
!python benchmark_julia.py -h
# Compile kernel.pyx
!python setup.py build_ext --inplace
# Run the benchmarking script with 5 MPI processes:
# 4 worker processes and 1 client process
!mpiexec -np 5 python benchmark_julia.py --kernel=cython -r1 1024
Distributed Array Protocol¶
Already have a library with its own distributed arrays? Use the Distributed Array Protocol to work with DistArray.
The Distributed Array Protocol (DAP) is a process-local protocol that allows two subscribers, called the "producer" and the "consumer" or the "exporter" and the "importer", to communicate the essential data and metadata necessary to share a distributed-memory array between them. This allows two independently developed components to access, modify, and update a distributed array without copying. The protocol formalizes the metadata and buffers involved in the transfer, allowing several distributed array projects to collaborate, facilitating interoperability. By not copying the underlying array data, the protocol allows for efficient sharing of array data.
http://distributed-array-protocol.readthedocs.org/en/rel-0.9.0/
def return_protocol_structure(la):
return la.__distarray__()
context.apply(return_protocol_structure, (darr.key,))
Acknowledgement and Disclaimer¶
This material is based upon work supported by the Department of Energy under Award Number DE-SC0007699.
This report was prepared as an account of work sponsored by an agency of the United States Government. Neither the United States Government nor any agency thereof, nor any of their employees, makes any warranty, express or implied, or assumes any legal liability or responsibility for the accuracy, completeness, or usefulness of any information, apparatus, product, or process disclosed, or represents that its use would not infringe privately owned rights. Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise does not necessarily constitute or imply its endorsement, recommendation, or favoring by the United States Government or any agency thereof. The views and opinions of authors expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof.