BlockingQueue与Condition原理解析

简介: 我在前段时间写了一篇关于AQS的文章,在文章里边我说几乎所有在JUC包中的所有多线程相关的类都和AQS相关,今天我就在这里总结一下另一个依赖于AQS来实现的同步工具类:BlockingQueue。

 我在前段时间写了一篇关于AQS的文章,在文章里边我说几乎所有在 JUC包中的所有多线程相关的类都和 AQS相关,今天我就在这里总结一下另一个依赖于 AQS来实现的同步工具类: BlockingQueue。我们主要以 ArrayBlockingQueue为主来分析相关的源码。

阻塞队列

 相信大多数同学都是在学习线程池相关知识时了解到阻塞队列的概念的。知道各种类型的阻塞队列对线程池初始化时的影响。在java doc中这样定义阻塞队列。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞知道另外一个线程从队列中读取一个元素。阻塞队列一般都是FIFO,用来实现生产者和消费者模式。阻塞队列的方法通过四种不同的方式来处理操作无法被立即完成的情况,这四种情况分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,然后还未执行成功就放弃操作。这些方法都总结在下边这种表中了。

BlockingQueue.png

 我们就只分析 puttake方法。

put和take函数

 我们都知道,使用同步队列可以很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,我们可以将 put函数看作生产者的操作, take是消费者的操作。 put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。

 
 
  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly(); //先获得锁
  5. try {
  6. while (count == items.length)
  7. //如果队列满了,就NotFull这个condition对象上进行等待
  8. notFull.await();
  9. enqueue(e);
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. private void enqueue(E x) {
  15. final Object[] items = this.items;
  16. items[putIndex] = x;
  17. //这里可以注意的是ArrayBlockingList实际上使用Array实现了一个环形数组,
  18. //当putIndex达到最大时,就返回到起点,继续插入,
  19. //当然,如果此时0位置的元素还没有被取走,
  20. //下次put时,就会因为cout == item.length未被阻塞。
  21. if (++putIndex == items.length)
  22. putIndex = 0;
  23. count++;
  24. //因为插入了元素,通知等待notEmpty事件的线程。
  25. notEmpty.signal();
  26. }

 我们会发现put函数也是使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用 ReentrantLockCondition相结合的先获得锁,再等待的机制;而不是 synchronizedObject.wait的机制。这里的区别我们下一节再详细讲解。  看完了生产者相关的 put函数,我们再来看一下消费者调用的 take函数。 take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。

 
 
  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. //如果队列为空,那么在notEmpty对象上等待,
  7. //当put函数调用时,会调用notEmpty的notify进行通知。
  8. notEmpty.await();
  9. return dequeue();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. private E dequeue() {
  15. E x = (E) items[takeIndex];
  16. items[takeIndex] = null; //取出takeIndex位置的元素
  17. if (++takeIndex == items.length)
  18. //如果到了尾部,将指针重新调整到头部
  19. takeIndex = 0;
  20. count--;
  21. ....
  22. //通知notFull对象上等待的线程
  23. notFull.signal();
  24. return x;
  25. }

Condition.await和Object.wait

 我们发现 ArrayBlockingList并没有使用 Object.wait,而是使用的 Condition.await,这是为什么呢?其中又有哪些原因呢? Condition对象可以提供和 Objectwaitnotify一样的行为,但是后者必须使用 synchronized这个内置的monitor锁,而 Condition使用的是 RenentranceLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition的等待可以中断,这是二者唯一的区别。  Condition的流程大致如下边两张图所示.

await

notify

 我们首先来看一下 await函数的实现,详细的讲解都在代码中.

 
 
  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. //在condition wait队列上添加新的节点
  5. Node node = addConditionWaiter();
  6. //释放当前持有的锁
  7. int savedState = fullyRelease(node);
  8. int interruptMode = 0;
  9. //由于node在之前是添加到condition wait queue上的,现在判断这个node
  10. //是否被添加到Sync的获得锁的等待队列上。
  11. //node在condition queue上说明还在等待事件的notify,
  12. //notify函数会将condition queue 上的node转化到Sync的队列上。
  13. while (!isOnSyncQueue(node)) {
  14. //node还没有被添加到Sync Queue上,说明还在等待事件通知
  15. //所以调用park函数来停止线程执行
  16. LockSupport.park(this);
  17. //判断是否被中断,线程从park函数返回有两种情况,一种是
  18. //其他线程调用了unpark,另外一种是线程被中断
  19. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  20. break;
  21. }
  22. //代码执行到这里,已经有其他线程调用notify函数,或则被中断,该线程可以继续执行,但是必须先
  23. //再次获得调用await函数时的锁.acquireQueued函数在AQS文章中做了介绍.
  24. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  25. interruptMode = REINTERRUPT;
  26.  ....
  27. }
  28. final int fullyRelease(Node node) {
  29. //AQS的方法,当前已经在锁中了,所以直接操作
  30. boolean failed = true;
  31. try {
  32. int savedState = getState();
  33. //获取state当前的值,然后保存,以待以后恢复
  34. if (release(savedState)) {
  35. failed = false;
  36. return savedState;
  37. } else {
  38. throw new IllegalMonitorStateException();
  39. }
  40. } finally {
  41. if (failed)
  42. node.waitStatus = Node.CANCELLED;
  43. }
  44. }
  45. /**
  46. * Checks for interrupt, returning THROW_IE if interrupted
  47. * before signalled, REINTERRUPT if after signalled, or
  48. * 0 if not interrupted.
  49. */
  50. private int checkInterruptWhileWaiting(Node node) {
  51. //中断可能发生在两个阶段中,一是在等待singla,另外一个是在获得signal之后
  52. return Thread.interrupted() ?
  53. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  54. 0;
  55. }
  56. final boolean transferAfterCancelledWait(Node node) {
  57. //这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑
  58. //两边可能有并发冲突,但是成功的一方必须调用enq来进入acquire lock queue中.
  59. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  60. enq(node);
  61. return true;
  62. }
  63. //如果失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue
  64. while (!isOnSyncQueue(node))
  65. Thread.yield();
  66. return false;
  67. }

signal函数将等待事件最长时间的线程节点从等待condition的队列移动到获得lock的等待队列中.

 
 
  1. public final void signal() {
  2. //
  3. if (!isHeldExclusively())
  4. //如果当前线程没有获得锁,抛出异常
  5. throw new IllegalMonitorStateException();
  6. Node first = firstWaiter;
  7. if (first != null)
  8. //将Condition wait queue中的第一个node转移到acquire lock queue中.
  9. doSignal(first);
  10. }
  11. private void doSignal(Node first) {
  12. do {
  13.    //由于生产者的signal在有消费者等待的情况下,必须要通知
  14. //一个消费者,所以这里有一个循环,直到队列为空
  15. //把first 这个node从condition queue中删除掉
  16. //condition queue的头指针指向node的后继节点,如果node后续节点为null,那么也将尾指针也置为null
  17. if ( (firstWaiter = first.nextWaiter) == null)
  18. lastWaiter = null;
  19. first.nextWaiter = null;
  20. } while (!transferForSignal(first) &&
  21. (first = firstWaiter) != null);
  22. //transferForSignal将node转而添加到Sync的acquire lock 队列
  23. }
  24. final boolean transferForSignal(Node node) {
  25. //如果设置失败,说明该node已经被取消了,所以返回false,让doSignal继续向下通知其他未被取消的node
  26. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  27. return false;
  28. //将node添加到acquire lock queue中.
  29. Node p = enq(node);
  30. int ws = p.waitStatus;
  31. //需要注意的是这里的node进行了转化
  32. //ws>0代表canceled的含义所以直接unpark线程
  33. //如果compareAndSetWaitStatus失败,所以直接unpark,让线程继续执行await中的
  34. //进行isOnSyncQueue判断的while循环,然后进入acquireQueue函数.
  35. //这里失败的原因可能是Lock其他线程释放掉了锁,同步设置p的waitStatus
  36. //如果compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中
  37. //等待锁被释放掉再次抢夺锁,然后再unpark
  38. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  39. LockSupport.unpark(node.thread);
  40. return true;
  41. }

后记

 后边一篇文章主要讲解如何自己使用 AQS来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶.

相关文章
|
1月前
|
关系型数据库 MySQL Shell
CMake构建Makefile深度解析:从底层原理到复杂项目(三)
CMake构建Makefile深度解析:从底层原理到复杂项目
32 0
|
1月前
|
编译器 vr&ar C++
CMake构建Makefile深度解析:从底层原理到复杂项目(二)
CMake构建Makefile深度解析:从底层原理到复杂项目
35 0
|
30天前
|
存储 安全 编译器
【C++ 17 新功能 std::visit 】深入解析 C++17 中的 std::visit:从原理到实践
【C++ 17 新功能 std::visit 】深入解析 C++17 中的 std::visit:从原理到实践
70 0
|
26天前
|
安全 Java 数据安全/隐私保护
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
57 1
|
1天前
|
缓存 JavaScript 前端开发
|
2天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
|
2天前
|
Java
并发编程之线程池的底层原理的详细解析
并发编程之线程池的底层原理的详细解析
11 0
|
2天前
|
JSON Java Maven
Javaweb之SpringBootWeb案例之 SpringBoot原理的详细解析
Javaweb之SpringBootWeb案例之 SpringBoot原理的详细解析
7 0
Javaweb之SpringBootWeb案例之 SpringBoot原理的详细解析
|
2天前
|
前端开发 JavaScript 编译器
深入解析JavaScript中的异步编程:Promises与async/await的使用与原理
【4月更文挑战第22天】本文深入解析JavaScript异步编程,重点讨论Promises和async/await。Promises用于管理异步操作,有pending、fulfilled和rejected三种状态。通过.then()和.catch()处理结果,但可能导致回调地狱。async/await是ES2017的语法糖,使异步编程更直观,类似同步代码,通过事件循环和微任务队列实现。两者各有优势,适用于不同场景,能有效提升代码可读性和维护性。
|
12天前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
34 0

推荐镜像

更多