JDK Concurrent包源码阅读3——Lock

Posted by 皮皮潘 on 11-26,2021

锁的可重入性:在Concurrent包中的锁都是可重入锁,所谓的可重入就是说,当A线程拿到一个锁的时候,该线程去调用另外一个需要该锁的操作的时候不会被阻塞

锁的整体继承层次如下图所示,其中ReentrantLock本身其实没有什么代码逻辑,其实现都在其内部类Sync中,lock,tryLock以及unLock方法实际上都是调用了Sync集成AQS的acquire,tryAcquire以及release方法:
Image.png
公平锁与非公平锁:公平锁会按照请求获取的顺序在队列中排队,而非公平锁则在第一次请求获取的时候直接尝试获取锁,当尝试失败后才会入队,一般才有非公平锁因为非公平锁可以提高效率,减少线程切换(直接抢锁的线程不需要陷入waiting状态了,而是直接继续执行)

锁的核心在sync,而sync的核心在于其父类AQS(AbstractQueuedSynchronizer),而整体的核心要素在于以下几点:

    1. 需要一个state变量,标记锁的状态(在AQS中存在state变量,通过CAS修改state的值)
    1. 需要记录当前是哪个线程持有锁(在AQS中存在exclusiveOwnerThread变量记录锁被哪个线程持有)
    1. 需要底层支持对一个线程进行阻塞或唤醒操作(LockSupport对于Unsafe.park和Unsafe.unpark方法进行了封装)
    1. 需要有一个线程安全的无锁队列维护所有阻塞的线程(AQS中利用双向链表和CAS实现了一个阻塞队列,它是AQS中核心的核心)

在AQS中最核心的方法是acquire(int args)方法,其实现如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(), arg)) {
        selfInterrupt();
    }
}

其中tryAcquire是一个虚函数,其目的就是尝试拿锁,不同锁(重入锁,读写锁,CountdownLatch,Semphere)的具体实现不同,最简单的NonfaiSync的实现如下,其核心就是检查(检查不通过直接返回失败)并修改state状态(通过CAS保证修改的原子性,修改失败也直接返回失败):

int c = getState();
if (c == 0) {
    if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
    }
}
else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
        throw new Error("Maximum lock count exceeded");
    setState(nextc);
}

addWaiter方法就是为当前线程生成一个Node然后入队,其中入队操作会顺便执行链表的初始化并通过自旋+CAS保证原子性,其具体实现如下:

for (;;) {
    Node t = tail;
    if (t == null) { // 检查初始化情况
        if (compareAndSetHead(new Node()))
            tail = head;
    } else {
        node.prev = t;
        if (compareAndSetTail(t, node)) {
            t.next = node;
            return t;
        }
    }
}

acquireQueued方法会通过LockSupport.park()方法无限期阻塞等待,一直到对应的Node节点到达头节点,这里需要注意的是LockSupport.park()会被unpark以及interrupt打断,但是由于锁是设计成不响应interrupt的,因此在被interrupt打断后只是简单记录一下interrupt状态然后继续waiting,其具体实现如下:

boolean interrupted = false;
for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
        setHead(node); p.next = null; // help GC
        return interrupted;
    }
    LockSupport.park(this);
    if (Thread.interrupted()) interrupted = true;
}

在AQS中另外一个核心的方法是release(int arg)方法,该方法负责解锁,其具体实现如下:

if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
        Node s = h.next;
        if (s != null) LockSupport.unpark(s.thread);
    return true;
}
return false;

其中tryReleas是一个虚函数,它的目的就是去修改对应的state,当state为0时则返回成功,然后触发队列中下一个等待的线程,其具体实现由不同的锁具体决定,另外由于是先修改了state再去唤醒下一个等待的线程的,且其中没有加任何的锁(也不可能加锁)因此此时可能存在锁被新到的线程抢占的可能,另外需要注意的一点是,release方法并没有去处理head头的修改,该修改交给了成功拿到锁的线程去实现,如果拿到锁的线程是插队的线程则不修改头,这样一方面在下次调用tryRelease方法的时候仍然能够唤醒原先的下一个等待的线程,一方面也不用给等待队列实现复杂的插入操作而只需要实现简单的入队操作就可以了,拿到锁的线程如果是等候的线程则会在acquireQueued方法中调用setHead方法将head头修改为自己对应的节点

总之,不同的锁本质上就是对于state状态的定义不同(比如读写锁将state拆成了两部分,高16位代表写锁状态,低16位代表读锁状态,至于为什么不使用两个变量来表示不同的状态一方面是因为CAS一次只能操作一个变量,一方面是AQS的实现中只提供了一个state变量),并且由此实现的tryAcquire和tryRelease的方法不同罢了,核心的阻塞、入队、唤醒、出队逻辑都已经在AQS中通过模板方法的模式实现好了,只需要实现tryAcquire和tryRelease两个核心方法就行了

另外Condition的await和signal是基于自身的队列(用来存储等待wait的线程)+AQS的队列(用来查看是否持有对应的锁)通过两个队列实现的(ConditionObject),同Object的wait和notify一样,使用Condition的和await和signal方法前都需要先持有Condition对应的Lock的锁(Condition是通过Lock.newCondition()方法获得的,它会生成一个新的ConditionObject)
在调用Condition.await()的时候首先会去放锁(查看当前线程是否持有锁就是在这里检查的)并将对应的Thread从AQS的队列中移除再加入ConditionObject中的队列,然后调用LockSupport.park使线程进入Waiting状态,注意此时await()方法中后续还有需要执行的代码用来重新获取锁

在调用Condition.signal()的时候会把ConditionObject自己队列中等待的所有线程都出队并重新加入AQS的队列然后唤醒对应的线程,唤醒的线程会继续执行之前在Condition.await()方法中被挂起的地方之后的代码(acquireQueued方法),基于AQS本身的机制去争抢锁,从而使得运行Condition.await()之后的方法时一定是带着锁的,因为这里会有一个争抢锁的问题,因此一般都会在Condition.await()方法返回之后再检查一下Race Condition,如果不满足(被别的早唤醒且抢到锁的线程捷足先登了)则重新await()。

最后给出一个基于Condition实现的BlockingQueue,它使得对应的notify信号可以精准地给到wait的线程,而避免”惊群效应“:

void put(E e) {
    lock.lock();
    while (count == items.length) notFull.wait();
    doPut(e);
    notEmpty.signal();
}

E take() {
    lock.lock();
    while (count == 0) notEmpty.wait();
    E res = doTake();
    notFull.signal();
    return res;
}