What is the most basic building block for asynchronous message passing? I’ve found the answer in Concurrent Haskell. Not many people are fluent in Haskell, especially when monads are involved, so MVars may sound foreign to most. Not to worry–I’ll translate them into Java (as I did with synchronous CML channels in my previous post).

An MVar is an object with two methods, put and take. The sender calls put with a message and continues (it doesn’t block–that’s why this message-passing model is called “asynchronous”). The receiver, in another thread, calls take to remove the message from MVar. If there is no message, the receiver blocks until there is one.

An MVar can store a maximum of one message, so it’s an error to call put when there is an unclaimed message inside it. An MVar can thus be only in one of two states: empty or full.

Here’s a simple-minded implementation of MVar written in Java. (A lock-free implementation is possible–and closer to how Haskell implements it–but it’s harder to reason about.)

public class MVar<T> {
    private T _obj;
    private boolean _full;

    public MVar(){
        _full = false;
    }
    // put: asynchronous (non-blocking)
    // Precondition: MVar must be empty
    public synchronized void put(T obj) {
        assert !_full;
        assert _obj == null;
        _obj = obj;
        _full = true;
        notify();
    }
    // take: if empty, blocks until full.
    // Removes the object and switches to empty
    public synchronized T take() throws InterruptedException {
        while (!_full)
            wait(); // may throw!

        T ret = _obj;
        _obj = null;
        _full = false;
        return ret;
    }
}

You might think that it would be difficult to implement anything useful with MVars. After all it looks like a message queue of length one, which bombs when you try to put a second message in. Yet it is the simplest building block for more sophisticated structures. It is the atom of asynchronous message passing.

We’ve seen before how to implement asynchronous message passing using synchronous building blocks by spawning a thread. Now let’s see how we can implement a simple synchronous channel variable using asynchronous MVars. Remember, in a synchronous channel, if there is no receiver already waiting on the other side, a call to send (or write, in this example) will block.

The basic channel variable, or a channel of length one, is called a CVar in Haskell, and it contains two MVars, one to store the message, and the other for the acknowledgment. The acknowledgment MVar doesn’t really have to store anything–we are only interested in its state: empty or full. The reader will acknowledge the receipt of the message by setting it to full.

public class CVar<T> {
    private MVar<T> _data;
    private MVar<Object> _ack;
    
    public CVar(){
        _data = new MVar<T>();
        _ack = new MVar<Object>();
        _ack.put(null); // make _ack full
    }
    public void write(T obj) throws InterruptedException {
        _ack.take(); // make _ack empty
        _data.put(obj);
    }
    public T read() throws InterruptedException {
        T data = _data.take();
        _ack.put(null); // make _ack full
        return data;
    }
}

MVars can also be used to build a more useful asynchronous buffered channel that can store more than one message at a time. I will show you the construction, but it’s far from trivial. You might notice that it resembles a lot a lock-free FIFO queue (although I chose to use locks to implement the MVar). As with all lock-free data structures, one has to be very careful when reasoning about their correctness. I’ll leave it as an exercise to the reader ;-) .

Item and Stream are mutually recursive data structures forming a linked list. An Item points to a Stream–the tail of the list. A Stream is an MVar containing an Item. You may look at a Stream as a sequence of alternating MVars and Items. An empty MVar may serve as a sentinel.

class Item<T> {
    private T _val;
    private Stream<T> _tail;
    
    public Item(T val, Stream<T> tail){
        _val = val;
        _tail = tail;
    }
    T value(){
        return _val;
    }
    Stream<T> tail(){
        return _tail;
    }
}

// It's just a typedef for an MVar that stores an Item
class Stream<T> extends MVar<Item<T>> {}

A Channel contains the Stream linked list stored in an MVar, which is called _read because the head of the list is where we read messages. The other MVar points to the end of the list (really an empty sentinel Stream). This is the write end of the list. The methods put and get (which are not synchronized!) perform some intricate manipulations characteristic of lock-free algorithms, making sure that at every step the queue is in a consistent state for concurrent access. Notice that put will only block if another put is in progress. Otherwise it’s asynchronous–there is no waiting for a receiver.

public class Channel<T> {
    private MVar<Stream<T>> _read;
    private MVar<Stream<T>> _write;
    
    public Channel() {
        _read = new MVar<Stream<T>>();
        _write = new MVar<Stream<T>>();
        Stream<T> hole = new Stream<T>();
        _read.put(hole);
        _write.put(hole);
    }
    public void put(T val)throws InterruptedException {
        Stream<T> newHole = new Stream<T>();
        Stream<T> oldHole = _write.take();
        _write.put(newHole);
        oldHole.put(new Item<T>(val, newHole));
    }
    public T get()throws InterruptedException {
        Stream<T> cts = _read.take();
        Item<T> item = cts.take();
        _read.put(item.tail());
        return item.value();
    }
}

In the next few installments I’m planning to talk a little about “choice” and then tackle the actor-based message-passing paradigm (Erlang and Scala).

About these ads