Semaphore 源码分析
Table of Contents
环境: jdk8
使用示例
class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } }
Semaphore 定义
// Semaphore.java public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { ... } /* 非公平实现 */ static final class NonfairSync extends Sync {...} /* 公平实现 */ static final class FairSync extends Sync {...} public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
NonfairSync
acquire()
// Semaphore.java public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // AbstractQueueSynchronizer.java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) /* 可中断的 */ throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
tryAcquireShared(arg), 尝试减掉许可数量
// AbstractQueueSynchronizer.java protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // Semaphore.java NonfairSync protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // Semaphore.java Sync final int nonfairTryAcquireShared(int acquires) { for (;;) { /* 自旋, 直到成功 */ int available = getState(); int remaining = available - acquires; if (remaining < 0 || /* 可用数量 < 0, 交由后续判断 */ compareAndSetState(available, remaining)) /* 可用数量 >= 0, 并且保存最新数量成功 */ return remaining; } }
doAcquireSharedInterruptibly(arg),
// AbstractQueueSynchronizer.java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); /* 该方法保证将节点插入到队尾, 已在 ReentrantLock 里分析过了 */ boolean failed = true; try { for (;;) { /* 阻塞, 直到成功返回 */ final Node p = node.predecessor(); if (p == head) { /* 只有前驱节点是head时, 才能继续 */ int r = tryAcquireShared(arg); /* 获取可用数量, >=0表示拿到了许可 */ if (r >= 0) { setHeadAndPropagate(node, r); /* 修改头节点, 并且向后传递共享状态 */ p.next = null; // help GC failed = false; return; } } /* 同独占模式, 可以 park 时, 则 park */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* 设置头节点 */ /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); /* 我们在 release 小节分析 */ } }
release()
// Semaphore.java public void release() { sync.releaseShared(1); } // AbstractQueuedSynchronizer.java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // Semaphore.java Sync protected final boolean tryReleaseShared(int releases) { for (;;) { /* 不断尝试, 直到设置成功, 或越界 */ int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } // AbstractQueuedSynchronizer.java private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { /* 头节点的 waitStatus 是 SIGNAL 时, CAS算法保证设置成0时, 才能唤醒后续节点 */ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && /* 头节点的 waitStatus 是 0 时, CAS算法设置成 PROPAGATE, 表示共享状态向后传播 */ !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
FairSync
详细代码在 NonfairSync 小节内已经贴过了, 本小节只贴变动的代码
acquire()
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) /* 如有有节点在排队, 则返回负数. 后续直接入队, 防止插队. */ return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
release()
跟 Nonfair 的版本一样.
小结
同样, Semaphore 也分公平模式和非公平模式, 实现这两种模式的代码也很简单.
至此, Semaphore 的主要代码都已看完, 通过这个类, 我们也简单的了解了一下 AQS 类的共享模式.