November 2010



I gave this presentation on Nov 17 at the Northwest C++ Users Group. Here are the links to the two-part video:

  1. Part I
  2. Part II

There was an unresolved discussion about one of the examples–the one about template specialization conflicting with modular type checking. It turns out the example was correct (that is an error would occur).

template<class T> struct vector {
    vector(int); // has constructor
};

//-- This one typechecks
template<LessThanComparable T>
void f(T x) { vector<T> v(100); ... }

//-- Specialization of vector for int
template<> struct vector<int> {
    // missing constructor!
};

int main() { f(4); } // error

The issue was whether the instantiation of f in main would see the specialization of the vector template for int even though the specialization occurs after the definition of f. Yes, it would, because vector<T> is a dependent name inside f. Dependent names are resolved in the context of template instantiation–here in the context of main, where the specialization is visible. Only non-dependent names are resolved in the context of template definition.

Advertisements

I’ve been planning on writing about the Google’s MapReduce algorithm for some time but I couldn’t find a good practical example. Then we had a Northwest C++ Users Group presentation by Steve Yegge and a followup discussion and beers, and I had a little epiphany. Steve was talking about, among other things, the build process. And that’s just a bunch of algorithms that are perfect for explaining MapReduce.

MapReduce is usually introduced in the context of distributed systems. It’s a system that spreads processing among multiple machines, often organized in huge server farms. But it can also be used on a single machine, to work with processes; or even in a single process, with threads. The beauty of it is that, if you write a build engine that uses MapReduce, you may scale it from a few threads to thousands of servers. It’s like writing a program that, rather than reading individual disk sectors, uses a file system. Such program will magically work on a USB stick as well as on a distributed file system.

The other hot trend, besides scalability, is data mining. Huge software projects are like the Internet. Developers spend a lot of time browsing source files. We need tools that are more than just search engines, we need smart tools that understand computer languages. And data mining fits very well into the MapReduce paradigm. There is fan-out of small independent tasks operating on single files, followed by a fan-in, during which data is combined. A perfect example of such an algorithm is mapping inter-file dependencies (mediated by include statements), something every build does to minimize rebuilding after a change.

I organized this blog to follow the old parable of three blind men and an elephant. The elephant in this case is the build. The first man touches the compile-link part of the build process and exclaims: “It’s MapReduce.” He has a well-developed sense of scale. The second one notices that not all files are rebuilt after a change and shouts: “It’s Whole Program Analysis.” He has an excellent data-mining ear. The third man, with a sequential nose, observes that not everything can be done in parallel and cries: “It’s a Topological Sort.”

So what is a build?

It’s MapReduce

Obviously, you can build a project on a single machine by performing a sequence of actions. I choose to treat this as a trivial case of a distributed process. Let’s start with the simplest thing: You have a bunch of C++ files and you want to compile them and link the resulting object files into an executable.

The separate compilation model tells us that the compilation of any source file (a Translation Unit, if you wish) is independent of the compilation of any other source file. Therefore, the compilation can (and I’d say, should) be run in parallel. You will not only make more cores busy, but also overlap I/O-intensive parts of compilation.

So the build engine would fan out a number of compilation tasks and, when they are all finished, combine the outputs (object files) and pass them to the linker. Let’s call the first part “mapping” and the second part “reducing.” That’s your MapReduce in a nutshell.

To be more specific, MapReduce abstracts this process so it may be reused in many different contexts. It’s a framework into which the client plugs in the parts that define what may be done in parallel and how to combine the results.

Here are the general steps:

  1. The client specifies input data. In our example this would be a list of files to be rebuilt.
  2. MapReduce partitions this list among multiple worker threads, processes, or machines.
  3. Each worker executes client-provided code–a function called map. In our example, we’d write map to run the compiler on a single file to produce the object file. This function “maps” a source file into an object file.
  4. MapReduce then waits for all workers to finish (possibly re-distributing the work if a server crashes). This barrier is the main synchronization point in the algorithm.
  5. MapReduce reshuffles the results and, again, distributes them to different workers for the final phase of the algorithm.
  6. Each worker executes client-provided code–a function called reduce. In our example we’d write reduce to execute a linker on a list of object files to produce the executable.

For all this to work, the client must organize data in a particular form that is palatable to MapReduce. The input must be a list of (key, data) pairs.

Since keys and data may be of arbitrary type, why this separation? MapReduce needs keys to be able to partition the work. Each key uniquely identifies a task that may be done in parallel with other tasks. In our case the key would be a source file name (or, in general, a path). The keys are sorted by MapReduce and partitioned equitably among workers (often the client has the option to override the default partitioning algorithm or the compare function).

The map function, written by the client, must conform to a particular (generic) interface. It takes a key and the associated data. In our case map would take a file name (key) and some data to be defined later. It would compile the file to an object. Notice that map is myopic by nature. It concerns itself only with a small fraction of the whole process. Because it doesn’t have to know about the big picture (here, the build), its implementation is relatively easy (here, it just calls the compiler).

The output of map has to conform to certain standards too, so it can be shuffled by MapReduce and passed to the second client-defined function, reduce. The map function must emit new pairs of (key, data). The new keys and the new data may be totally unrelated to the original ones, both in meaning and type.

Back to our example, to make reduce nontrivial, let’s shoot for a little more general scanario: we want to build multiple targets at once. In Visual Studio, for instance, building the solution might involve building several projects, each producing a separate executable, library, or a DLL. We’d make map emit pairs (target name, object name). (The target name will have to be passed to map as input data.)

Here’s my simplified implementation of map (a more complete toy example follows at the end of this post):

void map(std::string const & file, std::string const & target)
{
    std::string cmd = "cl /c ";
    cmd += file;
    execute(cmd);
    // output new (key, value) pair
    emit(target, ObjFileName(file));
}

When all workers are done, MapReduce gathers the emitted pairs and sorts them by the key so that it can accumulate data for each key into a separate list. It then redistributes the results among workers. As before, the keys define units of concurrency. Here we assume that each target may be linked independently (this is not always true: see “It’s a Topological Sort”).

The second-phase workers execute the client-defined reduce. The first argument to reduce is the new key, and the second is a list of data for this key.

In our example MapReduce would combine the outputs of all maps to build a list of object files corresponding to each build target. And that’s exactly what the linker needs. Here’s my toy implementation of reduce:

void reduce(std::string const & target, 
    std::list<std::string> const & objFiles)
{
    std::string cmd = "link /out ";
    cmd += ExeNames[target];
    std::for_each(objFiles.begin(), objFiles.end(), 
        [&](std::string const & obj) {
            cmd += " ";
            cmd += obj;
        });
    execute(cmd);
}

(I explain my use of lambdas in the Appendix.)

Of course this is a very simplified picture of just one part of the build process. Still, I suspect many a build environment use the same overall idea, and maybe some of them even use MapReduce to implement it.

There are many distributed build environment in use today. The difference is that they either put distribution on top of non-distributed builds or write one monolithic application. That restricts their reusability and leads to duplication of functionality. Here, on the other hand, distribution is a layer below the build engine. The MapReduce implementation, whether on a single machine or over a server farm, knows nothing about building software projects, so it doesn’t duplicate any functionality and is perfectly reusable. It’s a matter of whether the build engine uses, say, message-passing API or, more abstracted, MapReduce API.

It’s Whole Program Analysis

Separate compilation model means that the compiler has access to only one source file at a time. The build environment, on the other hand, has access to the whole program. So if there is any need for whole program analysis, that’s the natural place to put it.

Figuring out dependencies between program files is an example of whole program analysis. The build needs it because knowing the dependency graph allows it to recompile only the minimum number of files after one of them (for instance, a header file) changes. There is a program, makedepend, that finds such dependencies by looking at the #include statements.

Inverted Index

Finding dependencies is a lot like creation of an inverted index — a classic example of how to use MapReduce. The map function scans a given document and emits a stream of pairs, (word, document). The reduce function gets a word as the key and a list of documents in which this word occurs (possibly with some positional information). The bulk of the work is done inside MapReduce by sorting the results of map by the word.

As an exercise, let’s see how a simple makedepend could be implemented using MapReduce. Suppose that we have a dedicated parser (or a smart preprocessor) that takes a source file and finds all the includes on which it depends. (In C++, the parser must do it recursively, going into includes of includes, etc. See item 7 in Walter Bright’s post.)

What the build process really needs is the answer to a different question: Which files must be recompiled if a given include file changes?

Here’s how we may answer this question using MapReduce. Our map function should take a source file name as key and run our parser over this file. (We also have to know what constants are predefined for a given build target, because of conditional compilation. We could pass them as data to map.)

What map would emit is a stream of pairs, (include file, current source file). Notice the inversion: For map, the name of the source file was the key; here the name of the include file is the key. Because of that trick, MapReduce will do the bulk of work for us. It will sort and group data by the new key. Thus the input to our reduce function will be the name of an include file and a list of all source files that depend on it. These are the files that will have to be recompiled if that header file changes.

Here again, a higher level of abstraction–writing the dependency engine on top of MapReduce–provides better scalability and avoids code duplication. It also opens up new possibilities. Consider the building of a dependency graph as an example of data mining. In general, a build engine that works on top of MapReduce could easily dispatch all kinds of bots that extract local data from every file and combine them into a global index or database.

In particular, many IDEs attempt, with higher or lower degree of success, to build call graphs, definition/use graphs, inheritance graphs, etc.

Also, there are programs that can infer const-ness or, in Java, non-null-ness. In dynamic languages, type inference is very useful, and it requires whole-program analysis. Not to mention programs that can find potential data races or deadlocks. They all follow the same pattern and can easily be fit into a MapReduce build engine.

It’s a Topological Sort

The build process must also deal with other types of dependencies: when the result of one operation is a prerequisite for another operation. If you’ve ever built a compiler using flex and bison, you know that before you can run flex you need to run bison to produce the appropriate include files that contain, among others, definitions of tokens. These kinds of dependencies are usually explicitly written into makefiles, as in this example:

lang.tab.hpp lang.tab.cpp: lang.ypp
    bison lang.ypp
lex.yy.c: lang.lex lang.tab.hpp
    flex lang.lex

When you have results of some operations being prerequisites for other operations, you use the good old topological sort to organize your work. It so happens that topological sort may also be done using MapReduce (see, for instance, Ricky Ho’s blog).

I’ll explain it using a classic example: getting dressed in the morning. Our set consists of objects of clothing:

{jeans, lsock, rsock, lshoe, rshoe}

You can’t get dressed by putting on those objects in random order. Traditionally, you put on a sock before you put on a shoe. A sock must be on the list of prerequisites for a shoe.

In the preliminary stage of topological sort, we prepare a list of pairs, (object, prerequisite). In our case, the list consists of:

(lshoe, lsock)
(lshoe, jeans)
(rshoe, rsock)
(rshoe, jeans) 

Our map1 iterates over the list of prerequisites for a given object and emits inverted pairs: the prerequisite is the key, and the object is the data. In other words, it emits (object, dependent) pairs.

void map1(std::string const & object, std::string const & prereq)
{
    emit(prereq, object);
}

MapReduce sorts and reshuffles those pairs and then calls reduce1.

void reduce1(std::string const & object, StringList const & dependents)
{
    TheDependentsOf[object] = dependents;
};

Now we have, for each file, both the original list of prerequisites, and the newly created list of dependents. For the sake of this exposition, I’ll store these list in two global maps:

StringToList ThePrerequisitesOf;
StringToList TheDependentsOf;

In our case, the resulting dependents map will contain:

TheDependentsOf[jeans] = {lshoe, rshoe};
The DependentsOf[lsock] = {lshoe};
TheDependentsOf[rsock] = {rshoe};

The second part of the algorithm is a loop in which MapReduce is called repeatedly until all objects are consumed (I mean, put on).

The map2 function takes an object of clothing as key and a pair of lists, its prerequisites and its dependents, as data. If the list of prerequisites is not empty, it does nothing–we can’t put on that object yet.

Otherwise it performs the action: “Put the object on.” In our case, jeans and socks have no prerequisites, so they are put on immediately. Once the object is put on, our map2 removes that object from the list of objects. It also iterates over the list of its dependents and emits inverted pairs, (dependent, current object). For instance, once the jeans are put on, two pairs are emitted:

(lshoe, jeans),
(rshoe, jeans)

because both shoes depended on the jeans.

Here’s the code:

void map2(std::string const & object, TwoLists const & preqsAndDepds)
{
    if (preqsAndDepds.first.empty())
    {
        // This object has no prerequisites. Put it on.
        std::string cmd = "put on ";
        execute(cmd + object);
        // Remove it from the global list
        TheObjects.erase(
            std::find(TheObjects.begin(), TheObjects.end(), object));
        // Emit pairs (object, its completed prerequisite)
        StringList const & depds = preqsAndDepds.second;
        std::for_each(depds.begin(), depds.end(),
            [&](std::string const & dep) {
                emit(dep, object);
            });
    }
}

The reduce2 function is then called with an object and the list of its completed prerequisites. In our case, the arguments will be:

lshoe, {jeans, lsock}
rshoe, {jeans, rsock}

The reduce2 function removes the completed prerequisites from the list of all prerequisites for a given object.

In our case, reduce2 will remove both jeans and the left sock from ThePrerequisitsOf[lshoe].

It will also remove the jeans and the right sock from ThePrerequisitesOf[rshoe].

void reduce2(std::string const & object, StringList const & complPrereqs)
{
    std::for_each(complPrereqs.begin(), complPrereqs.end(),
        [&](std::string const & completed) {
            StringList & lst = ThePrerequisitesOf[object];
            lst.erase(std::find(lst.begin(), lst.end(), completed));
        });
}

As a result, the shoes end up with no prerequisites, so they will be put on in the next (and last) iteration of MapReduce.

Limitations of MapReduce

MapReduce is one particular approach to data-driven parallelism. In general it’s not a good fit for problems that exhibit task-driven parallelism. But even within the data-driven domain, MapReduce has competitors, one of them being PGAS (Partitioned Global Address Space; see my post on The Future of Concurrent Programming). Let’s briefly compare the two approaches.

MapReduce is closer to the functional-programming, message-passing, data-copying paradigm. PGAS is a generalization of shared-memory paradigm. It creates the illusion of global address space spanning multiple processes or machines. Consequently, MapReduce doesn’t require explicit synchronization while PGAS often does. On the other hand, MapReduce incurs more data copying even if it runs on threads of the same process (I’ll try to quantify this in my next post).

MapReduce requires a particular form of input — a set of (key, data) pairs. The distribution of work is encoded in the input and is driven by the choice of keys–each key defining a minimum unit of concurrency. PGAS works on shared data structures, typically multi-dimensional arrays. Distribution of work is abstracted from data and from the algorithm– it is defined by distribution maps over data structures (this is also explained in my previous post). A distribution may be modified without changing any other part of the algorithm. With MapReduce that would require either the change of keys, or a client-defined partitioning function.

A lot of MapReduce applications end up sharing data one way or another. Even my little example assumes that files, in particular header files, are accessible form every location. This is usually made possible through some kind of distributed file system. Google, the owner of the patent on MapReduce (I just hope they won’t “do evil” by suing other companies or individuals who use it or, gasp!, blog about it), has its own GFS (Google File System) that’s used in concert with MapReduce.

Finally, it’s not clear what fraction of problems lend themselves to MapReduce. Scientific simulations, for instance, are usually easier to describe in terms of large matrices, and therefore fit the PGAS mold more naturally. I plan to write more on this topic in my next post.

Appendix: Bob the Builder

Just for fun, below is a short C++ program I wrote to illustrate the use of MapReduce in the build process. Since MapReduce has its roots in functional programming, I felt free to use the C++0x lambdas at my convenience. For instance, in this snippet:

[&](Pair const & p) { targetFiles[p.first].push_back(p.second); }

I define an anonymous function that takes a Pair (by const reference) and adds its second component to the list targetFiles[p.first].

Notice that targetFiles is defined outside of the scope of the lambda–it is captured from the environment. A function that captures the environment is called a closure. Here I told the compiler to capture the whole environment, by reference, using the notation [&].

// A toy implementation of MapReduce. Sorry for using globals.

#include <iostream>
#include <string>
#include <list>
#include <map>
#include <algorithm>

std::list<std::pair<std::string, std::string>> Emitted;

void emit(std::string const & key, std::string const & data)
{
   std::cout << "  emit: (" << key.c_str() << ", " 
      << data.c_str() << ")\n";
   Emitted.push_back(make_pair(key, data));
}

void execute(std::string const & cmd)
{
   std::cout << cmd.c_str() << std::endl;
}

typedef std::pair<std::string, std::string> Pair;
typedef std::list<Pair> InputList;
typedef std::list<std::string> StringList;

void MapReduce(InputList const & input, 
   void (*map)(std::string const & key, std::string const & data),
   void (*reduce)(std::string const & key, StringList const & lst))
{
   // Distribute the map part to workers (here: do them sequentially)
   std::for_each(input.begin(), input.end(), [&](Pair const & p) {
      map(p.first, p.second);
   });
   // (--Wait for all workers to finish--)
   // Reshuffle emitted key/value pairs
   std::map<std::string, StringList> targetFiles;
   std::for_each(Emitted.begin(), Emitted.end(), 
      [&](Pair const & p) {
         targetFiles[p.first].push_back(p.second);
      });
   // Distribute the reduce part to workers (here: do them sequentially)
   std::for_each(targetFiles.begin(), targetFiles.end(), 
      [&](std::pair<std::string, StringList> const & p) {
         reduce(p.first, p.second);
      });
}

std::string ObjFileName(std::string const & srcFileName)
{
   std::string oFile = srcFileName.substr(0, srcFileName.length() - 3);
   return oFile + "obj";
}

// Maps target names to executable names
std::map<std::string, std::string> ExeNames;

void map(std::string const & file, std::string const & target)
{
   std::string cmd = "cl /c ";
   cmd += file;
   execute(cmd);
   emit(target, ObjFileName(file));
}

void reduce(std::string const & target, StringList const & objFiles)
{
   std::string cmd = "link /out ";
   cmd += ExeNames[target];
   std::for_each(objFiles.begin(), objFiles.end(), 
      [&](std::string const & obj) {
         cmd += " ";
         cmd += obj;
      });
   execute(cmd);
}

int main()
{
   // There are two targets
   ExeNames["MyApp"] = "MyApp.exe";
   ExeNames["YourApp"] = "YourApp.exe";
   // The input is a list of key/value pairs
   // Key: source file name
   // Data: target name
   InputList input;
   input.push_back(std::make_pair("foo.cpp", "MyApp"));
   input.push_back(std::make_pair("bar.cpp", "MyApp"));
   input.push_back(std::make_pair("baz.cpp", "YourApp"));

   MapReduce(input, &map, &reduce);
}

Here’s the output:

cl /c foo.cpp
  emit: (MyApp, foo.obj)
cl /c bar.cpp
  emit: (MyApp, bar.obj)
cl /c baz.cpp
  emit: (YourApp, baz.obj)
link /out MyApp.exe foo.obj bar.obj
link /out YourApp.exe baz.obj