并发容器与框架——并发容器(一)

  1. 云栖社区>
  2. 博客>
  3. 正文

并发容器与框架——并发容器(一)

江左煤郎 2018-08-31 17:22:19 浏览1202
展开阅读全文

Java中提供了非常多的并发容器和框架,在此仅对部分并发容器进行介绍。

1.ConcurrentHashMap的实现原理与使用

    1.1 为何使用ConcurrentHashMap

    在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非 常低下,基于以上两个原因,便有了ConcurrentHashMap。

  1. 线程不安全的HashMap:在多线程环境下,使用HashMap进行put操作会引起get操作死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry(死循环原理详见http://www.importnew.com/22011.html),也可能造成数据丢失。
  2. 效率低下的HashTable:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable 的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同 步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方 法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。
  3. ConcurrentHashMap的锁分段技术:HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么 当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

    1.2 ConcurrentHashMap结构

    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色HashEntry则用于存储键值对数据一个ConcurrentHashMap里包含一个Segment数组Segment的结构和HashMap类似,是一种 数组和链表结构。一个Segment里包含(用“锁住”或许更合适)一个HashEntry数组每个HashEntry数组中的元素是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。也就是说一个ConcurrentHashMap里有着多个Segment数组,而Segment数组中的每个元素锁住是HashEntry数组,每个HashEntry数组中的每个数组元素存放的就是一个链表。

    1.3 ConcurrentHashMap的存取过程简单解析

  1. put(K key, V value):在存储键值对数据时,会对键值进行第一次哈希运算,定位到Segment数组中的位置,确认后位置后对该位置下的所有元素进行加锁。加锁后再进行插入操作,对该对键值进行第二次哈希运算,用于确认存储在HashEntry数组中的下标位置,并进行判断是否需要对HashEntry数组进行扩容并且重计算HashEntry数组中所有元素的存储。确定在HashEntry数组中的位置后,就将该键值对添加到链表后。Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
  2. get(Object key):先经过一次再哈希,然后使用这个哈希值通过哈希运算定位到Segment数组中的元素,再通过哈希算法定位到HashEntry数组中的元素即可。get的高效之处在于它不需要加锁,除非读到的值是空才会加锁重读。而道HashTable容器的get方法是加锁的,所以才导致其效率极低。而ConcurrentHashMap是如何实现不加锁的?观察其源码可以发现,get方法里所有使用的共享变量全部设置为volatile类型,如用于统计当前 Segement大小的count字段和用于存储值的HashEntry的value,所有的键值对元素等。定义成volatile的变量,能够在线 程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写 (有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写 共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是因为根据Java内存模 型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取 volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。
  3. size():如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小 后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,是不是直接把 所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?实际上并不是,虽然可以获取每个Segment的count的最新值,但是如果在将这些值相加时(原子性操作以及其可见性),count的值改变了那么就会得到不准确的值。而把所有Segment的put、remove和clean方法 全部锁住又会非常低效。所以使用modCount 变量,在put、remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size 前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

2. ConcurrentLinkedQueue的实现原理与使用

    ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。基于CAS的“wait-free”(常规无等待)来实现,CAS并不是一个算法,它是一个CPU直接支持的硬件指令,这也就在一定程度上决定了它的平台相关性。

    当前常用的多线程同步机制可以分为下面三种类型:(在后面深入底层机制学习时在进行详解)

  1. volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。
  2. CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。
  3. 互斥锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。

    ConcurrentLinkedQueue 的非阻塞算法实现主要可概括为下面几点:

  1. 使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。
  2. head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 /出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 /出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。
  3. 以批处理方式来更新head/tail,从整体上减少入队 / 出队操作的开销。

    2.1 数据结构

    其数据结构与普通的队列没有差别,当单线程进行插入或删除时与普通队列相同,但是如果对于普通队列进行多线程的删除插入操作时,就会出现插队现象,而ConcurrentLinkedQueue则避免了这种现象的发生。

    2.2 入队列实现

    整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用 CAS将入队节点设置成尾节点的next节点,如不成功则重试。

 //实现源码
public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
  1. 定位尾节点:查看源码可以发现队列中拥head表示头结点,tail表示尾节点,但是在多线程入队列时tail节点并不一定是尾节点,尾节点可能会是tail的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。
  2. 设置入队节点为尾节点:p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点next节点,如果p是null, 表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点,则需要重新获取当 前队列的尾节点。

    2.3 出队列实现

  1. 首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

网友评论

登录后评论
0/500
评论
江左煤郎
+ 关注