ReentrantLock 源码分析

Table of Contents

环境: jdk8

使用方式

class X {
  private final ReentrantLock lock = new ReentrantLock();
  // ...
  public void m() {
    lock.lock();  // block until condition holds
    try {
      // ... method body
    } finally {
      lock.unlock()
    }
  }
}

ReentrantLock 定义

顾名思义, ReentrantLock 为重入锁, 意味着一个线程可以多次获得锁而不会阻塞, 类的基本定义如下.

// ReentrantLock.java
public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {...}
    /* 非公平实现 */
    static final class NonfairSync extends Sync {...}
    /* 公平实现 */
    static final class FairSync extends Sync {...}

    public ReentrantLock() {
        sync = new NonfairSync(); /* 默认是非公平的 */
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
}

NonfairSync

lock() 方法

注意 Sync 继承于 AbstractQueuedSynchronizer, AQS 是许多并发类的核心.

// ReentrantLock.java
public void lock() {
    sync.lock();
}

// ReentrantLock.java NonfairSync
final void lock() {
    /* 在此模式下 state=0 表示当前没有线程占用锁, state>0 表示当前线程获取了几次锁 */
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    /* 在独占模式中, 获取锁, 如果失败, 则把当前线程加入队列, 阻塞到成功为止 */
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(arg), 尝试获取一次锁

// AbstractQueuedSynchronizer.java
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// ReentrantLock.java NonfairSync
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// ReentrantLock.java Sync
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    /* 再次拿到 state, 判断是否为0, 可能在执行代码的过程中原来占用锁的线程释放了锁 */
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    /* 如果当前线程是持有锁的线程 state+1 */
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在继续往下之前, 先看一下源码文档中的队列图示

     +------+  prev +-----+       +-----+
head |      | <---- |     | <---- |     |  tail
     +------+       +-----+       +-----+

注意: head是一个空节点, 0值的概念

addWaiter(Node.EXCLUSIVE)
创建一个当前线程的 node, 并且加入队列. 该方法一定成功.

  • Node.EXCLUSIVE 独占模式
  • Node.SHARED 共享模式
// AbstractQueuedSynchronizer.java
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    /* 理论上去掉下面的这段代码, 对结果的正确性不会产生影响 */
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) { /* 此时队列已经有其他线程在排队, 直接插入队尾 */
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { /* 原子操作 */
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

/* 将 node 放入队列 */
private Node enq(final Node node) {
    for (;;) { /* 不断尝试, 直到成功为止 */
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node())) /* 初始化 head, 一个'零值'的 node */
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t; /* 成功放入队尾, 结束循环 */
            }
        }
    }
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
独占模式下, 线程已经加入队列, 堵塞直到获取到锁.

// AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /* 仅当上个节点是头节点并且拿到了锁, 则返回 */
            if (p == head && tryAcquire(arg)) {
                setHead(node); /* 把当前node设置成头节点, 并清空数据, 使之成为'零值' */
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) /* 由该方法阻塞线程 */
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/* 只有前驱节点的状态是 SIGNAL 时, 才返回 true, 表示可以 park. 否则返回 false */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

// LockSupport.java
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L); /* 线程阻塞直到获取许可 */
    setBlocker(t, null);
}

// Unsafe.java
public native void park(boolean isAbsolute, long time);

unlock() 方法

// ReentrantLock.java
public void unlock() {
    sync.release(1);
}

// AbstractQueuedSynchronizer.java
public final boolean release(int arg) {
    if (tryRelease(arg)) { /* 尝试释放锁 */
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); /* 尝试唤醒下一个节点 */
        return true;
    }
    return false;
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// ReentrantLock.java Sync
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { /* 因为可重入锁, 所以c可能大于0, 当等于0时, 表示当前线程已经正在释放锁 */
        free = true;
        setExclusiveOwnerThread(null); /* 设置拥有锁的线程为 null */
    }
    setState(c);
    return free;
}

unparkSuccessor(h) 唤醒下一个节点

// AbstractQueuedSynchronizer.java
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    /* 尽可能从头开始找到一个合适的 node */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread); /* 唤醒 node 的线程 */
}

// LockSupport.java
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

// Unsafe.java
public native void unpark(Object thread);

FairSync

详细代码在 NonfairSync 小节内已经贴过了, 本小节只贴变动的代码

lock() 方法

// ReentrantLock.java FairSync
final void lock() {
    /* 对比 NonfairSync 版本, 这里直接调用 acquire(), 避免了线程插队的问题 */
    acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        /* 对比 NonfairSync 版本, 只有这里多了一个判断 */
        /* 仅有队列为空 或者 队列的第一个是本线程时, 才尝试获取锁 */
        /* 否则, 将该线程扔到队列尾部 */
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

hasQueuedPredecessors(): 当前是否有队列在等待

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

unlock() 方法

跟 Nonfair 的版本一样

小结

  •  公平锁: 加锁时检查是否有等待的线程. 如果有则排队. 保证线程的获得锁是先到先得.
  • 非公平锁: 加锁时直接尝试获取锁, 抢不到, 则排队.

至此, ReentrantLock 的主要代码都已看完, 通过这个类, 我们也简单的了解了一下 AQS 类的独占模式, 理解起来还是比较容易的.

Author: Saul Lawliet

Created: 2022-01-13 Thu 22:47