【从入门到放弃-Java】并发编程-JUC-locks-ReentrantReadWriteLock

简介: 前言上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。

前言

上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。
接下来我们一起来学习下ReentrantReadWriteLock。

ReentrantReadWriteLock

/**
 * Creates a new {@code ReentrantReadWriteLock} with
  * default (nonfair) ordering properties.
  */
 public ReentrantReadWriteLock() {
     this(false);
 }
 
 /**
  * Creates a new {@code ReentrantReadWriteLock} with
  * the given fairness policy.
  *
  * @param fair {@code true} if this lock should use a fair ordering policy
  */
 public ReentrantReadWriteLock(boolean fair) {
     sync = fair ? new FairSync() : new NonfairSync();
     readerLock = new ReadLock(this);
     writerLock = new WriteLock(this);
 }

我们可以看到和ReentrantLock一样,ReentrantReadWriteLock也使用了通过AQS实现的FairSync和NonfairSync模式
有两个成员变量锁ReadLock和WriteLock

ReadLock::lock

获取读锁,不死不休

public void lock() {
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    //如果已经有写锁,且不是当前线程持有的,则加读锁失败
    //如果当前线程已经持有写锁,则可以获取读锁,这就是锁降级
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    /** 
     * 判断读线程是否阻塞,取决于队列的策略
     *   公平锁策略:如果当前同步队列不为空且当前线程不是队列的第一个节点,则阻塞。
     *   非公平锁策略:如果当前队列的第一个节点时写锁,则需要阻塞。这样是为了防止写锁饥饿。
     * 如果不需要阻塞,且读锁数未达到最大值 则尝试通过cas的方式获取锁
     */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //如果当前读锁为0,则当前线程获取锁
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //如过第一个读锁的持有者是当前线程,则firstReaderHoldCount数量加一
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            //如果最后一个获取锁的线程不是当前线程
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                //获取当前线程的锁
                cachedHoldCounter = rh = readHolds.get();
            //如果当前最后一个线程获取锁数量为0,则将其设置为当前线程的holdcounter
            else if (rh.count == 0)
                readHolds.set(rh);
            //读锁数+1
            rh.count++;
        }
        return 1;
    }
    //尝试无限循环获取读锁
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        //如果已经有写锁,且不是当前线程持有的,返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        //如果需要阻塞
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                        //如果当前线程持有的锁数为0,则移除
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取锁成功后,将当前线程从队列头结点移除
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

ReadLock::lockInterruptibly

获取读锁,直到成功或被中断

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //如果收到中断信号,则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果尝试获取锁失败,则循环等待获取锁
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //无限循环,直到当前线程是队列的头结点,则尝试获取读锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            //获取锁失败的话则需要进行中断检测,检测到中断信号则抛出异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::tryLock

//尝试获取读锁,如果有写锁获取失败,则直接返回失败
public boolean tryLock() {
    return sync.tryReadLock();
}

@ReservedStackAccess
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

//尝试获取读锁,获取失败或者超时未获取到的话,则返回失败
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            //排到当前线程的话则尝试获取锁
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            //超时返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            
            //阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            //如果被中断
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

ReadLock::unlock

释放锁

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个持有读锁的
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        //如果是唯一一个持有读锁的,则firstReader设置为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        //firstReaderHoldCount减一,
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        //如果不是最后一个持有读锁的线程
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            //从ThreadLocal获取readHolds
            rh = readHolds.get();
        int count = rh.count;
        //如果小于等于1,则移除readHolds
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        //持有锁的数量减一
        --rh.count;
    }
    for (;;) {
        //将state设置为0,原因是在写锁降级为读锁后,释放读锁时,需要将state设为0,方便后续的写锁竞争。
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        //如果头结点不是null,并且队列不为空
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果当前结点是SIGNAL信号
            if (ws == Node.SIGNAL) {
                //唤醒头结点
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

WriteLock::lock

获取写锁,如果获取失败,则加入等待队列
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

public void lock() {
    sync.acquire(1);
}

WriteLock::lockInterruptibly

获取写锁,如果获取失败,则加入等待队列,直到获取到或被中断
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock

WriteLock::tryLock

public boolean tryLock() {
    return sync.tryWriteLock();
}

@ReservedStackAccess
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    //如果存在写锁,且写锁不是当前线程持有的,则返回false
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //如果不存在写锁或是当前线程获取的写锁,则尝试将state加一
    if (!compareAndSetState(c, c + 1))
        return false;
    //设置持有写锁的线程为当前线程
    setExclusiveOwnerThread(current);
    return true;
}

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    //和ReentrantLock的调用方法一样,不再赘述
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

WriteLock::unlock

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //如果不是当前线程持有的写锁,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //判断持有的写锁是否释放完毕
    boolean free = exclusiveCount(nextc) == 0;
    //如果释放完毕,则将当前持有锁的线程设置为null
    if (free)
        setExclusiveOwnerThread(null);
    //设置持有的锁数量减一
    setState(nextc);
    return free;
}

总结

通过源码分析,我们了解到,可以通过ReentrantReadWriteLock可以获取读锁和写锁。

  • 写锁是互斥锁,只能一个线程持有,写锁和ReentrantLock类似
  • 读锁是共享锁,可以多个线程同时持有。
  • 读锁通过firstReader和cachedHoldCounter优化获取、释放锁的性能。使用ThreadLocal readHolds存放所有持有锁线程的tid和持有锁数量。
  • 线程可以将自己持有的写锁降级为读锁,在释放读锁时,一起释放。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
10天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
22 3
|
14天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
13天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
16天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【4月更文挑战第3天】 在Java并发编程中,线程池是一种重要的资源管理工具,它能有效地控制和管理线程的数量,提高系统性能。本文将深入探讨Java线程池的工作原理、应用场景以及优化策略,帮助读者更好地理解和应用线程池。
|
12天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
8天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
12天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
15 0
|
13天前
|
缓存 安全 Java
Java并发编程进阶:深入理解Java内存模型
【4月更文挑战第6天】Java内存模型(JMM)是多线程编程的关键,定义了线程间共享变量读写的规则,确保数据一致性和可见性。主要包括原子性、可见性和有序性三大特性。Happens-Before原则规定操作顺序,内存屏障和锁则保障这些原则的实施。理解JMM和相关机制对于编写线程安全、高性能的Java并发程序至关重要。
|
2天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
2天前
|
安全 Java 程序员
Java中的多线程并发编程实践
【4月更文挑战第18天】在现代软件开发中,为了提高程序性能和响应速度,经常需要利用多线程技术来实现并发执行。本文将深入探讨Java语言中的多线程机制,包括线程的创建、启动、同步以及线程池的使用等关键技术点。我们将通过具体代码实例,分析多线程编程的优势与挑战,并提出一系列优化策略来确保多线程环境下的程序稳定性和性能。