I’ve been planning on writing about the Google’s MapReduce algorithm for some time but I couldn’t find a good practical example. Then we had a Northwest C++ Users Group presentation by Steve Yegge and a followup discussion and beers, and I had a little epiphany. Steve was talking about, among other things, the build process. And that’s just a bunch of algorithms that are perfect for explaining MapReduce.
MapReduce is usually introduced in the context of distributed systems. It’s a system that spreads processing among multiple machines, often organized in huge server farms. But it can also be used on a single machine, to work with processes; or even in a single process, with threads. The beauty of it is that, if you write a build engine that uses MapReduce, you may scale it from a few threads to thousands of servers. It’s like writing a program that, rather than reading individual disk sectors, uses a file system. Such program will magically work on a USB stick as well as on a distributed file system.
The other hot trend, besides scalability, is data mining. Huge software projects are like the Internet. Developers spend a lot of time browsing source files. We need tools that are more than just search engines, we need smart tools that understand computer languages. And data mining fits very well into the MapReduce paradigm. There is fan-out of small independent tasks operating on single files, followed by a fan-in, during which data is combined. A perfect example of such an algorithm is mapping inter-file dependencies (mediated by include statements), something every build does to minimize rebuilding after a change.
I organized this blog to follow the old parable of three blind men and an elephant. The elephant in this case is the build. The first man touches the compile-link part of the build process and exclaims: “It’s MapReduce.” He has a well-developed sense of scale. The second one notices that not all files are rebuilt after a change and shouts: “It’s Whole Program Analysis.” He has an excellent data-mining ear. The third man, with a sequential nose, observes that not everything can be done in parallel and cries: “It’s a Topological Sort.”
So what is a build?
It’s MapReduce
Obviously, you can build a project on a single machine by performing a sequence of actions. I choose to treat this as a trivial case of a distributed process. Let’s start with the simplest thing: You have a bunch of C++ files and you want to compile them and link the resulting object files into an executable.
The separate compilation model tells us that the compilation of any source file (a Translation Unit, if you wish) is independent of the compilation of any other source file. Therefore, the compilation can (and I’d say, should) be run in parallel. You will not only make more cores busy, but also overlap I/O-intensive parts of compilation.
So the build engine would fan out a number of compilation tasks and, when they are all finished, combine the outputs (object files) and pass them to the linker. Let’s call the first part “mapping” and the second part “reducing.” That’s your MapReduce in a nutshell.
To be more specific, MapReduce abstracts this process so it may be reused in many different contexts. It’s a framework into which the client plugs in the parts that define what may be done in parallel and how to combine the results.
Here are the general steps:
- The client specifies input data. In our example this would be a list of files to be rebuilt.
- MapReduce partitions this list among multiple worker threads, processes, or machines.
- Each worker executes client-provided code–a function called
map
. In our example, we’d writemap
to run the compiler on a single file to produce the object file. This function “maps” a source file into an object file. - MapReduce then waits for all workers to finish (possibly re-distributing the work if a server crashes). This barrier is the main synchronization point in the algorithm.
- MapReduce reshuffles the results and, again, distributes them to different workers for the final phase of the algorithm.
- Each worker executes client-provided code–a function called
reduce
. In our example we’d writereduce
to execute a linker on a list of object files to produce the executable.
For all this to work, the client must organize data in a particular form that is palatable to MapReduce. The input must be a list of (key, data) pairs.
Since keys and data may be of arbitrary type, why this separation? MapReduce needs keys to be able to partition the work. Each key uniquely identifies a task that may be done in parallel with other tasks. In our case the key would be a source file name (or, in general, a path). The keys are sorted by MapReduce and partitioned equitably among workers (often the client has the option to override the default partitioning algorithm or the compare function).
The map
function, written by the client, must conform to a particular (generic) interface. It takes a key and the associated data. In our case map
would take a file name (key) and some data to be defined later. It would compile the file to an object. Notice that map
is myopic by nature. It concerns itself only with a small fraction of the whole process. Because it doesn’t have to know about the big picture (here, the build), its implementation is relatively easy (here, it just calls the compiler).
The output of map
has to conform to certain standards too, so it can be shuffled by MapReduce and passed to the second client-defined function, reduce
. The map
function must emit new pairs of (key, data). The new keys and the new data may be totally unrelated to the original ones, both in meaning and type.
Back to our example, to make reduce
nontrivial, let’s shoot for a little more general scanario: we want to build multiple targets at once. In Visual Studio, for instance, building the solution might involve building several projects, each producing a separate executable, library, or a DLL. We’d make map
emit pairs (target name, object name). (The target name will have to be passed to map
as input data.)
Here’s my simplified implementation of map
(a more complete toy example follows at the end of this post):
void map(std::string const & file, std::string const & target) { std::string cmd = "cl /c "; cmd += file; execute(cmd); // output new (key, value) pair emit(target, ObjFileName(file)); }
When all workers are done, MapReduce gathers the emitted pairs and sorts them by the key so that it can accumulate data for each key into a separate list. It then redistributes the results among workers. As before, the keys define units of concurrency. Here we assume that each target may be linked independently (this is not always true: see “It’s a Topological Sort”).
The second-phase workers execute the client-defined reduce
. The first argument to reduce
is the new key, and the second is a list of data for this key.
In our example MapReduce would combine the outputs of all map
s to build a list of object files corresponding to each build target. And that’s exactly what the linker needs. Here’s my toy implementation of reduce
:
void reduce(std::string const & target, std::list<std::string> const & objFiles) { std::string cmd = "link /out "; cmd += ExeNames[target]; std::for_each(objFiles.begin(), objFiles.end(), [&](std::string const & obj) { cmd += " "; cmd += obj; }); execute(cmd); }
(I explain my use of lambdas in the Appendix.)
Of course this is a very simplified picture of just one part of the build process. Still, I suspect many a build environment use the same overall idea, and maybe some of them even use MapReduce to implement it.
There are many distributed build environment in use today. The difference is that they either put distribution on top of non-distributed builds or write one monolithic application. That restricts their reusability and leads to duplication of functionality. Here, on the other hand, distribution is a layer below the build engine. The MapReduce implementation, whether on a single machine or over a server farm, knows nothing about building software projects, so it doesn’t duplicate any functionality and is perfectly reusable. It’s a matter of whether the build engine uses, say, message-passing API or, more abstracted, MapReduce API.
It’s Whole Program Analysis
Separate compilation model means that the compiler has access to only one source file at a time. The build environment, on the other hand, has access to the whole program. So if there is any need for whole program analysis, that’s the natural place to put it.
Figuring out dependencies between program files is an example of whole program analysis. The build needs it because knowing the dependency graph allows it to recompile only the minimum number of files after one of them (for instance, a header file) changes. There is a program, makedepend, that finds such dependencies by looking at the #include
statements.
Inverted Index
Finding dependencies is a lot like creation of an inverted index — a classic example of how to use MapReduce. The map
function scans a given document and emits a stream of pairs, (word, document). The reduce
function gets a word as the key and a list of documents in which this word occurs (possibly with some positional information). The bulk of the work is done inside MapReduce by sorting the results of map
by the word.
As an exercise, let’s see how a simple makedepend could be implemented using MapReduce. Suppose that we have a dedicated parser (or a smart preprocessor) that takes a source file and finds all the includes on which it depends. (In C++, the parser must do it recursively, going into includes of includes, etc. See item 7 in Walter Bright’s post.)
What the build process really needs is the answer to a different question: Which files must be recompiled if a given include file changes?
Here’s how we may answer this question using MapReduce. Our map
function should take a source file name as key and run our parser over this file. (We also have to know what constants are predefined for a given build target, because of conditional compilation. We could pass them as data to map
.)
What map
would emit is a stream of pairs, (include file, current source file). Notice the inversion: For map
, the name of the source file was the key; here the name of the include file is the key. Because of that trick, MapReduce will do the bulk of work for us. It will sort and group data by the new key. Thus the input to our reduce
function will be the name of an include file and a list of all source files that depend on it. These are the files that will have to be recompiled if that header file changes.
Here again, a higher level of abstraction–writing the dependency engine on top of MapReduce–provides better scalability and avoids code duplication. It also opens up new possibilities. Consider the building of a dependency graph as an example of data mining. In general, a build engine that works on top of MapReduce could easily dispatch all kinds of bots that extract local data from every file and combine them into a global index or database.
In particular, many IDEs attempt, with higher or lower degree of success, to build call graphs, definition/use graphs, inheritance graphs, etc.
Also, there are programs that can infer const
-ness or, in Java, non-null-ness. In dynamic languages, type inference is very useful, and it requires whole-program analysis. Not to mention programs that can find potential data races or deadlocks. They all follow the same pattern and can easily be fit into a MapReduce build engine.
It’s a Topological Sort
The build process must also deal with other types of dependencies: when the result of one operation is a prerequisite for another operation. If you’ve ever built a compiler using flex and bison, you know that before you can run flex you need to run bison to produce the appropriate include files that contain, among others, definitions of tokens. These kinds of dependencies are usually explicitly written into makefiles, as in this example:
lang.tab.hpp lang.tab.cpp: lang.ypp bison lang.ypp lex.yy.c: lang.lex lang.tab.hpp flex lang.lex
When you have results of some operations being prerequisites for other operations, you use the good old topological sort to organize your work. It so happens that topological sort may also be done using MapReduce (see, for instance, Ricky Ho’s blog).
I’ll explain it using a classic example: getting dressed in the morning. Our set consists of objects of clothing:
{jeans, lsock, rsock, lshoe, rshoe}
You can’t get dressed by putting on those objects in random order. Traditionally, you put on a sock before you put on a shoe. A sock must be on the list of prerequisites for a shoe.
In the preliminary stage of topological sort, we prepare a list of pairs, (object, prerequisite). In our case, the list consists of:
(lshoe, lsock) (lshoe, jeans) (rshoe, rsock) (rshoe, jeans)
Our map1
iterates over the list of prerequisites for a given object and emits inverted pairs: the prerequisite is the key, and the object is the data. In other words, it emits (object, dependent) pairs.
void map1(std::string const & object, std::string const & prereq) { emit(prereq, object); }
MapReduce sorts and reshuffles those pairs and then calls reduce1
.
void reduce1(std::string const & object, StringList const & dependents) { TheDependentsOf[object] = dependents; };
Now we have, for each file, both the original list of prerequisites, and the newly created list of dependents. For the sake of this exposition, I’ll store these list in two global maps:
StringToList ThePrerequisitesOf; StringToList TheDependentsOf;
In our case, the resulting dependents map will contain:
TheDependentsOf[jeans] = {lshoe, rshoe}; The DependentsOf[lsock] = {lshoe}; TheDependentsOf[rsock] = {rshoe};
The second part of the algorithm is a loop in which MapReduce is called repeatedly until all objects are consumed (I mean, put on).
The map2
function takes an object of clothing as key and a pair of lists, its prerequisites and its dependents, as data. If the list of prerequisites is not empty, it does nothing–we can’t put on that object yet.
Otherwise it performs the action: “Put the object on.” In our case, jeans and socks have no prerequisites, so they are put on immediately. Once the object is put on, our map2
removes that object from the list of objects. It also iterates over the list of its dependents and emits inverted pairs, (dependent, current object). For instance, once the jeans are put on, two pairs are emitted:
(lshoe, jeans), (rshoe, jeans)
because both shoes depended on the jeans.
Here’s the code:
void map2(std::string const & object, TwoLists const & preqsAndDepds) { if (preqsAndDepds.first.empty()) { // This object has no prerequisites. Put it on. std::string cmd = "put on "; execute(cmd + object); // Remove it from the global list TheObjects.erase( std::find(TheObjects.begin(), TheObjects.end(), object)); // Emit pairs (object, its completed prerequisite) StringList const & depds = preqsAndDepds.second; std::for_each(depds.begin(), depds.end(), [&](std::string const & dep) { emit(dep, object); }); } }
The reduce2
function is then called with an object and the list of its completed prerequisites. In our case, the arguments will be:
lshoe, {jeans, lsock} rshoe, {jeans, rsock}
The reduce2
function removes the completed prerequisites from the list of all prerequisites for a given object.
In our case, reduce2
will remove both jeans and the left sock from ThePrerequisitsOf[lshoe]
.
It will also remove the jeans and the right sock from ThePrerequisitesOf[rshoe]
.
void reduce2(std::string const & object, StringList const & complPrereqs) { std::for_each(complPrereqs.begin(), complPrereqs.end(), [&](std::string const & completed) { StringList & lst = ThePrerequisitesOf[object]; lst.erase(std::find(lst.begin(), lst.end(), completed)); }); }
As a result, the shoes end up with no prerequisites, so they will be put on in the next (and last) iteration of MapReduce.
Limitations of MapReduce
MapReduce is one particular approach to data-driven parallelism. In general it’s not a good fit for problems that exhibit task-driven parallelism. But even within the data-driven domain, MapReduce has competitors, one of them being PGAS (Partitioned Global Address Space; see my post on The Future of Concurrent Programming). Let’s briefly compare the two approaches.
MapReduce is closer to the functional-programming, message-passing, data-copying paradigm. PGAS is a generalization of shared-memory paradigm. It creates the illusion of global address space spanning multiple processes or machines. Consequently, MapReduce doesn’t require explicit synchronization while PGAS often does. On the other hand, MapReduce incurs more data copying even if it runs on threads of the same process (I’ll try to quantify this in my next post).
MapReduce requires a particular form of input — a set of (key, data) pairs. The distribution of work is encoded in the input and is driven by the choice of keys–each key defining a minimum unit of concurrency. PGAS works on shared data structures, typically multi-dimensional arrays. Distribution of work is abstracted from data and from the algorithm– it is defined by distribution maps over data structures (this is also explained in my previous post). A distribution may be modified without changing any other part of the algorithm. With MapReduce that would require either the change of keys, or a client-defined partitioning function.
A lot of MapReduce applications end up sharing data one way or another. Even my little example assumes that files, in particular header files, are accessible form every location. This is usually made possible through some kind of distributed file system. Google, the owner of the patent on MapReduce (I just hope they won’t “do evil” by suing other companies or individuals who use it or, gasp!, blog about it), has its own GFS (Google File System) that’s used in concert with MapReduce.
Finally, it’s not clear what fraction of problems lend themselves to MapReduce. Scientific simulations, for instance, are usually easier to describe in terms of large matrices, and therefore fit the PGAS mold more naturally. I plan to write more on this topic in my next post.
Appendix: Bob the Builder
Just for fun, below is a short C++ program I wrote to illustrate the use of MapReduce in the build process. Since MapReduce has its roots in functional programming, I felt free to use the C++0x lambdas at my convenience. For instance, in this snippet:
[&](Pair const & p) { targetFiles[p.first].push_back(p.second); }
I define an anonymous function that takes a Pair
(by const
reference) and adds its second component to the list targetFiles[p.first]
.
Notice that targetFiles
is defined outside of the scope of the lambda–it is captured from the environment. A function that captures the environment is called a closure. Here I told the compiler to capture the whole environment, by reference, using the notation [&]
.
// A toy implementation of MapReduce. Sorry for using globals. #include <iostream> #include <string> #include <list> #include <map> #include <algorithm> std::list<std::pair<std::string, std::string>> Emitted; void emit(std::string const & key, std::string const & data) { std::cout << " emit: (" << key.c_str() << ", " << data.c_str() << ")\n"; Emitted.push_back(make_pair(key, data)); } void execute(std::string const & cmd) { std::cout << cmd.c_str() << std::endl; } typedef std::pair<std::string, std::string> Pair; typedef std::list<Pair> InputList; typedef std::list<std::string> StringList; void MapReduce(InputList const & input, void (*map)(std::string const & key, std::string const & data), void (*reduce)(std::string const & key, StringList const & lst)) { // Distribute the map part to workers (here: do them sequentially) std::for_each(input.begin(), input.end(), [&](Pair const & p) { map(p.first, p.second); }); // (--Wait for all workers to finish--) // Reshuffle emitted key/value pairs std::map<std::string, StringList> targetFiles; std::for_each(Emitted.begin(), Emitted.end(), [&](Pair const & p) { targetFiles[p.first].push_back(p.second); }); // Distribute the reduce part to workers (here: do them sequentially) std::for_each(targetFiles.begin(), targetFiles.end(), [&](std::pair<std::string, StringList> const & p) { reduce(p.first, p.second); }); } std::string ObjFileName(std::string const & srcFileName) { std::string oFile = srcFileName.substr(0, srcFileName.length() - 3); return oFile + "obj"; } // Maps target names to executable names std::map<std::string, std::string> ExeNames; void map(std::string const & file, std::string const & target) { std::string cmd = "cl /c "; cmd += file; execute(cmd); emit(target, ObjFileName(file)); } void reduce(std::string const & target, StringList const & objFiles) { std::string cmd = "link /out "; cmd += ExeNames[target]; std::for_each(objFiles.begin(), objFiles.end(), [&](std::string const & obj) { cmd += " "; cmd += obj; }); execute(cmd); } int main() { // There are two targets ExeNames["MyApp"] = "MyApp.exe"; ExeNames["YourApp"] = "YourApp.exe"; // The input is a list of key/value pairs // Key: source file name // Data: target name InputList input; input.push_back(std::make_pair("foo.cpp", "MyApp")); input.push_back(std::make_pair("bar.cpp", "MyApp")); input.push_back(std::make_pair("baz.cpp", "YourApp")); MapReduce(input, &map, &reduce); }
Here’s the output:
cl /c foo.cpp emit: (MyApp, foo.obj) cl /c bar.cpp emit: (MyApp, bar.obj) cl /c baz.cpp emit: (YourApp, baz.obj) link /out MyApp.exe foo.obj bar.obj link /out YourApp.exe baz.obj
November 4, 2010 at 1:24 pm
Won’t the worst case for the second phase of the topological sort algorithm require N M/R runs? It seems bounded by the maximum shortest path between nodes (i.e. the distance from the “first” to the “last” sorted node). And with M/R, this means you’ll probably be occupying multiple machines that effectively sit idle.
This seems to be an example of where MapReduce is a poor choice, at least for cases where number of required iterations is high (or where the number of tasks that can be completed in each iteration varies a lot).
November 4, 2010 at 7:28 pm
The MapReduce implementation of topological sort is not much different from the standard implementation–in particular, you go through the same number of iterations. So the tradeoff is mostly in terms of MapReduce communication overhead, vs. the amount of parallelization that can be explored.
If the DAG is shallow but it branches a lot, you’re likely to get speedup. On the other hand, if the whole graph fits in memory, you’re probably better off doing the sort locally (which is what make does). Whether you use a local version of MapReduce, or the standard algorithm probably makes little difference in that case.
August 9, 2012 at 4:02 pm
[…] MapReduce and the Build Process https://bartoszmilewski.com/2010/11/03/mapreduce-and-the-build-process/ […]