Problem
Suppose that there is some variable that is initialized to zero and monotonically increases over time. Only a single thread may increase the value. Many other threads need to wait until the variable reaches certain value. They should block until the value is reached and wake up once it is reached. We should guarantee that when for any value of the variable, once it reaches the value, there are no sleeping threads waiting for that value.
So the interface is pretty simple. There is one thread that periodically calls inc() method and many threads that may call waitFor(int value) method. A call to waitFor() will either return immediately if the value is already reached or will block until it is reached.
Solution
I would like to have a solution that doesn't use locks - at least explicit locks. It can use thread-safe data structures which, of course, use locks internally.
We also need to make sure that we can safely wakeup a waiter before it decides to wait and in this case, subsequent call to wait() will simply return immediately. We will use Semaphore for this purpose.
For the counter value we use volatile variable. It is a little bit cleaner to use Atomic (and the actual code uses it), but it seems that volatile is sufficient, so I am using it here for demo purposes.
Code
import net.jcip.annotations.ThreadSafe; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.Semaphore; /** * Waiting for counter to reach certain value. * The counter starts from zero and its value increases over time. * The class allows for multiple consumers waiting until the value of the * counter reaches some value interesting to them. * Consumers call {@link #waitFor(int)} which may either return * immediately if the counter reached the specified value, or block * until this value is reached. * <p> * All waiters should be waken up when the counter becomes equal or higher * then the value they are waiting for. * <p> * The counter is updated by a single updater that should only increase the * counter value. * The updater calls the {@link #inc()} method to update the counter * value and this should wake up all threads waiting for any value smaller or * equal to the new one. * <p> * The class is thread-safe. * It is designed for use by multiple waiter threads and a single * updater thread, but it will work correctly even in the presence of multiple * updater threads. */ @ThreadSafe public final class CounterWait { // Implementation notes. // // The implementation is based on: // // 1) Using an atomic counter value which guarantees consistency. // Since everyone needs only to know when the counter value reached the // certain value and the counter may only increase its value, // it is safe to update the counter by another thread after its value // was read. // // 2) Priority queue of waiters, sorted by their expected values. The smallest // value is always at the top of the queue. The priority queue itself // is thread-safe, so no locks are needed to protect access to it. // // Each waiter is implemented using a binary semaphore. // This solves the problem of a wakeup that happens before the sleep - // in this case the acquire() doesn't block and returns immediately. // // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe, // we are not using its blocking queue semantics. /** Counter value. May only increase. */ private volatile int currentId = 0; /** * Waiters sorted by the value of the counter they are waiting for. * Note that {@link PriorityBlockingQueue} is thread-safe. * We are not using this as a blocking queue, but as a synchronized * PriorityQueue. */ private final PriorityBlockingQueue<ValueEvent> waiters = new PriorityBlockingQueue<>(); /** * Update the counter value and wake up all threads waiting for this * value or any value below it. * <p> * The counter value should only increase. * An attempt to decrease the value is raising * {@link IllegalArgumentException}. * The usual case is to have a single updater thread, but we enforce this * by synchronizing the call. */ public synchronized void inc() { currentId++; // Wake up any threads waiting for a counter to reach this value. wakeup(currentId); } /** * Wait for specified counter value. * Returns immediately if the value is reached or blocks until the value * is reached. * Multiple threads can call the method concurrently. * * @param value requested counter value * @return current counter value that should be no smaller then the requested * value */ public int waitFor(int value) { // Fast path - counter value already reached, no need to block if (value <= currentId) { return currentId; } // Enqueue the waiter for this value ValueEvent eid = new ValueEvent(value); waiters.put(eid); // It is possible that between the fast path check and the time the // value event is enqueued, the counter value already reached the requested // value. In this case we return immediately. if (value <= currentId) { return currentId; } // At this point we may be sure that by the time the event was enqueued, // the counter was below the requested value. This means that update() // is guaranteed to wake us up when the counter reaches the requested value. // The wake up may actually happen before we start waiting, in this case // the event's blocking queue will be non-empty and the waitFor() below // will not block, so it is safe to wake up before the wait. // So sit tight and wait patiently. eid.waitFor(); return currentId; } /** * Wake up any threads waiting for a counter to reach specified value * Peek at the top of the queue. If the queue is empty or the top value * exceeds the current value, we are done. Otherwise wakeup the top thread, * remove the corresponding waiter and continue. * <p> * Note that the waiter may be removed under our nose by * {@link #waitFor(long)} method, but this is Ok - in this case * waiters.remove() will just return false. * * @param value current counter value */ private void wakeup(int value) { while (true) { // Get the top of the waiters queue or null if it is empty ValueEvent e = waiters.poll(); if (e == null) { // Queue is empty - return. return; } // No one to wake up, return event to the queue and exit if (e.getValue() > value) { waiters.add(e); return; } // Due for wake-up call e.wakeup(); } } /** * Return number of waiters. This is mostly useful for metrics/debugging * * @return number of sleeping waiters */ public int waitersCount() { return waiters.size(); } /** * Representation of the waiting event. * The waiting event consists of the expected value and a binary semaphore. * <p> * Each thread waiting for the given value, creates a ValueEvent and tries * to acquire a semaphore. This blocks until the semaphore is released. * <p> * ValueEvents are stored in priority queue sorted by value, so they should be * comparable by the value. */ private static class ValueEvent implements Comparable<ValueEvent> { /** Value waited for. */ private final int value; /** Binary semaphore to synchronize waiters */ private final Semaphore semaphore = new Semaphore(1); /** * Instantiates a new Value event. * * @param v the expected value */ ValueEvent(int v) { this.value = v; // Acquire the semaphore. Subsequent calls to waitFor() will block until // wakeup() releases the semaphore. semaphore.acquireUninterruptibly(); // Will not block } /** Wait until signaled. May return immediately if already signalled. */ void waitFor() { semaphore.acquireUninterruptibly(); } /** @return the value we are waiting for */ long getValue() { return value; } /** Wakeup the waiting thread. */ void wakeup() { semaphore.release(); } /** * Compare objects by value */ @Override public int compareTo(final ValueEvent o) { return value == o.value ? 0 : value < o.value ? -1 : 1; } /** * Use identity comparison of objects */ @Override public boolean equals(final Object o) { return (this == o); } @Override public String toString() { return String.valueOf(value); } } }