天天减肥网,内容丰富有趣,生活中的好帮手!
天天减肥网 > 线程安全集合类- ConcurrentHashMap

线程安全集合类- ConcurrentHashMap

时间:2021-11-21 19:06:30

相关推荐

线程安全集合类- ConcurrentHashMap

文章目录

总结jdk7jdk8JDK7和8对比CHM是怎么保证线程安全的?应用举例CHM一定线程安全吗?读下CHM1.7的源码hashput总结getsizerehashCAS操作读下CHM1.8的源码重要参数五个构造方法putinitTableunsafeaddCountTreeBintryPresizetransfer 扩容getclear

总结

jdk7

jdk7的CHM最重要的两个数据结构:

一个不可扩容的Segment数组+可以扩容的HashEntry(类似HashMap)

Segment

默认数组大小16,且一旦初始化完成就不能改变,相当于ConcurrentHashMap默认支持最多16 个线程并发Segment继承自ReentrantLock,所以jdk7的CHM其实是利用ReentrantLock锁来保证线程安全的。

HashEntry

一个Segment里放入一个HashEntry,HashEntry数组类似于 HashMap 的结构。所以相当于jdk7的CHM是16个HashMap组成的。

HashEntry和HashMap的大部分都一样:初始化大小一样,也是2的次幂,默认16,容因子0.75,但是扩容大小不是2倍而是1.5倍。

CHM定位一个数据需要进行两次Hash操作。第一次Hash定位到Segment,第二次Hash定位到元素所在的HashEntry链表的头部。

jdk8

jdk8的CHM就是线程安全的hashMap

数据结构和HashMap类似 (摒弃了Segment,采用Node数组+链表+红黑树)

初始容量,扩容阈值,扩容大小等等都是一样的。

并发控制使用Synchronized+CAS(如果没有抢到sync锁则cas自旋等待)

JDK7和8对比

JDK8锁粒度更细,锁冲突概率更低,性能更高。

取消了segment分段锁数组,直接在Node数组的每个位置上加Synchronized锁,并利用CAS优化性能 ,使得锁的粒度更细,进一步减少并发冲突的概率

JDK8中的扩容性能更⾼

⽀持多线程同时扩容,任何一个线程都可以去帮助扩容

CHM是怎么保证线程安全的?

JDK 1.7:Segment数组+ HashEntry

使用分段锁,将数据分成一段一段的存储,然后给每个段(Segment[i])配一把锁(ReentrantLock),当一个线程占用锁访问其中一个段数据的时候,其他段的数据不受影响也能被访问

JDK1.8:Node数组+链表/红黑树

并发控制使用Synchronized+CAS,如果线程抢到sync锁则可以正常执行操作,如果没有抢到sync锁则cas自旋等待,超过一定的次数结束自旋进入阻塞状态,因为自旋也是要耗资源的不能一直自旋下去。

应用举例

比如可以用CHM作为ip池存储所有的IP,可以支持同时有多个线程来取IP,每个线程拿ip都是线程安全的,不会存在两个线程同时拿到同一个ip。

类似的开发场景还有很多,总之就是将CHM当作一个存放资源的容器,可以保证资源的互斥访问。

CHM一定线程安全吗?

不一定,因为虽然get和put这两个方法都是线程安全的,但是组合在一起使用时,线程安全性没法保证,可能a线程刚执行完get操作cpu就被b抢走了。

看下面的代码:

读下CHM1.7的源码

hash

不管是我们的get操作还是put操作要需要通过hash来对数据进行定位。

整体思想就是通过多次不同方式的位运算来努力将数据均匀的分布到目标table中,都是些扰动函数

private int hash(Object k) {int h = hashSeed;if ((0 != h) && (k instanceof String)) {return sun.misc.Hashing.stringHash32((String) k);}h ^= k.hashCode();// single-word Wang/Jenkins hash.h += (h << 15) ^ 0xffffcd7d;h ^= (h >>> 10);h += (h << 3);h ^= (h >>> 6);h += (h << 2) + (h << 14);return h ^ (h >>> 16);}

put

计算插入的索引值通过:key.hashcode & segmentMask(segmentMask=ssize-1, 类似hashmap的n-1)

public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);// hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算// 其实也就是把高4位与segmentMask(1111)做与运算int j = (hash >>> segmentShift) & segmentMask;if ((s = (Segment<K,V>)UNSAFE.getObject// nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment// 如果查找到的 Segment 为空,初始化s = ensureSegment(j);return s.put(key, hash, value, false); //待分析}

上面的源码分析了 ConcurrentHashMap 在 put 一个数据时的处理流程,下面梳理下具体流程。

1、计算要 put 的 key 的位置,获取指定位置的 Segment。

2、如果指定位置的 Segment 为空,则初始化这个 Segment.

3、初始化 Segment 流程:

检查计算得到的位置的 Segment是否为null.

为 null 继续初始化,使用 Segment[0] 的容量和负载因子创建一个 HashEntry 数组。

再次检查计算得到的指定位置的 Segment是否为null.

使用创建的 HashEntry 数组初始化这个 Segment.

自旋判断计算得到的指定位置的 Segment是否为null,使用 CAS 在这个位置赋值为 Segment.

Segment.put 插入 key,value 值。

上面探究了获取 Segment 段和初始化 Segment 段的操作。最后一行的 Segment 的 put 方法还没有查看,继续分析。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {// 获取 ReentrantLock 独占锁,如果获取不到,则用scanAndLockForPut 循环获取。HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry<K,V>[] tab = table;// 计算要put的数据位置int index = (tab.length - 1) & hash;// CAS 获取 index 坐标的值HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {// 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 valueK k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {// first 有值,说明 index 位置已经有值了,有冲突,链表头插法。if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;// 容量大于扩容阀值,小于最大容量,进行扩容if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);else// index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头setEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue;}

由于 Segment 继承了 ReentrantLock,所以 Segment 内部可以很方便的获取锁,put 流程就用到了这个功能。下面是对上面源码的说明:

1、tryLock() 获取锁,获取不到使用 scanAndLockForPut 方法继续获取。

2、计算 put 的数据要放入的 index 位置,然后获取这个位置上的 HashEntry 。

3、遍历 put 新元素,为什么要遍历?因为这里获取的 HashEntry 可能是一个空元素,也可能是链表已存在,所以要区别对待。

当这个位置上的 HashEntry 不存在:

如果当前容量大于扩容阀值,小于最大容量,进行扩容。

小于扩容直接头插法插入。

当这个位置上的 HashEntry 存在:

判断链表当前元素 Key 和 hash 值是否和要 put 的 key 和 hash 值一致。

一致则替换值

不一致,获取链表下一个节点,直到发现相同进行值替换,如果链表里没有相同的,则:

如果当前容量大于扩容阀值,小于最大容量,进行扩容。

小于扩容直接链表头插法插入。

4、如果要插入的位置之前已经存在,替换后返回旧值,否则返回 null.

scanAndLockForPut 分析

第一步中的 scanAndLockForPut 操作这里没有介绍,这个方法做的操作就是不断的自旋 tryLock() 获取锁。当自旋次数大于指定次数时,使用 lock() 阻塞获取锁。在自旋时顺便获取下 hash 位置的 HashEntry。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating node// 自旋获取锁while (!tryLock()) {HashEntry<K,V> f; // to recheck first belowif (retries < 0) {if (e == null) {if (node == null) // speculatively create nodenode = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))retries = 0;elsee = e.next;}else if (++retries > MAX_SCAN_RETRIES) {// 自旋达到指定次数后,阻塞等待直到获取到锁lock();break;}else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}

总结

当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。

get

ConcurrentHashMap的get操作跟HashMap类似,只是ConcurrentHashMap第一次需要经过一次hash定位到Segment的位置,然后再hash定位到指定的HashEntry,遍历该HashEntry下的链表进行对比,成功就返回,不成功就返回null

public V get(Object key) {Segment<K,V> s;HashEntry<K,V>[] tab;int h = hash(key); // JDK7中标准的hash值获取算法long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到对应的segment上if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {// 无非就是获得hash值对应的segment 是否存在,for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {// 看下这个hash值对应的是segment(HashEntry)中的具体位置。然后遍历查询该链表K k;if ((k = e.key) key || (e.hash h && key.equals(k)))return e.value;}}return null;}

size

计算ConcurrentHashMap的元素大小是一个有趣的问题,因为他是并发操作的,就是在你计算size的时候,他还在并发的插入数据,可能会导致你计算出来的size和你实际的size有相差(在你return size的时候,插入了多个数据),要解决这个问题,JDK1.7版本用两种方案

1、第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的

2、第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回

所以size方法要谨慎使用

public int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last = 0L; // previous sumint retries = -1; // first iteration isn't retrytry {for (;;) {if (retries++ RETRIES_BEFORE_LOCK) {// 超过2次则全部加锁for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // 直接对全部segment加锁消耗性太大}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount; // 统计的是modCount,涉及到增删该都会加1int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}if (sum last) // 每一个前后的修改次数一样 则认为一样,但凡有一个不一样则直接break。break;last = sum;}} finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size;}

rehash

segment 数组初始化后就不可变了,也就是说「并发性不可变」,不过segment里的table可以扩容为2倍,该方法没有考虑并发,因为执行该方法之前已经获取了锁。其中JDK7中的rehash思路跟JDK8 中扩容后处理链表的思路一样。

// 方法参数中的node 是这次扩容后,需要添加到新的数组中的数据。private void rehash(HashEntry<K,V> node) {HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;// 2 倍int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);// 创建新数组HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’int sizeMask = newCapacity - 1;// 遍历原数组,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置for (int i = 0; i < oldCapacity ; i++) {// e 是链表的第一个元素HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;// 计算应该放置在新数组中的位置,// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19int idx = e.hash & sizeMask; // 新位置if (next null) // 该位置处只有一个元素newTable[idx] = e;else {// Reuse consecutive sequence at same slot// e 是链表表头HashEntry<K,V> lastRun = e;// idx 是当前链表的头结点 e 的新位置int lastIdx = idx;// for 循环找到一个 lastRun 结点,这个结点之后的所有元素是将要放到一起的for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}// 将 lastRun 及其之后的所有结点组成的这个链表放到 lastIdx 这个位置newTable[lastIdx] = lastRun;// 下面的操作是处理 lastRun 之前的结点,//这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;}

CAS操作

在JDK7里在ConcurrentHashMap中通过原子操作sun.misc.Unsafe查找元素、替换元素和设置元素。通过这样的硬件级别获得数据可以保证即使是多线程,每次获得的数据也是最新的。这些原子操作起着非常关键的作用,你可以在所有ConcurrentHashMap的基本功能中看到。

读下CHM1.8的源码

重要参数

private static final int MAXIMUM_CAPACITY = 1 << 30; // 数组的最大值 private static final int DEFAULT_CAPACITY = 16; // 默认数组长度 static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的一个条件 static final int MIN_TREEIFY_CAPACITY = 64; // 链表转红黑树的第二个条件static final int UNTREEIFY_THRESHOLD = 6; // 红黑树转链表的条件 static final int MOVED= -1; // 表示正在扩容转移 static final int TREEBIN = -2; // 表示已经转换成树 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // 获得hash值的辅助参数transient volatile Node<K,V>[] table;// 默认没初始化的数组,用来保存元素 private transient volatile Node<K,V>[] nextTable; // 转移的时候用的数组 static final int NCPU = Runtime.getRuntime().availableProcessors();// 获取可用的CPU个数 private transient volatile Node<K,V>[] nextTable; // 连接表,用于哈希表扩容,扩容完成后会被重置为 null private transient volatile long baseCount; //保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。private transient volatile int sizeCtl;

sizeCtl :用来控制table的初始化和扩容操作

-1 代表table正在初始化

-N 表示有N-1个线程正在进行扩容操作

如果table未初始化,表示table需要初始化的大小。

如果table初始化完成,表示table的容量,默认是table大小的0.75倍

五个构造方法

1、ConcurrentHashMap()

无参构造函数用于创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16)的新的CHM对象。

2、ConcurrentHashMap(intinitialCapacity)

得到一个>initialCapacity2的幂次方的初始容量的CHM对象

和1.7以及hashMap不一样,即使initialCapacity是2的幂次方,也会计算得到一个大于initialCapacity的值,

例如initialCapacity为16,会初始化为32

3、ConcurrentHashMap(int initialCapacity, float loadFactor)

创建一个带有指定初始容量、加载因子和默认 concurrencyLevel的新的空映射。

注意看下面框起来的,concurrencyLevel=1

4、ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)

创建一个>=initialCapacity2的幂次方的初始容量的CHM对象

如initialCapacity=15,则sizeCtl=16,

若initialCapacity为16,则sizeCtl为16。若initialCapacity大小超过了允许的最大值,则sizeCtl为最大值。值得注意的是,构造函数中的concurrencyLevel参数已经在JDK1.8中的意义发生了很大的变化,其并不代表所允许的并发数,其只是用来确定sizeCtl大小,在JDK1.8中的并发控制都是针对具体的桶而言,即有多少个桶就可以允许多少个并发数

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;}

5、该构造函数用于构造一个与给定映射具有相同映射关系的新映射

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this.sizeCtl = DEFAULT_CAPACITY;putAll(m);}

put

put方法的执行流程如下:

① 判断存储的key、value是否为空,若为空,则抛出异常,不为空进入步骤②

② 计算key的hash值,随后插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③

③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤④

④ 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤

⑤ 对桶中的第一个结点进行加锁,对该桶进行遍历,桶中的结点的hash值和key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到hash值与key值指定的hash值与key值相等的结点,则直接新生一个结点并插入到链表末尾。进入步骤⑥

⑥ 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。

总结:

如果没有初始化就先调用initTable()方法来进行初始化过程

如果没有hash冲突就直接CAS插入

如果还在进行扩容操作就先进行扩容

如果存在hash冲突,就加锁来保证线程安全

最后一个如果Hash冲突时会形成Node链表,在链表长度超过8,Node数组超过64时会将链表结构转换为红黑树的结构

源码:

final V putVal(K key, V value, boolean onlyIfAbsent) {// ①键或值为空,抛出异常if (key == null || value == null) throw new NullPointerException(); // 键的hash值经过计算获得hash值int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {// 无限循环Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0) // 表为空或者表的长度为0// ②初始化表tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 表不为空并且表的长度大于0,并且该桶不为空if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) // ③比较并且交换值,如tab的第i项为空则用新生成的node替换break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED) // 该结点的hash值为MOVED// ④进行结点的转移(在扩容的过程中)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {//⑤ 加锁同步if (tabAt(tab, i) == f) {// 找到table表下标为i的节点if (fh >= 0) {// 该table表中该结点的hash值大于0// binCount赋值为1binCount = 1;for (Node<K,V> e = f;; ++binCount) {// 无限循环K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {// 结点的hash值相等并且key也相等// 保存该结点的val值oldVal = e.val;if (!onlyIfAbsent) // 进行判断// 将指定的value保存至结点,即进行了结点值的更新e.val = value;break;}// 保存当前结点Node<K,V> pred = e;if ((e = e.next) == null) {// 当前结点的下一个结点为空,即为最后一个结点// 新生一个结点并且赋值给next域pred.next = new Node<K,V>(hash, key,value, null);// 退出循环break;}}}else if (f instanceof TreeBin) {// 结点为红黑树结点类型Node<K,V> p;// binCount赋值为2binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {// 将hash、key、value放入红黑树// 保存结点的valoldVal = p.val;if (!onlyIfAbsent) // 判断// 赋值结点value值p.val = value;}}}}if (binCount != 0) {// binCount不为0//⑥ 如果binCount大于等于转化为红黑树的阈值if (binCount >= TREEIFY_THRESHOLD)// 进行转化treeifyBin(tab, i);if (oldVal != null) // 旧值不为空// 返回旧值return oldVal;break;}}}// 增加binCount的数量addCount(1L, binCount);return null;}

在putVal函数中会涉及到如下几个函数:initTable、tabAt、casTabAt、helpTransfer、putTreeVal、treeifyBin、addCount函数。下面对其中涉及到的函数进行分析。

initTable

initTable只允许一个线程对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度。这样保证了表同时只会被一个线程初始化,

对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认大小为16.

// 容器初始化 操作private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) null || tab.length 0) {if ((sc = sizeCtl) < 0) // 如果正在初始化-1,-N 正在扩容。Thread.yield(); // 进行线程让步等待// 让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。// 它可能会获取到,也有可能被其他线程获取到。else if (pareAndSwapInt(this, SIZECTL, sc, -1)) {// 比较sizeCtl的值与sc是否相等,相等则用 -1 替换,这表明我这个线程在进行初始化了!try {if ((tab = table) null || tab.length 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认为16@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2); // sc = 0.75n}} finally {sizeCtl = sc; //设置sizeCtl 类似threshold}break;}}return tab;}

unsafe

在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。

table[i]数据是通过Unsafe对象通过反射获取的,

取数据直接table[index]不可以么,为什么要这么复杂?

在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的「副本」,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,「保证了每次拿到数据都是最新的」。

addCount

主要就2件事:一是更新 baseCount,二是判断是否需要扩容。

private final void addCount(long x, int check) {CounterCell[] as; long b, s;// 首先如果没有并发 此时countCells is null, 此时尝试CAS设置数据值。if ((as = counterCells) != null || !pareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {// 如果 counterCells不为空以为此时有并发的设置 或者 CAS设置 baseCount 失败了CounterCell a; long v; int m;boolean uncontended = true;if (as null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) null ||!(uncontended = pareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 1. 如果没出现并发 此时计数盒子为 null// 2. 随机取出一个数组位置发现为空// 3. 出现并发后修改这个cellvalue 失败了// 执行funAddCountfullAddCount(x, uncontended);// 死循环操作return;}if (check <= 1)return;s = sumCount(); // 吧counterCells数组中的每一个数据进行累加给baseCount。}// 如果需要扩容if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);// 获得高位标识符if (sc < 0) {// 是否需要帮忙去扩容if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc rs + 1 ||sc rs + MAX_RESIZERS || (nt = nextTable) null || transferIndex <= 0)break;if (pareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);} // 第一次扩容else if (pareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}

baseCount添加ConcurrentHashMap提供了baseCount、counterCells 两个辅助变量和一个 CounterCell辅助内部类

sumCount() 就是迭代 counterCells来统计 sum 的过程。

put 操作时,肯定会影响 size(),在 put() 方法最后会调用addCount()方法。整体的思维方法跟LongAdder类似,用的思维就是借鉴的ConcurrentHashMap。每一个Cell都用Contended修饰来避免伪共享。

JDK1.7 和 JDK1.8 对 size 的计算是不一样的。1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。

JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。

JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。

TreeBin

主要功能就是当链表变化为红黑树时,这个红黑树用TreeBin来包装。并且要注意 转成红黑树以后以前链表的结构信息还是有的,最终信息如下:

TreeBin.first = 链表中第一个节点。

TreeBin.root = 红黑树中的root节点。

TreeBin(TreeNode<K,V> b) {super(TREEBIN, null, null, null); //创建空节点 hash = -2 this.first = b;TreeNode<K,V> r = null; // root 节点for (TreeNode<K,V> x = b, next; x != null; x = next) {next = (TreeNode<K,V>)x.next;x.left = x.right = null;if (r null) {x.parent = null;x.red = false;r = x; // root 节点设置为x }else {K k = x.key;int h = x.hash;Class<?> kc = null;for (TreeNode<K,V> p = r;;) {// x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点int dir, ph;K pk = p.key;if ((ph = p.hash) > h)dir = -1;else if (ph < h)dir = 1;else if ((kc null &&(kc = comparableClassFor(k)) null) ||(dir = compareComparables(kc, k, pk)) 0)dir = tieBreakOrder(k, pk); // 当key不可以比较,或者相等的时候采取的一种排序措施TreeNode<K,V> xp = p;// 放一定是放在叶子节点上,如果还没找到叶子节点则进行循环往下找。// 找到了目前叶子节点才会进入 再放置数据if ((p = (dir <= 0) ? p.left : p.right) null) {x.parent = xp;if (dir <= 0)xp.left = x;elsexp.right = x;r = balanceInsertion(r, x); // 每次插入一个元素的时候都调用 balanceInsertion 来保持红黑树的平衡break;}}}}this.root = r;assert checkInvariants(root);}

tryPresize

当数组长度小于64的时候,扩张数组长度一倍,调用此函数。扩容后容量大小的核对,可能涉及到初始化容器大小。并且扩容的时候又跟2的次幂联系上了!,其中初始化时传入的map会调用putAll方法直接put一个map的话,在「putAll」方法中没有调用initTable方法去初始化table,而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断。

PS:默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1,这个时候说明扩容完毕了。

transfer 扩容

ConcurrentHashMap引入了多线程并发扩容的机制,简单来说就是多个线程对原始数组进行分片后,每个线程负责一个分片的数据迁移,从而提升了扩容过程中数据迁移的效率。

扩容的源码主要分三部分:

1、 主要是 单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作。

2、每个线程进来会先领取自己的任务区间[bound,i],然后开始 --i 来遍历自己的任务区间,对每个桶进行处理。如果遇到桶的头结点是空的,那么使用 ForwardingNode标识旧table中该桶已经被处理完成了。如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。如果是正常的桶,对桶首节点加锁,正常的迁移即可(跟HashMap第三部分一样思路),迁移结束后依然会将原表的该位置标识位已经处理。

该函数中的finish= true 则说明整张表的迁移操作已经「全部」完成了,我们只需要重置 table的引用并将 nextTable 赋为空即可。否则,CAS 式的将 sizeCtl减一,表示当前线程已经完成了任务,退出扩容操作。如果退出成功,那么需要进一步判断当前线程是否就是最后一个在执行扩容的。

第一次扩容时在addCount中有写到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示当前只有一个线程正在工作,「相对应的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,说明当前线程就是最后一个还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。

f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;

3、几乎跟HashMap大致思路类似的遍历链表/红黑树然后扩容操作。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE 用来控制不要占用太多CPUstride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_TRANSFER_STRIDE=16 每个CPU处理最小长度个数if (nextTab null) {// 新表格为空则直接新建二倍,别的辅助线程来帮忙扩容则不会进入此if条件try {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {// try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n; // transferIndex 指向最后一个桶,方便从后向前遍历}int nextn = nextTab.length; // 新表长度ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点boolean advance = true; //是否继续向前查找的标志位boolean finishing = false; // to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看有没完成的没// 第一部分// i 指向当前桶, bound 指向当前线程需要处理的桶结点的区间下限【bound,i】 这样来跟线程划分任务。for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 这个 while 循环的目的就是通过 --i 遍历当前线程所分配到的桶结点// 一个桶一个桶的处理while (advance) {// 每一次成功处理操作都会将advance设置为true,然里来处理区间的上一个数据int nextIndex, nextBound;if (--i >= bound || finishing) {//通过此处进行任务区间的遍历advance = false;}else if ((nextIndex = transferIndex) <= 0) {i = -1;// 任务分配完了advance = false;}// 更新 transferIndex// 为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)else if (pareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {// nextIndex本来等于末尾数字,bound = nextBound;i = nextIndex - 1;advance = false;}}// 当前线程所有任务完成 if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {// 已经完成转移 则直接赋值操作nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1); //设置sizeCtl为扩容后的0.75return;}if (pareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// sizeCtl-1 表示当前线程任务完成。if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {// 判断当前线程完成的线程是不是最后一个在扩容的,思路精髓return;}finishing = advance = true;// 如果是则相应的设置参数i = n; }}else if ((f = tabAt(tab, i)) null) // 数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])advance = casTabAt(tab, i, null, fwd); // 如果老节点数据是空的则直接进行CAS设置为fwdelse if ((fh = f.hash) MOVED) //已经是个fwd了,因为是多线程操作 可能别人已经给你弄好了,advance = true; // already processedelse {synchronized (f) {//加锁操作if (tabAt(tab, i) f) {Node<K,V> ln, hn;if (fh >= 0) {//该节点的hash值大于等于0,说明是一个Node节点// 关于链表的操作整体跟HashMap类似不过 感觉好像更扰一些。int runBit = fh & n; // fh= f.hash first hash的意思,看第一个点 放老位置还是新位置Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n; //n的值为扩张前的数组的长度if (b != runBit) {runBit = b;lastRun = p;//最后导致发生变化的节点}}if (runBit 0) {//看最后一个变化点是新还是旧 旧ln = lastRun;hn = null;}else {hn = lastRun; //看最后一个变化点是新还是旧 旧ln = null;}/** 构造两个链表,顺序大部分和原来是反的,不过顺序也有差异* 分别放到原来的位置和新增加的长度的相同位置(i/n+i)*/for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) 0)/** 假设runBit的值为0,* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/ln = new Node<K,V>(ph, pk, pv, ln);else/** 假设runBit的值不为0,* 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点* 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/hn = new Node<K,V>(ph, pk, pv, hn); }setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {// 该节点hash值是个负数否则的话是一个树节点TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null; // 旧 头尾TreeNode<K,V> hi = null, hiTail = null; //新头围int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) 0) {if ((p.prev = loTail) null)lo = p;elseloTail.next = p; //旧头尾设置loTail = p;++lc;}else {// 新头围设置if ((p.prev = hiTail) null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}//ln 如果老位置数字<=6 则要对老位置链表进行红黑树降级到链表,否则就看是否还需要对老位置数据进行新建红黑树ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd); //老表中i位置节点设置下advance = true;}}}}}}

get

计算hash值,定位到该table的索引位置如果是首节点符合就返回如果遇到扩容,查找该节点,匹配就返回以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 计算key的hash值int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// 表不为空并且表的长度大于0并且key所在的桶不为空if ((eh = e.hash) == h) {// 表中的元素的hash值与key的hash值相等if ((ek = e.key) == key || (ek != null && key.equals(ek)))// 键相等return e.val; // 返回值}//hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTableelse if (eh < 0)// 在桶(链表/红黑树)中查找return (p = e.find(h, key)) != null ? p.val : null;//既不是首节点也不是ForwardingNode,那就往下遍历 while ((e = e.next) != null) {// 对于结点hash值大于0的情况if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;}

clear

关于清空也相对简单 ,无非就是遍历桶数组,然后通过CAS来置空。

public void clear() {long delta = 0L;int i = 0;Node<K,V>[] tab = table;while (tab != null && i < tab.length) {int fh;Node<K,V> f = tabAt(tab, i);if (f null)++i; //这个桶是空的直接跳过else if ((fh = f.hash) MOVED) {// 这个桶的数据还在扩容中,要去扩容同时等待。tab = helpTransfer(tab, f);i = 0; // restart}else {synchronized (f) {// 真正的删除if (tabAt(tab, i) f) {Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);//循环到链表/者红黑树的尾部while (p != null) {--delta; // 记录删除了多少个p = p.next;} //利用CAS无锁置null setTabAt(tab, i++, null);}}}}if (delta != 0L)addCount(delta, -1); //调整count}

如果觉得《线程安全集合类- ConcurrentHashMap》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。