JUC LinkedBlockingQueue

简介:

本文首先发表在 码蜂笔记

java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。

队列是否为空、是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头、队尾并发地进行访问、添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它的容量范围是: 1 – Integer.MAX_VALUE。

由于同时使用了两把锁,在需要同时使用两把锁时,加锁顺序与释放顺序是非常重要的:必须以固定的顺序进行加锁,再以与加锁顺序的相反的顺序释放锁。

头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。

属性与链表节点类


// 链表的结点类,单向链表,只有一个后继指针
static class Node<E> {
    E item;

     /*
     * 后继指针。值为下列之一:
     * 实际的后继结点。
     * 自身,表示后继是 head.next (用于在遍历处理时判断)
     * null,表示没有后继(这是尾结点)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

// 最大容量上限,默认是 Integer.MAX_VALUE
private final int capacity;

// 当前元素数量,这是个原子类。因为读写分别使用不同的锁,但都会访问这个属性,所以它需要是线程安全的。
private final AtomicInteger count = new AtomicInteger(0);

// 头结点
private transient Node<E> head;

// 尾结点
private transient Node<E> last;

// 队头访问锁
private final ReentrantLock takeLock = new ReentrantLock();

// 队头访问等待条件、队列
private final Condition notEmpty = takeLock.newCondition();

// 队尾访问锁
private final ReentrantLock putLock = new ReentrantLock();

// 队尾访问等待条件、队列
private final Condition notFull = putLock.newCondition();

enqueue 操作


// 在持有 putLock 锁下执行
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

dequeue 操作

返回队列里第一个有效元素。


// 在持有 takeLock 锁下执行
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC

    head = first;
    E x = first.item;
    first.item = null; // 出队列后的结点作为新的哨兵结点
    return x;
}

对两把锁的加锁与释放

在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。

注意,锁的释放顺序与加锁顺序是相反的。


// 把固定的加锁顺序封装在方法内,确保所有的对两把锁加锁的顺序都是一致的。
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

// 把固定的释放锁顺序封装在方法内,确保所有的对两把锁的释放顺序都是一致的。
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

put 操作

put 操作把指定元素添加到队尾,如果没有空间则一直等待。


public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();

    // 在所有的 put/take/etc 等操作中预设值本地变量 c 为负数表示失败。成功会设置为 >= 0 的值。
    int c = -1;
    Node<E> node = new Node(e);

    // 下面两行是访问优化
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;

    putLock.lockInterruptibly();
    try {
        /*
         * 注意,count用于等待监视,即使它没有用锁保护。这个可行是因为
         * count 只能在此刻(持有putLock)减小(其他put线程都被锁拒之门外),
         * 当count对capacity发生变化时,当前线程(或其他put等待线程)将被通知。
         * 在其他等待监视的使用中也类似。
         */
        while (count.get() == capacity) {
            notFull.await();
        }

        enqueue(node);
        c = count.getAndIncrement();

        // 还有可添加空间则唤醒put等待线程。
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }

    // c 由 count.getAndIncrement()返回,如果等于0,
    // 则 count 应该是大于等于 1 了,唤醒take线程。
    if (c == 0)
        signalNotEmpty();
}


take 操作

take 操作会一直阻塞直到有元素可返回。


public E take() throws InterruptedException {
    E x;
    int c = -1;
    // 下面两行是访问优化
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;

    takeLock.lockInterruptibly();
    try {
      // 循环里等待直到有数据可获取
        while (count.get() == 0) {
            notEmpty.await();
        }

        // 获取第一个有效元素
        x = dequeue();

        // 如果还有可获取元素,唤醒等待获取的线程。
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }

    // 注意,c 是调用 getAndDecrement 返回的,如果 if 成立,
    // 表明当前的 count 是 capacity - 1,可以添加新元素,所以唤醒 添加线程。
    if (c == capacity)
        signalNotFull();
    return x;
}

remove 操作


// 移除指定元素。由于移除元素涉及该结点前后两个结点的访问与修改,
// 对两把锁加锁简化了同步管理。
public boolean remove(Object o) {
    if (o == null ) return false;

    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null ;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true ;
            }
        }
        return false ;
    } finally {
        fullyUnlock();
    }
}


目录
相关文章
|
6月前
|
存储 安全 Java
ArrayBlockingQueue 和 LinkedBlockingQueue 有什么区别?
ArrayBlockingQueue 和 LinkedBlockingQueue 有什么区别?
|
3天前
并发编程之BlockingQueue(阻塞队列)的详细解析
并发编程之BlockingQueue(阻塞队列)的详细解析
9 0
|
8月前
|
存储 消息中间件 算法
JUC-阻塞队列
问题引出 一.单端阻塞队列(BlockingQueue) 二.双端阻塞队列(BlockingDeque) 三.延迟队列(DelayQueue)
29 0
|
9月前
|
存储 缓存 安全
JUC之阻塞队列解读(BlockingQueue)
JUC之阻塞队列解读(BlockingQueue)
|
缓存 安全 Java
JUC系列学习(四):线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue
线程池阻塞队列BlockingQueue及其相关实现ArrayBlockingQueue、LinkedBlockingQueue
|
缓存 安全 Java
JUC - BlockingQueue
JUC - BlockingQueue
102 0
JUC - BlockingQueue
|
Java
Java并发编程之LinkedBlockingQueue
Java并发编程之LinkedBlockingQueue
98 0
|
存储 缓存 安全
Java并发指南11:解读 Java 阻塞队列 BlockingQueue
解读 Java 并发队列 BlockingQueue 转自:https://javadoop.com/post/java-concurrent-queue 最近得空,想写篇文章好好说说 java 线程池问题,我相信很多人都一知半解的,包括我自己在仔仔细细看源码之前,也有许多的不解,甚至有些地方我一直都没有理解到位。
立正、稍息、入"ArrayBlockingQueue"(上)
大家好,我是指北君。 在前面的文章中,已经对 ArrayBlockingQueue 进行了一次源码分析,对它的核心源码做了分析,今天来解析一波同为 BlockingQueue 家族中的一员的 LinkedBlockingQueue。它的底层基于单向链表实现。
立正、稍息、入"ArrayBlockingQueue"(上)
立正、稍息、入"ArrayBlockingQueue"(下)
大家好,我是指北君。 在前面的文章中,已经对 ArrayBlockingQueue 进行了一次源码分析,对它的核心源码做了分析,今天来解析一波同为 BlockingQueue 家族中的一员的 LinkedBlockingQueue。它的底层基于单向链表实现。
立正、稍息、入"ArrayBlockingQueue"(下)