美文网首页
ConcurrentSkipListMap源码解析

ConcurrentSkipListMap源码解析

作者: hubo_li | 来源:发表于2020-10-02 22:51 被阅读0次

跳表的基本介绍:

ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景。
ConcurrentSkipListMap和TreeMap,它们虽然都是有序的哈希表。但是,第一,它们的线程安全机制不同,TreeMap是非线程安全的,而ConcurrentSkipListMap是线程安全的。第二,ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的。

在4线程1.6万数据的条件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。
但ConcurrentSkipListMap有几个ConcurrentHashMap 不能比拟的优点:
1、ConcurrentSkipListMap 的key是有序的。
2、ConcurrentSkipListMap 支持更高的并发。ConcurrentSkipListMap 的存取时间是log(N),和线程数几乎无关。也就是说在数据量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现出他的优势。
在非多线程的情况下,应当尽量使用TreeMap。此外对于并发性相对较低的并行程序可以使用Collections.synchronizedSortedMap将TreeMap进行包装,也可以提供较好的效率。对于高并发程序,应当使用ConcurrentSkipListMap,能够提供更高的并发度。
所以在多线程程序中,如果需要对Map的键值进行排序时,请尽量使用ConcurrentSkipListMap,可能得到更好的并发度。
注意,调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。

    public V get(Object key) {
        return doGet(key);
    }

  // 实际上
    private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }    private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            for (Index<K,V> q = head, r = q.right, d;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }

接下来我们看一下findPredecessor的源码

findPredecessor(Object key, Comparator cmp) 方法,它的作用是 找到合适与key的前一个位置。

private Node findPredecessor(Object key, Comparator cmp) {

if (key == null) //判断为null

throw new NullPointerException();

for (;;) {

for (Index q = head, r = q.right, d;;) { //head为最顶上一层的头节点。

if (r != null) { //r为head下一个节点。

Node n = r.node; //获得r的node

K k = n.key; //获得n的key。

if (n.value == null) { //r为无效节点

if (!q.unlink(r))

break; // restart 取消q绑定的r,失败就重新来。

r = q.right; // reread r 成功的话,就重新r,此时r为q.right.right,也就是删除了原来的r。

continue;

}

if (cpr(cmp, key, k) > 0) { //如果key>k,因为skiplist是有顺序的,所以可以这样比较。

q = r; //q为r,找下一个节点。

r = r.right;

continue;

}

}

/**

  • 前面方法有两个用,

  • 1 是找无效节点解除绑定

  • 2 是比较key和k,如果key>k,则直接会跳过下面的往后找,直到key>k

*/

//当前面方法过了后,就看是down是不是null,

if ((d = q.down) == null) //此时q是最底层节点,那么就是这个node。

return q.node; //找到了这个节点,down==null。

q = d; //往下找一层。

r = d.right;

}

}

}

findPredecessor的思路主要是从上层的第一个节点开始找,如果找到大于(或小于),就往下,因为上层节点一定有down指针指向下一层。最终找到的节点只能是最下面那一层,因为由这个返回条件可以知晓:

if ((d = q.down) == null) //此时q是最底层节点,那么就是这个node。

return q.node; //找到了这个节点,down==null。

接下来看doPut方法:


private V doPut(K key, V value, boolean onlyIfAbsent) {

Node z; //待添加的node。

if (key == null) //key不允许为null。

throw new NullPointerException();

Comparator cmp = comparator; //获得比较器。

/**

* 以下为在最底层合适位置插入一个节点key,value。

* 或者替换最底层一个节点。

*/

outer: for (;;) {

for (Node b = findPredecessor(key, cmp), n = b.next;;) {

//找到对应与key的前一个节点。

if (n != null) { //当b.next不为null时候,就是把key插入到n和b之间。

Object v; int c;

Node f = n.next;

if (n != b.next) // 不一致读。

break;

if ((v = n.value) == null) { // n被删除了。

n.helpDelete(b, f); //就是CAS方法删除。

break;

}

if (b.value == null || v == n) // b is deleted 再检测一次是不是被删除了。

break;

if ((c = cpr(cmp, key, n.key)) > 0) {

//如果key>n.key,那么就往下一个找。

//这一步为了再次检测一部,防止并发操作,别的线程先插入,

b = n;

n = f;

continue;

}

if (c == 0) { //相等就替换。

if (onlyIfAbsent || n.casValue(v, value)) {

//此处能够执行的条件为onlyIfAbsent == true,也就是不存在才替换,

//存在直接返回,或者cas成功。

@SuppressWarnings("unchecked") V vv = (V)v;

return vv;

}

break; // restart if lost race to replace value

}

// else c

}

//当,n为null。

//或者前面检测都通过了,

//则直接插入到n后面

z = new Node(key, value, n);

if (!b.casNext(n, z)) //将z插入到b和n之间。

break; // 插入失败重新开始。

break outer;

}

}

/**

* 以下为随机选择一个数,从刚刚插入的z节点,做一条单独的由

* down节点连接而成的链。

*

* 然后,在通过splice循环,把这条链水平方向连起来。从而形成网状结构。

*/

//因为是跳表,所以要往它插入层的下层继续插入。

int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取一个当前线程的随机数。

if ((rnd & 0x80000001) == 0) { // test highest and lowest bits 测试最高和最低位。

int level = 1, max; //最高层和最低层

while (((rnd >>>= 1) & 1) != 0) // // 抛硬币决定层次,

++level;

Index idx = null;

HeadIndex h = head;

if (level

for (int i = 1; I

idx = new Index(z, idx, null); //在每一层都添加。

}

else { //选择的这个level>max,那么就要增加一层。

level = max + 1; // 只增加一层。

@SuppressWarnings("unchecked")Index[] idxs =

(Index[])new Index[level+1];

for (int i = 1; I

idxs[i] = idx = new Index(z, idx, null); //首先从第一层到level层都连接上节点。

for (;;) {

h = head;

int oldLevel = h.level;

if (level

break;

HeadIndex newh = h;

Node oldbase = h.node;

for (int j = oldLevel+1; j

newh = new HeadIndex(oldbase, newh, idxs[j], j);

if (casHead(h, newh)) { //替换head节点。

h = newh;

idx = idxs[level = oldLevel];

break;

}

}

}

// 找到插入的点,并且把index插入。

splice: for (int insertionLevel = level;;) { //从插入的level层次开始。

int j = h.level;

for (Index q = h, r = q.right, t = idx;;) {

if (q == null || t == null) //退出

break splice;

if (r != null) { //r不为null,

Node n = r.node;

// 在删除前比较,防止需要检查两次。

int c = cpr(cmp, key, n.key);

if (n.value == null) { //已经被删除了,就不一致读。

if (!q.unlink(r))

break;

r = q.right;

continue;

}

if (c > 0) { //右移

q = r;

r = r.right;

continue;

}

}

//找到位置了。就开始插入。

if (j == insertionLevel) {

if (!q.link(r, t)) //连接

break; // restart

if (t.node.value == null) {

findNode(key); //如果此时当前节点被删除,删除已经删除的节点。并且退出循环。

break splice;

}

// 标志的插入层自减 ,如果== 0 ,表示已经到底了,插入完毕,退出循环

if (--insertionLevel == 0)

break splice;

}

// 上面节点已经插入完毕了,插入下一个节点

if (--j >= insertionLevel && j

t = t.down;

q = q.down;

r = q.right;

}

}

}

return null;

}

代码很长,具体意思已经写道代码注释里面。其实开始我看的时候,那张图看得很明白,但是具体实现流程却总是没有想通。感觉精髓在于它的实现过程。

doPut操作,主要步骤分为下面三步:

以下为在最底层合适位置插入一个节点key,value。 或者替换最底层一个节点。

随机选择一个数n,如果这个数小于maxLevel,那么直接从1~n上增加节点,否则就从1~maxLevel+1上增加,并且新增加一链,从刚刚插入的z节点,做一条单独的由 down连接而成的链。这是纵向的。

然后,再通过splice循环,把这条链水平方向连起来。从而形成网状结构。

这样一来,整个插入过程就明了了。

get操作

接下来看看get操作相关的doGet方法:



private V doGet(Object key) {

if (key == null) //判断为null

throw new NullPointerException();

Comparator cmp = comparator; //获取comparator。

outer: for (;;) {

for (Node b = findPredecessor(key, cmp), n = b.next;;) { //找到predecessor。

Object v; int c;

if (n == null) //如果b.next为null,即b为最后一个。

break outer;

Node f = n.next;

if (n != b.next) // inconsistent read //不一致读。

break;

if ((v = n.value) == null) { // n is deleted n被删除了。

n.helpDelete(b, f);

break;

}

if (b.value == null || v == n) // b is deleted b被删除了。

break;

if ((c = cpr(cmp, key, n.key)) == 0) { // key就是这个key,所以直接返回

@SuppressWarnings("unchecked") V vv = (V)v;

return vv;

}

if (c

break outer;

b = n;

n = f;

}

}

return null;

}

由上图代码,代码主要有以下几步:

1. 通过findPredecessor来获得离得最近的前趋节点。

2. 获得这个前趋节点的值从而判断前趋节点状态,比如是否被删除,是否有别的线程已经更改过了。

3. 和前趋节点比较,根据比较结果来判定是否成功get到,例如,如果判断比后者小(说明判断逻辑不对),就直接退出。当比较结果相等,则直接返回。

remove操作

接下来看remove操作:



public V remove(Object key) {

return doRemove(key, null);

}

直接调用doRemove方法:

final V doRemove(Object key, Object value) {

if (key == null) //判空。

throw new NullPointerException();

Comparator cmp = comparator;

outer: for (;;) {

for (Node b = findPredecessor(key, cmp), n = b.next;;) { //找到predecessor节点。

Object v; int c;

if (n == null) //说明后面没有了,已经删掉了。

break outer;

Node f = n.next;

if (n != b.next) // 不一致读,break内层循环

break;

if ((v = n.value) == null) { // n已经被删掉了。但是没删完全,就帮他一起删

n.helpDelete(b, f);

break; //break内层循环

}

if (b.value == null || v == n) // 重新找一遍predecessor。

break;

if ((c = cpr(cmp, key, n.key))

break outer;

if (c > 0) { // 右移

b = n;

n = f;

continue;

}

// value != null 表示需要同时校验key-value值

if (value != null && !value.equals(v))

break outer;

if (!n.casValue(v, null)) // CAS替换value为null。

break;

if (!n.appendMarker(f) || !b.casNext(n, f)) //尝试取消链接。

findNode(key); // 在这个方法里面会调用helpDelete方法,从而把value为null的值删除。

else {

findPredecessor(key, cmp); // 清除索引连接,就是消除为null的。

// head.right == null表示该层已经没有节点,删掉该层

if (head.right == null)

tryReduceLevel();

}

@SuppressWarnings("unchecked") V vv = (V)v;

return vv; //走到这里,说明删除成功了。

}

}

return null;

}

参考文章:

https://blog.csdn.net/vernonzheng/article/details/8244984
https://cloud.tencent.com/developer/news/391343

相关文章

网友评论

      本文标题:ConcurrentSkipListMap源码解析

      本文链接:https://www.haomeiwen.com/subject/igtyuktx.html