并发编程锁之ReentrantLock总结

  1. 云栖社区>
  2. 博客>
  3. 正文

并发编程锁之ReentrantLock总结

一心飞扬 2018-02-07 11:14:33 浏览1059
展开阅读全文

本文作者: Zhang
本文链接: http://blog.reactor.top/2018/01/31/并发编程锁之ReentrantLock总结/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议。转载请注明出处!

之前我讲过,在并发编程中一个比较难解决的就是共享资源并发访问控制问题。如果同步做的不好,很容易出现不一致问题,从而导致业务逻辑的错误;但是如果对共享资源控制的过于严格,又很容易对性能造成很大的影响。在并发编程中,一方面多从大牛的源码中学习精巧的思想和结构设计;另一方面要对并发基础知识掌握的足够牢固,你才能游刃有余的结合些设计模式、架构思想做出些高质量的高并发、高性能的系统。并发编程对设计模式、架构设计是非常依赖的,因此,并发编程对经验的积累、知识的积累方面要求是比较高的。

刚才说过,共享资源在并发访问中很容易造成不一致问题,解决方案就是我们所熟知的悲观锁和乐观锁。悲观锁就如其名字一样:悲观锁认为并发访问一定会导致状态不一致问题,所以在并发操作前一定要锁住资源,让并发线程一个接一个串行化去访问。而乐观锁就不一样了,乐观锁认为并发访问在大多数情况下是不会导致状态不一致问题,所以可以放心的去访问,一旦出现问题再说,本质上乐观锁是不会对共享资源添加锁限制的。这是我个人的理解,可能直白不是那么的精准,有关悲观锁、乐观锁的定义可以搜索更准确的官方解释。

我们来看看在Java中是如何实现悲观锁和乐观锁的。悲观锁在Java中就是我们所熟知的锁,实现方式主要分为两种:synchronized和Lock,而乐观锁的实现主要通过CAS操作实现。这里我们来比较下synchronized和Lock方式的大致差别:
​ 1、synchronized主要依赖JVM底层实现,而Lock是通过编码方式实现,其实现方式差别还是比较大
​ 2、synchronized由于其简单方便,只需要声明在方法、代码块上即可,主要是不需要关心锁释放问题,在一般的编程中使用量还是比较大的,但是在真正的并发编程系统中,Lock方式明显优于synchronized:
​ a.在高版本JDK中,已经对synchronized进行了优化,synchronized和Lock方式在性能方面差别已不太明显
​ b.synchronized最致命的缺陷是:synchronized不支持中断和超时,也就是说通过synchronized一旦被阻塞住,如果一直无法获取到所资源就会一直被阻塞,即使中断也没用,这对并发系统的性能影响太大了;Lock支持中断和超时、还支持尝试机制获取锁,对synchronized进行了很好的扩展,所以从灵活性上Lock是明显优于synchronized的

在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),ReentrantLock在锁的使用上算是非常普遍的,这一节我们就以ReentrantLock为例,分析下Lock实现方式。

ReentrantLock

ReentrantLock是一个可重入的互斥锁,所谓可重入是线程可以重复获取已经持有的锁。锁基本上都是要支持可重入性,否则很容易出现死锁问题。比如:假如锁B不支持可重入性,线程A在持有锁B的情况下再次获取锁B,由于不支持可重入性导致线程A被阻塞,知道锁B资源被释放,但是锁B资源是被线程A持有的,所以线程A永远无法因获取到锁B而被唤醒,这就导致了死锁问题。

ReentrantLock内部实现主要通过AbstractQueuedSynchronizer类实现的,AbstractQueuedSynchronizer是抽象类,在ReentrantLock类中有两个实现类:NonfairSync和FairSync,分别对应非公平锁和公平锁的实现。类结构关系如下:

                  +-------------------------------+
                  |  AbstractQueuedSynchronizer   |
                  +--------------^----------------+
                                 |
                                 |
                   +------------------------------+
                   |           Sync               |
                   +--------^----------^----------+
                            |          |
                            |          |
      +-----------------------+      +-----------------------+
      |   NonfairSync         |      |     FairSync          |
      +-----------------------+      +-----------------------+

ReentrantLock类内部持有一个Sync类型的变量,主要实现基本上都是调用Sync的实现机制,默认构建的是NonfairSync,即非公平锁,也可以通过带Boolean类型的构造函数构建公平锁,源码如下:

/**
* 1、默认创建的非公平锁,性能更高,等价于ReentrantLock(false)
*/
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
* @param fair true:公平锁    false:非公平锁
*/
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

获取锁原理

使用ReentrantLock时,一般流程大致为:1、调用lock()申请锁资源,申请成功则立即返回,如果申请不到则会阻塞直到申请成功;2、申请锁成功后,即可进行共享资源操作;3、共享资源操作完成,最后调用unlock()释放锁资源。

/**
    * 1、获取锁,如果该锁没有被其它线程持有则立即返回,并设置该lock的hold count=1
    * 2、如果当前线程已经持有该锁,则lock的count+1并立即返回
    * 3、如果该锁被其它线程持有,则当前线程处于休眠直到获取锁,获取锁同时设置hold count=1
    * 4、sync.lock()会调用Sync具体实现类NonfairSync、FairSync中的lock()
*/
public void lock() {
    sync.lock();
}

ReentrantLock.lock()源码非常简单,调用sync.lock(),这就体现了ReentrantLock核心机制都是在Sync中实现的,上面已说过ReentrantLock中的Sync中有两个子类分别对应公平锁和非公平锁,这里我们就来先看非公平锁NonfairSync的实现:

/**
* state在AbstractQueuedSynchronizer类中定义,表示当前锁状态:
*
*  0:当前锁锁未被任何线程持有,线程获取锁资源时可以使用CAS原子操作compareAndSetState(0, 1)
*     将state由0修改成1,修改成功表示获取锁成功,state这时被修改成1了,并将exclusiveOwnerThread设置成当前Thread,
*     exclusiveOwnerThread即表示持有锁的线程
*
*  >0:表示当前锁被线程持有,因为ReentrantLock是重入锁,同一个线程可以重入多次锁,每重入一次state加1,
*      同样在释放锁资源release()的时候,每释放一次state减1,直到state=0表示全部释放完成,可以被其它线程竞争使用
*      state大于0时CAS原子操作compareAndSetState(0, 1)会失败,即进入另一分支
*/
final void lock() {
    if (compareAndSetState(0, 1))
    //当前线程获取锁成功,并将当前线程赋值给exclusiveOwnerThread变量
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);//该分支则表示已有线程持有当前锁
}

这里的逻辑也很简单,通过一个CAS操作将state由0设置成1,成功则获取锁成功,并让Sync的exclusiveOwnerThread变量持有当前线程,供后续当前线程重入使用;如果CAS操作失败,则表示存在竞争,已有线程获取到锁,当前线程获取锁失败,需要进入acquire(1)分支。可以看到ReentrantLock的核心就是通过state字段的值判断是否被占用。

/**
* 1、调用tryAcquire尝试获取锁,注意:tryAcquire在AbstractQueuedSynchronizer类中实现直接抛出异常,一般是子类NonfairSync、FairSync继承重写该方法
* 2、tryAcquire会做如下尝试:
*      a.如果state=0表示当前锁又没有被线程所持有,重新获取一次锁,成功返回true,失败返回false
*      b.如果持有锁的线程就是当前线程,则将state累加1,用于记录重入次数,释放的时候也要全部释放,并返回true表示获取锁成功
*      c.非以上两种情况,直接返回fasle
* 3、如果获取锁成功,即tryAcquire返回true,则直接返回
* 4、如果获取锁失败,即tryAcquire返回false,则将当前线程封装成Node放入到Sync Queue里(调用addWaiter),等待Signal信号
* 5、调用acquireQueued进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
* 4、根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt)
*/
public final void acquire(int arg) {
        //acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

AbstractQueuedSynchronizer中acquire()逻辑大致如下:

​ 1、首先调用tryAcquire(),该方法的目的主要是:a.重新自旋一次获取下锁看看是否成功,成功则返回true;b.判断持有锁的线程是否就是当前线程,如果是的话,直接在state累加1,并返回true表示获取锁成功;c.如果上述两个目的都没有实现,则返回false,表示获取锁失败。注意:tryAcquire()在AbstractQueuedSynchronizer中是直接抛出异常,具体调用的是子类NonfairSync中的实现逻辑,源码如下:

/**
* 1、该方法重新获取锁state,如果state=0表示当前又没有线程持有该锁了,则重新获取一次锁,成功返回true,失败返回false
* 2、如果持有该锁的线程就是当前线程,则也会返回true表示获取锁成功,并将state累加acquires
* 3、否则返回false表示获取锁失败
*/
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {//如果等于0表示此刻已没有线程持有该锁,所以重新获取一次锁,成功则立即返回true,否则立即返回false
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }else if (current == getExclusiveOwnerThread()) {
        //如果持有该锁的线程就是当前线程,则将state+1,然后立即返回true表示获取锁成功,这是可重入锁特性
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);//修改state值,此处只会持有锁的线程才会执行,不存在多线程竞争情况,所以通过setState修改,而非CAS,这段代码实现了偏向锁的功能
        return true;
    }
    return false;
}

​ 2、如果tryAcquire()获取锁失败则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)语句,首先我们来看下addWaiter(Node.EXCLUSIVE),它的作用就是将无法获取锁的线程追加到一个双向链表中,然后让线程休眠,当锁资源可用时会从该双向链表头部唤醒一个线程去竞争锁资源,其源码如下:

/**
* 1、将无法获取锁的当前线程封装成Node加入到Sync Queue里面,其中参数mode是独占锁还是共享锁,null表示独占锁。如果是读写锁mode就为共享锁模式
* 2、Sync Queue实现的一个双向链表,包含head和tail分别指向头部和尾部Node,head和tail设置为了volatile,这两个节点的修改将会被其他线程看到,主要是通过修改这两个节点来完成入队和出队
* 3、当新加入Node时,将tail指向新加入的Node,同时之前的Node的next指向新Node,新Node的pre指向之前的tail节点,即在双向链表上添加节点成功
* 4、总而言之,addWaiter的目的就是通过CAS把当前线程封装的Node追加到队尾,并返回该Node实例。
* 5、把线程要包装为Node对象的主要原因:
*        a.构造双向链表时,需要指针前驱节点和后驱节点
*        b.Node中mode用于区分是否是排它模式还是共享模式
*        c.Node中的waitStatus用于表示当前线程状态:
                    SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要重新唤醒这个后继线程
                    CANCELLED(1):因为超时或中断,该线程已经被取消,其前驱节点释放锁后不会让处于该种状态的线程去竞争锁资源
                    CONDITION(-2):表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞
                    PROPAGATE(-3):传播共享锁
                    0:0代表无状态,默认Node就是该种状态,当存在后驱节点追加,就去把其前驱节点设置成SIGNAL
*/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {//默认head = tail = null, tail !=null说明队列中已经有节点,直接CAS到尾节点
        /**
        * 将当前Node追加到Queue尾部:
        *      1、将当前node的前驱节点设置成tail节点
        *      2、通过cas操作将tail指针指向当前node
        *      3、并将之前的tail节点的next指向当前node
        * 通过上面三步骤,即将一个node追加到双向链表的尾部
        */
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {//4.CAS node到tail
            pred.next = node;
            return node;
        }
    }
    enq(node);//执行到这里,表明Queue队列为空,调用enq会初始化队列并将当前node追加到尾部
    return node;
}

/**
* 1、这里通过一个死循环方式调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾
* 2、第一次循环tail肯定为null,则会初始化一个默认的node,并将head=tail指向该node
* 3、第二次循环的时候,会将当前node追加到1中创建的node尾部
*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { //1.队列为空,初始化一个dummy节点,其实和ConcurrentLinkedQueue一样
            if (compareAndSetHead(new Node()))
            tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

​ 3、addWaiter(Node.EXCLUSIVE)将当前线程封装的Node节点添加到等待锁资源的Queue上后,接下来要执行acquireQueued(),该方法是获取锁逻辑比较核心的一个方法,关键点有如下几个:
​ a.该方法被设计成了一个无限for循环,只有满足通过tryAcquire()获取到锁时才会退出该循环,当然如果没有获取到锁也不会一直在for循环中进行空循环,而是通过parkAndCheckInterrupt()让线程休眠,当线程被唤醒后才会执行一次for循环看是否可以获取锁,获取成功则会将Queue的head指针指向当前thread,并将之前head废弃
​ b.在tryAcquire()竞争锁资源时,会存在p == head判断,判断当前线程的前驱节点是否是head节点,只有前驱是head节点的线程才有资格调用tryAcquire()去竞争锁资源,这个设计思想逻辑主要是:申请锁资源失败的线程会依次加入到Queue中,head指向头部,tail指向尾部,如果head不为空,则该节点代表的线程为锁的占有者,当该线程释放锁时,它会唤醒它的后驱节点,而不是Queue中所有线程,因此,每次释放锁时只会唤醒一个线程,唤醒顺序也是从head到tail依次唤醒,而不是存在锁资源时一起唤醒然后竞争锁资源,因为这样如果存在几百几千个线程,同时竞争锁资源对系统性能损耗很大,有效的避免性能风暴
​ c.该方法在让线程真正休眠前会让线程再次自旋一次获取锁,如果还是失败则立即进入休眠状态,作者这么设计就体现了:让线程休眠还是比较耗费性能资源的,涉及到上下文切换,另外当线程唤醒时可能会被分配到其它CPU上执行,由于高速缓存L1、L2是CPU独有的,就会降低高速缓存命中率,对性能影响还是比较大的,因此能尽量不休眠就不会让线程休眠
​ d.当有线程释放锁时,会唤醒Queue头部线程的后驱节点,唤醒后依然要竞争锁,竞争的对象是刚申请锁资源还没有进入到Queue等待队列的线程们,如果竞争失败则再次进入休眠状态,这就体现了非公平锁的特性,这么设计的目的:主要从性能考虑,如果新申请锁的线程可以立即获取到锁,避免了后续一系列创建Node、添加Node到队列等一些列操作,而从Queue中唤醒的线程没有申请到锁只是重新进入休眠,代价要小很多,同时让它们一起竞争锁资源避免Queue等待队列中的线程一直无法获取锁而被饿死情况

/**
* 1、该方法是一个无限死循环,只有保证前驱节点就是头节点,并且重新调用一次tryAcquire()获取锁并成功,才会推送该循环
* 2、否则会执行shouldParkAfterFailedAcquire()将当前node的前驱节点的waitStatus设置成SIGNAL,表示当它的前驱节点释放锁后会唤醒当前线程,然后当前线程就可以放心的让自己休眠了
* 3、调用shouldParkAfterFailedAcquire()时,由于默认前驱节点的waitStatus不等于SIGNAL,所以会将前驱节点设置成SIGNAL,但是注意这时的返回结果是false,表示并不会立即让当前线程进入休眠状态,而是重新执行一次for循环,相当于给了一次重新获取锁的机会,如果获取锁成功,则将head节点指向当前节点,之前头结点就废弃了;如果获取失败则调用parkAndCheckInterrupt()让线程真正进入休眠状态
4、parkAndCheckInterrupt()中调用LockSupport.park()让当前线程休眠,客户端也就进入阻塞状态,注意这里有个关键点:当休眠状态的线程被唤醒后,需要再次执行一次for循环通过tryAcquire()来竞争锁资源,竞争成功则退出当前for循环,当然也有可能会竞争失败,如果竞争失败会再次进去休眠状态
*
*    Queue队列中的线程是按照从头到尾部的顺序依次唤醒的,每次只会唤醒Queue中的一个线程,为什么还会出现竞争呢?这是因为虽然从Queue中只会唤醒一个线程,但是假如同时又有一个线程执行lock来获取锁资源,而此时并没有放入Queue等待队列中,它就会和从Queue中唤醒的线程进行竞争锁资源,这就体现了非公平锁的特性:后申请锁资源的线程可能会比先申请锁资源的线程优先申请到锁资源。
*    为什么要这么设计呢?
*    主要从性能考虑,如果新申请锁的线程可以立即获取到锁,避免了后续一系列创建Node、添加Node到队列等一些列操作,而从Queue中唤醒的线程没有申请到锁只是重新进入休眠,代价要小很多,同时让它们一起竞争锁资源避免Queue等待队列中的线程一直无法获取锁而被饿死情况
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //1.获取当前节点的前驱节点
            final Node p = node.predecessor();
            //2.判断前驱节点是否是head节点(前继节点是head, 存在两种情况:
            //      a.前继节点现在占用lock
            //      b.前继节点是个空节点,已经释放lock,node现在有机会获取lock; 则再次调用tryAcquire尝试获取一下锁,该源码之前已经分析过
            if (p == head && tryAcquire(arg)) {
                setHead(node);//3.获取lock成功,直接设置新head(原来的head可能就直接被GC回收)
                p.next = null; // help GC
                failed = false;
                return interrupted;//4.返回在整个获取的过程中是否被中断过;若整个过程中被中断过,则最后我在自我中断一下(selfInterrupt),因为外面的函数可能需要知道整个过程是否被中断过
            }
            //shouldParkAfterFailedAcquire(p, node)返回当前线程是否需要挂起,如果需要则调用parkAndCheckInterrupt()让当前线程休眠
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//6.parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈,返回值判断是否这次线程的唤醒是被中断唤醒
                    interrupted = true;
            }
        } finally {
            if (failed)//7.在整个获取中出错
                cancelAcquire(node);//8.清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除)
        }
}

private final boolean parkAndCheckInterrupt() {
    //利用LockSupport的park方法来挂起当前线程的,直到被唤醒。
    LockSupport.park(this);
    return Thread.interrupted();
}

看到这里,基本上对ReentrantLock非公平锁获取锁资源的流程有一个比较清晰的认识了。公平锁和非公平锁流程基本一致,区别只是在tryAcquire()获取锁逻辑的差别,具体如下:

FailSync类中的tryAcquire()获取锁逻辑:
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
        }
    }else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

可以看到,当锁资源可用(state=0)并且当Queue队列中不存在等待锁资源的线程时,才会通过cas操作将state由0设置成1,表示申请锁资源成功,否则都将加入到Queue队列的尾部。对比非公平锁获取锁资源逻辑nonfairTryAcquire(),差别主要是:非公平锁只要判断锁资源可用就会立即通过cas操作获取锁资源,而公平锁则会在锁资源可用的情况下,还要满足Queue队列中无等待锁资源线程才能立即申请锁资源,否则会被追加到Queue队列的尾部,这就体现了公平特性。

上面已经将ReentrantLock.lock()获取锁的流程基本都 分析完成,当然ReentrantLock还提供lockInterruptibly()、tryLock()、tryLock(long timeout, TimeUnit unit)等,如果对lock()逻辑比较清楚,这些方式获取锁的原理就比较简单了,下面大致说下。

ReentrantLock.lockInterruptibly():可中断方式获取锁,通过之前源码分析,线程如果没有获取到锁,会通过LockSupport.park()方式休眠,当锁资源释放时,其它线程调用Lock.unpark()唤醒休眠线程去竞争锁资源,但是LockSupport.park()休眠的线程也可以通过中断方式进行唤醒,可中断锁就是在唤醒时候判断如是中断唤醒,则直接抛出异常,而lock()方式获取的锁使用中断唤醒后直接去竞争锁资源了,竞争不到直接休眠,这就是它们的差别,具体实现看源码:

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//进入阻塞状态,阻塞解除时,返回true表示中断方式唤醒
                    throw new InterruptedException();//中断唤醒时,不会去竞争锁资源,而是直接抛出异常
            }
        } finally {
            if (failed)
                cancelAcquire(node);
}

ReentrantLock.tryLock():可尝试性获取锁,获取到返回true,获取不到直接返回fasle,而不会阻塞,实现方式就更简单了,直接nonfairTryAcquire()获取锁,获取到立即返回true,获取不到立即返回false,而不是添加到Sync Queue阻塞队列中去等待。

ReentrantLock.tryLock(long timeout, TimeUnit unit):会尝试一段时间,这段时间都无法获取锁就返回false

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;//超时时间
          //加入到Sync Queue,这里主要是性能优化,下面获取锁的逻辑在for无线循环中,如果超时时间设置较长
        //一直无线循环下去肯定浪费CPU资源,所以会进行休眠,等待前驱节点释放锁时会唤醒该线程
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)//如果超时,直接返回false,表示获取锁失败
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)//如果超时时间现在大于1000纳秒,就会进入休眠,否则就不停的for循环,因为超时时间太短,没必要进行休眠,比较休眠还是比较耗费资源的
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())//可响应中断的,如果中断则会抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
总结

通过上面对ReentrantLock源码分析,Lock机制的核心就是通过cas原子操作state属性,state=0表示锁资源可用,获取锁就是通过cas原子操作将state从0设置成1,成功就表示获取锁成功,如果state>0,cas操作将会失败,即表示锁已被占用,当前获取锁失败。获取锁失败,根据是否是可中断、可超时等特性,处理的逻辑不太一致,但大致为:
​ 1、将获取锁失败的线程封装成Node,封装成Node一方面是要构建双向队列,另一方面是Node中额外添加状态信息对节点进行控制
​ 2、在一个for无线循环中通过Lock.park()让线程休眠,当有锁资源被释放发生时,会从队列头到尾的顺序依次唤醒线程(会跳过CANCELLED标记的节点,因为这些节点代表的线程已经无效了),注意这里只会唤醒一个线程,唤醒的线程只表示该线程具有竞争锁资源的资格,还需要和新申请但还没有放入到Queue中的线程进行竞争该锁资源,这就是非公平锁的特性,这样设计主要是从性能方面考虑,如果竞争成功则退出for循环返回,否则继续进入休眠状态

最后再通过一个大致流程图,对整体的执行流程有个更清晰认识。

获取锁流程

释放锁原理

接下来我们来分析下ReentrantLock释放锁资源的流程。释放锁没有区分公平和非公平的,主要的工作就是减小state的值,当state等0的时候,释放锁并唤醒Queue中其他线程来获取锁。

public void unlock() {
    sync.release(1);
}

release()是在AbstractQueuedSynchronizer中实现的,

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        //tryRelease:尝试释放状态,返回true表示锁资源释放完成,
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 /**
* 1、只有持有该锁的线程才会执行tryRelease(),因此不会涉及到多线程问题,不需要使用cas保证原子性
* 2、调用该方法会将state-1,然后判断state值,如果等于0表示当前线程已经释放锁资源完成,返回true,
*    并将exclusiveOwnerThread设置成null,表示当前锁资源空闲,未被线程占用
* 3、如果state>0,则表示当前并未释放完全,返回false
*/
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {//只有state=0表示该锁被所有线程都释放完成,即锁可以被其它线程占用了,否则只是释放一次重入次数,并不会释放锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);//不存在并发问题,采用setState()而非cas操作,提供性能
     return free;
}

深入分析

上面通过源码已经对ReentrantLock获取锁和释放锁的大致流程有了比较清晰的认识,当你越深入分析时你会对Doug Lea这位大牛构思和多线程并发处理的游刃有余感到惊叹,以及后面我们会讲到的IO模型,依然会有Doug Lea大牛的精彩大作,如果你对他还不了解,可以多搜索关注下。

了解了流程并不一定就表示你已经完全熟悉ReentrantLock,你知道他是这么做的,但是你不一定清楚他这么做背后的考量是什么,毕竟并发编程比单线程编程复杂性高出太多,你很难顾及到所有线程分支运行的流程,这就是很容易导致bug的根源。上面我们已经分析过addWaiter()这个方法的作用,下面通过该方法进行更深入的分析,希望对并发编程的认识更加深刻。

addWaiter()方法主要完成工作:将未获取锁的线程封装成Node,然后追加到等待队列Queue尾部,等待队列Queue并不存在一个定义好的数据结构,而是通过head、tail、next和prev模拟出的具有出队、入队操作的双向链表。追加当前节点到双向链表尾部关键源码如下:

node.prev = pred;
if (compareAndSetTail(pred, node)) {//4.CAS node到tail
    pred.next = node;
    return node;
}

将上面代码梳理一下,大致分为三个步骤:
​ 1、将当前node的prev指向tail节点;
​ 2、通过cas原子操作将tail指针指向当前节点;
​ 3、将之前tail节点的next指向当前节点。

示意图如下:

追加节点

假如将追加节点的三个步骤顺序调换下,先将tail节点的next指向当前节点,然后cas原子修改tail指向,最后再来修改当前节点的prev指向,即将上面的1和3对调一下,会出现上面情况呢?

将tail节点的next指向当前节点操作后,紧接着会执行cas操作修改tail指向当前节点,由于存在多线程并发问题,即可能会存在多个线程同时申请锁资源,假如现在t1、t2两个线程都同时做上面两个步骤:
​ 1、t1线程修改next后,紧接着t2线程也修改next指向,导致会把t1修改的指向覆盖;
​ 2、这时t1线程做cas替换tail指向成功后,t2也来做cas操作就会失败;
​ 3、t1由于cas操作成功,最后修改prev指向

追加节点2

可以发现,由于并发导致追加的t1节点是存在问题的,正常情况下Node1的next应该指向t1节点,但是却被t2节点覆盖了。所以,1和3对调是在并发下是存在问题的。

假如1和2对调,先进行cas操作,然后修改prev,最后再来修改next又会怎么样呢?首先通过cas原子操作将tail指向当前节点,示意图如下:

追加节点3

tail节点这时还是孤立的节点,prev和next都还没有指向,tail节点和其它节点之间没有关联了,这时如果其它线程需要遍历这个双向链表就比较危险了,比如释放锁时会调用unparkSuccessor(),其源码如下:

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //获取当前节点的后继节点,如果满足状态,那么进行唤醒操作  // 如果没有满足状态,从尾部开始找寻符合要求的节点并将其唤醒
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//s.waitStatus>0表示当前线程已被CANCELLED,不需要唤醒
            s = null;
            //从tail向前查找,知道找到waitStatus<=0的线程,赋值给s
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //队列的维护(首节点的更换)是依靠消费者(获取时)来完成的,也就是说在满足了自旋退出的条件时的一刻,这个节点就会被设置成为首节点。
        if (s != null)
            LockSupport.unpark(s.thread);
}

会发现其可能会存在一个从tail向前查找的流程,假如刚好这时执行这个流程,从tail向head查找节点显然就会存在问题,所以1和2对调的流程在并发下也是存在问题的。unparkSuccessor()在查找head的下一个有效节点的时候,没有从head到tail方向查找,而是反方向从tail向head查找,正常逻辑肯定是从head向tail方向查找速度更快,但是为啥反其道而行呢?如果你只看这段代码是永远看不出问题的,具体原因可以参加下面正常流程分析情况。

错误的顺序我就不一一举例了,大致都是差不多,现在我们来分析下为什么源码中这个顺序执行在并发下就不会存在问题。现在假设两个线程同一时间都没有获取到锁,都需要追加到Sync Queue队列尾部,大致流程如下:
​ 1、线程t1的节点设置prev指向tail,线程t2节点同时也设置prev指向tail,这时就不会出现上面如果先设置next就会导致后设置把之前设置覆盖情况,因为如果先设置next是对Node1进行操作,存在多个线程对Node1同时操作导致状态不一致问题,而如果这里先设置prev,操作对象时线程本身的节点,是不存在多线程并发问题,示意图如下:

追加节点5

​ 2、这时t1和t2都进行cas原子操作,反正会有一个线程会操作成功,假如是t1线程操作成功,然后就可以顺利的设置Node1节点的next指向t1,因为只会存在一个线程操作成功,所以对Node1的操作此时也不会存在并发问题,由于t1的cas操作成功导致t2线程进行cas操作必然失败,此刻示意图如下:

追加节点6

​ 3、由于t2线程cas操作失败,因此不再继续操作Node1的next指向自己,而是进入enq()方法中,其源码如下,enq方法中通过cas+无限循环方式保证t2节点一定会被追加到Sync Queue尾部的,每次循环都是重新获取最新的tail,然后将t2的prev指向这个最新的tail,然后通过cas操作将tail指向自己,最后在将之前tail节点的next指向t2节点,这个案例中获取的最新tail就是t1节点了,所以t2节点会被追加到t1节点后,这样就能保证即使在高并发下依然可以实现节点正常添加,而不会像之前出现状态不一致情况,示意图如下:

/**
* 1、这里通过一个死循环方式调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾
* 2、第一次循环tail肯定为null,则会初始化一个默认的node,并将head=tail指向该node
* 3、第二次循环的时候,会将当前node追加到1中创建的node尾部
*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { //1.队列为空,初始化一个dummy节点,其实和ConcurrentLinkedQueue一样
            if (compareAndSetHead(new Node()))
            tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

追加节点7

​ 4、上面分析unparkSuccessor()在查找head的下一个有效节点的时候,没有从head到tail方向查找,而是反方向从tail向head查找,如果你对我刚才分析得到逻辑理解透彻的话,就比较好解释了。比如:t1设置prev指向Node1,然后cas操作将tail指向了t1,这时Queue的结构如下,假如这时候执行unparkSuccessor(),Node0查找它的后驱节点为Node1,假如Node1是无效节点,Node1需要继续查找它的后驱节点,但是这时Node1的next并没有设置,是无法查找到的,所以必须从tail向head方向查找才行。

追加节点8

通过对addWaiter的深入分析,你会对并发编程的难度有一个更加深刻的认识,真是处处要小心,搞不好就掉坑里面去了,但是Doug Lea巧妙的构思处理的游刃有余。

至此,对ReentrantLock也有了一个比较完整的流程分析,这一节也就结束了,后面会对Lock的其它实现类及synchronized的底层实现机制进行些分析。

网友评论

登录后评论
0/500
评论
一心飞扬
+ 关注