【从入门到放弃-Java】并发编程-JUC-SynchronousQueue

简介: 前言上文【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue,我们介绍了基于链表的有界阻塞队列LinkedBlockingQueue,它是Executors.newFixedThreadPool中workQueue使用的阻塞队列。

前言

上文【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue,我们介绍了基于链表的有界阻塞队列LinkedBlockingQueue,它是Executors.newFixedThreadPool中workQueue使用的阻塞队列。
本文我们来学习ExecutorService.newCachedThreadPool中使用的阻塞队列:SynchronousQueue。

SynchronousQueue


如图和LinkedBlockingQueue一样,都是继承了AbstractQueue类,实现了BlockingQueue和Serializable接口

SynchronousQueue

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue() {
    this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {
    //公平模式下使用队列,实现先进先出,非公平模式下使用栈,先进后出
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

因为SynchronousQueue的put、offer、take、poll方法全是调用了Transferer的transfer方法,我们一起来看下这个transfer到底是何方神圣。

Transferer

/**
 * Shared internal API for dual stacks and queues.
 */
abstract static class Transferer<E> {
    /**
     * Performs a put or take.
     *
     * @param e if non-null, the item to be handed to a consumer;
     *          if null, requests that transfer return an item
     *          offered by producer.
     * @param timed if this operation should timeout
     * @param nanos the timeout, in nanoseconds
     * @return if non-null, the item provided or received; if null,
     *         the operation failed due to timeout or interrupt --
     *         the caller can distinguish which of these occurred
     *         by checking Thread.interrupted.
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

Transferer是一个抽象类,只有一个抽象方法transfer。可以从注释中看到:

  • e是元素根据e是否为null来控制是生产者还是消费者。
  • timed是布尔值,控制是否使用超时机制。
  • nanos是超时时间。

transfer的具体实现有两个,在Transferer的两个实现类:TransferQueue和TransferStack中

TransferQueue::transfer

E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
    //判断是消费者还是生产者,如果e为null则消费者,e不为null是生产者
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        //tail和head是队列的尾部和头部,是一个item为空的QNode,如果队列被其它线程改动了,则continue重新处理
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        //如果队列为空,或者处于same-mode模式
        if (h == t || t.isData == isData) { // empty or same-mode
            //如果不是最后一个节点,则继续寻找最后一个有数据的节点
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            //如果已经tn不为null,则尝试通过CAS把tn置为尾结点,然后重新执行
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            //如果超时,直接返回null
            if (timed && nanos <= 0L)       // can't wait
                return null;
            //如果新节点还未创建,则创建一个新的QNode来承载元素e
            if (s == null)
                s = new QNode(e, isData);
            //尝试通过CAS将t的下一个节点设置为s,如果设置失败,则说明t的下一个节点已经被添加了元素,则需要从头开始处理
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //尝试将tail设置为新建的节点s
            advanceTail(t, s);              // swing tail and wait
            //加入队列后阻塞,把等待线程设置为当前线程,等待唤醒处理
            Object x = awaitFulfill(s, e, timed, nanos);
            //如果超时中断则删除这个节点并返回null
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            //如果不是tail节点,则判断是否是head节点,
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                //如果返回的x不为null,则设置item为自身
                if (x != null)              // and forget fields
                    s.item = s;
                //把等待线程设置为null
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            //x为null说明已经被消费
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                //通过cas将首节点设置为e(null)
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            //唤醒节点设置的线程
            LockSupport.unpark(m.waiter);
            //返回获取到的item
            return (x != null) ? (E)x : e;
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列尾部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

TransferStack::transfer

E transfer(E e, boolean timed, long nanos) {
    /*
     * Basic algorithm is to loop trying one of three actions:
     *
     * 1. If apparently empty or already containing nodes of same
     *    mode, try to push node on stack and wait for a match,
     *    returning it, or null if cancelled.
     *
     * 2. If apparently containing node of complementary mode,
     *    try to push a fulfilling node on to stack, match
     *    with corresponding waiting node, pop both from
     *    stack, and return matched item. The matching or
     *    unlinking might not actually be necessary because of
     *    other threads performing action 3:
     *
     * 3. If top of stack already holds another fulfilling node,
     *    help it out by doing its match and/or pop
     *    operations, and then continue. The code for helping
     *    is essentially the same as for fulfilling, except
     *    that it doesn't return the item.
     */

    //如果e是null,则是REQUEST模式,不为null则是DATA模式
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        //如果是队列不为空
        if (h == null || h.mode == mode) {  // empty or same-mode
            //如果超时
            if (timed && nanos <= 0L) {     // can't wait
                if (h != null && h.isCancelled())
                    //cas方式移除头部节点
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            //从头部插入数据
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                //等待节点被内的元素被处理完毕或等待超时
                SNode m = awaitFulfill(s, timed, nanos);
                //如果是中断,则清除节点s并返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                //从头部取出数据并移除节点
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        //如果节点h没有被处理完
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    //尝试唤醒节点中保存的线程
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                //尝试唤醒节点中保存的线程
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列头部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

总结

SynchronousQueue是一个无空间的队列即不可以通过peek来获取数据或者contain判断数据是否在队列中。
当队列为空时,队列执行take或put操作都会调用transfer,使线程进入阻塞,等待一个与tail节点模式互补(即put等take、take等put)的请求。
如果新请求与队列tail节点的模式相同,则将请求加入队列,模式不同,则可进行消费从队列中移除节点。
TransferStack:非公平的栈模式,先进后出(头进头出)
TransferQueue:公平的队列模式,先进先出(尾进头出)

我的理解:

  • SynchronousQueue不存储数据,只存储请求
  • 当生产或消费请求到达时,如果队列中没有互补的请求,则将会此请求加入队列中,线程进入阻塞 等待互补的请求到达。
  • 若是互补的请求到达时,则唤醒队列中的线程,消费请求使用生产请求中的数据内容。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

目录
相关文章
|
10天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
22 3
|
13天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
13天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
15天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【4月更文挑战第3天】 在Java并发编程中,线程池是一种重要的资源管理工具,它能有效地控制和管理线程的数量,提高系统性能。本文将深入探讨Java线程池的工作原理、应用场景以及优化策略,帮助读者更好地理解和应用线程池。
|
11天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
7天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
11天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
15 0
|
13天前
|
缓存 安全 Java
Java并发编程进阶:深入理解Java内存模型
【4月更文挑战第6天】Java内存模型(JMM)是多线程编程的关键,定义了线程间共享变量读写的规则,确保数据一致性和可见性。主要包括原子性、可见性和有序性三大特性。Happens-Before原则规定操作顺序,内存屏障和锁则保障这些原则的实施。理解JMM和相关机制对于编写线程安全、高性能的Java并发程序至关重要。
|
3天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。
|
5天前
|
Java 编译器
Java并发编程中的锁优化策略
【4月更文挑战第13天】 在Java并发编程中,锁是一种常见的同步机制,用于保证多个线程之间的数据一致性。然而,不当的锁使用可能导致性能下降,甚至死锁。本文将探讨Java并发编程中的锁优化策略,包括锁粗化、锁消除、锁降级等方法,以提高程序的执行效率。
12 4