Erlang



In my previous blog posts I described C++ implementations of two basic functional data structures: a persistent list and a persistent red-black tree. I made an argument that persistent data structures are good for concurrency because of their immutability. In this post I will explain in much more detail the role of immutability in concurrent programming and argue that functional data structures make immutability scalable and composable.

Concurrency in 5 Minutes

To understand the role of functional data structures in concurrent programming we first have to understand concurrent programming. Okay, so maybe one blog post is not enough, but I’ll try my best at mercilessly slashing through the complexities and intricacies of concurrency while brutally ignoring all the details and subtleties.

The archetype for all concurrency is message passing. Without some form of message passing you have no communication between processes, threads, tasks, or whatever your units of execution are. The two parts of “message passing” loosely correspond to data (message) and action (passing). So there is the fiddling with data by one thread, some kind of handover between threads, and then the fiddling with data by another thread. The handover process requires synchronization.

There are two fundamental problems with this picture: Fiddling without proper synchronization leads to data races, and too much synchronization leads to deadlocks.

Communicating Processes

Let’s start with a simpler world and assume that our concurrent participants share no memory — in that case they are called processes. And indeed it might be physically impossible to share memory between isolated units because of distances or hardware protection. In that case messages are just pieces of data that are magically transported between processes. You just put them (serialize, marshall) in a special buffer and tell the system to transmit them to someone else, who then picks them up from the system.

So the problem reduces to the proper synchronization protocols. The theory behind such systems is the good old CSP (Communicating Sequential Processes) from the 1970s. It has subsequently been extended to the Actor Model and has been very successful in Erlang. There are no data races in Erlang because of the isolation of processes, and no traditional deadlocks because there are no locks (although you can have distributed deadlocks when processes are blocked on receiving messages from each other).

The fact that Erlang’s concurrency is process-based doesn’t mean that it’s heavy-weight. The Erlang runtime is quite able to spawn thousands of light-weight user-level processes that, at the implementation level, may share the same address space. Isolation is enforced by the language rather than by the operating system. Banning direct sharing of memory is the key to Erlang’s success as the language for concurrent programming.

So why don’t we stop right there? Because shared memory is so much faster. It’s not a big deal if your messages are integers, but imagine passing a video buffer from one process to another. If you share the same address space (that is, you are passing data between threads rather than processes) why not just pass a pointer to it?

Shared Memory

Shared memory is like a canvas where threads collaborate in painting images, except that they stand on the opposite sides of the canvas and use guns rather than brushes. The only way they can avoid killing each other is if they shout “duck!” before opening fire. This is why I like to think of shared-memory concurrency as the extension of message passing. Even though the “message” is not physically moved, the right to access it must be passed between threads. The message itself can be of arbitrary complexity: it could be a single word of memory or a hash table with thousands of entries.

It’s very important to realize that this transfer of access rights is necessary at every level, starting with a simple write into a single memory location. The writing thread has to send a message “I have written” and the reading thread has to acknowledge it: “I have read.” In standard portable C++ this message exchange might look something like this:

std::atomic x = false;
// thread one
x.store(true, std::memory_order_release);
// thread two
x.load(std::memory_order_acquire);

You rarely have to deal with such low level code because it’s abstracted into higher order libraries. You would, for instance, use locks for transferring access. A thread that acquires a lock gains unique access to a data structure that’s protected by it. It can freely modify it knowing that nobody else can see it. It’s the release of the lock variable that makes all those modifications visible to other threads. This release (e.g., mutex::unlock) is then matched with the subsequent acquire (e.g., mutex::lock) by another thread. In reality, the locking protocol is more complicated, but it is at its core based on the same principle as message passing, with unlock corresponding to a message send (or, more general, a broadcast), and lock to a message receive.

The point is, there is no sharing of memory without communication.

Immutable Data

The first rule of synchronization is:

The only time you don’t need synchronization is when the shared data is immutable.

We would like to use as much immutability in implementing concurrency as possible. It’s not only because code that doesn’t require synchronization is faster, but it’s also easier to write, maintain, and reason about. The only problem is that:

No object is born immutable.

Immutable objects never change, but all data, immutable or not, must be initialized before being read. And initialization means mutation. Static global data is initialized before entering main, so we don’t have to worry about it, but everything else goes through a construction phase.

First, we have to answer the question: At what point after initialization is data considered immutable?

Here’s what needs to happen: A thread has to somehow construct the data that it destined to be immutable. Depending on the structure of that data, this could be a very simple or a very complex process. Then the state of that data has to be frozen — no more changes are allowed. But still, before the data can be read by another thread, a synchronization event has to take place. Otherwise the other thread might see partially constructed data. This problem has been extensively discussed in articles about the singleton pattern, so I won’t go into more detail here.

One such synchronization event is the creation of the receiving thread. All data that had been frozen before the new thread was created is seen as immutable by that thread. That’s why it’s okay to pass immutable data as an argument to a thread function.

Another such event is message passing. It is always safe to pass a pointer to immutable data to another thread. The handover always involves the release/acquire protocol (as illustrated in the example above).

All memory writes that happened in the first thread before it released the message become visible to the acquiring thread after it received it.

The act of message passing establishes the “happens-before” relationship for all memory writes prior to it, and all memory reads after it. Again, these low-level details are rarely visible to the programmer, since they are hidden in libraries (channels, mailboxes, message queues, etc.). I’m pointing them out only because there is no protection in the language against the user inadvertently taking affairs into their own hands and messing things up. So creating an immutable object and passing a pointer to it to another thread through whatever message passing mechanism is safe. I also like to think of thread creation as a message passing event — the payload being the arguments to the thread function.

The beauty of this protocol is that, once the handover is done, the second (and the third, and the fourth, and so on…) thread can read the whole immutable data structure over and over again without any need for synchronization. The same is not true for shared mutable data structures! For such structures every read has to be synchronized at a non-trivial performance cost.

However, it can’t be stressed enough that this is just a protocol and any deviation from it may be fatal. There is no language mechanism in C++ that may enforce this protocol.

Clusters

As I argued before, access rights to shared memory have to be tightly controlled. The problem is that shared memory is not partitioned nicely into separate areas, each with its own army, police, and border controls. Even though we understand that an object is frozen after construction and ready to be examined by other threads without synchronization, we have to ask ourselves the question: Where exactly does this object begin and end in memory? And how do we know that nobody else claims writing privileges to any of its parts? After all, in C++ it’s pointers all the way. This is one of the biggest problems faced by imperative programmers trying to harness concurrency — who’s pointing where?

For instance, what does it mean to get access to an immutable linked list? Obviously, it’s not enough that the head of the list never changes, every single element of the list must be immutable as well. In fact, any memory that can be transitively accessed from the head of the list must be immutable. Only then can you safely forgo synchronization when accessing the list, as you would in a single-threaded program. This transitive closure of memory accessible starting from a given pointer is often called a cluster. So when you’re constructing an immutable object, you have to be able to freeze the whole cluster before you can pass it to other threads.

But that’s not all! You must also guarantee that there are no mutable pointers outside of the cluster pointing to any part of it. Such pointers could be inadvertently used to modify the data other threads believe to be immutable.

That means the construction of an immutable object is a very delicate operation. You not only have to make sure you don’t leak any pointers, but you have to inspect every component you use in building your object for potential leaks — you either have to trust all your subcontractors or inspect their code under the microscope. This clearly is no way to build software! We need something that it scalable and composable. Enter…

Functional Data Structures

Functional data structures let you construct new immutable objects by composing existing immutable objects.

Remember, an immutable object is a complete cluster with no pointers sticking out of it, and no mutable pointers poking into it. A sum of such objects is still an immutable cluster. As long as the constructor of a functional data structure doesn’t violate the immutability of its arguments and does not leak mutable pointers to the memory it is allocating itself, the result will be another immutable object.

Of course, it would be nice if immutability were enforced by the type system, as it is in the D language. In C++ we have to replace the type system with discipline, but still, it helps to know exactly what the terms of the immutability contract are. For instance, make sure you pass only (const) references to other immutable objects to the constructor of an immutable object.

Let’s now review the example of the persistent binary tree from my previous post to see how it follows the principles I described above. In particular, let me show you that every Tree forms an immutable cluster, as long as user data is stored in it by value (or is likewise immutable).

The proof proceeds through structural induction, but it’s easy to understand. An empty tree forms an immutable cluster trivially. A non-empty tree is created by combining two other trees. We can assume by the inductive step that both of them form immutable clusters:

Tree(Tree const & lft, T val, Tree const & rgt)

In particular, there are no external mutating pointers to lft, rgt, or to any of their nodes.

Inside the constructor we allocate a fresh node and pass it the three arguments:

Tree(Tree const & lft, T val, Tree const & rgt)
      : _root(std::make_shared<const Node>(lft._root, val, rgt._root))
{}

Here _root is a private member of the Tree:

std::shared_ptr<const Node> _root;

and Node is a private struct defined inside Tree:

struct Node
{
   Node(std::shared_ptr<const Node> const & lft
       , T val
       , std::shared_ptr<const Node> const & rgt)
   : _lft(lft), _val(val), _rgt(rgt)
   {}

   std::shared_ptr<const Node> _lft;
   T _val;
   std::shared_ptr<const Node> _rgt;
};

Notice that the only reference to the newly allocated Node is stored in _root through a const pointer and is never leaked. Moreover, there are no methods of the tree that either modify or expose any part of the tree to modification. Therefore the newly constructed Tree forms an immutable cluster. (With the usual caveat that you don’t try to bypass the C++ type system or use other dirty tricks).

As I discussed before, there is some bookkeeping related to reference counting in C++, which is however totally transparent to the user of functional data structures.

Conclusion

Immutable data structures play an important role in concurrency but there’s more to them that meets the eye. In this post I tried to demonstrate how to use them safely and productively. In particular, functional data structures provide a scalable and composable framework for working with immutable objects.

Of course not all problems of concurrency can be solved with immutability and not all immutable object can be easily created from other immutable objects. The classic example is a doubly-linked list: you can’t add a new element to it without modifying pointers in it. But there is a surprising variety of composable immutable data structures that can be used in C++ without breaking the rules. I will continue describing them in my future blog posts.


Is the Actor model just another name for message passing between threads? In other words, can you consider a Java Thread object with a message queue an Actor? Or is there more to the Actor model? Bartosz investigates.

I’ll start with listing various properties that define the Actor Model. I will discuss implementation options in several languages.

Concurrency

Actors are objects that execute concurrently. Well, sort of. Erlang, for instance, is not an object-oriented language, so we can’t really talk about “objects”. An actor in Erlang is represented by a thing called a Process ID (Pid). But that’s nitpicking. The second part of the statement is more interesting. Strictly speaking, an actor may execute concurrently but at times it will not. For instance, in Scala, actor code may be executed by the calling thread.

Caveats aside, it’s convenient to think of actors as objects with a thread inside.

Message Passing

Actors communicate through message passing. Actors don’t communicate using shared memory (or at least pretend not to). The only way data may be passed between actors is through messages.

Erlang has a primitive send operation denoted by the exclamation mark. To send a message Msg to the process (actor) Pid you write:

Pid ! Msg

The message is copied to the address space of the receiver, so there is no sharing.

If you were to imitate this mechanism in Java, you would create a Thread object with a mailbox (a concurrent message queue), with no public methods other than put and get for passing messages. Enforcing copy semantics in Java is impossible so, strictly speaking, mailboxes should only store built-in types. Note that passing a Java Strings is okay, since strings are immutable.

-Typed messages

Here’s the first conundrum: in Java, as in any statically typed language, messages have to be typed. If you want to process more than one type of messages, it’s not enough to have just one mailbox per actor. In Erlang, which is dynamically typed, one canonical mailbox per actor suffices. In Java, mailboxes have to be abstracted from actors. So an actor may have one mailbox for accepting strings, another for integers, etc. You build actors from those smaller blocks.

But having multiple mailboxes creates another problem: How to block, waiting for messages from more than one mailbox at a time without breaking the encapsulation? And when one of the mailboxes fires, how to retrieve the correct type of a message from the appropriate mailbox? I’ll describe a few approaches.

-Pattern matching

Scala, which is also a statically typed language, uses the power of functional programming to to solve the typed messages problem. The receive statement uses pattern matching, which can match different types. It looks like a switch statements whose case labels are patterns. A pattern may specify the type it expects. You may send a string, or an integer, or a more complex data structure to an actor. A single receive statement inside the actor code may match any of those.

receive {
    case s: String => println("string: "+ s)
    case i: Int => println("integer: "+ i)
    case m => println("unknown: "+ m)
}

In Scala the type of a variable is specified after the colon, so s:String declares the variable s of the type String. The last case is a catch-all.

This is a very elegant solution to a difficult problem of marrying object-oriented programming to functional programming–a task at which Scala exceeds.

-Casting

Of course, we always have the option of escaping the type system. A mailbox could be just a queue of Objects. When a message is received, the actor could try casting it to each of the expected types in turn or use reflection to find out the type of the message. Here’s what Martin Odersky, the creator of Scala, has to say about it:

The most direct (some would say: crudest) form of decomposition uses the type-test and type-cast instructions available in Java and many other languages.

In the paper he co-authored with Emir and Williams (Matching Objects With Patterns) he gives the following evaluation of this method:

Evaluation: Type-tests and type-casts require zero overhead for the class hierarchy. The pattern matching itself is very verbose, for both shallow and deep patterns. In particular, every match appears as both a type-test and a subsequent type-cast. The scheme raises also the issue that type-casts are potentially unsafe because they can raise ClassCastExceptions. Type-tests and type-casts completely expose representation. They have mixed characteristics with respect to extensibility. On the one hand, one can add new variants without changing the framework (because there is nothing to be done in the framework itself). On the other hand, one cannot invent new patterns over existing variants that use the same syntax as the type-tests and type-casts.

The best one could do in C++ or D is to write generic code that hides casting from the client. Such generic code could use continuations to process messages after they’ve been cast. A continuation is a function that you pass to another function to be executed after that function completes (strictly speaking, a real continuation never returns, so I’m using this word loosely). The above example could be rewritten in C++ as:

void onString(std::string const & s) {
    cout << "string: " << s << std::endl;
}
void onInt(int i) {
    cout << "integer: " << i << std::endl;
}

receive<std::string, int> (&onString, &onInt);

where receive is a variadic template (available in C++0x). It would do the dynamic casting and call the appropriate function to process the result. The syntax is awkward and less flexible than that of Scala, but it works.

The use of lambdas might make things a bit clearer. Here’s an example in D using lambdas (function literals), courtesy Sean Kelly and Jason House:

receive(
    (string s){ writefln("string: %s", s); },
    (int i){ writefln("integer: %s", i); }
);

Interestingly enough, Scala’s receive is a library function with the pattern matching block playing the role of a continuation. Scala has syntactic sugar to make lambdas look like curly-braced blocks of code. Actually, each case statement is interpreted by Scala as a partial function–a function that is not defined for all values (or types) of arguments. The pattern matching part of case becomes the isDefinedAt method of this partial function object, and the code after that becomes its apply method. Of course, partial functions could also be implemented in C++ or D, but with a lot of superfluous awkwardness–lambda notation doesn’t help when partial functions are involved.

-Isolation

Finally, there is the problem of isolation. A message-passing system must be protected from data sharing. As long as the message is a primitive type and is passed by value (or an immutable type passed by reference), there’s no problem. But when you pass a mutable Object as a message, in reality you are passing a reference (a handle) to it. Suddenly your message is shared and may be accessed by more than one thread at a time. You either need additional synchronization outside of the Actor model or risk data races. Languages that are not strictly functional, including Scala, have to deal with this problem. They usually pass this responsibility, conveniently, to the programmer.

-Kilim

Java is not a good language to implement the Actor model. You can extend Java though, and there is one such extension worth mentioning called Kilim by Sriram Srinivasan and Alan Mycroft from Cambridge, UK. Messages in Kilim are restricted to objects with no internal aliasing, which have move semantics. The pre-processor (weaver) checks the structure of messages and generates appropriate Java code for passing them around. I tried to figure out how Kilim deals with waiting on multiple mailboxes, but there isn’t enough documentation available on the Web. The authors mention using the select statement, but never provide any details or examples.

Correction: Sriram was kind enough to provide an example of the use of select:

int n = Mailbox.select(mb0, mb1, .., timeout);

The return value is the index of the mailbox, or -1 for the timeout. Composability is an important feature of the message passing model.

Dynamic Networks

Everything I described so far is common to CSP (Communicating Sequential Processes) and the Actor model. Here’s what makes actors more general:

Connections between actors are dynamic. Unlike processes in CSP, actors may establish communication channels dynamically. They may pass messages containing references to actors (or mailboxes). They can then send messages to those actors. Here’s a Scala example:

receive {
    case (name: String, actor: Actor) =>
        actor ! lookup(name)
}

The original message is a tuple combining a string and an actor object. The receiver sends the result of lookup(name) to the actor it has just learned about. Thus a new communication channel between the receiver and the unknown actor can be established at runtime. (In Kilim the same is possible by passing mailboxes via messages.)

Actors in D

The D programming language with my proposed race-free type system could dramatically improve the safety of message passing. Race-free type system distinguishes between various types of sharing and enforces synchronization when necessary. For instance, since an Actor would be shared between threads, it would have to be declared shared. All objects inside a shared actor, including the mailbox, would automatically inherit the shared property. A shared message queue inside the mailbox could only store value types, unique types with move semantics, or reference types that are either immutable or are monitors (provide their own synchronization). These are exactly the types of messages that may be safely passed between actors. Notice that this is more than is allowed in Erlang (value types only) or Kilim (unique types only), but doesn’t include “dangerous” types that even Scala accepts (not to mention Java or C++).

I will discuss message queues in the next installment.


Recently I’ve been looking into message passing as a model for concurrency. It turns out that there are two warring camps in the message passing community. One believes that synchronous message passing is more fundamental, the other believes that asynchronous message passing is more basic. You might think that the discussion is moot, since one can be emulated by the other, but it’s not as simple as that.

Let me first explain the concepts. In message passing you have a sender and a receiver–they usually run in separate threads. The sender sends a message and the receiver receives it. All issues of memory sharing and concurrent access are hidden inside the communication channel. The client does no low level synchronization actions like locking. Which is a good thing–no low level races or deadlocks!

The receiver usually has a choice of synchronous or asynchronous access. It can block until a message is available, or it can peek to see if there is a message waiting. Usually both interfaces are available.

The major choice of paradigms is on the sender’s side.

  • In the synchronous model, the sender blocks until the receiver picks up the message. The two have to rendezvous.
  • In the asynchronous model, the sender drops the message and continues with its own business.

The synchronous model is used, among others, in CSP (Communicating Sequential Processes) and CML (Concurrent ML). The asynchronous one is used in Concurrent Haskell and in actor-based languages like Erlang or Scala.

Here’s the main argument of the synchronous camp: After calling send the sender has the guarantee that the message has been received by the receiver. The code after send may safely make this assumption. If you’re a believer in RPC (Remote Procedure Call) you’ll love this. Sending a message is just like making a function call, only the work is done in a separate thread.

To which the asynchronous camp answers: Yeah, but what about distribution? In a distributed system, the receiver may be in a different process on a different machine. There may be many reasons why the message doesn’t arrive at the recipient (the recipient is dead? a bulldozer cut the network cable?). In that case the sender will hang forever. The code after send may still assume safe delivery, but it will never be executed.

There is another, more subtle problem with synchronous message passing between machines–a network protocol might deliver messages in different order than they were sent. If the receiver’s code expects a certain sequence of messages, it might block forever if the messages are permuted.

All these problems may be overcome by queuing messages, building sophisticated protocols, and spawning helper threads. But is it really worth the trouble?

Yes, say the synchronous camp, if you want to formally prove the correctness of your programs. With synchronous message passing you can use the theoretical machinery of C.A.R Hoare’s CSP. That’s an important advantage if you are writing your Ph.D. thesis, but maybe less so if you’re maintaining millions of lines of Erlang code.

For my next installment I’m already working on translating Haskell’s MVar’s to Java. This will be fun!

You may vote on this article on reddit.


With the multicore explosion in the making, are we going to be running hundreds of thousands of threads in a single program? Erlang programmers would emphatically say, yes! C++ and Java programmers would probably say no.

Why this discrepancy?

Thread model based on heavy-duty OS threads and mutexes has its limitations. You can ask server writers, or Google for “thread per connection” to convince yourself. Servers use thread pools exactly because of that.

Thread pools are an admission of defeat for the thread model. Having to pool threads tells you that:

  • Thread creation is not fast enough
  • Threads’ consumption of resources is substantial, so it makes sense to keep their numbers down

Granted, these are technical problems that might be overcome in future by improvements in operating systems.

The more fundamental problem with threads has its root in memory sharing. It seems like sharing offers great advantage in terms of performance, but sharing requires locking. It’s a well known fact that locking doesn’t scale (or compose). Between races and deadlocks, it’s also extremely hard to get right.

Here’s what Erlang does

Erlang gives up on sharing!

Threads that don’t share memory are called processes. We tend to think of processes as heavyweight beasts implemented by operating systems. That’s because one needs the operating system to strictly enforce the no-sharing policy (the isolation of address spaces). Only the OS can manage separate address spaces and the passing of data between them.

But that’s not the only way. The isolation might instead be enforced by the language. Erlang is a functional language with strict copy semantics and with no pointers or references. Erlang processes communicate by message passing. Of course, behind the scenes, messages are passed through shared memory, thus avoiding a large performance hit of inter-process communication. But that’s invisible to the client.

Erlang rolls out its own threads!

Erlang interpreter provides lightweight processes (so lightweight that there’s a benchmark running 20 million Erlang processes).

And there is a bonus: Erlang code that uses lightweight processes will also work with heavyweight processes and in a distributed environment.

Why don’t we all switch to Erlang?

As far as I know there are two main reasons:

  • It’s a weird language. Don’t get me wrong, I love functional programming for its mathematical beauty, terseness, and elegance. But I had to rewire my brain to be able to write pure functional programs. Functional paradigm is as alien to our everyday experience as quantum mechanics. (CS grad students: you’re not typical programmers.)
  • Messages have to be copied. You can’t deep-copy a large data structure without some performance degradation, and not all copying can be optimized away (it requires behind-the-scenes alias analysis). This is why mainstream languages (and I will even include Scala in this category) don’t abandon sharing. Instead they rely on programmer’s discipline or try to control aliasing.

Conclusions

Native threads are expensive. Interpreted languages, like Erlang, can afford to implement their own lightweight threads and schedulers. I don’t see that happening in compiled languages. The hope is that  operating systems will improve their implementations of threads–I hear Linux’s NPTL already is a big improvement in this area. In the meanwhile, we have to rely on thread pools.

Shared memory concurrency model is the reason why multithreaded programming is so difficult and error-prone. Separating address spaces simplifies programming, but at a performance cost. I believe that some kind of compromise is possible. A message-passing or an actor model can work with sharing, as long as aliasing is under control.

Inspiration for this post

After my last post on thin lock implementation, I was asked the question, why I used such a small number, 1024, for the maximum number of threads. It was actually a number I’ve found in the D compiler runtime. A thin lock could easily work with a much larger number of threads. In fact I’m going to substantially increase the number of bits for thread ID at the expense of recursion count. But this question got me thinking about scalability of threading in general.

Fibers

Fibers (as well as Java’s green threads) are not an alternative to heavyweight threads. Fibers can’t run in parallel, so they have no way to use multiple processors. They are more of a flow-of-control construct, often used interchangeably with coroutines.

Interesting reading