jdk11源码--ArrayBlockingQueue源码分析

简介: jdk11 ArrayBlockingQueue源码分析

概述

上一篇文章jdk11源码--ReentrantLock之Condition源码分析中分析了ReentrantLock和Condition的源码,那么接下来看一下Condition在JDK中的具体应用。

ArrayBlockingQueue底层就是使用Condition来实现的。

BlockingQueue

BlockingQueue 阻塞队列,该类是一个接口,平时我们熟知的ArrayBlockingQueue,LinkedBlockingQueue等都是该接口的实现。
BlockingQueue 之所以说是阻塞的,是因为他可以在队列为空的时候,获取元素的线程会阻塞,直到有新的元素添加进来。当队列满时,添加元素的线程会阻塞,直到有线程从队列中取走了元素。这也是注明的==生产者消费者==问题。
当有面试官问你==生产者消费者==问题时,直接将ArrayBlockingQueue源码分析讲一下也是可以的。

看一下BlockingQueue的类图,BlockingQueue是继承自Queue,而queue继承自collection。
在这里插入图片描述

BlockingQueue 对插入、删除、获取原色的操作提供了四种不同的方法用于不同的场景中使用,这些方法总结在下表中:

抛出异常Throws exception Special value ==Blocks== Times out
插入数据Insert add(e) //队列满时抛异常 offer(e) //队列满返回false,不阻塞 ==put(e)== //队列满时阻塞,直到队列未满时再插入 offer(e, time, unit) //指定直接内可以插入返回true,指定时间内不能插入,返回false
获取数据Remove remove()//队列为空,抛异常 poll()//队列为空返回null,不阻塞 ==take()== //当队列为空时会阻塞,一直等到队列不为空时再返回队首值 poll(time, unit) //在指定时间内,队列都是空,则返回null,否则返回对首的值
Examine element() peek()

本文我们重点关注 put和take方法,因为这两个方法时阻塞的。

ArrayBlockingQueue

ArrayBlockingQueue是BlockingQueue的一个实现。他是FIFO先进先出队列。
ArrayBlockingQueue类图:
在这里插入图片描述
重要属性:

/** 队列集合,数组存储!! */
final Object[] items;
/** take, poll, peek or remove 方法获取元素的下标位置 */
int takeIndex;
/** put, offer, or add 方法添加元素的下标位置 */
int putIndex;

/** 队列中的元素数量 */
int count;

//并发控制
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

items是使用数组存储的,这也是ArrayBlockingQueue名称的由来。

并发控制使用经典的双Condition 算法,上面定义了两个Condition ,一个notEmpty,一个notFull。下面来逐行源码具体分析一下。

ArrayBlockingQueue构造方法

//capacity: 数组初始容量
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
//fair:是否是公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

在构造函数中会指定初始容量以及锁的类型,默认是非公平锁。
数组items一旦确定下来,后续就不会再更改大小。

put

put方法添加元素到队尾,当队列满时阻塞。

public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;//获取锁lock
    lock.lockInterruptibly();//加锁,响应中断
    try {
        while (count == items.length)
            notFull.await();//如果items满了,那么notFull阻塞
        enqueue(e);//将元素添加到队列末尾
    } finally {
        lock.unlock();
    }
}

//将元素添加到队列末尾, ++putIndex,count++,
//该方法只允许在lock加锁后操作
private void enqueue(E e) {
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    notEmpty.signal();//唤醒阻塞的获取元素的线程
}

整个过程还是比较简单的,首先加锁,注意这里的锁允许中断返回。然后,如果队列满了(count == items.length),那么就无法继续添加元素了,添加元素的线程就需要await等待(notFull.await();),该线程添加到notFull的condition队列上,直到被take方法唤醒(后面会讲)后,继续添加元素到队尾(enqueue(e);)。
enqueue方法中,添加元素到putIndex 的位置,然后对putIndex 加1操作,由于是数组,所以这里进行了越界处理,越界后从0开始继续计算。count元素的数量进行加1操作,同时唤醒notEmpty condition队列上阻塞的线程。
读者可以思考一下这里为什么没有进行这个校验:putIndex 在加一以后是否会与现有队列中已经存在的元素重合而覆盖掉现有元素?答案下一节揭晓。

take

take:从队首获取元素。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//加锁,响应中断
    try {
        while (count == 0)
            notEmpty.await();//如果队列中空,那么当前线程需要添加到notEmpty的condition队列中阻塞,直到有新的元素添加进来
        return dequeue();
    } finally {
        lock.unlock();
    }
}

//从对首获取元素,
//该方法只允许在lock加锁后操作
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;//设置takeIndex 下一次应该获取的位置
    count--;//队列总数量-1
    if (itrs != null)
        itrs.elementDequeued();//迭代器相关
    notFull.signal();//唤醒notFull condition队列上的线程
    return e;
}

take过程也很简单,先看一下队列是否为空,如果为空,则无法获取元素,将当前线程添加到notEmpty的condition队列。否则从takeIndex 的位置读取元素并且设置takeIndex ,count 的值。

上一节提到了这个问题:
putIndex 在加一以后是否会与现有队列中已经存在的元素重合而覆盖掉现有元素?
其实也很简单,enqueue和dequeue都是需要加锁以后才调用的,所以是线程安全的,count可以准确表示队列中的有效长度,takeIndex 和putIndex 也都没有并发问题,他们每次添加或者读取时都会判断count的值,来确认队列是否满或者空。如此一来,当然不会出现添加过多覆盖现有元素的情况。

总结

首先回顾一下上一篇jdk11源码--ReentrantLock之Condition源码分析中画的condition结构图吗,以及condition与ReentrantLock之间的关系。
然后再次基础上,画一下ArrayBlockingQueue中的condition关系图:
在这里插入图片描述
总体来讲,ArrayBlockingQueue包含一个ReentrantLock和两个condition:notEmpty和notFull。
put可take操作都需要加锁,都是线程安全的。
当队列满时,put操作需阻塞等待,当前线程添加到notFull的condition队列中;添加成功时,需唤醒notEmpty队列中的线程。
当队列空时,take操作需阻塞等待,当前线程添加到notEmpty的condition队列中;获取成功时,需唤醒notFull队列中的线程。

相关文章
|
2天前
|
传感器 人工智能 前端开发
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
智慧校园电子班牌,坐落于班级的门口,适合于各类型学校的场景应用,班级学校日常内容更新可由班级自行管理,也可由学校统一管理。让我们一起看看,电子班牌有哪些功能呢?
41 4
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
|
2天前
|
分布式计算 Java API
Java8 Lambda实现源码解析
Java8的lambda应该大家都比较熟悉了,本文主要从源码层面探讨一下lambda的设计和实现。
|
3天前
|
存储 Java
0基础java初学者都能做的打字通小游戏? 内含源码解读和细致讲解!!
0基础java初学者都能做的打字通小游戏? 内含源码解读和细致讲解!!
15 2
0基础java初学者都能做的打字通小游戏? 内含源码解读和细致讲解!!
|
3天前
|
SQL Java 分布式数据库
实现HBase表和RDB表的转化(附Java源码资源)
该文介绍了如何将数据从RDB转换为HBase表,主要涉及三个来源:RDB Table、Client API和Files。文章重点讲解了RDB到HBase的转换,通过批处理思想,利用RDB接口批量导出数据并转化为`List&lt;Put&gt;`,然后导入HBase。目录结构包括配置文件、RDB接口及实现类、HBase接口及实现类,以及一个通用转换器接口和实现。代码中,`RDBImpl`负责从RDB读取数据并构造`Put`对象,`HBaseImpl`则负责将`Put`写入HBase表。整个过程通过配置文件`transfer.properties`管理HBase和RDB的映射关系。
20 3
实现HBase表和RDB表的转化(附Java源码资源)
|
6天前
|
消息中间件 缓存 Java
java基于云部署的SaaS医院云HIS系统源码 心理CT、B超 lis、电子病历
云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工作站等一系列常规功能,还能与公卫、PACS等各类外部系统融合,实现多层机构之间的融合管理。
41 12
|
9天前
|
人工智能 监控 Java
java互联网+智慧工地云平台SaaS源码
智慧工地以施工现场风险预知和联动预控为目标,将智能AI、传感技术、人像识别、监控、虚拟现实、物联网、5G、大数据、互联网等新一代科技信息技术植入到建筑、机械、人员穿戴设施、场地进出关口等各类设备中,实现工程管理与工程施工现场的整合
22 0
|
11天前
|
监控 Java BI
java基于云计算的SaaS医院his信息系统源码 HIS云平台源码
基于云计算技术的B/S架构的HIS系统源码,SaaS模式Java版云HIS系统,融合B/S版电子病历系统,支持电子病历四级,HIS与电子病历系统均拥有自主知识产权。
37 5
|
12天前
|
人工智能 监控 数据可视化
Java智慧工地云平台源码带APP SaaS模式 支持私有化部署和云部署
智慧工地是指应用智能技术和互联网手段对施工现场进行管理和监控的一种工地管理模式。它利用传感器、监控摄像头、人工智能、大数据等技术,实现对施工现场的实时监测、数据分析和智能决策,以提高工地的安全性、效率和质量(技术架构:微服务+Java+Spring Cloud +UniApp +MySql)。
32 4
|
14天前
|
人工智能 监控 安全
JAVA基于SaaS模式的智慧工地云平台源码(云智慧工地解决方案)
智慧工地支持多端展示(PC端、手机端、平板端)SaaS微服务架构,项目监管端,工地管理端源码
20 0
|
14天前
|
搜索推荐 前端开发 Java
java医院绩效考核管理系统项目源码
系统需要和his系统进行对接,按照设定周期,从his系统获取医院科室和医生、护士、其他人员工作量,对没有录入信息化系统的工作量,绩效考核系统设有手工录入功能(可以批量导入),对获取的数据系统按照设定的公式进行汇算,且设置审核机制,可以退回修正,系统功能强大,完全模拟医院实际绩效核算过程,且每步核算都可以进行调整和参数设置,能适应医院多种绩效核算方式。
18 0