What’s the lowest level primitive that can be used to build a concurrent message passing system? As I discussed in my previous post, there are two message passing paradigms, synchronous and asynchronous.

Let’s start with the atom of synchronous message passing. In its purest form it is implemented in Concurrent ML (CML) in the form of a channel. A channel has two methods, send and recv. A thread that calls send blocks until another thread calls recv on the same channel. Similarly, a thread that calls recv will block if there is no blocked send to rendezvous with.

CML channels are typed, i.e., a channel for passing integers has a different type than a channel for passing characters.

Since not everybody is familiar with ML, I decided to implement an equivalent of CML channels in Java. I considered C++ and D, but the Java implementation is the simplest. In C++ I would have to use a C++0x compiler, which I don’t have; in the D programming language, condition variables are not as easy to use as in Java (although that might change in the future). This implementation works only for point-to-point communications–it does not support multiple senders or receivers. [Following readers’ comments I modified the implementation to take into account the so-called spurious wakeups–exits from wait not caused by notify.]

public class Channel<T> {
    private boolean _dataReady;
    private boolean _received;
    T _msg;

    Channel(){
        _dataReady = false;
        _received = false;
    }
    public synchronized void send(T msg) throws InterruptedException {
    	while (_dataReady)
    		wait();
        _msg = msg;
        _dataReady = true;
        notify();
        while (!_received)
            wait();
        _received = false;
    }
    public synchronized T recv() throws InterruptedException {
        while (!_dataReady)
            wait();
        T msg = _msg;
        _dataReady = false;
        _received = true;
        notify();
        return msg;
    }
}

This is probably not the simplest implementation, but it illustrates the principle. Notice that internally the message is passed through shared memory (the data member _msg). This is standard for intra-process, and often inter-process, message passing.

Synchronous channels can be used as building blocks in the implementation of asynchronous message passing. The asynchronous sender simply creates a worker thread that calls send and possibly blocks. The calling thread is free to proceed immediately. The efficiency of this sort of implementation depends heavily on how cheap thread creation is (although worker threads can be cached in thread pools). Any language runtime that uses native threads (including Java) is handicapped in this respect. Interestingly enough, Erlang and Haskell, which use the asynchronous model, have cheap threads; the synchronous CML, paradoxically, uses native threads.

In the next installment, I’ll describe the atom of asynchronous message passing that can be found in Haskell (don’t worry, I’ll translate it into Java).

For completeness, here’s the program I used to test my channels:

public class RecvThread extends Thread {
    private Channel<String> _ch;
    long _waitTime;

    public RecvThread(Channel<String> ch, long waitTime){
        _ch = ch;
        _waitTime = waitTime;
    }
    public void run()
    {
        try {
            Thread.sleep(_waitTime);
            String msg = _ch.recv();
            System.out.println("Message received: " + msg);
        } catch (InterruptedException e) {}
    }
}

public class TestChannel {
    public static void main(String[] args) {
        try {
            testReceiver();
            testSender();
        } catch (InterruptedException e) {
            System.out.println("Interrupted Exception");
        }
    }
    public static void testReceiver() throws InterruptedException 
    {
        Channel<String> ch = new Channel<String>();
        // send will happen first
        RecvThread t = new RecvThread(ch, 1000);
        t.start();
        ch.send("Hello before rendezvous");
    }
    public static void testSender() throws InterruptedException 
    {
        Channel<String> ch = new Channel<String>();
        RecvThread t = new RecvThread(ch, 0);
        t.start();
        Thread.sleep(1000);
        ch.send("Hello after sleep");
    }
}

In the first test, the sender blocks; in the second, the receiver blocks.

Advertisements