## 前言
Condition 是 Lock 的伴侣,至于如何使用,我们之前也写了一些文章来说,例如 使用 ReentrantLock 和 Condition 实现一个阻塞队列,并发编程之 Java 三把锁, 在这两篇文章中,我们都详细介绍了他们的使用。今天我们就来深入看看源码实现。
构造方法
Condition
接口有 2 个实现类,一个是 AbstractQueuedSynchronizer.ConditionObject
,还有一个是 AbstractQueuedLongSynchronizer.ConditionObject
,都是 AQS 的内部类,该类结构如下:
几个公开的方法:
- await()
- await(long time, TimeUnit unit)
- awaitNanos(long nanosTimeout)
- awaitUninterruptibly()
- awaitUntil(Date deadline)
- signal()
- signalAll()
今天我们重点关注 2 个方法,也是最常用的 2 个方法: await 和 signal。
await 方法
先贴一波代码加注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
|
总结一下这个方法的逻辑:
- 在 Condition 中, 维护着一个队列,每当执行 await 方法,都会根据当前线程创建一个节点,并添加到尾部.
- 然后释放锁,并唤醒阻塞在锁的 AQS 队列中的一个线程.
- 然后,将自己阻塞.
- 在被别的线程唤醒后, 将刚刚这个节点放到 AQS 队列中.接下来就是那个节点的事情了,比如抢锁.
- 紧接着就会尝试抢锁.接下来的逻辑就和普通的锁一样了,抢不到就阻塞,抢到了就继续执行.
看看详细的源码实现,Condition 是如何添加节点的?addConditionWaiter 方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
|
创建一个当前线程的节点,追加到最后一个节点中. 当然,其中还有一个 unlinkCancelledWaiters 方法的调用,当最后一个节点失效了,就需要清理 Condition 队列中无效的节点,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
|
那么又是如何释放锁,并唤醒 AQS 队列中的一个节点上的线程的呢?fullyRelease 方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
|
而 release 方法中,又是如何操作的呢?代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
release 方法主要调用了 tryRelease 方法,该方法就是释放锁的。tryRelease 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
|
好了,我们大概知道了 Condition 是如何释放锁的了,那么又是如何将自己阻塞的呢?在将自己阻塞之前,需要调用 isOnSyncQueue 方法判断,代码如下:
1 2 3 4 5 6 7 8 9 10 11
| final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) return true; return findNodeFromTail(node); }
|
实际上,第一次总是会返回 fasle,从而进入 while 块调用 park 方法,阻塞自己,至此,Condition 成功的释放了所在的 Lock 锁,并将自己阻塞。
虽然阻塞了,但总有人会调用 signal 方法唤醒他,唤醒之后走下面的 if 逻辑,也就是 checkInterruptWhileWaiting 方法,看名字是当等待的时候检查中断状态,代码如下:
1 2 3 4 5
| private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
|
当线程中断了,就需要根据调用 transferAfterCancelledWait 方法的返回值来返回不同的常量,该方法内部逻辑是怎么样的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
|
其实是尝试将自己放入到队列中。如果无法放入,就自旋等待 signal 方法放入。
回到 await 方法,继续向下走,执行 3 个 if 块,第一个 if 块,尝试拿锁,为什么?因为这个时候,这个线程已经被唤醒了,而且他在 AQS 的队列中,那么,他就需要在醒的时候,去拿锁。acquireQueued 方法是拿锁的逻辑,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
|
注意,如果这里拿不到锁,就会在 parkAndCheckInterrupt 方法中阻塞。这里和正常的 AQS 中的队列节点是一摸一样的,没有特殊。
这里有个 tryAcquire 方法需要注意一下: 这个就是尝试拿锁的逻辑所在。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
主要逻辑是设置 state 变量,将锁的持有线程变成自己。这些是在没有比自己等待时间长的线程的情况下发生的。意思是,优先哪些等待时间久的线程拿锁。当然,这里还有一些重入的逻辑。
后面的两个 if 块就简单了,如果 Condition 中还有节点,那么就尝试清理无效的节点,调用的是 unlinkCancelledWaiters 方法,这个方法,我们在上面分析过了,就不再重复分析了。
最后,判断是否中断,执行 reportInterruptAfterWait 方法,这个方法可能会抛出异常,也可能会对当前线程打一个中断标记。
代码如下:
1 2 3 4 5 6 7 8 9 10 11
| private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); }
|
好,关于 await 方法就分析完了,可以看到,楼主贴了很多的注释,事实上,都是为了以后能更好的复习,也方便感兴趣的同学在看源码的时候结合我的注释一起分析。
这个方法的总结都在开始说了,就不再重复总结了。后面,我们会结合 signal 方法一起总结。
再来看看 signal 方法实现。
signal 方法
代码如下:
1 2 3 4 5 6 7 8 9
| public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
|
很明显,唤醒策略是从头部开始的。
看看 doSignal(first) 方法实现:
1 2 3 4 5 6 7 8 9 10 11 12
| private void doSignal(Node first) { do { if ((firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
|
重点在于 transferForSignal 方法,该方法肯定做了唤醒操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
果然,看到了 unpark 操作,该方法先是 CAS 修改了节点状态,如果成功,就将这个节点放到 AQS 队列中,然后唤醒这个节点上的线程。此时,那个节点就会在 await 方法中苏醒,并在执行 checkInterruptWhileWaiting 方法后开始尝试获取锁。
总结
总结一下 Condition 执行 await 和 signal 的过程吧。
首先,线程如果想执行 await 方法,必须拿到锁,在 AQS 里面,抢到锁的一般都是 head,然后 head 失效,从队列中删除。
在当前线程(也就是 AQS 的 head)拿到锁后,调用了 await 方法,第一步创建一个 Node 节点,放到 Condition 自己的队列尾部,并唤醒 AQS 队列中的某个(head)节点,然后阻塞自己,等待被 signal 唤醒。
当有其他线程调用了 signal 方法,就会唤醒 Condition 队列中的 first 节点,然后将这个节点放进 AQS 队列的尾部。
阻塞在 await 方法的线程苏醒后,他已经从 Condition 队列总转移到 AQS 队列中了,这个时候,他就是一个正常的 AQS 节点,就会尝试抢锁。并清除 Condition 队列中无效的节点。
下面这张图说明了这几个步骤。
好啦,关于 Condition 就说到这里啦,总的来说,就是在 Condition 中新增一个休眠队列来实现的。只要调用 await 方法,就会休眠,进入 Condition 队列,调用 signal 方法,就会从 Condition 队列中取出一个线程并插入到 AQS 队列中,然后唤醒,让这个线程自己去抢锁。