ReentrantLock 源码分析
Table of Contents
环境: jdk8
使用方式
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
ReentrantLock 定义
顾名思义, ReentrantLock 为重入锁, 意味着一个线程可以多次获得锁而不会阻塞, 类的基本定义如下.
// ReentrantLock.java public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {...} /* 非公平实现 */ static final class NonfairSync extends Sync {...} /* 公平实现 */ static final class FairSync extends Sync {...} public ReentrantLock() { sync = new NonfairSync(); /* 默认是非公平的 */ } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } }
NonfairSync
lock() 方法
注意 Sync 继承于 AbstractQueuedSynchronizer, AQS 是许多并发类的核心.
// ReentrantLock.java public void lock() { sync.lock(); } // ReentrantLock.java NonfairSync final void lock() { /* 在此模式下 state=0 表示当前没有线程占用锁, state>0 表示当前线程获取了几次锁 */ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // AbstractQueuedSynchronizer.java public final void acquire(int arg) { /* 在独占模式中, 获取锁, 如果失败, 则把当前线程加入队列, 阻塞到成功为止 */ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg), 尝试获取一次锁
// AbstractQueuedSynchronizer.java protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // ReentrantLock.java NonfairSync protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // ReentrantLock.java Sync final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); /* 再次拿到 state, 判断是否为0, 可能在执行代码的过程中原来占用锁的线程释放了锁 */ int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } /* 如果当前线程是持有锁的线程 state+1 */ else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
在继续往下之前, 先看一下源码文档中的队列图示
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+ 注意: head是一个空节点, 0值的概念
addWaiter(Node.EXCLUSIVE)
创建一个当前线程的 node, 并且加入队列. 该方法一定成功.
- Node.EXCLUSIVE 独占模式
- Node.SHARED 共享模式
// AbstractQueuedSynchronizer.java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); /* 理论上去掉下面的这段代码, 对结果的正确性不会产生影响 */ // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { /* 此时队列已经有其他线程在排队, 直接插入队尾 */ node.prev = pred; if (compareAndSetTail(pred, node)) { /* 原子操作 */ pred.next = node; return node; } } enq(node); return node; } /* 将 node 放入队列 */ private Node enq(final Node node) { for (;;) { /* 不断尝试, 直到成功为止 */ Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) /* 初始化 head, 一个'零值'的 node */ tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; /* 成功放入队尾, 结束循环 */ } } } }
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
独占模式下, 线程已经加入队列, 堵塞直到获取到锁.
// AbstractQueuedSynchronizer.java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); /* 仅当上个节点是头节点并且拿到了锁, 则返回 */ if (p == head && tryAcquire(arg)) { setHead(node); /* 把当前node设置成头节点, 并清空数据, 使之成为'零值' */ p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) /* 由该方法阻塞线程 */ interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /* 只有前驱节点的状态是 SIGNAL 时, 才返回 true, 表示可以 park. 否则返回 false */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } // LockSupport.java public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); /* 线程阻塞直到获取许可 */ setBlocker(t, null); } // Unsafe.java public native void park(boolean isAbsolute, long time);
unlock() 方法
// ReentrantLock.java public void unlock() { sync.release(1); } // AbstractQueuedSynchronizer.java public final boolean release(int arg) { if (tryRelease(arg)) { /* 尝试释放锁 */ Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); /* 尝试唤醒下一个节点 */ return true; } return false; } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // ReentrantLock.java Sync protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { /* 因为可重入锁, 所以c可能大于0, 当等于0时, 表示当前线程已经正在释放锁 */ free = true; setExclusiveOwnerThread(null); /* 设置拥有锁的线程为 null */ } setState(c); return free; }
unparkSuccessor(h) 唤醒下一个节点
// AbstractQueuedSynchronizer.java private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ /* 尽可能从头开始找到一个合适的 node */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); /* 唤醒 node 的线程 */ } // LockSupport.java public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } // Unsafe.java public native void unpark(Object thread);
FairSync
详细代码在 NonfairSync 小节内已经贴过了, 本小节只贴变动的代码
lock() 方法
// ReentrantLock.java FairSync final void lock() { /* 对比 NonfairSync 版本, 这里直接调用 acquire(), 避免了线程插队的问题 */ acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { /* 对比 NonfairSync 版本, 只有这里多了一个判断 */ /* 仅有队列为空 或者 队列的第一个是本线程时, 才尝试获取锁 */ /* 否则, 将该线程扔到队列尾部 */ if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
hasQueuedPredecessors(): 当前是否有队列在等待
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
unlock() 方法
跟 Nonfair 的版本一样
小结
- 公平锁: 加锁时检查是否有等待的线程. 如果有则排队. 保证线程的获得锁是先到先得.
- 非公平锁: 加锁时直接尝试获取锁, 抢不到, 则排队.
至此, ReentrantLock 的主要代码都已看完, 通过这个类, 我们也简单的了解了一下 AQS 类的独占模式, 理解起来还是比较容易的.