Java 7的ConcurrenHashMap的源码我建议大家都看看,那个版本的源码就是Java多线程编程的教科书。在Java 7的源码中,作者对悲观锁的使用非常谨慎,大多都转换为自旋锁加volatile获得相同的语义,即使最后迫不得已要用,作者也会通过各种技巧减少锁的临界区。在上一篇文章中我们也有讲到,自旋锁在临界区比较小的时候是一个较优的选择是因为它避免了线程由于阻塞而切换上下文,但本质上它也是个锁,在自旋等待期间只有一个线程能进入临界区,其他线程只会自旋消耗CPU的时间片。Java 8中ConcurrentHashMap的实现通过一些巧妙的设计和技巧,避开了自旋锁的局限,提供了更高的并发性能。如果说Java 7版本的源码是在教我们如何将悲观锁转换为自旋锁,那么在Java 8中我们甚至可以看到如何将自旋锁转换为无锁的方法和技巧。
新区网站制作公司哪家好,找创新互联建站!从网页设计、网站建设、微信开发、APP开发、响应式网站设计等网站项目制作,到程序开发,运营维护。创新互联建站成立于2013年到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联建站。
把书读薄
image
图片来源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/
在开始本文之前,大家首先在心里还是要有这样的一张图,如果有同学对HashMap比较熟悉,那这张图也应该不会陌生。事实上在整体的数据结构的设计上Java 8的ConcurrentHashMap和HashMap基本上是一致的。
Java 7中ConcurrentHashMap为了提升性能使用了很多的编程技巧,但是引入Segment的设计还是有很大的改进空间的,Java 7中ConcurrrentHashMap的设计有下面这几个可以改进的点:
1. Segment在扩容的时候非扩容线程对本Segment的写操作时都要挂起等待的
2. 对ConcurrentHashMap的读操作需要做两次哈希寻址,在读多写少的情况下其实是有额外的性能损失的
3. 尽管size()方法的实现中先尝试无锁读,但是如果在这个过程中有别的线程做写入操作,那调用size()的这个线程就会给整个ConcurrentHashMap加锁,这是整个ConcurrrentHashMap唯一一个全局锁,这点对底层的组件来说还是有性能隐患的
4. 极端情况下(比如客户端实现了一个性能很差的哈希函数)get()方法的复杂度会退化到O(n)。
针对1和2,在Java 8的设计是废弃了Segment的使用,将悲观锁的粒度降低至桶维度,因此调用get的时候也不需要再做两次哈希了。size()的设计是Java 8版本中最大的亮点,我们在后面的文章中会详细说明。至于红黑树,这篇文章仍然不做过多阐述。接下来的篇幅会深挖细节,把书读厚,涉及到的模块有:初始化,put方法, 扩容方法transfer以及size()方法,而其他模块,比如hash函数等改变较小,故不再深究。
ForwardingNode
- static final class ForwardingNode
extends Node { - final Node
[] nextTable; - ForwardingNode(Node
[] tab) { - // MOVED = -1,ForwardingNode的哈希值为-1
- super(MOVED, null, null, null);
- this.nextTable = tab;
- }
- }
除了普通的Node和TreeNode之外,ConcurrentHashMap还引入了一个新的数据类型ForwardingNode,我们这里只展示他的构造方法,ForwardingNode的作用有两个:
这是在Java 8版本的ConcurrentHashMap实现CAS的工具,以int类型为例其方法定义如下:
- /**
- * Atomically update Java variable to x if it is currently
- * holding expected.
- * @return true if successful
- */
- public final native boolean compareAndSwapInt(Object o, long offset,
- int expected,
- int x);
相应的语义为:
如果对象o起始地址偏移量为offset的值等于expected,则将该值设为x,并返回true表明更新成功,否则返回false,表明CAS失败
- public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
- if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 检查参数
- throw new IllegalArgumentException();
- if (initialCapacity < concurrencyLevel)
- initialCapacity = concurrencyLevel;
- long size = (long)(1.0 + (long)initialCapacity / loadFactor);
- int cap = (size >= (long)MAXIMUM_CAPACITY) ?
- MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中说过
- this.sizeCtl = cap;
- }
即使是最复杂的一个初始化方法代码也是比较简单的,这里我们只需要注意两个点:
- public V put(K key, V value) {
- return putVal(key, value, false);
- }
put方法将调用转发到putVal方法:
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node
[] tab = table;;) { - Node
f; int n, i, fh; - // 【A】延迟初始化
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- // 【B】当前桶是空的,直接更新
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- if (casTabAt(tab, i, null,
- new Node
(hash, key, value, null))) - break; // no lock when adding to empty bin
- }
- // 【C】如果当前的桶的第一个元素是一个ForwardingNode节点,则该线程尝试加入扩容
- else if ((ffh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- // 【D】否则遍历桶内的链表或树,并插入
- else {
- // 暂时折叠起来,后面详细看
- }
- }
- // 【F】流程走到此处,说明已经put成功,map的记录总数加一
- addCount(1L, binCount);
- return null;
- }
从整个代码结构上来看流程还是比较清楚的,我用括号加字母的方式标注了几个非常重要的步骤,put方法依然牵扯出很多的知识点
- private final Node
[] initTable() { - Node
[] tab; int sc; - while ((tab = table) == null || tab.length == 0) {
- if ((sc = sizeCtl) < 0)
- // 说明已经有线程在初始化了,本线程开始自旋
- Thread.yield(); // lost initialization race; just spin
- else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
- // CAS保证只有一个线程能走到这个分支
- try {
- if ((tab = table) == null || tab.length == 0) {
- int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
- @SuppressWarnings("unchecked")
- Node
[] nt = (Node [])new Node,?>[n]; - tabtable = tab = nt;
- // sc = n - n/4 = 0.75n
- sc = n - (n >>> 2);
- }
- } finally {
- // 恢复sizeCtl > 0相当于释放锁
- sizeCtl = sc;
- }
- break;
- }
- }
- return tab;
- }
在初始化桶数组的过程中,系统如何保证不会出现并发问题呢,关键点在于自旋锁的使用,当有多个线程都执行initTable方法的时候,CAS可以保证只有一个线程能够进入到真正的初始化分支,其他线程都是自旋等待。这段代码中我们关注三点即可:
- static final
Node tabAt(Node [] tab, int i) { - return (Node
)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); - }
- static final
boolean casTabAt(Node [] tab, int i, - Node
c, Node v) { - return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
- }
put方法的第二个分支会用tabAt判断当前桶是否是空的,如果是则会通过CAS写入,tabAt通过UNSAFE接口会拿到桶中的最新元素,casTabAt通过CAS保证不会有并发问题,如果CAS失败,则通过循环再进入其他分支
- final Node
[] helpTransfer(Node [] tab, Node f) { - Node
[] nextTab; int sc; - if (tab != null && (f instanceof ForwardingNode) &&
- (nextTab = ((ForwardingNode
)f).nextTable) != null) { - int rs = resizeStamp(tab.length);
- while (nextTab == nextTable && table == tab &&
- (sc = sizeCtl) < 0) {
- // RESIZE_STAMP_SHIFT = 16
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
- sc == rs + MAX_RESIZERS || transferIndex <= 0)
- break;
- // 这里将sizeCtl的值自增1,表明参与扩容的线程数量+1
- if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
- transfer(tab, nextTab);
- break;
- }
- }
- return nextTab;
- }
- return table;
- }
在这个地方我们就要详细说下sizeCtl这个标志位了,临时变量rs由resizeStamp这个方法返回
- static final int resizeStamp(int n) {
- // RESIZE_STAMP_BITS = 16
- return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
- }
因为入参n是一个int类型的值,所有Integer.numberOfLeadingZeros(n)的返回值介于0到32之间,如果转换成二进制
因此resizeStampd的返回值也就介于00000000 00000000 10000000 00000000到00000000 00000000 10000000 00100000之间,从这个返回值的范围可以看出来resizeStamp的返回值高16位全都是0,是不包含任何信息的。因此在ConcurrrentHashMap中,会把resizeStamp的返回值左移16位拼到sizeCtl中,这就是为什么sizeCtl的高16位包含整个Map大小的原理。有了这个分析,这段代码中比较长的if判断也就能看懂了
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
- sc == rs + MAX_RESIZERS || transferIndex <= 0)
- break;
- (sc >>> RESIZE_STAMP_SHIFT) != rs保证所有线程要基于同一个旧的桶数组扩容
- transferIndex <= 0已经有线程完成扩容任务了
至于sc == rs + 1 || sc == rs + MAX_RESIZERS这两个判断条件如果是细心的同学一定会觉得难以理解,这个地方确实是JDK的一个BUG,这个BUG已经在JDK 12中修复,详细情况可以参考一下Oracle的官网:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427,这两个判断条件应该写成这样:sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,因为直接比较rs和sc是没有意义的,必须要有移位操作。它表达的含义是
真正扩容的逻辑在transfer方法中,我们后面会详细看,不过有个小细节可以提前注意,如果nextTable已经初始化了,transfer会返回nextTable的的引用,后续可以直接操作新的桶数组。
如果桶数组已经初始化好了,该扩容的也扩容了,并且根据哈希定位到的桶中已经有元素了,那流程就跟普通的HashMap一样了,唯一一点不同的就是,这时候要给当前的桶加锁,且看代码:
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node
[] tab = table;;) { - Node
f; int n, i, fh; - if (tab == null || (n = tab.length) == 0)// 折叠
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 折叠}
- else if ((ffh = f.hash) == MOVED)// 折叠
- else {
- V oldVal = null;
- synchronized (f) {
- // 要注意这里这个不起眼的判断条件
- if (tabAt(tab, i) == f) {
- if (fh >= 0) { // fh>=0的节点是链表,否则是树节点或者ForwardingNode
- binCount = 1;
- for (Node
e = f;; ++binCount) { - K ek;
- if (e.hash == hash &&
- ((eek = e.key) == key ||
- (ek != null && key.equals(ek)))) {
- oldVal = e.val; // 如果链表中有值了,直接更新
- if (!onlyIfAbsent)
- e.val = value;
- break;
- }
- Node
pred = e; - if ((ee = e.next) == null) {
- // 如果流程走到这里,则说明链表中还没值,直接连接到链表尾部
- pred.next = new Node
(hash, key, value, null); - break;
- }
- }
- }
- // 红黑树的操作先略过
- }
- }
- }
- }
- // put成功,map的元素个数+1
- addCount(1L, binCount);
- return null;
- }
这段代码中要特备注意一个不起眼的判断条件(上下文在源码上已经标注出来了):tabAt(tab, i) == f,这个判断的目的是为了处理调用put方法的线程和扩容线程的竞争。因为synchronized是阻塞锁,如果调用put方法的线程恰好和扩容线程同时操作同一个桶,且调用put方法的线程竞争锁失败,等到该线程重新获取到锁的时候,当前桶中的元素就会变成一个ForwardingNode,那就会出现tabAt(tab, i) != f的情况。
- private final void transfer(Node
[] tab, Node [] nextTab) { - int n = tab.length, stride;
- if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
- stride = MIN_TRANSFER_STRIDE; // subdivide range
- if (nextTab == null) { // 初始化新的桶数组
- try {
- @SuppressWarnings("unchecked")
- Node
[] nt = (Node [])new Node,?>[n << 1]; - nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTabnextTable = nextTab;
- transferIndex = n;
- }
- int nextn = nextTab.length;
- ForwardingNode
fwd = new ForwardingNode (nextTab); - boolean advance = true;
- boolean finishing = false; // to ensure sweep before committing nextTab
- for (int i = 0, bound = 0;;) {
- Node
f; int fh; - while (advance) {
- int nextIndex, nextBound;
- if (--i >= bound || finishing)
- advance = false;
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex > stride ?
- nextIndex - stride : 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- if (finishing) {
- nextTable = null;
- table = nextTab;
- sizeCtl = (n << 1) - (n >>> 1);
- return;
- }
- if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
- // 判断是会否是最后一个扩容线程
- if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
- return;
- finishing = advance = true;
- i = n; // recheck before commit
- }
- }
- else if ((f = tabAt(tab, i)) == null)
- advance = casTabAt(tab, i, null, fwd);
- else if ((ffh = f.hash) == MOVED) // 只有最后一个扩容线程才有机会执行这个分支
- advance = true; // already processed
- else { // 复制过程与HashMap类似,这里不再赘述
- synchronized (f) {
- // 折叠
- }
- }
- }
- }
在深入到源码细节之前我们先根据下图看一下在Java 8中ConcurrentHashMap扩容的几个特点:
image-20210424202636495
先看一个关键的变量transferIndex,这是一个被volatile修饰的变量,这一点可以保证所有线程读到的一定是最新的值。
- private transient volatile int transferIndex;
这个值会被第一个参与扩容的线程初始化,因为只有第一个参与扩容的线程才满足条件nextTab == null
- if (nextTab == null) { // initiating
- try {
- @SuppressWarnings("unchecked")
- Node
[] nt = (Node [])new Node,?>[n << 1]; - nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTabnextTable = nextTab;
- transferIndex = n;
- }
在了解了transferIndex属性的基础上,上面的这个循环就好理解了
- while (advance) {
- int nextIndex, nextBound;
- // 当bound <= i <= transferIndex的时候i自减跳出这个循环继续干活
- if (--i >= bound || finishing)
- advance = false;
- // 扩容的所有任务已经被认领完毕,本线程结束干活
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- // 否则认领新的一段复制任务,并通过`CAS`更新transferIndex的值
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex > stride ?
- nextIndex - stride : 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
transferIndex就像是一个游标,每个线程认领一段复制任务的时候都会通过CAS将其更新为transferIndex - stride, CAS可以保证transferIndex可以按照stride这个步长降到0。
对于每一个扩容线程,for循环的变量i代表要复制的桶的在桶数组中的下标,这个值的上限和下限通过游标transferIndex和步长stride计算得来,当i减小为负数,则说明当前扩容线程完成了扩容任务,这时候流程会走到这个分支:
- // i >= n || i + n >= nextn现在看来取不到
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- if (finishing) { // 【A】完成整个扩容过程
- nextTable = null;
- table = nextTab;
当前名称:Java 8 ConcurrentHashMap源码中竟然隐藏着两个Bug
文章路径:http://www.csdahua.cn/qtweb/news18/153068.html网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网