What’s the lowest level primitive that can be used to build a concurrent message passing system? As I discussed in my previous post, there are two message passing paradigms, synchronous and asynchronous.
Let’s start with the atom of synchronous message passing. In its purest form it is implemented in Concurrent ML (CML) in the form of a channel. A channel has two methods, send and recv. A thread that calls send blocks until another thread calls recv on the same channel. Similarly, a thread that calls recv will block if there is no blocked send to rendezvous with.
CML channels are typed, i.e., a channel for passing integers has a different type than a channel for passing characters.
Since not everybody is familiar with ML, I decided to implement an equivalent of CML channels in Java. I considered C++ and D, but the Java implementation is the simplest. In C++ I would have to use a C++0x compiler, which I don’t have; in the D programming language, condition variables are not as easy to use as in Java (although that might change in the future). This implementation works only for point-to-point communications–it does not support multiple senders or receivers. [Following readers’ comments I modified the implementation to take into account the so-called spurious wakeups–exits from wait not caused by notify.]
public class Channel<T> { private boolean _dataReady; private boolean _received; T _msg; Channel(){ _dataReady = false; _received = false; } public synchronized void send(T msg) throws InterruptedException { while (_dataReady) wait(); _msg = msg; _dataReady = true; notify(); while (!_received) wait(); _received = false; } public synchronized T recv() throws InterruptedException { while (!_dataReady) wait(); T msg = _msg; _dataReady = false; _received = true; notify(); return msg; } }
This is probably not the simplest implementation, but it illustrates the principle. Notice that internally the message is passed through shared memory (the data member _msg). This is standard for intra-process, and often inter-process, message passing.
Synchronous channels can be used as building blocks in the implementation of asynchronous message passing. The asynchronous sender simply creates a worker thread that calls send and possibly blocks. The calling thread is free to proceed immediately. The efficiency of this sort of implementation depends heavily on how cheap thread creation is (although worker threads can be cached in thread pools). Any language runtime that uses native threads (including Java) is handicapped in this respect. Interestingly enough, Erlang and Haskell, which use the asynchronous model, have cheap threads; the synchronous CML, paradoxically, uses native threads.
In the next installment, I’ll describe the atom of asynchronous message passing that can be found in Haskell (don’t worry, I’ll translate it into Java).
For completeness, here’s the program I used to test my channels:
public class RecvThread extends Thread { private Channel<String> _ch; long _waitTime; public RecvThread(Channel<String> ch, long waitTime){ _ch = ch; _waitTime = waitTime; } public void run() { try { Thread.sleep(_waitTime); String msg = _ch.recv(); System.out.println("Message received: " + msg); } catch (InterruptedException e) {} } } public class TestChannel { public static void main(String[] args) { try { testReceiver(); testSender(); } catch (InterruptedException e) { System.out.println("Interrupted Exception"); } } public static void testReceiver() throws InterruptedException { Channel<String> ch = new Channel<String>(); // send will happen first RecvThread t = new RecvThread(ch, 1000); t.start(); ch.send("Hello before rendezvous"); } public static void testSender() throws InterruptedException { Channel<String> ch = new Channel<String>(); RecvThread t = new RecvThread(ch, 0); t.start(); Thread.sleep(1000); ch.send("Hello after sleep"); } }
In the first test, the sender blocks; in the second, the receiver blocks.
February 15, 2009 at 11:21 am
You should use wait() in a while loop, to guard against spurious wake ups.
February 15, 2009 at 11:38 am
The Plan 9 threading library is based on this model (though they inherit their terminology from CSP). Russ Cox had a version ported to Unix as part of plan9port at , documentation at .
There’s also a (very slightly) simpler co-routine version of the library, libtask, at .
The CSP model can be used to mix coroutines and multiprogramming, and makes it easy to reason about.
February 24, 2009 at 8:34 pm
Bruno is right: you need to use wait() in a while loop, always. The reason is that the Java API does not guarantee that a thread that returns from wake returned because another thread called notify. It could return for another reason. From http://java.sun.com/javase/6/docs/api/java/lang/Object.html#wait(long):
A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops
On Linux, the reason this can happen is signals + the futex system call, which underlies the implementation of pthread condition variables/Java monitors:
http://vladimir_prus.blogspot.com/2005/07/spurious-wakeups.html
February 24, 2009 at 10:21 pm
To Bruno and Evan: You’re right! I’m not an experienced Java programmer so this little gotcha escaped my attention. I’m going to fix it for future readers.
February 26, 2009 at 12:48 pm
[…] February 26, 2009 Message Passing Atoms–MVars Posted by Bartosz Milewski under Concurrency, Haskell, Java, Programming What is the most basic building block for asynchronous message passing? I’ve found the answer in Concurrent Haskell. Not many people are fluent in Haskell, especially when monads are involved, so MVars may sound foreign to most. Not to worry–I’ll translate them into Java (as I did with synchronous CML channels in my previous post). […]
March 13, 2009 at 10:59 am
Using Object.wait() is not cool anymore, you should use java.util.concurrent.locks.ReentrantLock =).
And speaking about message passing between threads:
http://java.sun.com/javase/6/docs/api/java/util/concurrent/Exchanger.html
and
http://java.sun.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html
March 24, 2015 at 11:28 pm
I know it’s been six years since this was posted but I have to point out that this implementation is not correct. Consider the following scenario (s/r == F/T stands for _sending/_receiving == false/true)
If asserts are turned off the scenario may go on as follows:
Now both threads are waiting — deadlock!
March 25, 2015 at 4:11 pm
@Artur: You’re right, what I implemented was a one-shot channel that cannot send multiple messages without some kind of separate acknowledgment channel. A more usable channel requires three flags. I have updated the code in the blog. Here’s the previous version you commented upon, for reference:
March 25, 2015 at 10:43 pm
This works provided you reset the flags: send() should set _isEmpty and _received to false, recv() should set _dataReady to false.
Then it becomes evident that _isEmpty is equivalent to !_dataReady and we may eliminate one of them.
March 26, 2015 at 9:10 am
@Artur, you’re right. I fixed it. I swear I saw a three semaphore solution somewhere, and thought that was the minimum.
The bottom line is, you shouldn’t trust any implementation you see on the internet (including mine) without a proof of correctness. Casual testing is no substitute for a proof. Caveat emptor!