Java 7 TransferQueue

7

Java 7 will include several improvements in the collections and concurrency libraries under JSR 166y. One addition is a new interface called TransferQueue and a corresponding implementation called LinkedTransferQueue.

TransferQueue extends BlockingQueue which extends Queue interface added in Java 5 and adds some new methods. BlockingQueue expresses the concept of a queue that a producer can block when adding items to a queue until there is space avaliable or a consumer can block when removing an item from the queue until an item exists.

TransferQueue takes this one step further and blocks on put until the item is actually consumed by a consumer (not just added to the queue). This new constraint is expressed in the key new method called transfer(). The name is very descriptive – because the blocking occurs until a hand-off is complete from one thread to another, you are effectively transferring the item between threads (in a way that properly creates happens-before relationships in the Java Memory Model).

Several other methods are included as well: two forms of tryTransfer() that perform a transfer but are either non-blocking (transfer only if it can be done immediately) or with a timeout. And then there are a couple of helper methods hasWaitingConsumer() and getWaitingConsumerCount().

When I first read about TransferQueue, I was immediately reminded of the existing SynchronousQueue implementation, which provides a queue with size 0. That seemed inherently useless when I originally looked at it but I’ve since found it to be one of the most useful queue implementations in the collections framework, specifically for this use case of handing off an item from one thread to another.

TransferQueue is more generic and useful than SynchronousQueue however as it allows you to flexibly decide whether to use normal BlockingQueue semantics or a guaranteed hand-off. In the case where items are already in the queue, calling transfer will guarantee that all existing queue items will be processed before the transferred item. Doug Lea says that capability-wise, LinkedTransferQueue is actually a superset of ConcurrentLinkedQueue, SynchronousQueue (in “fair” mode), and unbounded LinkedBlockingQueues. And it’s made better by allowing you to mix and match those features as well as take advantage of higher-performance implementation techniques.

Joe Bowbeer helpfully provided a link to a paper by William Scherer, Doug Lea, and Michael Scott that lays out the LinkedTransferQueue algorithms and performance tests showing their improvements over the existing Java 5 alternatives. LinkedTransferQueue outperforms SynchronousQueue by a factor of 3x in unfair mode and 14x in fair mode. Because SynchronousQueue is used as the heart of task handoff in something like ThreadPoolExecutor, this can result in similar kinds of performance improvements there. Given the importance of executors in concurrent programming, you start to see the importance of adding this new implementation.

The Java 5 SynchronousQueue implementation uses dual queues (for waiting producers and waiting consumers) and protects both queues with a single lock. The LinkedTransferQueue implementation uses CAS operations to form a nonblocking implementation and that is at the heart of avoiding serialization bottlenecks. The paper lays out a lot more detail and data – if you’re interested in the details it’s worth a read.

Comments

7 Responses to “Java 7 TransferQueue”
  1. Kirk says:

    Hi Alex,

    Funny this should show up. I’ve just been looking at this implementation. As impressive as the implementation looks, I’m concerned that it doesn’t appear to stripe either the reads or writes on the ends of the linked list. While I’m sure this isn’t a problem when one is working with a limited number of cores, I’m wondering if you see this as a potential limit to scalability when larger numbers of cores are at play? I’d be concerned about contention on the ends of the queues where many threads could be competing for an add which could result in large numbers of CAS failures. But then, maybe I’m missing something.

  2. Alex says:

    Well, I’m no guru. My understanding of the fork/join stuff is that it is more tuned for this use case of fine-grained parallelism of a large number of light-weight tasks, work-stealing, and all that. Sounds like a good question for Doug Lea in any case.

  3. Kirk says:

    Right, but with work-stealing/fork-join, the idea is to place the contended memory location on the left and the uncontended memory location on the right. Large chunks of work go to the left and small chunks to the right. Net result, uncontented end will never suffer from CAS failure while contended end shouldn’t be visited that often. This implementation looks to be well tuned to support that use case but at the possible expense of others.

  4. yongboy says:

    Hi, Alex:
    Thanks for your post. I like it. But I wondered that if the line “In the case where items are already in the queue, calling transfer will guarantee that all existing queue items will be processed before the transferred item” is right ? maybe is “after the transferred item” ?

  5. Alex Miller says:

    @yongboy: I think it is correct as written. If you are putting a new item in the queue, then it cannot be consumed until all of the other items already in the queue have been consumed (as that’s the FIFO guarantee).

  6. yongboy says:

    Hi, Alex:
    Thanks for your response.
    I understood wrong with the transfer method, with my test, your are right.
    With some item already in the queue, when call the transfer method, the producer thread will wait util the other items have been consumed.

  7. Glen Newton says:

    I think you want to choose your words more carefully; “calling transfer will guarantee that all existing queue items will be processed before the transferred item.”. If there are multiple threads consuming from the queue, then you want to say: “calling transfer will guarantee that all existing queue items will start being processed before the transferred item.”. Due to scheduling of threads, you cannot guarantee when the threads will be _finished_ processing items, only when they are _starting to process items…