## 前言
并发 JUC 包提供了很多工具类,比如之前说的 CountDownLatch,CyclicBarrier ,今天说说这个 Semaphore——信号量,关于他的使用请查看往期文章并发编程之 线程协作工具类 ,今天的任务就是从源码层面分析一下他的原理。
源码分析 如果先不看源码,根据以往我们看过的 CountDownLatch CyclicBarrier 的源码经验来看,Semaphore 会怎么设计呢?
首先,他要实现多个线程线程同时访问一个资源,类似于共享锁,并且,要控制进入资源的线程的数量。
如果根据 JDK 现有的资源,我们是否可以使用 AQS 的 state 变量来控制呢?类似 CountDownLatch 一样,有几个线程我们就为这个 state 变量设置为几,当 state 达到了阈值,其他线程就不能获取锁了,就需要等待。当 Semaphore 调用 release 方法的时候,就释放锁,将 state 减一,并唤醒 AQS 上的线程。
以上,就是我们的猜想,那我们看看 JDK 是不是和我们想的一样。
首先看看 Semaphore 的 UML 结构:
内部有 3 个类,继承了 AQS。一个公平锁,一个非公平锁,这点和 ReentrantLock 一摸一样。
看看他的构造器:
1 2 3 4 5 6 public Semaphore (int permits) { sync = new NonfairSync (permits); } public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync (permits) : new NonfairSync (permits); }
两个构造器,两个参数,一个是许可线程数量,一个是是否公平锁,默认非公平。
而 Semaphore 有 2 个重要的方法,也是我们经常使用的 2 个方法:
1 2 3 semaphore.acquire(); // doSomeing..... semaphore.release();
acquire 和 release 方法,我们今天重点看这两个方法的源码,一窥 Semaphore 的全貌。
acquire 方法源码分析 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
1 2 3 4 protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
这里的释放就是对 state 变量减一(或者更多)的。
返回了剩余的 state 大小。
当返回值小于 0 的时候,说明获取锁失败了,那么就需要进入 AQS 的等待队列了。代码入下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
总的逻辑就是:
创建一个分享类型的 node 节点包装当前线程追加到 AQS 队列的尾部。
如果这个节点的上一个节点是 head ,就是尝试获取锁,获取锁的方法就是子类重写的方法。如果获取成功了,就将刚刚的那个节点设置成 head。
如果没抢到锁,就阻塞等待。
release 方法源码分析 该方法用于释放锁,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void release () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } }
这里释放锁的逻辑写在了抽象类 Sync 中。逻辑简单,就是对 state 变量做加法。
在加法成功后,执行 doReleaseShared
方法,这个方法是 AQS 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
该方法的主要作用就是从 AQS 的 head 节点开始唤醒线程,注意,这里唤醒是 head 节点的下一个节点,需要和 doAcquireSharedInterruptibly
方法对应,因为 doAcquireSharedInterruptibly
方法唤醒的当前节点的上一个节点,也就是 head 节点。
至此,释放 state 变量,唤醒 AQS 头节点结束。
总结 总结一下 Semaphore 的原理吧。
总的来说,Semaphore 就是一个共享锁,通过设置 state 变量来实现对这个变量的共享。当调用 acquire 方法的时候,state 变量就减去一,当调用 release 方法的时候,state 变量就加一。当 state 变量为 0 的时候,别的线程就不能进入代码块了,就会在 AQS 中阻塞等待。