您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
HashMap是一個線程不安全的類,在并發情況下會產生很多問題,詳情可以參考HashMap 源碼解析;HashTable是線程安全的類,但是它使用的是synchronized來保證線程安全,線程競爭激烈的情況下效率非常低下。在jdk1.5的時候引入了ConcurrentHashMap,這也是一個線程安全的類,它使用了分段鎖的技術來提升并發訪問效率。
HashTable容器在競爭激烈的并發環境下表現出效率低下的原因是所有訪問HashTable的 線程都必須競爭同一把鎖,假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分數據,那么 當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效提高并 發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。
在jdk1.7及以前ConcurrentHashMap采用Segment數組結構和HashEntry數組結構組成,之后采用的是和HashMap一樣的結構。
采用Segment數組結構和HashEntry數組結構組成,Segment數組的大小就是ConcurrentHashMap的并發度。Segment繼承自ReentrantLock,所以他本身就是一個鎖。Segment數組一旦初始化后就不會再進行擴容,這也是jdk1.8去掉他的原因。Segment里面又包含了一個table數組,這個數組是可以擴容的。
如圖我們在定位數據的時候需要對key的hash值進行兩次尋址操作,第一次找到在Segment數組的位置,第二次找到在table數組中的位置。
// 直接繼承自ReentrantLock,所以一個Segment本身就是一個鎖 static final class Segment<K,V> extends ReentrantLock implements Serializable { ... // table數組 transient volatile HashEntry<K,V>[] table; // 一個Segment內的元素個數 transient int count; // 擴容閾值 transient int threshold; // 擴容因子 final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } ...
我們發現Segment直接繼承自ReentrantLock,所以一個Segment本身就是一個鎖。所以Segment數組的長度大小直接影響了ConcurrentHashMap的并發度。還有每個Segment單獨維護了擴容閾值,擴容因子,所以每個Segment的擴容操作時完全獨立互不干擾的。
static final class HashEntry<K,V> { // 不可變 final int hash; final K key; // volatile保證可見性,這樣我們在get操作時就不用加鎖了 volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } ... }
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 參數校驗 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 并發度控制,最大是65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 等于ssize從1向左移位的 次數 int sshift = 0; int ssize = 1; // 找出最接近concurrencyLevel的2的n次冪的數值 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 這里之所 以用32是因為ConcurrentHashMap里的hash()方法輸出的最大數是32位的 this.segmentShift = 32 - sshift; // 散列運算的掩碼,等于ssize減1 this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 里HashEntry數組的長度度,它等于initialCapacity除以ssize的倍數c,如果c大于1,就會取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。 int cap = MIN_SEGMENT_TABLE_CAPACITY; // 保證HashEntry數組大小一定是2的n次冪 while (cap < c) cap <<= 1; // create segments and segments[0] // 初始化Segment數組,并實際只填充Segment數組的第0個元素。 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
通過代碼我們可以看出,在構造ConcurrentHashMap的時候我們就會完成以下件事情:
確認ConcurrentHashMap的并發度,也就是Segment數組長度,并保證它是2的n次冪
確認HashEntry數組的初始化長度,并保證它是2的n次冪
將Segment數組初始化好并且只填充第0個元素
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 先獲取key的hash值 int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 2. 定位到Segment s = ensureSegment(j); // 3.調用Segment的put方法 return s.put(key, hash, value, false); }
主要流程是:
先獲取key的hash值
定位到Segment
調用Segment的put方法
private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash42((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
這個方法大致思路是:先拿到key的hashCode,然后對這個值進行再散列。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 1. 加鎖 HashEntry<K,V> node = tryLock() ? null : // scanAndLockForPut在沒有獲取到鎖的情況下,去查詢key是否存在,如果不存在就新建一個Node scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; // 確定元素在tabl數組上的位置 int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; // 如果原來位置上有值并且key相同,那么直接替換原來的value if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); // 元素總數加一 int c = count + 1; // 判斷是否需要擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
大致過程是:
加鎖
定位key在tabl數組上的索引位置index
,獲取到頭結點
判斷是否有hash沖突
如果沒有沖突直接將新節點node
添加到數組index
索引位
如果有沖突,先判斷是否有相同key
有相同key直接替換對應node的value值
沒有添加新元素到鏈表尾部
解鎖
這里需要注意的是scanAndLockForPut方法,他在沒有獲取到鎖的時候不僅會通過自旋獲取鎖,還會做一些其他的查找或新增節點的工,以此來提升put性能。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //定位HashEntry數組位置,獲取第一個節點 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; //掃描次數,循環標記位 int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below // 表示遍歷鏈表還沒有結束 if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 完成新節點初始化 node = new HashEntry<K,V>(hash, key, value, null); // 完成鏈表的遍歷,還是沒有找到相同key的節點 retries = 0; } // 有hash沖突,開始查找是否有相同的key else if (key.equals(e.key)) retries = 0; else e = e.next; } // 斷循環次數是否大于最大掃描次數 else if (++retries > MAX_SCAN_RETRIES) { // 自旋獲取鎖 lock(); break; } // 每間隔一次循環,檢查一次first節點是否改變 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { // 首節點有變動,更新first,重新掃描 e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
scanAndLockForPut方法在當前線程獲取不到segment鎖的情況下,完成查找或新建節點的工作。當獲取到鎖后直接將該節點加入鏈表即可,提升了put操作的性能。大致過程:
定位key在HashEntry數組的索引位,并獲取第一個節點
嘗試獲取鎖,如果成功直接返回,否則進入自旋
判斷是否有hash沖突,沒有就直接完成新節點的初始化
有hash沖突,開始遍歷鏈表查找是否有相同key
如果沒找到相同key,那么就完成新節點的初始化
如果找到相同key,判斷循環次數是否大于最大掃描次數
如果循環次數是否大于最大掃描次數,就直接CAS拿鎖(阻塞式)
如果循環次數不大于最大掃描次數,判斷頭結點是否有變化
進入下次循環
private void rehash(HashEntry<K,V> node) { // 復制老數組 HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // table數組擴容2倍 int newCapacity = oldCapacity << 1; // 擴容閾值也增加兩倍 threshold = (int)(newCapacity * loadFactor); // 創建新數組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 計算新的掩碼 int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 計算新的索引位 int idx = e.hash & sizeMask; // 轉移數據 if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 將新的節點加到對應索引位 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
在這里我們可以發現每次擴容是針對一個單獨的Segment的,在擴容完成之前中不會對擴容前的數組進行修改,這樣就可以保證get()
不被擴容影響。大致過程是:
新建擴容后的數組,容量是原來的兩倍
遍歷擴容前的數組
通過e.hash & sizeMask;
計算key新的索引位
轉移數據
將擴容后的數組指向成員變量table
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); // 計算出Segment的索引位 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 以原子的方式獲取Segment if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // 原子方式獲取HashEntry for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; // key相同 if ((k = e.key) == key || (e.hash == h && key.equals(k))) // value是volatile所以可以不加鎖直接取值返回 return e.value; } } return null; }
我們可以看到get方法是沒有加鎖的,因為HashEntry的value和next屬性是volatile的,volatile直接保證了可見性,所以讀的時候可以不加鎖。Java中Unsafe類可以參考這篇博客。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; // true表示size溢出32位(大于Integer.MAX_VALUE) boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { // retries 如果retries等于2則對所有Segment加鎖 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; // 統計每個Segment元素個數 for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { // 解鎖 if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } // 如果size大于Integer.MAX_VALUE值則直接返貨Integer.MAX_VALUE return overflow ? Integer.MAX_VALUE : size; }
size的核心思想是先進性兩次不加鎖統計,如果兩次的值一樣則直接返回,否則第三個統計的時候會將所有segment全部鎖定,再進行size統計,所以size()盡量少用。因為這是在并發情況下,size其他線程也會改變size大小,所以size()的返回值只能表示當前線程、當時的一個狀態,可以算其實是一個預估值。
public boolean isEmpty() { long sum = 0L; final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { // 只要有一個Segment的元素個數不為0則表示不為null if (seg.count != 0) return false; // 統計操作總數 sum += seg.modCount; } } if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } // 說明在統計過程中ConcurrentHashMap又被操作過, // 因為上面判斷了ConcurrentHashMap不可能會有元素,所以這里如果有操作一定是新增節點 if (sum != 0L) return false; } return true; }
先判斷Segment里面是否有元素,如果有直接返回,如果沒有則統計操作總數;
為了保證在統計過程中ConcurrentHashMap里面的元素沒有發生變化,再對所有的Segment的操作數做了統計;
最后 sum==0 表示ConcurrentHashMap里面確實沒有元素返回true,否則一定進行過新增元素返回false。
和size方法一樣這個方法也是一個若一致方法,最后的結果也是一個預估值。
這個結構和HashMap一樣
//最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; //初始容量 private static final int DEFAULT_CAPACITY = 16; //數組最大容量 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //默認并發度,兼容1.7及之前版本 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //加載/擴容因子,實際使用n - (n >>> 2) private static final float LOAD_FACTOR = 0.75f; //鏈表轉紅黑樹的節點數閥值 static final int TREEIFY_THRESHOLD = 8; //紅黑樹轉鏈表的節點數閥值 static final int UNTREEIFY_THRESHOLD = 6; //當數組長度還未超過64,優先數組的擴容,否則將鏈表轉為紅黑樹 static final int MIN_TREEIFY_CAPACITY = 64; //擴容時任務的最小轉移節點數 private static final int MIN_TRANSFER_STRIDE = 16; //sizeCtl中記錄stamp的位數 private static int RESIZE_STAMP_BITS = 16; //幫助擴容的最大線程數 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //size在sizeCtl中的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // ForwardingNode標記節點的hash值(表示正在擴容) static final int MOVED = -1; // hash for forwarding nodes // TreeBin節點的hash值,它是對應桶的根節點 static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash //存放Node元素的數組,在第一次插入數據時初始化 transient volatile Node<K,V>[] table; //一個過渡的table表,只有在擴容的時候才會使用 private transient volatile Node<K,V>[] nextTable; //基礎計數器值(size = baseCount + CounterCell[i].value) private transient volatile long baseCount; /** * 控制table數組的初始化和擴容,不同的值有不同的含義: * -1:表示正在初始化 * -n:表示正在擴容 * 0:表示還未初始化,默認值 * 大于0:表示下一次擴容的閾值 */ private transient volatile int sizeCtl; //節點轉移時下一個需要轉移的table索引 private transient volatile int transferIndex; //元素變化時用于控制自旋 private transient volatile int cellsBusy; // 保存table中的每個節點的元素個數 長度是2的冪次方,初始化是2,每次擴容為原來的2倍 // size = baseCount + CounterCell[i].value private transient volatile CounterCell[] counterCells;
其他的參考HashMap 源碼解析
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } ...
鏈表節點,保存著key和value的值。
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } ...
紅黑樹節點,包含了樹的信息。
TreeBins中使用的節點
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; // 鎖的持有者 volatile Thread waiter; // 鎖狀態 volatile int lockState; // values for lockState // 表示持有寫鎖 static final int WRITER = 1; // set while holding write lock // 表示等待 static final int WAITER = 2; // set when waiting for write lock // 表示讀鎖的增量值 static final int READER = 4; // increment value for setting read lock ...
與HashMap有點區別的是,他不直接使用TreeNode作為數的根節點,而是使用TreeBins對其做了裝飾后成為了根節點;同時它還記錄了鎖的狀態;需要注意的是:
TreeBins節點的hash值是 -2
我們對紅黑樹添加節點后,紅黑樹的根節點有可能會因為旋轉而發生變化,所以我們在添加樹節點的時候在putTreeVal()
方法里面我們使用cas在加了一次鎖。
/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; // 1. 判斷新的數組是否是null, // 2. 如果不為NULL給那就找到對應索引位上的頭結點 // 3. 判斷頭節點是否為NULL if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; // 自旋找節點 for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { // 如果又變成了ForwardingNode標記節點,那說明有發生了擴容,需要跳出循環從新查找 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
ForwardingNode 節點是一個擴容標記節點,只要在數組上發現對應索引位上是ForwardingNode 節點時,表示正在擴容。當get方法調用時,如果遇到ForwardingNode 節點,那么它將會到擴容后的數據上查找數據,否則還是在擴容前的數組上查找數據。這個要注意兩點:
這個節點的hash值是 -1
這個節點的find方法是在對擴容后的數組進行查找
public ConcurrentHashMap18() { }
與HashMap一樣,構造函數啥都沒干,初始化操作是在第一次put完成的。
public V put(K key, V value) { return putVal(key, value, false); }
啥都么有
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
計算key.hashCode()并將更高位的散列擴展(XOR)降低。采用位運算主要是是加快計算速度。
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 計算hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 判斷是否需要初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 找出key對應的索引位上的第一個節點 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果該索引位為null,則直接將數據放到該索引位 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 正在擴容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 加內置鎖鎖定一個數組的索引位,并添加節點 synchronized (f) { if (tabAt(tab, i) == f) { // 表示鏈表節點 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // key相同直接替換value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 將新節點添加到鏈表尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 表示樹節點 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; // 添加數節點 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 嘗試將鏈表轉換成紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
主要流程:
計算key的hash值
判斷是否需要初始化,如果是則調用initTable()
方法完成初始化
判斷是否有hash沖突,如沒有直接設置新節點到對飲索引位,如果有獲取頭結點
根據頭結點的hash值判斷是否正在擴容,如果是則幫助擴容
如果沒有擴容則對頭結點加鎖,添加新節點
fh >= 0
根據頭結點hash值判斷是否是鏈表節點,如果是新增鏈表節點,否則新增樹節點
新增樹節點putTreeVal()
需要注意,紅黑樹的根節點有可能會因為旋轉而發生變化,所以我們在添加節點的時候還需要對根節點使用cas在加了一次鎖。
判斷是否需要嘗試由鏈表轉換成樹結構
addCount(1L, binCount);
新增count數,并判斷是否需要擴容或者幫助擴容
-1:表示正在初始化
-n:表示正在擴容
0:表示還未初始化,默認值
大于0:表示下一次擴容的閾值
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 正在初始化 if ((sc = sizeCtl) < 0) // 讓出CPU執行權,然后自旋 Thread.yield(); // lost initialization race; just spin // CAS替換標志位(相當于獲取鎖) else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 二次判斷 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 相當于sc=n*3/4 sc = n - (n >>> 2); } } finally { // 擴容閾值 sizeCtl = sc; } break; } } return tab; }
主要過程:
根據sizeCtl判斷是否正在初始化
如果其他線程正在初始化就讓出CPU執行權,進入下一次CPU執行權的競爭Thread.yield();
如果沒有進行初始化的線程則,CAS替換sizeCtl標志位(相當于獲取鎖)
獲取到鎖后再次判斷是否初始化
如果沒有則初始化Node數組,并設置sizeCtl值為下一次擴容閾值
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // ForwardingNode標記節點,表示正在擴容 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
判斷是否正在擴容,如果是就幫助擴容。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {\ // n原來數組長度 int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 判斷是發起擴容的線程還是幫助擴容的線程,如果是發起擴容的需要初始化新數組 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 擴容期間的數據節點(用于標志位,hash值是-1) ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 當advance == true時,表明該節點已經處理過了 boolean advance = true; // 在擴容完成之前保證get不被影響 boolean finishing = false; // to ensure sweep before committing nextTab // 1. 從右往左找到第一個有數據的索引位節點(有hash沖突的桶) // 2. 如果找到的節點是NULL節點(沒有hash沖突的節點),那么將該索引位的NULL替換成ForwardingNode標記節點,這個節點的hash是-1 // 3. 如果找到不為NULL的節點(有hash沖突的桶),則對這個節點進行加鎖 // 4. 開始進進移動節點數據 for (int i = 0, bound = 0;;) { //f:當前處理i位置的node(頭結點或者根節點); Node<K,V> f; int fh; // 通過while循環獲取本次需要移動的節點索引i while (advance) { // nextIndex:下一個要處理的節點索引; nextBound:下一個需要處理的節點的索引邊界 int nextIndex, nextBound; // i是老數組索引位,通過--i來講索引位往前一個索引位移動,直到0索引位 if (--i >= bound || finishing) advance = false; // 節點已全部轉移 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // transferIndex(初值為最后一個節點的索引),表示從transferIndex開始后面所有的節點都已分配, // 每次線程領取擴容任務后,需要更新transferIndex的值(transferIndex-stride)。 // CAS修改transferIndex,并更新索引邊界 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; // 老數組最后一個索引位置 i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; // 已經完成所有節點復制了 if (finishing) { nextTable = null; table = nextTab; // sizeCtl閾值為原來的1.5倍 sizeCtl = (n << 1) - (n >>> 1); // 結束自旋 return; } // CAS 更新擴容閾值,在這里面sizectl值減一,說明新加入一個線程參與到擴容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 將以前老數組上為NULL的節點(還沒有元素的桶或者說成沒有hash沖突的數據節點),用ForwardingNode標記節點補齊 // 主要作用是:其他線程在put元素,發現找到的索引位是fwd節點則表示正在擴容,那么該線程會來幫助擴通,而不是在那里等待 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 表示處理過該節點了 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 對應索引位加鎖 synchronized (f) { // 再次校驗一下老數組對應索引位節點是否是我們找到的節點f if (tabAt(tab, i) == f) { // 低索引位頭節點(i位), 高位索引位頭節點(i+tab.length) Node<K,V> ln, hn; // fh >=0 表示鏈表節點,TreeBin節點的hash值-2 if (fh >= 0) { // fh & n算法可以算出新的節點該分配到那個索引位(runBit要么為0放低位ln,要么為n放高位hn), // runBit表示鏈表中最后一個元素的hash值&n的值 int runBit = fh & n; // lastRun表示鏈表中最后一個元素 Node<K,V> lastRun = f; // 找到鏈表中最后一個節點,并賦值給lastRun for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 判斷原來的最后一個節點應該添加到高位還是低位 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // f表示頭結點,如果p不是尾節點,則轉移節點 // 如果以前節點順序是 1 2 3 4 轉移后就是 3 2 1 4 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) // 轉移節點時都是新建節點,以免破壞原來數組結構影響get方法 ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 設置新數組低索引位頭節點(i位) setTabAt(nextTab, i, ln); // 設置新數組高位索引位頭節點(i+tab.length) setTabAt(nextTab, i + n, hn); // 設置老數組i位為標記節點,表示已經處理過了 setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 設置新數組低索引位頭節點(i位) setTabAt(nextTab, i, ln); // 設置新數組高位索引位頭節點(i+tab.length) setTabAt(nextTab, i + n, hn); // 設置老數組i位為標記節點,表示已經處理過了 setTabAt(tab, i, fwd); advance = true; } } } } } }
主要過程:
tab
為擴容前的數組
判斷是否是第一個發起擴容的線程,如果是需要初始化擴容后的數組nextTable
fwd = new ForwardingNode<K,V>(nextTab)
初始化擴容標記節點
進入擴容循環
在擴容前的數組tab
上從右往左(從高索引位到低索引位)遍歷所有頭結點,索引位為i
如果找到的頭結點是NULL(沒有hash沖突)則tab[i]=fwd
。
找到的頭結點不為NULL則(有hash沖突)則鎖定頭結點synchronized (f)
再次校驗頭結點是否發生改變,如果改變直接結束
初始化高索引位和第索引位的頭結點
移動節點到相應索引位
設置擴容后的數組低索引位頭節點(i位)
設置擴容后的數組高位索引位頭節點(i+tab.length)
設置擴容前的數組i位為標記節點(tab[i]=fwd
),表示已經處理過了
進入第3步直到完成
注意:
第5點有
tab[i]=fwd
有兩層含義:1,表示對應索引位已經處理過了;2,當其他線程拿到該頭結點的時候能知曉正在擴容,這時在put的時候幫助擴容,在get的時候去擴容后的數組上找相應的key
int runBit = fh & n;
算法可以算出新的節點該分配到那個索引位(runBit要么為0放低位ln,要么為n放高位hn)如果是鏈表節點,以前節點順序是 1 2 3 4 擴容后會變成 3 2 1 4
擴容的大致過程圖解:
發起擴容,擴容前數組tab
在擴容前的數組tab
上從右往左(從高索引位到低索引位)遍歷所有頭結點,索引位為i
,如果找到的頭結點是NULL則直接賦值成````fwd``` 標記節點。
3. 擴容前的數組上找到不為NULL的節點,則還是移動節點到擴容后的額數組
private final void addCount(long x, int check) { // CounterCell[] as;使用計數器數組因該是為了提升并發量,減小沖突概率 CounterCell[] as; long b, s; // 計數器表不為NULL(counterCells當修改baseCount有沖突時,需要將size增量放到這個計數器數組里面) if ((as = counterCells) != null || // 使用CAS更新baseCount的值(+1)如果失敗說明存在競爭 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // CounterCell是否存在競爭的標記位 boolean uncontended = true; // CounterCell[] as為NULL表示as沒有競爭 if (as == null || (m = as.length - 1) < 0 || // 隨機一個數組位置來驗證是否為NULL,如果a是null表示沒有競爭,隨機也是為了減小沖突概率 (a = as[ThreadLocalRandom.getProbe() & m]) == null || // CAS替換a的value,如果失敗表示存在競爭 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 將size增量值存到as上 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 統計size s = sumCount(); } // 檢查是否需要擴容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // size大于閾值sizeCtl,tab數組長度小于最大值1<<30 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // 表示正在擴容 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 幫助擴容 transfer(tab, nt); } // sc = (rs << RESIZE_STAMP_SHIFT) + 2,移位后是負數 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 發起擴容,此時nextTable=null transfer(tab, null); s = sumCount(); } } }
在put()
方法執行最后會對當前Map的size+1,ConcurrentHashMap中size由baseCount
和CounterCell[] as
組成,size=baseCount+as[i].value
。addCount的大致過程如下:
CAS替換baseCount值,如果失敗說明對size的增量(size++
)存在競爭
如果存在競爭,我們會使用到CounterCell[] as
數組
as[ThreadLocalRandom.getProbe() & m]
隨機取一個索引位,使用CAS完成size++
如果as[i]
也存在競爭會調用fullAddCount(x, uncontended);
方法完成size++
size++
完成后通過size=baseCount+as[i].value
公式計算出元素總數
判斷是否需要擴容
如果需要擴容,在判斷一下是幫助擴容還是發起擴容
注意:
CounterCell[] as:這個的只要目的是分散對
baseCount
的單一競爭,提示size++
的并發率,這里和table
數組一樣使用了鎖分離技術,as的長度也是2的n次方,初始長度是2在第三步中使用隨機數也是為了提升并發效率,
ThreadLocalRandom
類是JDK7在JUC包下新增的隨機數生成器,它解決了Random類在多線程下,多個線程競爭內部唯一的原子性種子變量,而導致大量線程自旋重試的不足
fullAddCount(x, uncontended);
方法里面完成了as
的初始化和擴容元素總數的計算公式是
size=baseCount+as[i].value
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
元素總數的計算公式是size=baseCount+as[i].value
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); // table 不為NULL并且對飲索引位不為NULL if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { // 頭節點key相同 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 樹節點或者ForwardingNode標記節點 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 鏈表節點 while ((e = e.next) != null) { // key相同 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
主要流程:
判斷table和key對應索引位是否為NULL
判斷頭節點是否是要找的節點
eh < 0表示是樹節點或ForwardingNode標記節點,直接通過find方法找對應的key
否則是鏈表節點,挨個鏈表節點找相應的key
返回結果
注意:
get 方法沒有加鎖,原因是節點的value是volatile的,已經保證了可見性,只要value有更新,那么我們一定能讀到最新數據。
e.find(h, key)
這里:如果對應索引位頭結點是ForwardingNode節點,那么會直接去擴通后的數組找對應的key,可以參見上面ForwardingNode.find()
方法
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
get方法和containsKey方法都是遍歷對應索引位上所有節點,來判斷是否存在key相同的節點以及獲得該節點的value。但由于遍歷過程中其他線程可能對鏈表結構做了調整,因此get和containsKey返回的可能是過時的數據,這一點是ConcurrentHashMap在弱一致性上的體現。
去掉了Segment 數組:這樣做鎖的粒度更小,減少了并發沖突的概率;查找數據時不用計算兩次hash;
存儲數據是采用了鏈表+紅黑樹的形式:當一個桶內數據量很大的時候,紅黑樹的查詢效率遠高于鏈表。
1.8直接使用了內置鎖synchronized:簡化了加鎖操作
1.8的初始化是在第一次put時完成的,1.7的時候再構造的時候完成的
在put過程中當發現正在擴容,1.8的線程會幫助擴容,1.7的只是會檢查key是否存在或者完成新節點的初始化工作
1.8的hash值計算更簡單了
1.8擴容過程中會修改擴容前的數組,1.7擴容過程中不會修改原來數組
1.8在get()
時如果判斷到當前索引位正在擴容,那么直接在擴容后的數組中去找對應的key
1.7的size計算使用的三次計算的方式,1.8使用了鎖分離技術
package com.xiaolyuh; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author yuhao.wang3 * @since 2019/7/26 17:58 */ public class HashMapTest { public static void main(String[] args) { AtomicInteger integer = new AtomicInteger(); ExecutorService cachedThreadPool = Executors.newFixedThreadPool(50); Map<User, Integer> map = new ConcurrentHashMap<>(); for (int i = 0; i < 10000; i++) { cachedThreadPool.execute(() -> { User user = new User(1); map.put(user, 1); }); } } static class User { int age; public User(int age) { this.age = age; } @Override public int hashCode() { return age; } @Override public String toString() { return "" + age; } } }
為監控而生的多級緩存框架 layering-cache這是我開源的一個多級緩存框架的實現
上述就是小編為大家分享的JAVA并發容器ConcurrentHashMap 1.7和1.8 源碼怎么寫了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。