PS.本文基于JDK1.8
前言 大家都知道HashMap是线程不安全的,想要在并发的环境中使用,用什么呢?HashTable?采用syncgronized加锁,导致效率及其底下.在java5之后jdk提供了另一个类,ConcurrentHashMap,极大的提升了并发下的性能.
这次将阅读ConcurrentHashMap的源码并记录关键知识.
实现原理 数据结构 与HashMap的数据结构同步,在JDK1.7中使用数组+链表,在JDK1.8之后使用数组+链表+红黑树.
并发 在1.7版本,使用锁分离 技术,即ConcurrentHashMap由Segment组成,每个Segment包含一些Node存储键值对. 而每个Segment都有一把锁.并发性能依赖于Segment的粒度,当你将整个HashMap放入同一个Segment,ConcurrentHashMap会退化成HashMap.
1.8版本中,摒弃了锁分离的概念,虽然保留了Segment,但是只是为了兼容老的版本.
1.8中使用CAS算法+锁来保证并发性能及线程安全
CAS 算法 通俗的讲(我的理解)就是:在每一次操作的时候参数中带有预期值(旧值),当且仅当内存中的值与预期值相同的时候,才写入新值.
源码逐步解析 注意,本文只解读JDK1.8版本的ConcurrentHashMap,在源码中与以前版本有关的东西略过.
常量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static final int MAXIMUM_CAPACITY = 1 << 30 ;private static final int DEFAULT_CAPACITY = 16 ;private static final float LOAD_FACTOR = 0.75f ;static final int TREEIFY_THRESHOLD = 8 ;static final int UNTREEIFY_THRESHOLD = 6 ;static final int MIN_TREEIFY_CAPACITY = 64 ;static final int MOVED = -1 ; static final int TREEBIN = -2 ;
常量的定义较为简单,这里只列出了一些常用的常量,还有一些在具体使用时再贴.
代码中已加入注释,一看就懂.
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable;private transient volatile long baseCount; private transient volatile int sizeCtl;
这里面有一个重要的属性sizeCtl,保留了源码的注释及添加了我的理解.
数据节点Node类 1 2 3 4 5 6 static class Node <K,V> implements Map .Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; }
对于Node类的构造方法以及getter/setter进行了省略.只保留了属性.
可以看到共有四个属性
final修饰的hash值,初始化后不能再次改变.
final修饰的key,初始化后不能再次改变.
volatile 修饰的值
volatile 修饰的下一节点指针
hash和key都被final修饰,不会存在线程安全问题,而value及next被volatile修饰,保证了线程间的数据可见性.
三个重要的原子方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long )i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long )i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt (Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long )i << ASHIFT) + ABASE, v); }
构造方法 构造方法十分简单,这里不再贴代码,只是需要注意:
在创建对象的时候没有进行Node数组的初始化,初始化操作在put时进行.
get()方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
可以看到,在get()方法的过程中,是没有进行加锁操作的,那么是如何保证线程安全的呢?
首先通过tabat获取hash桶的根节点
遍历的时候根据node的volatile属性next.
返回时读取node的volatile属性val.
所有的操作属性都是volatile,由该关键字保证内存的可见性,进一步保证读取时的线程安全.
put()方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException (); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node <K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node <K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
put方法中的流程:
获取hash值
遍历数组
如果未初始化则初始化
如果要插入的位置为null,则使用cas插入,不加锁
如果要插入的位置为扩容标识节点,则帮助其扩容
对插入的hash桶加锁
按照红黑树或者链表的方式进行插入
检查插入后链表长度是否超过阀值,如果超过则转为红黑树
添加计数,如果添加后的数量大于扩容阀值,则进行扩容.
remove()方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 public V remove (Object key) { return replaceNode(key, null , null ); } final V replaceNode (Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1 ) & hash)) == null ) break ; else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; boolean validated = false ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { validated = true ; for (Node<K,V> e = f, pred = null ;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null ) e.val = value; else if (pred != null ) pred.next = e.next; else setTabAt(tab, i, e.next); } break ; } pred = e; if ((e = e.next) == null ) break ; } } else if (f instanceof TreeBin) { validated = true ; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null )) != null ) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null ) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null ) { if (value == null ) addCount(-1L , -1 ); return oldVal; } break ; } } } return null ; }
remove方法中流程:
获取hash值
如果未初始化或者该hash值对应的hash桶为空,则直接返回
如果正在扩容则帮助扩容
对该hash桶加锁
遍历该hash桶处的链表或者红黑树,更新或者删除节点.
后话 本文记录了ConcurrentHashMap的基本原理及几个常用方法的实现,但由于才疏学浅以及ConcurrentHashMap的复杂性,文中可能会有些许疏漏,如有错误欢迎随时指出.
对于ConcurrentHashMap,建议还是先学会使用,在有一定的并发基础后再学习源码,至少要了解volatile及synchronized关键字的实现机制以及JMM(java内存模型)的一些基础知识.否则学习起来十分费劲(我看了好久,,),并且囫囵吞枣,学习之后收获也不一定很大.
参考链接 https://www.jianshu.com/p/cf5e024d9432 https://blog.csdn.net/u010723709/article/details/48007881 https://www.jianshu.com/p/5bc70d9e5410
完。
ChangeLog
2018-11-18 完成
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客——>呼延十