美文网首页
靠别人不如靠自己(3)——ConcurrentHashMap源码

靠别人不如靠自己(3)——ConcurrentHashMap源码

作者: 墨_0b54 | 来源:发表于2021-10-18 21:49 被阅读0次

1. ConcurrentHashMap

1.1. 概述

在ConcurrentHashMap的源码注释中,说明了它是HashMap的并发版本实现,所以同样的方法两者实现意义是一样的,ConcurrentHashMap只是解决了多线程访问的并发问题,并尽量提升多线程访问时的性能。
首先看ConcurrentHashMap的类定义:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
  • ConcurrentHashMap实现了ConcurrentMap接口,表明这是一个支持并发访问的map,同样实现这个接口的还有ConcurrentSkipListMap,是一个用跳跃表实现的并发map。
  • HashTable是另一个支持并发的map,但是HashTable使用synchronized控制对整个表的访问,每次只允许一个线程访问,对并发的支持远不如ConcurrentHashMap。
  • ConcurrentHashMap使用锁的粒度更细,在进行插入节点、删除节点、更新值时,仅对需要操作的桶(表的每个位置都是一个桶)加synchronized锁,阻断其他线程的修改操作。
  • 对于读操作,仅仅是对桶内保存的是树的情况使用乐观锁,这是因为红黑树会调整平衡,因为不会很频繁,所以乐观锁就足够了
  • 因此在ConcurrentHashMap的每个桶在同一时刻可以支持单线程的修改操作、多线程读。
  • ConcurrentHashMap没有快速失败机制

1.2. 常量

java8后没有使用的常量和与HashMap相同的常量就不提了

private static final int MIN_TRANSFER_STRIDE = 16; // 每个cpu核心最少处理的数组步长
private static int RESIZE_STAMP_BITS = 16; //sizeCtl 中用于生成扩容标记的位数,sizeCtl 的高16位
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 同时参与调整大小的最大线程数2^16-1
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //sizeCtl>>>RESIZE_STAMP_SHIFT就是本次扩容的标记

static final int MOVED     = -1; // 已经移动(扩容)的节点hash值
static final int TREEBIN   = -2; // 树容器的hash值
static final int RESERVED  = -3; // 临时节点的hash值
static final int HASH_BITS = 0x7fffffff; // 保证正常节点hash值>=0
static final int NCPU = Runtime.getRuntime().availableProcessors(); //CPU核心数量

1.3. 属性 & 构造方法

1.3.1. 属性

transient volatile Node<K,V>[] table; 
private transient volatile Node<K,V>[] nextTable; //resize时的下一个表,只有resize时非空
private transient volatile long baseCount; //基本计数器值,主要在没有线程竞争时使用,通过 CAS 更新

//当 table 为 null 时,保存创建时使用的初始表大小 ,或默认为 0
//大于0与HashMap的threshold类似,保存下一次要调整表的大小
//小于0表示正在初始化或调整大小。-1 表示初始化,或者 -(1 + 活动调整大小线程的数量)
//扩容时sizeCtl 高16(RESIZE_STAMP_BITS)位是扩容标记;低16位是并行线程数
private transient volatile int sizeCtl;
private transient volatile int transferIndex; // 在调整大小时要拆分的下一个桶索引。非负
private transient volatile int cellsBusy;//调整CounterCell大小或初始化 CounterCell 时使用的自旋锁
private transient volatile CounterCell[] counterCells;//计数器单元格表。当非空时,大小是 2 的幂。当线程竞争激烈时,线程把值加在CounterCell中

//遍历视图
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;


//最下面这部分静态代码块是为了初始化属性变量在内存中的相对位置,因为CAS直接操作内存
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE; // 数组起始位置
private static final int ASHIFT; // 数组元素字节长度的位数,如int为1 << 2字节,ASHIFT就是2
static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentHashMap.class;
        SIZECTL = U.objectFieldOffset
            (k.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (k.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (k.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (k.getDeclaredField("cellsBusy"));
        Class<?> ck = CounterCell.class;
        CELLVALUE = U.objectFieldOffset
            (ck.getDeclaredField("value"));
        Class<?> ak = Node[].class;
        ABASE = U.arrayBaseOffset(ak); //获取数组起始位置
        int scale = U.arrayIndexScale(ak); //获取数组元素的字节长度
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }
}

注意看上面加了volatile的属性,更新都使用CAS直接操作内存,这样做的好处就是既保证了同步,又减少线程切换

volatile 可以保证内存的可见性,但无法保证原子性,一致性

1.4. 讲核心方法之前

1.4.1. 几个常见方法

//这是ConcurrentHashMap计算hash的方法,使hashcode高位向低位传播,且保证结果不是负数
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS; // 多出的HASH_BITS保证hash码>0
}
//CAS根据偏移量在数组中找到对应桶
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//CAS根据偏移量找到桶并修改,修改成功返回true
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) { //CAS操作赋值
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//CAS根据偏移量找到桶并修改
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

1.4.2. 键值对结构

// 正常情况下的键值对节点
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
    public final V setValue(V value) { //可见这里不支持直接set值
        throw new UnsupportedOperationException();
    }
    //对 map.get() 的虚拟化支持; 在子类中被覆盖。
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

Node有四个子类:ForwardingNode、ReservationNode、TreeBin和TreeNode,他们在不同情况下有不同的作用

  • ForwardingNode:表扩容时,原表一个桶迁移到新表后,应该new一个ForwardingNode放在原表桶节点处,这样当其他线程访问时,发现桶节点是ForwardingNode就知道当前表正在扩容,可以去帮助扩容,或者直接通过nextTable指针直接在扩容后的表进行操作,结构如下:
static final class ForwardingNode<K,V> extends Node<K,V> { // 移动状态的Node
    final Node<K,V>[] nextTable; //指向扩容后的表
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);// ForwardingNode的hash是MOVED常量
        this.nextTable = tab;
    }
    Node<K,V> find(int h, Object k) { //覆盖Node的find方法
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }
}
  • ReservationNode:在 computeIfAbsent 和 compute 中使用的占位符节点,结构如下:
static final class ReservationNode<K,V> extends Node<K,V> {
        ReservationNode() {
            super(RESERVED, null, null, null);
        }

        Node<K,V> find(int h, Object k) {
            return null;
        }
    }
  • TreeNode:红黑树节点,主要用于 TreeBin,其他方法不再展示,结构如下:
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
}
  • TreeBin:树的头节点,或者说是树的容器,通过读写锁进行并发访问控制,结构如下:
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root; //树的头节点
    volatile TreeNode<K,V> first;
    volatile Thread waiter; //正在等待访问的线程
    volatile int lockState; //锁状态值,
    // lockState的值
    static final int WRITER = 1; // 写锁时设置
    static final int WAITER = 2; // 等待写锁时设置
    static final int READER = 4; // 设置读锁时的增量值
    //初始化TreeBin时,转红黑树
    TreeBin(TreeNode<K,V> b) { //参数是链表头
        super(TREEBIN, null, null, null);
        this.first = b;
        ... ...             //将链表转化为红黑树
        this.root = r;
        assert checkInvariants(root); //检查链表,树指针是否正常
    }
    private static final sun.misc.Unsafe U;
    private static final long LOCKSTATE; //同上,内存相对位置
    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = TreeBin.class;
            LOCKSTATE = U.objectFieldOffset
                (k.getDeclaredField("lockState"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

那么TreeBin怎么实现这套读写锁的呢?首先是写锁以及等待写锁:

//设置写锁WRITER,如果失败设置等待锁WAITER,出现锁争用会阻塞当前线程
private final void lockRoot() {
    if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) // CAS尝试对没有任何锁的lockState设置写锁
        contendedLock(); // 设置失败
}
//释放锁
private final void unlockRoot() {
    lockState = 0;
}
//争写锁,可能阻塞等待线程
private final void contendedLock() {
    boolean waiting = false;
    for (int s;;) {//如果前面有等待线程,自旋
        if (((s = lockState) & ~WAITER) == 0) { //没有读写锁,只有等锁时
            if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) { //直接设置写锁
                if (waiting) //waiting为true,将等待线程清空
                    waiter = null;
                return;
            }
        }
        else if ((s & WAITER) == 0) { //有读锁或者写锁,但没有等锁
            if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) { //设置等锁
                waiting = true;
                waiter = Thread.currentThread();//将当前线程设置为等待线程
            }
        }
        else if (waiting) //当前线程是等待者,阻塞当前线程
            LockSupport.park(this); //后面会看到唤醒方法
    }
}

上述加锁逻辑主要在putTreeVal插入方法,removeTreeNode删除方法中使用:

//removeTreeNode大致也是这样的使用逻辑,就不展示了
final TreeNode<K,V> putTreeVal(int h, K k, V v) {
    balabala...//大致与HashMap一样
    lockRoot(); //调整前去取得锁,设置写锁WRITER,如果失败设置等待锁WAITER,出现锁争用会阻塞当前线程
    try {
        root = balanceInsertion(root, x);//调整树平衡
    } finally {
        unlockRoot(); //释放锁,要放在finally中
    }
    balabala...//忽略
}

设置读锁的逻辑在find方法中:

// 如果没有,则返回匹配的节点或 null。 尝试使用树搜索,但在锁不可用时用链表搜索。
final Node<K,V> find(int h, Object k) {
    if (k != null) {
        for (Node<K,V> e = first; e != null; ) {
            int s; K ek;
            if (((s = lockState) & (WAITER|WRITER)) != 0) { //如果当前有写锁或者等锁
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                e = e.next; // 锁不可用时使用线性搜索
            }
            else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                         s + READER)) { // CAS设置读锁,同一时刻,每增加一个读的线程,大小增加READER
                TreeNode<K,V> r, p;
                try {
                    p = ((r = root) == null ? null :
                         r.findTreeNode(h, k, null)); //树搜索
                } finally {
                    Thread w;
                    if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                        (READER|WAITER) && (w = waiter) != null) //一个读线程退出,释放读锁,大小减去READER
                        LockSupport.unpark(w); 如果有等待锁,唤醒等待的线程
                }
                return p;
            }
        }
    }
    return null;
}

1.5. 核心方法的实现

与HashMap一样,我们主要说一说put、get、remove以及遍历的实现,总结核心知识

1.5.1. 一切都要从put方法说起

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException(); // key和value都不允许为null
    int hash = spread(key.hashCode()); //计算hash
    int binCount = 0; //搜索计数
    for (Node<K,V>[] tab = table;;) {//自旋
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); //初始化表
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null))) //上文有解释,CAS无锁为table指定索引位设置值
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED) // 当前节点已经被移动到新表
            tab = helpTransfer(tab, f); //帮助移动,下文细说
        else {
            V oldVal = null;
            synchronized (f) { //对当前桶加锁
                // 双重检查,多线程常用手段,参考单例模式的双重检查实现
                if (tabAt(tab, i) == f) { //检查当前桶节点是否在加锁时改变了
                    if (fh >= 0) { //hash大于0是普通节点,链表线性
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) { //直接加在链表尾部
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { //属于TreeBin
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) { //树的put方法,会加锁
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { // 代表链表深度
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); //增加计数
    return null;
}

看一下put的完整流程:

  • 如果是初始化table,需要调用initTable方法,CAS将sizeCtl设置为-1,保证只由一个线程进行初始化
//使用 sizeCtl 中记录的大小初始化表。
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) { //自旋条件
        if ((sc = sizeCtl) < 0) //sizeCtl小于0代表其他线程正在初始化,让出cpu
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 确保只有一个线程进行初始化
            try {
                if ((tab = table) == null || tab.length == 0) { //双重检查,防止重复初始化
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2); // 相当于hashmap的threshold,计算下次扩容元素数量
                }
            } finally {
                sizeCtl = sc; //设置下次扩容的容量,释放锁
            }
            break;
        }
    }
    return tab;
}
  • 表初始化后,如果key所处的桶是空的,无锁新增一个Node到桶
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
                 new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}
  • 如果key所处的桶是MOVED状态,说明正在扩容或者扩容已经结束了,桶已经被移到扩容后的表了,那么当前线程要去尝试帮助扩容,即helpTransfer方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // table的Node已经被移到下一个table
        int rs = resizeStamp(tab.length); //获得本次扩容标记,上文说过,每次扩容产生的扩容标记都是不一样的
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) { // 判断本次扩容是否结束
            // 上文有提到,扩容时sizeCtl的高16位保存的是扩容标记,(sc >>> RESIZE_STAMP_SHIFT) != rs代表扩容标记不同
            // 当最后一个线程结束扩容了,就会将 sc 减1。此时sc - 1 == rs代表扩容结束
            // sc == 标识符 + 65535(帮助线程数已经达到最大)
            // 在transfer函数中可以看到,每个线程负责移动一个索引范围的桶,这样主要是为了减少线程之间的竞争
            // 当transferIndex到0时,代表数组被其他线程分完了
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break; // 扩容已经结束,退出自旋
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //帮助扩容,扩容线程数加1
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

那么线程之间是怎么协调的将每个 bin 中的节点移动或复制到新表呢?
线程会根据cpu数量计算出一个合适的步长,按照原表索引,从高到底每次分配该步长的范围,然后当前线程就负责移动这个范围,直到整个范围都被分配完。
看transfer方法:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;//stride代表步长,也代表一个线程处理的数组长度,下面代码有解释,主要作用是避免多线程争用内存
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)//计算stride
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // 初始化nextTable
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //初始化nextTable长度是table的两倍
            nextTab = nt;
        } catch (Throwable ex) {      // 尝试应对OOM异常
            sizeCtl = Integer.MAX_VALUE; //不扩容了
            return;
        }
        nextTable = nextTab;
        transferIndex = n; // 这里可以看到,transferIndex初始化为原表的容量
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点
    boolean advance = true; //是否继续向前查找的标志位
    boolean finishing = false; //在完成之前重新在扫描一遍数组,看看有没完成的没
    for (int i = 0, bound = 0;;) { //自旋
        Node<K,V> f; int fh;
        while (advance) { //线程扫描数组,寻找自己可以处理的范围的
            int nextIndex, nextBound;
            //这里的--i可以看到是从后往前扫描
            if (--i >= bound || finishing) // 将当前索引i向前移1位,如果当前索引还在[bound,nextIndex)范围里或已完成
                advance = false; //不再向前扫描
            else if ((nextIndex = transferIndex) <= 0) { // transferIndex<=0代表没有可以分配的范围了
                i = -1; //使下面可以进入退出自旋的代码块
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) { //根据步长,跳到下一个索引位置,更新transferIndex
                bound = nextBound; // 等同于bound = nextIndex(transferIndex) - stride,得到一个i的范围[bound,nextIndex),即当前线程分到的处理范围
                i = nextIndex - 1; // --i还有这里代表是反向查找
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) { //i<0代表任务被分完了,i>=原容量,i >= next容量 - 原容量,判断是否完成转移
            int sc;
            if (finishing) { //完成转移
                nextTable = null;
                table = nextTab; // 使table指向下一个table
                sizeCtl = (n << 1) - (n >>> 1); //更新sizeCtl为下一次扩容的容量
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 如果任务还没有全部完成,当前线程退出
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 如果当前的扩容的标记与sc不相等,退出
                    return;
                finishing = advance = true;
                i = n; // 重新检查一遍还有没有任务
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED) //已经被处理了
            advance = true; // 已经被处理了,去找下一个范围
        else { //没有被处理
            synchronized (f) {//锁定当前节点
                if (tabAt(tab, i) == f) { //双重检查
                    Node<K,V> ln, hn;//ln维持原索引
                    if (fh >= 0) { //普通节点,复制到nextTable
                        balaba...//复制到新表,与HashMap一样
                        advance = true;
                    }
                    else if (f instanceof TreeBin) { //复制树到新位置
                       balaba...//复制树到新表,与HashMap一样
                        //判断是否需要退化树,与HashMap一样
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t; 
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        advance = true;
                    }
                }
            }
        }
    }
}
  • 再说回put方法,当前桶没有处于移动状态,加synchronized锁然后增加节点
  • 然后判断是否需要将链表转化为红黑树
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1); //与hashMap一样,类似于resize
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //转化为树
            synchronized (b) { // 同时只有一个线程进行转化
                if (tabAt(tab, index) == b) { // 双重检查
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null); //转化为树节点,但是依旧是一个链表
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd)); // 将树置入TreeBin容器中,将TreeBin容器放在当前表节点
                }
            }
        }
    }
}
//putAll方法也会使用这个方法
private final void tryPresize(int size) { 
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) { // sizeCtl < 0表示正在初始化或者扩容,自旋
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) { //初始化
            n = (sc > c) ? sc : c; // n是扩容容量
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //将sizeCtl改为-1,表示正在初始化(无锁同步)
                try {
                    if (table == tab) { //双重检查,防止重新初始化。
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2); // 0.75*n
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY) //一直到最大容量或者sc>=扩容容量, 所以在一次扩容之后,不是原来长度的两倍,而是2的n次方倍
            break;
        else if (tab == table) {
            int rs = resizeStamp(n); //扩容标记
            if (sc < 0) { //如果初始化,或者扩容,帮助移动
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0) // 判断扩容是否结束,同上
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) //线程数加1
                    transfer(tab, nt); //帮助扩容
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) 
                transfer(tab, null);//这里代表是第一个进行扩容的线程
        }
    }
  • 最后一步是并发的count+1,同时需要判断是否需要进行扩容
// 如果check <0,不检查调整大小;如果check <= 1 只检查是否无竞争
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //如果counterCells是null,试图直接给basecount加x;失败还是要加到counterCells
        CounterCell a; long v; int m;
        boolean uncontended = true; //没有线程争用
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null || //计算当前线程的probe值,每个线程都有一个probe随机值,这个值是可以改变的,也是为了避免线程争用
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //试图给CounterCell加x,失败代表有线程争用
            fullAddCount(x, uncontended); //把x加到baseCount或者counterCells中
            return;
        }
        if (check <= 1)
            return;
        s = sumCount(); //初始化s,当前的元素计数
    }
    if (check >= 0) { //检查是否需要扩容
        Node<K,V>[] tab, nt; int n, sc;
        // 注意看s >= (long)(sc = sizeCtl),这里判断了是否需要进行扩容,只要s>sizeCtl就会一直扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) { //自旋
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 
                    transferIndex <= 0) //同上
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2)) //第一个进行扩容的线程,2是因为-1代表初始化
                transfer(tab, null); //第一进行扩容的线程传null
            s = sumCount(); //刷新一次元素计数
        }
    }
}

再说一下fullAddCount方法,这个方法就是LongAddr的实现:

private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {//等于0,线程本地属性字段初始化
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;//等于0,没有争用
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) { //用cellsBusy乐观锁创建cell,初始化cell
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) { //双重检查
                                rs[j] = r; //设置计数
                                created = true; //创建成功
                            }
                        } finally {
                            cellsBusy = 0; //释放锁
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // cell已存在且存在线程争用
                wasUncontended = true;      // Continue after rehash。刷新并重试
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) //尝试给cell的数加x
                break;
            else if (counterCells != as || n >= NCPU) //其他线程创建了新的counterCells,或当前counterCells长度大于等于cpu数
                collide = false;            // At max size or stale
            else if (!collide) // 到这里设置为true,下一次进入扩容
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //尝试加锁,进行扩容
                try {
                    if (counterCells == as) {// 双重检查
                        CounterCell[] rs = new CounterCell[n << 1]; //扩容
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h); //设置probe
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // 如果counterCells为空,CAS尝试初始化counterCells
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) { // 初始化counterCells
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally { //释放锁
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) //竞争很激烈,就索性将计数加到baseCount上
            break;                          // Fall back on using base
    }
}

到这里整个put方法就结束了,基本上ConcurrentHashMap的核心点都在这里了

1.5.2. size方法的实现

源注释说明这里的size类似LongAddr的实现

public int size() {
    long n = sumCount(); //在一瞬间,可能小于0 ,可能大于int最大值
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
final long sumCount() { // baseCount + CounterCell中的值;
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount; //没有线程争用时,可以作为最终最终size
    if (as != null) { //有线程争用时,counterCells不是null
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

1.5.3. remove方法的实现

remove方法的实现在replaceNode方法,replaceNode方法是四个公共删除/替换方法的实现

// 用 value 替换节点值,条件是 cv 匹配(cv非空的情况下)
final V replaceNode(Object key, V value, Object cv) { //value为null代表删除,k、v都不能为null
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) { //同一时刻一个桶只能一个线程删除,与put一样
                if (tabAt(tab, i) == f) { //双重检查
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next); //设置头节点
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p)) //remove方法内部判断树是否需要退化,如果需要会返回true
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null) //value 为null代表删除
                        addCount(-1L, -1); //size减一,并且不需要检查扩容
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

1.5.4. get方法的实现

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) { //找到元素所在的桶
        if ((eh = e.hash) == h) { //判断hash是否相等,防止此时进入其他状态了,比如扩容、树化
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0) //小于0,扩容了或者树化了
            return (p = e.find(h, key)) != null ? p.val : null; //这里的find方法,在Node的子类中都有各自的实现
        while ((e = e.next) != null) { //否则直接向后查找
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

1.5.5. Map中的视图

开头的属性里有说明三个视图:KeySetView、ValuesView、EntrySetView,三者的实现基本相同,那么只挑entrySet来讲吧。

  • 从entrySet方法进入,该方法实际上返回了一个视图对象EntrySetView,这里体现了mvc设计思想,对视图的操作会映射到实际数据上,java源码很多地方都采用了这样的设计,例如List的subList。
public Set<Map.Entry<K,V>> entrySet() {
    EntrySetView<K,V> es;
    return (es = entrySet) != null ? es : (entrySet = new EntrySetView<K,V>(this));
}
  • EntrySetView实现了CollectionView类,而CollectionView实现了Collection。
static final class EntrySetView<K,V> extends CollectionView<K,V,Map.Entry<K,V>>
    implements Set<Map.Entry<K,V>>, java.io.Serializable {
    private static final long serialVersionUID = 2249069246763182397L;
    EntrySetView(ConcurrentHashMap<K,V> map) { super(map); }
    public boolean contains(Object o) {
        Object k, v, r; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (r = map.get(k)) != null &&
                (v = e.getValue()) != null &&
                (v == r || v.equals(r)));
    }
    public boolean remove(Object o) {
        Object k, v; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                map.remove(k, v));
    }
    /**
     * @return an iterator over the entries of the backing map
     */
    public Iterator<Map.Entry<K,V>> iterator() { //迭代器的实现在这里
        ConcurrentHashMap<K,V> m = map;
        Node<K,V>[] t;
        int f = (t = m.table) == null ? 0 : t.length;
        return new EntryIterator<K,V>(t, f, 0, f, m);
    }
    public boolean add(Entry<K,V> e) {
        return map.putVal(e.getKey(), e.getValue(), false) == null;
    }
    public boolean addAll(Collection<? extends Entry<K,V>> c) {
        boolean added = false;
        for (Entry<K,V> e : c) {
            if (add(e))
                added = true;
        }
        return added;
    }
}

//CollectionView其他的函数就不放出来了,可以看到所有的实现最终都是依赖于ConcurrentHashMap
abstract static class CollectionView<K,V,E>
    implements Collection<E>, java.io.Serializable {
    private static final long serialVersionUID = 7249069246763182397L;
    final ConcurrentHashMap<K,V> map;
    CollectionView(ConcurrentHashMap<K,V> map)  { this.map = map; }
}
  • 同样的,迭代器也有三个KeyIterator、ValueIterator、EntryIterator,三者有一个共同的父类:BaseIterator
static final class EntryIterator<K,V> extends BaseIterator<K,V>
    implements Iterator<Map.Entry<K,V>> {
    EntryIterator(Node<K,V>[] tab, int index, int size, int limit,
                  ConcurrentHashMap<K,V> map) { //这里参数发现一个很有趣的事情,名字好像跟父类搞反了
        super(tab, index, size, limit, map);
    }
    public final Map.Entry<K,V> next() {
        Node<K,V> p;
        if ((p = next) == null)
            throw new NoSuchElementException();
        K k = p.key;
        V v = p.val;
        lastReturned = p;
        advance(); //迭代器的核心实现,扫描下一个迭代节点,由父类Traverser实现
        return new MapEntry<K,V>(k, v, map);
    }
}
  • BaseIterator又继承了 Traverser ,并添加了lastReturned字段以支持remove,lastReturned在子类中赋值
static class BaseIterator<K,V> extends Traverser<K,V> {
    final ConcurrentHashMap<K,V> map;
    Node<K,V> lastReturned; //next
    BaseIterator(Node<K,V>[] tab, int size, int index, int limit,
                ConcurrentHashMap<K,V> map) {
        super(tab, size, index, limit);
        this.map = map;
        advance(); //第一次调用,初始化迭代器
    }
    public final boolean hasNext() { return next != null; }
    public final boolean hasMoreElements() { return next != null; }
    public final void remove() {
        Node<K,V> p;
        if ((p = lastReturned) == null) //一次迭代不能调用两次
            throw new IllegalStateException();
        lastReturned = null;
        map.replaceNode(p.key, null, null);
    }
}
  • Traverser保存了迭代过程的上下文数据,
static class Traverser<K,V> {
    Node<K,V>[] tab;        // 当前表,扩容后会更新
    Node<K,V> next;         // 扩容时的下一个表
    TableStack<K,V> stack, stack; // 保存或者恢复ForwardingNodes的栈,stack是stack出栈的table
    int index;              // 下一个节点的索引
    int baseIndex;          // 初始表当前的索引
    int baseLimit;          // 初始表的最大索引
    final int baseSize;     // 初始表大小
    Traverser(Node<K,V>[] tab, int size, int index, int limit) {
        this.tab = tab;
        this.baseSize = size;
        this.baseIndex = this.index = index;
        this.baseLimit = limit;
        this.next = null;
    }
    /**
     * Advances if possible, returning next valid node, or null if none.
     */
    final Node<K,V> advance() {//如果可能,则前进,返回下一个有效节点,如果没有,则返回 null
        Node<K,V> e;
        if ((e = next) != null) //与HashMap一样,直接用线性遍历
            e = e.next;
        for (;;) {
            Node<K,V>[] t; int i, n;  // must use locals in checks
            if (e != null)
                return next = e;
            if (baseIndex >= baseLimit || (t = tab) == null ||
                (n = t.length) <= (i = index) || i < 0)//到了表的尽头
                return next = null;
            if ((e = tabAt(t, i)) != null && e.hash < 0) {
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable; //更新当前table
                    e = null;
                    pushState(t, i, n); //保存上一个table的状态到栈
                    continue; //这里不会进入下方的出栈
                }
                else if (e instanceof TreeBin)
                    e = ((TreeBin<K,V>)e).first;
                else
                    e = null;
            }
            if (stack != null) //栈不为空,从栈恢复上一个table的状态
                recoverState(n);
            else if ((index = i + baseSize) >= n) //stack 为空,将初始表桶索引+1,进入下一个桶
                index = ++baseIndex; // visit upper slots if present
        }
    }
    /**
     * Saves traversal state upon encountering a forwarding node.
     */
    private void pushState(Node<K,V>[] t, int i, int n) { //遇到扩容时,将上一个表压入栈
        TableStack<K,V> s = spare;  // reuse if possible
        if (s != null) //入如果spare有table,出栈放入stack 
            spare = s.next;
        else
            s = new TableStack<K,V>();
        s.tab = t;
        s.length = n;
        s.index = i;
        s.next = stack;
        stack = s;
    }
    /**
     * Possibly pops traversal state.
     *
     * @param n length of current table
     */
    private void recoverState(int n) {
        TableStack<K,V> s; int len;
        while ((s = stack) != null && (index += (len = s.length)) >= n) {
            n = len;
            index = s.index;
            tab = s.tab;
            s.tab = null; //table是null,应该是为了重用栈对象
            TableStack<K,V> next = s.next; //stack 出栈的table入栈到spare
            s.next = spare; // save for reuse
            stack = next;
            spare = s;
        }
        if (s == null && (index += baseSize) >= n) //栈空说明回到了初始表,当前索引要回到初始表的索引
            index = ++baseIndex;
    }
}

上面的实现说明了一个问题,这个视图是不支持并发迭代的,并且没有快速失败机制!并发访问可能会让Traverser的数据异常。
然后我去看了其他的公共方法,找到了安全迭代的方法,每次调用都返回一个新的迭代器对象,如下:

public Enumeration<K> keys() {
    Node<K,V>[] t;
    int f = (t = table) == null ? 0 : t.length;
    return new KeyIterator<K,V>(t, f, 0, f, this);
}
public Enumeration<V> elements() {
    Node<K,V>[] t;
    int f = (t = table) == null ? 0 : t.length;
    return new ValueIterator<K,V>(t, f, 0, f, this);
}

相关文章

网友评论

      本文标题:靠别人不如靠自己(3)——ConcurrentHashMap源码

      本文链接:https://www.haomeiwen.com/subject/wxmnoltx.html