February 2009



What is the most basic building block for asynchronous message passing? I’ve found the answer in Concurrent Haskell. Not many people are fluent in Haskell, especially when monads are involved, so MVars may sound foreign to most. Not to worry–I’ll translate them into Java (as I did with synchronous CML channels in my previous post).

An MVar is an object with two methods, put and take. The sender calls put with a message and continues (it doesn’t block–that’s why this message-passing model is called “asynchronous”). The receiver, in another thread, calls take to remove the message from MVar. If there is no message, the receiver blocks until there is one.

An MVar can store a maximum of one message, so it’s an error to call put when there is an unclaimed message inside it. An MVar can thus be only in one of two states: empty or full.

Here’s a simple-minded implementation of MVar written in Java. (A lock-free implementation is possible–and closer to how Haskell implements it–but it’s harder to reason about.)

public class MVar<T> {
    private T _obj;
    private boolean _full;

    public MVar(){
        _full = false;
    }
    // put: asynchronous (non-blocking)
    // Precondition: MVar must be empty
    public synchronized void put(T obj) {
        assert !_full;
        assert _obj == null;
        _obj = obj;
        _full = true;
        notify();
    }
    // take: if empty, blocks until full.
    // Removes the object and switches to empty
    public synchronized T take() throws InterruptedException {
        while (!_full)
            wait(); // may throw!

        T ret = _obj;
        _obj = null;
        _full = false;
        return ret;
    }
}

You might think that it would be difficult to implement anything useful with MVars. After all it looks like a message queue of length one, which bombs when you try to put a second message in. Yet it is the simplest building block for more sophisticated structures. It is the atom of asynchronous message passing.

We’ve seen before how to implement asynchronous message passing using synchronous building blocks by spawning a thread. Now let’s see how we can implement a simple synchronous channel variable using asynchronous MVars. Remember, in a synchronous channel, if there is no receiver already waiting on the other side, a call to send (or write, in this example) will block.

The basic channel variable, or a channel of length one, is called a CVar in Haskell, and it contains two MVars, one to store the message, and the other for the acknowledgment. The acknowledgment MVar doesn’t really have to store anything–we are only interested in its state: empty or full. The reader will acknowledge the receipt of the message by setting it to full.

public class CVar<T> {
    private MVar<T> _data;
    private MVar<Object> _ack;
    
    public CVar(){
        _data = new MVar<T>();
        _ack = new MVar<Object>();
        _ack.put(null); // make _ack full
    }
    public void write(T obj) throws InterruptedException {
        _ack.take(); // make _ack empty
        _data.put(obj);
    }
    public T read() throws InterruptedException {
        T data = _data.take();
        _ack.put(null); // make _ack full
        return data;
    }
}

MVars can also be used to build a more useful asynchronous buffered channel that can store more than one message at a time. I will show you the construction, but it’s far from trivial. You might notice that it resembles a lot a lock-free FIFO queue (although I chose to use locks to implement the MVar). As with all lock-free data structures, one has to be very careful when reasoning about their correctness. I’ll leave it as an exercise to the reader 😉 .

Item and Stream are mutually recursive data structures forming a linked list. An Item points to a Stream–the tail of the list. A Stream is an MVar containing an Item. You may look at a Stream as a sequence of alternating MVars and Items. An empty MVar may serve as a sentinel.

class Item<T> {
    private T _val;
    private Stream<T> _tail;
    
    public Item(T val, Stream<T> tail){
        _val = val;
        _tail = tail;
    }
    T value(){
        return _val;
    }
    Stream<T> tail(){
        return _tail;
    }
}

// It's just a typedef for an MVar that stores an Item
class Stream<T> extends MVar<Item<T>> {}

A Channel contains the Stream linked list stored in an MVar, which is called _read because the head of the list is where we read messages. The other MVar points to the end of the list (really an empty sentinel Stream). This is the write end of the list. The methods put and get (which are not synchronized!) perform some intricate manipulations characteristic of lock-free algorithms, making sure that at every step the queue is in a consistent state for concurrent access. Notice that put will only block if another put is in progress. Otherwise it’s asynchronous–there is no waiting for a receiver.

public class Channel<T> {
    private MVar<Stream<T>> _read;
    private MVar<Stream<T>> _write;
    
    public Channel() {
        _read = new MVar<Stream<T>>();
        _write = new MVar<Stream<T>>();
        Stream<T> hole = new Stream<T>();
        _read.put(hole);
        _write.put(hole);
    }
    public void put(T val)throws InterruptedException {
        Stream<T> newHole = new Stream<T>();
        Stream<T> oldHole = _write.take();
        _write.put(newHole);
        oldHole.put(new Item<T>(val, newHole));
    }
    public T get()throws InterruptedException {
        Stream<T> cts = _read.take();
        Item<T> item = cts.take();
        _read.put(item.tail());
        return item.value();
    }
}

In the next few installments I’m planning to talk a little about “choice” and then tackle the actor-based message-passing paradigm (Erlang and Scala).


What’s the lowest level primitive that can be used to build a concurrent message passing system? As I discussed in my previous post, there are two message passing paradigms, synchronous and asynchronous.

Let’s start with the atom of synchronous message passing. In its purest form it is implemented in Concurrent ML (CML) in the form of a channel. A channel has two methods, send and recv. A thread that calls send blocks until another thread calls recv on the same channel. Similarly, a thread that calls recv will block if there is no blocked send to rendezvous with.

CML channels are typed, i.e., a channel for passing integers has a different type than a channel for passing characters.

Since not everybody is familiar with ML, I decided to implement an equivalent of CML channels in Java. I considered C++ and D, but the Java implementation is the simplest. In C++ I would have to use a C++0x compiler, which I don’t have; in the D programming language, condition variables are not as easy to use as in Java (although that might change in the future). This implementation works only for point-to-point communications–it does not support multiple senders or receivers. [Following readers’ comments I modified the implementation to take into account the so-called spurious wakeups–exits from wait not caused by notify.]

public class Channel<T> {
    private boolean _dataReady;
    private boolean _received;
    T _msg;

    Channel(){
        _dataReady = false;
        _received = false;
    }
    public synchronized void send(T msg) throws InterruptedException {
    	while (_dataReady)
    		wait();
        _msg = msg;
        _dataReady = true;
        notify();
        while (!_received)
            wait();
        _received = false;
    }
    public synchronized T recv() throws InterruptedException {
        while (!_dataReady)
            wait();
        T msg = _msg;
        _dataReady = false;
        _received = true;
        notify();
        return msg;
    }
}

This is probably not the simplest implementation, but it illustrates the principle. Notice that internally the message is passed through shared memory (the data member _msg). This is standard for intra-process, and often inter-process, message passing.

Synchronous channels can be used as building blocks in the implementation of asynchronous message passing. The asynchronous sender simply creates a worker thread that calls send and possibly blocks. The calling thread is free to proceed immediately. The efficiency of this sort of implementation depends heavily on how cheap thread creation is (although worker threads can be cached in thread pools). Any language runtime that uses native threads (including Java) is handicapped in this respect. Interestingly enough, Erlang and Haskell, which use the asynchronous model, have cheap threads; the synchronous CML, paradoxically, uses native threads.

In the next installment, I’ll describe the atom of asynchronous message passing that can be found in Haskell (don’t worry, I’ll translate it into Java).

For completeness, here’s the program I used to test my channels:

public class RecvThread extends Thread {
    private Channel<String> _ch;
    long _waitTime;

    public RecvThread(Channel<String> ch, long waitTime){
        _ch = ch;
        _waitTime = waitTime;
    }
    public void run()
    {
        try {
            Thread.sleep(_waitTime);
            String msg = _ch.recv();
            System.out.println("Message received: " + msg);
        } catch (InterruptedException e) {}
    }
}

public class TestChannel {
    public static void main(String[] args) {
        try {
            testReceiver();
            testSender();
        } catch (InterruptedException e) {
            System.out.println("Interrupted Exception");
        }
    }
    public static void testReceiver() throws InterruptedException 
    {
        Channel<String> ch = new Channel<String>();
        // send will happen first
        RecvThread t = new RecvThread(ch, 1000);
        t.start();
        ch.send("Hello before rendezvous");
    }
    public static void testSender() throws InterruptedException 
    {
        Channel<String> ch = new Channel<String>();
        RecvThread t = new RecvThread(ch, 0);
        t.start();
        Thread.sleep(1000);
        ch.send("Hello after sleep");
    }
}

In the first test, the sender blocks; in the second, the receiver blocks.


Recently I’ve been looking into message passing as a model for concurrency. It turns out that there are two warring camps in the message passing community. One believes that synchronous message passing is more fundamental, the other believes that asynchronous message passing is more basic. You might think that the discussion is moot, since one can be emulated by the other, but it’s not as simple as that.

Let me first explain the concepts. In message passing you have a sender and a receiver–they usually run in separate threads. The sender sends a message and the receiver receives it. All issues of memory sharing and concurrent access are hidden inside the communication channel. The client does no low level synchronization actions like locking. Which is a good thing–no low level races or deadlocks!

The receiver usually has a choice of synchronous or asynchronous access. It can block until a message is available, or it can peek to see if there is a message waiting. Usually both interfaces are available.

The major choice of paradigms is on the sender’s side.

  • In the synchronous model, the sender blocks until the receiver picks up the message. The two have to rendezvous.
  • In the asynchronous model, the sender drops the message and continues with its own business.

The synchronous model is used, among others, in CSP (Communicating Sequential Processes) and CML (Concurrent ML). The asynchronous one is used in Concurrent Haskell and in actor-based languages like Erlang or Scala.

Here’s the main argument of the synchronous camp: After calling send the sender has the guarantee that the message has been received by the receiver. The code after send may safely make this assumption. If you’re a believer in RPC (Remote Procedure Call) you’ll love this. Sending a message is just like making a function call, only the work is done in a separate thread.

To which the asynchronous camp answers: Yeah, but what about distribution? In a distributed system, the receiver may be in a different process on a different machine. There may be many reasons why the message doesn’t arrive at the recipient (the recipient is dead? a bulldozer cut the network cable?). In that case the sender will hang forever. The code after send may still assume safe delivery, but it will never be executed.

There is another, more subtle problem with synchronous message passing between machines–a network protocol might deliver messages in different order than they were sent. If the receiver’s code expects a certain sequence of messages, it might block forever if the messages are permuted.

All these problems may be overcome by queuing messages, building sophisticated protocols, and spawning helper threads. But is it really worth the trouble?

Yes, say the synchronous camp, if you want to formally prove the correctness of your programs. With synchronous message passing you can use the theoretical machinery of C.A.R Hoare’s CSP. That’s an important advantage if you are writing your Ph.D. thesis, but maybe less so if you’re maintaining millions of lines of Erlang code.

For my next installment I’m already working on translating Haskell’s MVar’s to Java. This will be fun!

You may vote on this article on reddit.


An immutable object never changes. You can bet your program on it. As I explained in my previous post, the same is not true for const objects (or readonly objects, in dialects of Java). They may be changed through mutable aliases. An immutable object has no mutable aliases. Ever!

Small print: this guarantee is predicated on the programmer not overriding the type system with casts and other escape mechanisms.

To my knowledge, immutability is currently available in the D programming language and in a Java dialect called IGJ (Immutability Generic Java). It is the default in functional languages.

The closest you can get to immutability in C++ is by using const as a storage class:

const double pi = 3.141592;
const char ERRORMSG[] = "Your bank went belly up.";

Here, the value of pi or ERRORMSG is guaranteed to never change. Global or static immutable values can be used at compile time (for instance as labels in a switch statement).

Creating Immutable Objects

Defining immutable numbers, arrays, or POD structs is pretty straightforward. But what about more complex objects that don’t have static initializers? How do you create an immutable linked list? In functional languages, lists are treated as built-in types; like arrays in general purpose languages. They can be statically initialized. But in C++ or Java the creation of a list involves memory allocations and reference manipulation. Since, by definition, we can’t manipulate an immutable list, it seems like we can never create one!

How about relaxing the constraints a little to allow mutation inside the constructor of an immutable object. Here’s a hypothetical example of a list in D (the current D compiler doesn’t fully implement immutability, so my examples are written in pseudo-D):

class IntList
{
public:
  // default constructor
  this() {} // _head is default-initialized to null
  // one element constructor
  this(int i) {
    _head = new IntLink(i); // mutates _head
  }
private:
  IntLink _head;
}
// will this work?
immutable IntList oneList= new immutable IntList(1);

There is a significant problem with this solution. If this is not considered immutable inside the constructor then there is no guarantee that a mutable reference to this or any of its subobjects won’t escape its scope. Consider this example:

IntLink globalLink; // mutable!

IntList.this(int i) {
  _head = new IntLink(i);
  globalLink = _head; // escape!
}

immutable IntList immList= new immutable IntList(1);
globalLink.setValue(2); // mutates immList!

Here, an immutable object immList has been mutated through an alias globalLink. We can’t allow this!

It’s true that a compiler could perform escape analysis on the constructor of IntList, provided it has access to its source code; which might not always be true when it’s compiling the statement that creates the immutable object. After all, class IntList might be implemented in a third-party library.

In the absence of source code, the only other possibility is to include immutability information in the type signature of the constructor. When an immutable object is created, the compiler would use an immutable constructor, and it would fail if one didn’t exist. Conversely, an immutable constructor would not compile if it allowed a mutable reference to escape. This bad code would not compile:

IntList.this(int i) immutable {
  _head = new IntLink(i);
  globalLink = _head; // error!
}

Of course, no mutable methods may be called from inside an immutable constructor–they couldn’t guarantee the non-escape of mutable aliases.

This solution works, even if it’s not perfect. It often leads to code duplication (the immutable constructor being identical to the mutable one, as in the IntList example). Moreover, it prevents some forms of refactoring. Even though, inside an immutable constructor, you may initialize an object’s fields, you can’t delegate this task to a (perforce immutable) method of the object.

Assignment is not the same as Mutation

The key to immutable construction is the observation that, when constructing an immutable object, it’s okay to assign the object’s fields but not to mutate them. During construction only such “shallow” mutation should be possible.

In my example, the assignment to _head is okay, but the mutation of the IntLink object attached to it should be prohibited. Indeed, I don’t need to mutate the head link once it’s constructed. Of course the construction of an immutable IntLink follows the same rules. Here’s the relevant code:

class IntLink
{
  this(int i) immutable {
    // _next is default-initialized to null
    _val = i; // field assignment
  }
  int _val;
  IntLink _next;
}

With this understanding, it’s actually possible to minimize code duplication. To this end, IGJ introduces a new type modifier, AssignFields. A constructor or a method that performs no other mutation but field assignment may be declared AssignFields. Since AssignFields methods and AssignFields constructors can also be used in mutable contexts, they don’t have to be duplicated. Expanding on the above example:

class IntLink
{
  this(int i) assignfields {
    // _next is default-initialized to null
    SetValue(i); // Ok: it's an assignfields method
  }
  void SetValue(int i) assignfields {
    _val = i; // field assignment
  }
  int _val;
  IntLink _next;
}

As you can see, I was even able to refactor the part of the constructor that does the assignment to _val. I can now use the same constructor in both, mutable and immutable, contexts. The SetValue method can only be called in a mutable or assignfields context.

immutable IntLink iLink = new immutable IntLink(1);
IntLink mLink = new IntLink(2);
mLink.SetValue(3);

Subtyping relationships

It is okay to pass an immutable object to a function that expects a const object. After all, such a function will not mutate the object. If a reference to the object escapes the function, it can only be a const reference. And again, const reference cannot be used to mutate the object, so we’re fine.

The compiler will allow this subsumption if we establish that immutable is a subtype of const (more precisely, for any type T, immutable T is a subtype of const T). This is very similar to the compiler allowing the passing a derived class object to a function that accepts a base class objects–the Liskov substitution principle.

The full subtyping hierarchy between various mutability annotations is as follows:

  • immutable is a subtype of const: You can pass an immutable object to a function taking a const argument.
  • assignfields is a subtype of const: You can call a const method from an assignfields method (subsumption of this).
  • mutable is a subtype of assignfields. You can call an assignfields method on a mutable object (as in mLink.SetValue()).

Because of transitivity, mutable is also a subtype of const. You can pass a mutable object to a function that takes a const argument (you do it in C++ without much thinking).

subtyping relationships

Conclusion

There is some advantage to introducing yet another type modifier, assignfields, to smooth out the process of constructing immutable objects. On the other hand, is it worth additional complexity? How often does one construct non-trivial immutable objects? If it’s not a common programming pattern then maybe we can live with current restrictions and some code duplication. We still have very little experience in using immutable in D, so we might have to wait and see.

Recent news: A video of my talk to the Vancouver C++ Users group was just posted. The black shadow in front of the bright screen talking about memory fences is yours truly.

If you found this post interesting, please register your vote on reddit, so that more people can find it.