Parallelism



I’ve been planning on writing about the Google’s MapReduce algorithm for some time but I couldn’t find a good practical example. Then we had a Northwest C++ Users Group presentation by Steve Yegge and a followup discussion and beers, and I had a little epiphany. Steve was talking about, among other things, the build process. And that’s just a bunch of algorithms that are perfect for explaining MapReduce.

MapReduce is usually introduced in the context of distributed systems. It’s a system that spreads processing among multiple machines, often organized in huge server farms. But it can also be used on a single machine, to work with processes; or even in a single process, with threads. The beauty of it is that, if you write a build engine that uses MapReduce, you may scale it from a few threads to thousands of servers. It’s like writing a program that, rather than reading individual disk sectors, uses a file system. Such program will magically work on a USB stick as well as on a distributed file system.

The other hot trend, besides scalability, is data mining. Huge software projects are like the Internet. Developers spend a lot of time browsing source files. We need tools that are more than just search engines, we need smart tools that understand computer languages. And data mining fits very well into the MapReduce paradigm. There is fan-out of small independent tasks operating on single files, followed by a fan-in, during which data is combined. A perfect example of such an algorithm is mapping inter-file dependencies (mediated by include statements), something every build does to minimize rebuilding after a change.

I organized this blog to follow the old parable of three blind men and an elephant. The elephant in this case is the build. The first man touches the compile-link part of the build process and exclaims: “It’s MapReduce.” He has a well-developed sense of scale. The second one notices that not all files are rebuilt after a change and shouts: “It’s Whole Program Analysis.” He has an excellent data-mining ear. The third man, with a sequential nose, observes that not everything can be done in parallel and cries: “It’s a Topological Sort.”

So what is a build?

It’s MapReduce

Obviously, you can build a project on a single machine by performing a sequence of actions. I choose to treat this as a trivial case of a distributed process. Let’s start with the simplest thing: You have a bunch of C++ files and you want to compile them and link the resulting object files into an executable.

The separate compilation model tells us that the compilation of any source file (a Translation Unit, if you wish) is independent of the compilation of any other source file. Therefore, the compilation can (and I’d say, should) be run in parallel. You will not only make more cores busy, but also overlap I/O-intensive parts of compilation.

So the build engine would fan out a number of compilation tasks and, when they are all finished, combine the outputs (object files) and pass them to the linker. Let’s call the first part “mapping” and the second part “reducing.” That’s your MapReduce in a nutshell.

To be more specific, MapReduce abstracts this process so it may be reused in many different contexts. It’s a framework into which the client plugs in the parts that define what may be done in parallel and how to combine the results.

Here are the general steps:

  1. The client specifies input data. In our example this would be a list of files to be rebuilt.
  2. MapReduce partitions this list among multiple worker threads, processes, or machines.
  3. Each worker executes client-provided code–a function called map. In our example, we’d write map to run the compiler on a single file to produce the object file. This function “maps” a source file into an object file.
  4. MapReduce then waits for all workers to finish (possibly re-distributing the work if a server crashes). This barrier is the main synchronization point in the algorithm.
  5. MapReduce reshuffles the results and, again, distributes them to different workers for the final phase of the algorithm.
  6. Each worker executes client-provided code–a function called reduce. In our example we’d write reduce to execute a linker on a list of object files to produce the executable.

For all this to work, the client must organize data in a particular form that is palatable to MapReduce. The input must be a list of (key, data) pairs.

Since keys and data may be of arbitrary type, why this separation? MapReduce needs keys to be able to partition the work. Each key uniquely identifies a task that may be done in parallel with other tasks. In our case the key would be a source file name (or, in general, a path). The keys are sorted by MapReduce and partitioned equitably among workers (often the client has the option to override the default partitioning algorithm or the compare function).

The map function, written by the client, must conform to a particular (generic) interface. It takes a key and the associated data. In our case map would take a file name (key) and some data to be defined later. It would compile the file to an object. Notice that map is myopic by nature. It concerns itself only with a small fraction of the whole process. Because it doesn’t have to know about the big picture (here, the build), its implementation is relatively easy (here, it just calls the compiler).

The output of map has to conform to certain standards too, so it can be shuffled by MapReduce and passed to the second client-defined function, reduce. The map function must emit new pairs of (key, data). The new keys and the new data may be totally unrelated to the original ones, both in meaning and type.

Back to our example, to make reduce nontrivial, let’s shoot for a little more general scanario: we want to build multiple targets at once. In Visual Studio, for instance, building the solution might involve building several projects, each producing a separate executable, library, or a DLL. We’d make map emit pairs (target name, object name). (The target name will have to be passed to map as input data.)

Here’s my simplified implementation of map (a more complete toy example follows at the end of this post):

void map(std::string const & file, std::string const & target)
{
    std::string cmd = "cl /c ";
    cmd += file;
    execute(cmd);
    // output new (key, value) pair
    emit(target, ObjFileName(file));
}

When all workers are done, MapReduce gathers the emitted pairs and sorts them by the key so that it can accumulate data for each key into a separate list. It then redistributes the results among workers. As before, the keys define units of concurrency. Here we assume that each target may be linked independently (this is not always true: see “It’s a Topological Sort”).

The second-phase workers execute the client-defined reduce. The first argument to reduce is the new key, and the second is a list of data for this key.

In our example MapReduce would combine the outputs of all maps to build a list of object files corresponding to each build target. And that’s exactly what the linker needs. Here’s my toy implementation of reduce:

void reduce(std::string const & target, 
    std::list<std::string> const & objFiles)
{
    std::string cmd = "link /out ";
    cmd += ExeNames[target];
    std::for_each(objFiles.begin(), objFiles.end(), 
        [&](std::string const & obj) {
            cmd += " ";
            cmd += obj;
        });
    execute(cmd);
}

(I explain my use of lambdas in the Appendix.)

Of course this is a very simplified picture of just one part of the build process. Still, I suspect many a build environment use the same overall idea, and maybe some of them even use MapReduce to implement it.

There are many distributed build environment in use today. The difference is that they either put distribution on top of non-distributed builds or write one monolithic application. That restricts their reusability and leads to duplication of functionality. Here, on the other hand, distribution is a layer below the build engine. The MapReduce implementation, whether on a single machine or over a server farm, knows nothing about building software projects, so it doesn’t duplicate any functionality and is perfectly reusable. It’s a matter of whether the build engine uses, say, message-passing API or, more abstracted, MapReduce API.

It’s Whole Program Analysis

Separate compilation model means that the compiler has access to only one source file at a time. The build environment, on the other hand, has access to the whole program. So if there is any need for whole program analysis, that’s the natural place to put it.

Figuring out dependencies between program files is an example of whole program analysis. The build needs it because knowing the dependency graph allows it to recompile only the minimum number of files after one of them (for instance, a header file) changes. There is a program, makedepend, that finds such dependencies by looking at the #include statements.

Inverted Index

Finding dependencies is a lot like creation of an inverted index — a classic example of how to use MapReduce. The map function scans a given document and emits a stream of pairs, (word, document). The reduce function gets a word as the key and a list of documents in which this word occurs (possibly with some positional information). The bulk of the work is done inside MapReduce by sorting the results of map by the word.

As an exercise, let’s see how a simple makedepend could be implemented using MapReduce. Suppose that we have a dedicated parser (or a smart preprocessor) that takes a source file and finds all the includes on which it depends. (In C++, the parser must do it recursively, going into includes of includes, etc. See item 7 in Walter Bright’s post.)

What the build process really needs is the answer to a different question: Which files must be recompiled if a given include file changes?

Here’s how we may answer this question using MapReduce. Our map function should take a source file name as key and run our parser over this file. (We also have to know what constants are predefined for a given build target, because of conditional compilation. We could pass them as data to map.)

What map would emit is a stream of pairs, (include file, current source file). Notice the inversion: For map, the name of the source file was the key; here the name of the include file is the key. Because of that trick, MapReduce will do the bulk of work for us. It will sort and group data by the new key. Thus the input to our reduce function will be the name of an include file and a list of all source files that depend on it. These are the files that will have to be recompiled if that header file changes.

Here again, a higher level of abstraction–writing the dependency engine on top of MapReduce–provides better scalability and avoids code duplication. It also opens up new possibilities. Consider the building of a dependency graph as an example of data mining. In general, a build engine that works on top of MapReduce could easily dispatch all kinds of bots that extract local data from every file and combine them into a global index or database.

In particular, many IDEs attempt, with higher or lower degree of success, to build call graphs, definition/use graphs, inheritance graphs, etc.

Also, there are programs that can infer const-ness or, in Java, non-null-ness. In dynamic languages, type inference is very useful, and it requires whole-program analysis. Not to mention programs that can find potential data races or deadlocks. They all follow the same pattern and can easily be fit into a MapReduce build engine.

It’s a Topological Sort

The build process must also deal with other types of dependencies: when the result of one operation is a prerequisite for another operation. If you’ve ever built a compiler using flex and bison, you know that before you can run flex you need to run bison to produce the appropriate include files that contain, among others, definitions of tokens. These kinds of dependencies are usually explicitly written into makefiles, as in this example:

lang.tab.hpp lang.tab.cpp: lang.ypp
    bison lang.ypp
lex.yy.c: lang.lex lang.tab.hpp
    flex lang.lex

When you have results of some operations being prerequisites for other operations, you use the good old topological sort to organize your work. It so happens that topological sort may also be done using MapReduce (see, for instance, Ricky Ho’s blog).

I’ll explain it using a classic example: getting dressed in the morning. Our set consists of objects of clothing:

{jeans, lsock, rsock, lshoe, rshoe}

You can’t get dressed by putting on those objects in random order. Traditionally, you put on a sock before you put on a shoe. A sock must be on the list of prerequisites for a shoe.

In the preliminary stage of topological sort, we prepare a list of pairs, (object, prerequisite). In our case, the list consists of:

(lshoe, lsock)
(lshoe, jeans)
(rshoe, rsock)
(rshoe, jeans) 

Our map1 iterates over the list of prerequisites for a given object and emits inverted pairs: the prerequisite is the key, and the object is the data. In other words, it emits (object, dependent) pairs.

void map1(std::string const & object, std::string const & prereq)
{
    emit(prereq, object);
}

MapReduce sorts and reshuffles those pairs and then calls reduce1.

void reduce1(std::string const & object, StringList const & dependents)
{
    TheDependentsOf[object] = dependents;
};

Now we have, for each file, both the original list of prerequisites, and the newly created list of dependents. For the sake of this exposition, I’ll store these list in two global maps:

StringToList ThePrerequisitesOf;
StringToList TheDependentsOf;

In our case, the resulting dependents map will contain:

TheDependentsOf[jeans] = {lshoe, rshoe};
The DependentsOf[lsock] = {lshoe};
TheDependentsOf[rsock] = {rshoe};

The second part of the algorithm is a loop in which MapReduce is called repeatedly until all objects are consumed (I mean, put on).

The map2 function takes an object of clothing as key and a pair of lists, its prerequisites and its dependents, as data. If the list of prerequisites is not empty, it does nothing–we can’t put on that object yet.

Otherwise it performs the action: “Put the object on.” In our case, jeans and socks have no prerequisites, so they are put on immediately. Once the object is put on, our map2 removes that object from the list of objects. It also iterates over the list of its dependents and emits inverted pairs, (dependent, current object). For instance, once the jeans are put on, two pairs are emitted:

(lshoe, jeans),
(rshoe, jeans)

because both shoes depended on the jeans.

Here’s the code:

void map2(std::string const & object, TwoLists const & preqsAndDepds)
{
    if (preqsAndDepds.first.empty())
    {
        // This object has no prerequisites. Put it on.
        std::string cmd = "put on ";
        execute(cmd + object);
        // Remove it from the global list
        TheObjects.erase(
            std::find(TheObjects.begin(), TheObjects.end(), object));
        // Emit pairs (object, its completed prerequisite)
        StringList const & depds = preqsAndDepds.second;
        std::for_each(depds.begin(), depds.end(),
            [&](std::string const & dep) {
                emit(dep, object);
            });
    }
}

The reduce2 function is then called with an object and the list of its completed prerequisites. In our case, the arguments will be:

lshoe, {jeans, lsock}
rshoe, {jeans, rsock}

The reduce2 function removes the completed prerequisites from the list of all prerequisites for a given object.

In our case, reduce2 will remove both jeans and the left sock from ThePrerequisitsOf[lshoe].

It will also remove the jeans and the right sock from ThePrerequisitesOf[rshoe].

void reduce2(std::string const & object, StringList const & complPrereqs)
{
    std::for_each(complPrereqs.begin(), complPrereqs.end(),
        [&](std::string const & completed) {
            StringList & lst = ThePrerequisitesOf[object];
            lst.erase(std::find(lst.begin(), lst.end(), completed));
        });
}

As a result, the shoes end up with no prerequisites, so they will be put on in the next (and last) iteration of MapReduce.

Limitations of MapReduce

MapReduce is one particular approach to data-driven parallelism. In general it’s not a good fit for problems that exhibit task-driven parallelism. But even within the data-driven domain, MapReduce has competitors, one of them being PGAS (Partitioned Global Address Space; see my post on The Future of Concurrent Programming). Let’s briefly compare the two approaches.

MapReduce is closer to the functional-programming, message-passing, data-copying paradigm. PGAS is a generalization of shared-memory paradigm. It creates the illusion of global address space spanning multiple processes or machines. Consequently, MapReduce doesn’t require explicit synchronization while PGAS often does. On the other hand, MapReduce incurs more data copying even if it runs on threads of the same process (I’ll try to quantify this in my next post).

MapReduce requires a particular form of input — a set of (key, data) pairs. The distribution of work is encoded in the input and is driven by the choice of keys–each key defining a minimum unit of concurrency. PGAS works on shared data structures, typically multi-dimensional arrays. Distribution of work is abstracted from data and from the algorithm– it is defined by distribution maps over data structures (this is also explained in my previous post). A distribution may be modified without changing any other part of the algorithm. With MapReduce that would require either the change of keys, or a client-defined partitioning function.

A lot of MapReduce applications end up sharing data one way or another. Even my little example assumes that files, in particular header files, are accessible form every location. This is usually made possible through some kind of distributed file system. Google, the owner of the patent on MapReduce (I just hope they won’t “do evil” by suing other companies or individuals who use it or, gasp!, blog about it), has its own GFS (Google File System) that’s used in concert with MapReduce.

Finally, it’s not clear what fraction of problems lend themselves to MapReduce. Scientific simulations, for instance, are usually easier to describe in terms of large matrices, and therefore fit the PGAS mold more naturally. I plan to write more on this topic in my next post.

Appendix: Bob the Builder

Just for fun, below is a short C++ program I wrote to illustrate the use of MapReduce in the build process. Since MapReduce has its roots in functional programming, I felt free to use the C++0x lambdas at my convenience. For instance, in this snippet:

[&](Pair const & p) { targetFiles[p.first].push_back(p.second); }

I define an anonymous function that takes a Pair (by const reference) and adds its second component to the list targetFiles[p.first].

Notice that targetFiles is defined outside of the scope of the lambda–it is captured from the environment. A function that captures the environment is called a closure. Here I told the compiler to capture the whole environment, by reference, using the notation [&].

// A toy implementation of MapReduce. Sorry for using globals.

#include <iostream>
#include <string>
#include <list>
#include <map>
#include <algorithm>

std::list<std::pair<std::string, std::string>> Emitted;

void emit(std::string const & key, std::string const & data)
{
   std::cout << "  emit: (" << key.c_str() << ", " 
      << data.c_str() << ")\n";
   Emitted.push_back(make_pair(key, data));
}

void execute(std::string const & cmd)
{
   std::cout << cmd.c_str() << std::endl;
}

typedef std::pair<std::string, std::string> Pair;
typedef std::list<Pair> InputList;
typedef std::list<std::string> StringList;

void MapReduce(InputList const & input, 
   void (*map)(std::string const & key, std::string const & data),
   void (*reduce)(std::string const & key, StringList const & lst))
{
   // Distribute the map part to workers (here: do them sequentially)
   std::for_each(input.begin(), input.end(), [&](Pair const & p) {
      map(p.first, p.second);
   });
   // (--Wait for all workers to finish--)
   // Reshuffle emitted key/value pairs
   std::map<std::string, StringList> targetFiles;
   std::for_each(Emitted.begin(), Emitted.end(), 
      [&](Pair const & p) {
         targetFiles[p.first].push_back(p.second);
      });
   // Distribute the reduce part to workers (here: do them sequentially)
   std::for_each(targetFiles.begin(), targetFiles.end(), 
      [&](std::pair<std::string, StringList> const & p) {
         reduce(p.first, p.second);
      });
}

std::string ObjFileName(std::string const & srcFileName)
{
   std::string oFile = srcFileName.substr(0, srcFileName.length() - 3);
   return oFile + "obj";
}

// Maps target names to executable names
std::map<std::string, std::string> ExeNames;

void map(std::string const & file, std::string const & target)
{
   std::string cmd = "cl /c ";
   cmd += file;
   execute(cmd);
   emit(target, ObjFileName(file));
}

void reduce(std::string const & target, StringList const & objFiles)
{
   std::string cmd = "link /out ";
   cmd += ExeNames[target];
   std::for_each(objFiles.begin(), objFiles.end(), 
      [&](std::string const & obj) {
         cmd += " ";
         cmd += obj;
      });
   execute(cmd);
}

int main()
{
   // There are two targets
   ExeNames["MyApp"] = "MyApp.exe";
   ExeNames["YourApp"] = "YourApp.exe";
   // The input is a list of key/value pairs
   // Key: source file name
   // Data: target name
   InputList input;
   input.push_back(std::make_pair("foo.cpp", "MyApp"));
   input.push_back(std::make_pair("bar.cpp", "MyApp"));
   input.push_back(std::make_pair("baz.cpp", "YourApp"));

   MapReduce(input, &map, &reduce);
}

Here’s the output:

cl /c foo.cpp
  emit: (MyApp, foo.obj)
cl /c bar.cpp
  emit: (MyApp, bar.obj)
cl /c baz.cpp
  emit: (YourApp, baz.obj)
link /out MyApp.exe foo.obj bar.obj
link /out YourApp.exe baz.obj

Concurrent programming has been around for quite some time (almost half a century), but it was mostly accessible to the highest ranks of the programming priesthood. This changed when, in the 2000s, concurrency entered prime time, prodded by the ubiquity of multicore processors. The industry can no longer afford to pay for hand-crafted hand-debugged concurrent solutions. The search is on for programming paradigms that lead to high productivity and reliability.

Recently DARPA created its HPCS, High Productivity Computing Systems, program (notice the stress on productivity), and is funding research that lead to, among other things, the development of new programming languages that support concurrency. I had a peek at those languages and saw some very interesting developments which I’d like to share with you. But first let me give you some perspective on the current state of concurrency.

Latency vs. Parallel Performance

We are living in the world of multicores and our biggest challenge is to make use of parallelism to speed up the execution of programs. As Herb Sutter famously observed, we can no longer count on the speed of processors increasing exponentially. If we want our programs to run faster we have to find ways to take advantage of the exponential increase in the number of available processors.

Even before multicores, concurrency had been widely used to reduce latencies and to take advantage of the parallelism of some peripherals (e.g., disks). Low latency is particularly important in server architectures, where you can’t wait for the completion of one request before you pick up the next one.

Initially, the same approach that was used to reduce latency–creating threads and using message-passing to communicate between them–has been used to improve parallel performance. It worked, but it turned out to be extremely tedious as a programming paradigm.

There will always be a niche for client/server architectures, which are traditionally based on direct use of threads and message passing. But in this post I will limit myself to strategies for maximizing program performance using parallelism. These strategies are becoming exponentially more important with time.

Threads? Who Needs Threads?

There are two extremes in multithreading. One is to make each thread execute different code–they all have different thread functions (MPMD–Multiple Programs Multiple Data, or even MPSD–Multiple Programs Single Data). This approach doesn’t scale very well with the number of cores–after all there is a fixed number of thread functions in one program no matter how many cores it’s running on.

The other extreme is to spawn many threads that essentially do the same thing (SPMD, Single Program Multiple Data). On the very extreme we have SIMD (Single Instruction Multiple Data), the approach taken, e.g., graphics accelerators. There, multiple cores execute the same instructions in lockstep. The trick is to start them with slightly different initial states and data sets (the MD part), so they all do independently useful work. With SPMD, you don’t have to write lots of different thread functions and you have a better chance at scaling with the number of cores.

There are two kinds of SPMD. When you start with a large data set and split it into smaller chunks and create separate threads to process each chunk, it’s called data-driven parallelism. If, on the other hand, you have a loop whose iterations don’t depend on each other (no data dependencies), and you create multiple threads to process individual iterations (or groups thereof), it’s called control-driven parallelism.

These types of parallelism become more and more important because they allow programs to be written independent of the number of available cores. And, more importantly, they can be partially automated (see my blog on semi-implicit parallelism).

In the past, to explore parallelism, you had to create your own thread pools, assign tasks to them, balance the loads, etc., by hand. Having this capacity built into the language (or a library) takes your mind off the gory details. You gain productivity not by manipulating threads but by identifying the potential for parallelism and letting the compiler and the runtime take care of the details.

In task-driven parallelism, the programmer is free to pick arbitrary granularity for potential parallelizability, rather than being forced into the large-grain of system threads. As always, one more degree of indirection solves the problem. The runtime may chose to multiplex tasks between threads and implement work stealing queues, like it’s done in Haskell, TPL, or TBB. Programming with tasks rather than threads is also less obtrusive, especially if it has direct support in the language.

Shared Memory or Message Passing?

Threads share memory, so it’s natural to use shared memory for communication between threads. It’s fast but, unfortunately, plagued with data races. To avoid races, access to shared (mutable) memory must be synchronized. Synchronization is usually done using locks (critical sections, semaphores, etc.). It’s up to the programmer to come up with locking protocols that eliminate races and don’t lead to deadlocks. This, unfortunately, is hard, very hard. It’s definitely not a high-productivity paradigm.

One way to improve the situation is to hide the locking protocols inside message queues and switch to message passing (MP). The programmer no longer worries about data races or deadlocks. As a bonus, MP scales naturally to distributed, multi-computer, programming. Erlang is the prime example of a programming language that uses MP exclusively as its concurrency paradigm. So why isn’t everybody happy with just passing messages?

Unfortunately not all problems lend themselves easily to MP solutions–in particular, data driven parallelism is notoriously hard to express using message passing. And then there is the elephant in the room–inversion of control.

We think linearly and we write (and read) programs linearly–line by line. The more non-linear the program gets, the harder it is to design, code, and maintain it (gotos are notorious for delinearizing programs). We can pretty easily deal with some of the simpler MP protocols–like the synchronous one. You send a message and wait for the response. In fact object oriented programming is based on such a protocol–in the Smalltalk parlance you don’t “call” a method, you “send a message” to an object. Things get a little harder when you send an asynchronous message, then do some other work, and finally “force” the answer (that’s how futures work); although that’s still manageable. But if you send a message to one thread and set up a handler for the result in another, or have one big receive or select statement at the top to process heterogeneous messages from different sources, you are heading towards the land of spaghetti. If you’re not careful, your program turns into a collection of handlers that keep firing at random times. You are no longer controlling the flow of execution; it’s the flow of messages that’s controlling you. Again, programmer productivity suffers. (Some research shows that the total effort to write an MPI application is significantly higher than that required to write a shared-memory version of it.)

Back to shared memory, this time without locks, but with transactional support. STM, or Software Transactional Memory, is a relatively new paradigm that’s been successfully implemented in Haskell, where the type system makes it virtually fool-proof. So far, implementations of STM in other languages haven’t been as successful, mostly because of problems with performance, isolation, and I/O. But that might change in the future–at least that’s the hope.

What is great about STM is the ease of use and reliability. You access shared memory, either for reading of writing, within atomic blocks. All code inside an atomic block executes as if there were no other threads, so there’s no need for locking. And, unlike lock-based programming, STM scales well.

There is a classic STM example in which, within a single atomic block, money is transferred between two bank accounts that are concurrently accessed by other threads. This is very hard to do with traditional locks, since you must lock both accounts before you do the transfer. That forces you not only to expose those locks but also puts you in risk of deadlocks.

As far as programmer productivity goes, STM is a keeper.

Three HPCS Languages

Yesterday’s supercomputers are today’s desktops and tomorrow’s phones. So it makes sense to look at the leading edge in supercomputer research for hints about the future. As I mentioned in the introduction, there is a well-funded DARPA program, HPCS, to develop concurrent systems. There were three companies in stage 2 of this program, and each of them decided to develop a new language:

(Sun didn’t get to the third stage, but Fortress is still under development.)

I looked at all three languages and was surprised at how similar they were. Three independent teams came up with very similar solutions–that can’t be a coincidence. For instance, all three adopted the shared address space abstraction. Mind you, those languages are supposed to cover a large area: from single-computer multicore programming (in some cases even on an SIMD graphics processor) to distributed programming over huge server farms. You’d think they’d be using message passing, which scales reasonably well between the two extremes. And indeed they do, but without exposing it to the programmer. Message passing is a hidden implementation detail. It’s considered too low-level to bother the programmer with.

Running on PGAS

All three HPCS languages support the shared address space abstraction through some form of PGAS, Partitioned Global Address Space. PGAS provides a unified way of addressing memory across machines in a cluster of computers. The global address space is partitioned into various locales (places in X10 and regions in Fortress) corresponding to local address spaces of processes and computers. If a thread tries to access a memory location within its own locale, the access is direct and shared between threads of the same locale. If, on the other hand, it tries to access a location in a different locale, messages are exchanged behind the scenes. Those could be inter-process messages on the same machine, or network messages between computers. Obviously, there are big differences in performance between local and remote accesses. Still, they may look the same from the programmer’s perspective. It’s just one happy address space.

By now you must be thinking: “What? I have no control over performance?” That would be a bad idea indeed. Don’t worry, the control is there, either explicit (locales are addressable) or in the form of locality awareness (affinity of code and data) or through distributing your data in data-driven parallelism.

Let’s talk about data parallelism. Suppose you have to process a big 2-D array and have 8 machines in your cluster. You would probably split the array into 8 chunks and spread them between the 8 locales. Each locale would take care of its chunk (and possibly some marginal areas of overlap with other chunks). If you’re expecting your program to also run in other cluster configurations, you’d need more intelligent partitioning logic. In Chapel, there is a whole embedded language for partitioning domains (index sets) and specifying their distribution among locales.

To make things more concrete, let’s say you want to distribute a (not so big) 4×8 matrix among currently available locales by splitting it into blocks and mapping each block to a locale. First you want to define a distribution–the prescription of how to distribute a block of data between locales. Here’s the relevant code in Chapel:

const Dist = new dmap(new Block(boundingBox=[1..4, 1..8]));

A Block distribution is created with a bounding rectangle of dimension 4×8. This block is passed as an argument to the constructor of a domain map. If, for instance, the program is run on 8 locales, the block will be mapped into 8 2×2 regions, each assigned to a different locale. Libraries are supposed to provide many different distributions–block distribution being the simplest and the most useful of them.

When you apply the above map to a domain, you get a mapped domain:

var Dom: domain(2) dmapped Dist = [1..4, 1..8];

Here the variable Dom is a 2-D domain (a set of indices) mapped using the distribution Dist that was defined above. Compare this with a regular local domain–a set of indices (i, j), where i is between 1 and 4 (inclusive) and j is between 1 and 8.

var Dom: domain(2) = [1..4, 1..8];

Domains are used, for instance, for iteration. When you iterate over an unmapped domain, all calculations are done within the current locale (possibly in parallel, depending on the type of iteration). But if you do the same over a mapped domain, the calculations will automatically migrate to different locales.

This model of programming is characterized by a very important property: separation of algorithm from implementation. You separately describe implementation details, such as the distribution of your data structure between threads and machines; but the actual calculation is coded as if it were local and performed over monolithic data structures. That’s a tremendous simplification and a clear productivity gain.

Task-Driven Parallelism

The processing of very large (potentially multi-dimensional) arrays is very useful, especially in scientific modeling and graphics. But there are also many opportunities for parallelism in control-driven programs. All three HPCS languages chose fine-grained tasks (activities in X10, implicit threads in Fortress), not threads, as their unit of parallel execution. A task may be mapped to a thread by the runtime system but, in general, this is not a strict requirement. Bunches of small tasks may be bundled for execution within a single system thread. Fortress went the furthest in making parallelism implicit–even the for loop is by default parallel.

From the programmer’s perspective, task-driven parallelism doesn’t expose threads (there is no need for a fork statement or other ways of spawning threads). You simply start a potentially parallel computation. In Fortress you use a for loop or put separate computations in a tuple (tuples are evaluated in parallel, by default). In Chapel, you use the forall statement for loops or begin to start a task. In X10 you use async to mark parallelizable code.

What that means is that you don’t have to worry about how many threads to spawn, how to manage a thread pool, or how to balance the load. The system will spawn the threads for you, depending on the number of available cores, and it will distribute tasks between them and take care of load balancing, etc. In many cases it will do better than a hand-crafted thread-based solution. And in all cases the code will be simpler and easier to maintain.

Global View vs. Fragmented View

If you were to implemented parallel computation using traditional methods, for instance MPI (Message Passing Interface), instead of allocating a single array you’d allocate multiple chunks. Instead of writing an algorithm to operate on this array you’d write an algorithm that operates on chunks, with a lot of code managing boundary cases and communication. Similarly, to parallelize a loop you’d have to partially unroll it and, again, take care of such details as the uneven tail, etc. These approaches results in fragmented view of the problem.

What HPCS languages offer is global view programming. You write your program in terms of data structures and control flows that are not chopped up into pieces according to where they will be executed. Global view approach results in clearer programs that are easier to write and maintain.

Synchronization

No Synchronization?

A lot of data-driven algorithms don’t require much synchronization. Many scientific simulations use read-only arrays for input, and make only localized writes to output arrays.

Consider for instance how you’d implement the famous Game of Life. You’d probably use a read-only array for the previous snapshot and a writable array for the currently evaluated state. Both arrays would be partitioned in the same way between locales. The main loop would go over all array elements and concurrently calculate each one’s new state based on the previous state of its nearest neighbors. Notice that while the neighbors are sometimes read concurrently by multiple threads, the output is always stored once. The only synchronization needed is a thread barrier at the end of each cycle.

The current approach to synchronization in HPCS languages is the biggest disappointment to me. Data races are still possible and, since parallelism is often implicit, harder to spot.

The biggest positive surprise was that all three endorsed transactional memory, at least syntactically, through the use of atomic statements. They didn’t solve the subtleties of isolation, so safety is not guaranteed (if you promise not to access the same data outside of atomic transactions, the transactions are isolated from each other, but that’s all).

The combination of STM and PGAS in Chapel necessitates the use of distributed STM, an area of active research (see, for instance, Software Transactional Memory for Large Scale Clusters).

In Chapel, you not only have access to atomic statements (which are still in the implementation phase) and barriers, but also to low level synchronization primitives such as sync and single variables–somewhat similar to locks and condition variables. The reasoning is that Chapel wants to provide multi-resolution concurrency support. The low level primitives let you implement concurrency in the traditional style, which might come in handy, for instance, in MPMD situations. The high level primitives enable global view programming that boosts programmer productivity.

However, no matter what synchronization mechanism are used (including STM), if the language doesn’t enforce their use, programmers end up with data races–the bane of concurrent programming. The time spent debugging racy programs may significantly cut into, or even nullify, potential productivity gains. Fortress is the only language that attempted to keep track of which data is shared (and, therefore, requires synchronization), and which is local. None of the HPCS languages tried to tie sharing and synchronization to the type system in the way it is done, for instance, in the D programming language (see also my posts about race-free multithreading).

Conclusions

Here’s the tongue-in-cheek summary of the trends which, if you believe that the HPCS effort provides a glimpse of the future, will soon be entering the mainstream:

  1. Threads are out (demoted to latency controlling status), tasks (and semi-implicit parallelism) are in.
  2. Message passing is out (demoted to implementation detail), shared address space is in.
  3. Locks are out (demoted to low-level status), transactional memory is in.

I think we’ve been seeing the twilight of thread-based parallelism for some time (see my previous post on Parallel Programming with Hints. It’s just not the way to fully explore hardware concurrency. Traditionally, if you wanted to increase the performance of your program on multicore machines, you had to go into the low-level business of managing thread pools, splitting your work between processors, etc. This is now officially considered the assembly language of concurrency and has no place in high level programming languages.

Message passing’s major flaw is the inversion of control–it is a moral equivalent of gotos in un-structured programming (it’s about time somebody said that message passing is considered harmful). MP still has its applications and, used in moderation, can be quite handy; but PGAS offers a much more straightforward programming model–its essence being the separation of implementation from algorithm. The Platonic ideal would be for the language to figure out the best parallel implementation for a particular algorithm. Since this is still a dream, the next best thing is getting rid of the interleaving of the two in the same piece of code.

Software transactional memory has been around for more than a decade now and, despite some negative experiences, is still alive and kicking. It’s by far the best paradigm for synchronizing shared memory access. It’s unobtrusive, deadlock-free, and scalable. If we could only get better performance out of it, it would be ideal. The hope though is in moving some of the burden of TM to hardware.

Sadly, the ease of writing parallel programs using those new paradigms does not go hand in hand with improving program correctness. My worry is that chasing concurrency bugs will eat into productivity gains.

What are the chances that we’ll be writing programs for desktops in Chapel, X10, or Fortress? Probably slim. Good chances are though that the paradigms used in those languages will continue showing up in existing and future languages. You may have already seen task driven libraries sneaking into the mainstream (e.g., the .NET TPL). There is a PGAS extension of C called UPC (Unified Parallel C) and there are dialects of several languages like Java, C, C#, Scala, etc., that experiment with STM. Expect more in the future.

Acknowledgments

Special thanks go to Brad Chamberlain of the Cray Chapel team for his help and comments. As usual, a heated exchange with the Seattle Gang, especially Andrei Alexandrescu, lead to numerous improvements in this post.

Bibliography

  1. DARPA’s High Productivity Computing Systems
  2. Cray Chapel
  3. IBM X10
  4. Sun Fortress
  5. Message Passing Interface specification
  6. Unified Parallel C, PGAS in C
  7. Microsoft Task Parallel Library
  8. Intel Thread Building Blocks
  9. HPC Programmer Productivity:
    A Case Study of Novice HPC Programmers
  10. Software Transactional Memory for Large Scale Clusters
  11. Scalable Software Transactional Memory for Global Address Space Architectures
  12. A Brief Retrospective on Transactional Memory
  13. Designing an Effective
    Hybrid Transactional Memeory System
  14. Concurrency in the D Programming Language

« Previous Page