MapReduce−MPI Library Users Manual
http://www.sandia.gov/~sjplimp/mapreduce.html
Sandia National Laboratories, Copyright (2009) Sandia Corporation
This software and manual is distributed under the modified Berkeley Software Distribution (BSD) License.
Table of Contents
MapReduce−MPI Library..............................................................................................................................1
Background..............................................................................................................................................2
What is a MapReduce?............................................................................................................................3
Getting Started.........................................................................................................................................4
Writing a MapReduce program...............................................................................................................5
C++ Interface to the Library....................................................................................................................6
Instantiate a MapReduce object........................................................................................................6
Copy a MapReduce object................................................................................................................7
Destroy a MapReduce object............................................................................................................8
MapReduce aggregate() method.......................................................................................................8
MapReduce clone() method..............................................................................................................9
MapReduce collapse() method.........................................................................................................9
MapReduce collate() method............................................................................................................9
MapReduce compress() method.....................................................................................................10
MapReduce convert() method.........................................................................................................11
MapReduce gather() method...........................................................................................................11
MapReduce map() method..............................................................................................................11
MapReduce reduce() method..........................................................................................................13
MapReduce scrunch() method........................................................................................................14
MapReduce sort_keys() method.....................................................................................................14
MapReduce sort_values() method..................................................................................................15
MapReduce sort_multivalues() method..........................................................................................15
MapReduce kv_stats() method.......................................................................................................16
MapReduce kmv_stats() method....................................................................................................16
KeyValue add() method..................................................................................................................16
Settings and defaults.......................................................................................................................17
C interface to the Library................................................................................................................17
Python interface to the Library.......................................................................................................19
Technical Details...................................................................................................................................25
Length and byte−alignment of keys and values..............................................................................25
Memory requirements for KeyValue and KeyMultiValue objects.................................................26
Hash functions................................................................................................................................27
Callback functions..........................................................................................................................27
Python overhead..............................................................................................................................28
Error messages................................................................................................................................28
Examples...............................................................................................................................................28
Word frequency example................................................................................................................29
R−MAT matrices example..............................................................................................................29
Citations..........................................................................................................................................30
MapReduce−MPI Library Users Manual
i
MapReduce−MPI Library
This document describes the 13 April 2009 version of the MapReduce−MPI (MR−MPI) library that implements
the MapReduce operation popularized by Google. The library is designed for parallel execution on
distributed−memory platforms, but will also operate on a single processor. The library is written in C++, is
callable from hi−level langauges (C++, C, Fortran, Python, or other scripting languages), and requires no
additional software except linking with MPI (a message passing library) if you wish to perform MapReduces in
parallel.
Similar to the original Google design, a user performs a MapReduce by writing a small program that invokes the
library. The user typically provides two application−specific functions, a "map" and a "reduce", that are called by
the library when a MapReduce operation is executed. "Map" and "reduce" are serial functions, meaning they are
invoked independently on individual processors on portions of your data when performing a MapReduce
operation in parallel.
The goal of this library is to provide a simple and portable interface for users to create their own MapReduce
programs, which can then be run on any desktop or large parallel machine using MPI. See the Background section
for features and limitations of this implementation.
Source codes for the library is freely available for download from the MR−MPI web site and is licensed under the
modified Berkeley Software Distribution (BSD) License. This basically means it can be used by anyone for any
purpose. See the LICENSE file provided with the distribution for more details.
The distrubution includes a few examples of simple programs that illustrate the use of MapReduce.
The authors of this library are Steve Plimpton at Sandia National Laboratories and Karen Devine who can be
contacted via email: sjplimp at sandia.gov, kddevin at sandia.gov.
Background•
What is a MapReduce?•
Getting Started•
Writing a MapReduce program•
C++ Interface to the Library
Instantiate a MapReduce object♦
Copy a MapReduce object♦
Destroy a MapReduce object♦
MapReduce::aggregate()♦
MapReduce::clone()♦
MapReduce::collapse()♦
MapReduce::collate()♦
MapReduce::compress()♦
MapReduce::convert()♦
MapReduce::gather()♦
MapReduce::map()♦
MapReduce::reduce()♦
MapReduce::scrunch()♦
MapReduce::sort_keys()♦
MapReduce::sort_values()♦
MapReduce::sort_multivalues()♦
MapReduce::kv_stats()♦
MapReduce::kmv_stats()♦
KeyValue::add()♦
•
1
Settings and defaults♦
C interface to the Library•
Python interface to the Library•
Technical Details•
Examples
Word frequency♦
R−MAT matrices♦
•
Background
MapReduce is the programming paradigm popularized by Google researchers Dean and Ghemawat. Their
motivation was to enable analysis programs to be rapidly developed and deployed within Google to operate on the
massive data sets residing on their large distributed clusters. Their paper introduced a novel way of thinking about
certain kinds of large−scale computations as "map" operations followed by "reduces". The power of the paradigm
is that when cast in this way, a traditionally serial algorithm now becomes two highly parallel application−specific
operations (requiring no communication) sandwiched around an intermediate operation that requires parallel
communication, but which can be encapsulated in a library since the operation is independent of the application.
The Google implementation of MapReduce was a C++ library with communication between networked machines
via remote procedure calls. They allow for fault tolerance when large numbers of machines are used, and can use
disks as out−of−core memory to process huge data sets. Thousands of MapReduce programs have since been
written by Google researchers and are part of the daily compute tasks run by the company.
While I had heard about MapReduce, I didn't appreciate its power for scientific computing on a monolithic
distributed−memory parallel machine, until reading a SC08 paper by Tu, et al of the D.E. Shaw company. They
showed how to think about tasks such as the post−processing of simulation output as MapReduce operations. In
this context it can be useful for computations that would normally be thought of as serial, such as reading in a
large data set and scanning it for events of a desired kind. As before, the computation can be formulated as a
highly parallel "map" followed by a "reduce". The encapsulated parallel operation in the middle requires
all−to−all communication to reorgnanize the data, a familiar MPI operation.
Tu's implementation of MapReduce was in parallel Python with communication between processors via MPI,
again allowing disks to be used for out−of−core operations, since their Linux cluster has one disk per processing
node.
This MapReduce−MPI (MR−MPI) library is a very simple and lightweight implementation of the basic
MapReduce functionality, borrowing ideas from both the Dean and Sanjay and Tu, et al papers. It has the
following features:
C++ library using MPI for inter−processor communication. This allows precise control over the memory
allocated during a large−scale MapReduce.
•
C++ and C and Python interfaces provided. A C++ interface means that one or more MapReduce objects
can be instantiated and invoked by the user's program. A C interface means that the library can also be
called from C or other hi−level languages such as Fortran. A Python interface means the library can be
called from a Python script, allowing you to write serial map() and reduce() functions in Python. If your
machine can run Python in parallel, you can also run a parallel MapReduce in that manner.
•
Small, portable. The entire library is a few thousand lines of C++ code in a handful of C++ files which
can be built on any machine with a C++ compiler. For parallel operation, you link with MPI, a standard
message passing library available on all distributed memory machines. For serial operation, a dummy
MPI library can be substituted, which is provided. The Python wrapper can be installed on any machine
with a version of Python that includes the ctypes module, typically Python 2.5 or later.
•
2
This library also has the following limitations, which may be overcome in future releases:
No fault toleranace. Current MPI implementations do not enable easy detection of a dead processor. So
like most MPI programs, a MapReduce operation will hang or crash if a processor goes away.
•
No out−of−core operations. Most of the large parallel machines at Sandia do not have one disk per
processor or node. Rather they have a few I/O nodes shared by 1000s of processors. This makes
out−of−core processing via disk access by all processors less effective and less portable. While these
machines do have huge aggregate memory, it does mean the library is limited to processing data sets that
will fit in that memory. This can be a limitation, particularly when the intermediate data set (between the
map and reduce operations) is large.
•
Finally, I call attention to recent work by Alexander Gray and colleagues at Georgia Tech. They show that various
kinds of scientific computations such as N−body forces via multipole expansions, k−means clustering, and
machine learning algorithms, can be formulated as MapReduce operations. Thus there is an expanding set of
data−intense or compute−intense problems that may be amenable to solution using a MapReduce library such as
this.
What is a MapReduce?
The canonical example of a MapReduce operation, described in both the Dean and Sanjay and Tu, et al papers, is
counting the frequency of words in a collection of text files. Imagine a large corpus of text comprising Gbytes or
Tbytes of data. To count how often each word appears, the following algorithm would work, written in Python:
dict = {}
for file in sys.argv[1:]:
text = open(file,'r').read()
words = text.split()
for word in words:
if word not in dict: dict[word] = 1
else: dict[word] += 1
unique = dict.keys()
for word in unique:
print dict[word],word
Dict is a "dictionary" or associative array which is a collection of key/value pairs where the keys are unique. In
this case, the key is a word and its value is the number of times it appears in any text file. The program loops over
files, and splits the contents into words (separated by whitespace). For each word, it either adds it to the dictionary
or increments its associated value. Finally, the resulting dictionary of unique words and their counts is printed.
The drawback of this implementation is that it is inherently serial. The files are read one by one. More importantly
the dictionary data structure is updated one word at a time.
A MapReduce formulation of the same task is as follows:
array = []
for file in sys.argv[1:]:
array += map(file)
newarray = collate(array)
unique = []
for entry in newarray:
unique += reduce(entry)
for entry in unique:
print entry[1],entry[0]
3