22、Java并发编程:(JUC集合)ConcurrentSkipListMap介绍

ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。

理解SkipList

要想弄明白ConcurrentSkipListMap,我们的先明白他的数据结构实现,先来看SkipList。

Skip List是一种随机化的数据结构,基于并联的链表,其效率可比拟于二叉查找树(对于大多数操作需要O(log n)平均时间)。基本上,跳跃列表是对有序的链表增加上附加的前进链接,增加是以随机化的方式进行的,所以在列表中的查找可以快速的跳过部分列表(因此得名)。所有操作都以对数随机化的时间进行。SkipList可以很好解决有序链表查找特定值的困难。

Skip List定义:

一个跳表,应该具有以下特征:

1、 一个跳表应该有几个层(level)组成;
2、 跳表的第一层包含所有的元素;
3、 每一层都是一个有序的链表;
4、 如果元素x出现在第i层,则所有比i小的层都包含x;
5、 第i层的元素通过一个down指针指向下一层拥有相同值的元素;
6、 在每一层中,-1和1两个元素都出现(分别表示INT_MIN和INT_MAX);
7、 Top指针指向最高层的第一个元素;

构建有序链表:

*

一个跳表如下:

*

Skip List构造步骤:

1、 给定一个有序的链表;
2、 选择链表中最大和最小的元素,然后从其他元素中按照一定算法(随机)随即选出一些元素,将这些元素组成有序链表这个新的链表称为一层,原链表称为其下一层;
3、 为刚选出的每个元素添加一个指针域,这个指针指向下一层中值同自己相等的元素Top指针指向该层首元素;
4、 重复2、3步,直到不再能选择出除最大最小元素以外的元素;

从上图可以看到,跳表具有以下几种特性:

  • 由很多层组成,level越高的层节点越少,最后一层level用有所有的节点数据
  • 每一层的节点数据也都是有顺序的
  • 上面层的节点肯定会在下面层中出现
  • 每个节点都有两个指针,分别是同一层的下一个节点指针和下一层节点的指针

使用跳表查询元素的时间复杂度是O(log n),跟红黑树一样。查询效率还是不错的,但是跳表的存储容量变大了,本来一共只有10个节点的数据,使用跳表之后变成了21个节点。

所以跳表是一种使用”空间换时间”的概念用来提高查询效率的链表,开源软件Redis、LevelDB都使用到了跳表。跳表相比B树,红黑树,AVL树时间复杂度一样,但是耗费更多存储空间,但是跳表的优势就是它相比树,实现简单,不需要考虑树的一些rebalance问题。

ConcurrentSkipListMap探索

ConcurrentSkipListMap包含了很多内部类,内部类的框架图如下:
*

ConcurrentSkipListMap在原始链表的基础上增加了跳表的结构,所以需要两个额外的内部类来封装链表的节点,以及跳表的节点——Node和Index。

同ConcurrentHashMap的Node节点一样,key为final,是不可变的,value和next通过volatile修饰保证内存可见性。

Index:跳表的节点:

static class Index<K,V> {
   
     
      final Node<K,V> node;
       final Index<K,V> down;
       volatile Index<K,V> right;
}

Node:链表的节点:

static final class Node<K,V> {
      final K key;
       volatile Object value;
       volatile Node<K,V> next;
}

Index封装了跳表需要的结构,首先node包装了链表的节点,down指向下一层的节点(不是Node,而是Index),right指向同层右边的节点。node和down都是final的,说明跳表的节点一旦创建,其中的值以及所处的层就不会发生变化(因为down不会变化,所以其下层的down都不会变化,那他的层显然不会变化)。Node和Index内部都提供了用于CAS原子更新的AtomicReferenceFieldUpdater对象,该对象前面讲Atomic原子类的时候已经讲过,原理和机制将不再介绍。

下面我们还是着重介绍ConcurrentSkipListMap的get、put和remove方法。在介绍这三个方法之前我们先看一下这三个方法都会用到的一个辅助方法:

private Comparable<? super K> comparable(Object key)
            throws ClassCastException {
     if (key == null)
          throw new NullPointerException();
    //有两种封装方法,如果在构造时指定了comparator,则使用comparator封装key
    // 如果没有指定comparator,则key必须是一个继承自Comparable接口的类,否则会抛出ClassCastException
    // 所以ConcurrentSkipListMap的key要么是继承自Comparable接口的类,如果不是的话需要显示提供comparator进行比较
      if (comparator != null)
          return new ComparableUsingComparator<K>((K)key, comparator);
      else
          return (Comparable<? super K>)key;
  }

static final class ComparableUsingComparator<K> implements Comparable<K> {
   
     
      final K actualKey;
      final Comparator<? super K> cmp;
      ComparableUsingComparator(K key, Comparator<? super K> cmp) {
          this.actualKey = key;
          this.cmp = cmp;
      }
      public int compareTo(K k2) {
          return cmp.compare(actualKey, k2);
      }
  }

ConcurrentSkipListMap的key必须是能够比较的,这样来确保线程安全。

我们再来看一下get方法:

public V get(Object key) {
     return doGet(key);
}

private V doGet(Object okey) {
     Comparable<? super K> key = comparable(okey);
      /*
       * Loop needed here and elsewhere in case value field goes
       * null just as it is about to be returned, in which case we
       * lost a race with a deletion, so must retry.
       */
      for (;;) {
          Node<K,V> n = findNode(key);
          if (n == null)
              return null;
          Object v = n.value;
          if (v != null)
              return (V)v;
      }
  }

可见在get方法中调用了doGet()来进行取值操作,首先调用了comparable(key)方法来确保该次取值的安全性,后面再一个死循环中持续进行 findNode(key)操作。

再看一下put方法:

public V put(K key, V value) {
    if (value == null)
          throw new NullPointerException();
      return doPut(key, value, false);
}

private V doPut(K kkey, V value, boolean onlyIfAbsent) {
       Comparable<? super K> key = comparable(kkey);
     for (;;) {
    // 从跳表中查找最接近指定key的节点:该节点的key小于等于指定key,且处于最底层
          Node<K,V> b = findPredecessor(key);
          Node<K,V> n = b.next;
    //新节点插入在b与n之间
          for (;;) {
        //n==null则说明b是链表的最后一个节点,则新节点直接插入到链表尾部即可
              if (n != null) {
                  Node<K,V> f = n.next;
                  if (n != b.next)  // 此处增加判断,避免链表结构已被修改(针对节点b)
                      break;
                  Object v = n.value;
                  if (v == null) {     // n节点已经被删除
                      n.helpDelete(b, f);b和f分别为n的前驱和后继节点
                      break;
                  }
            // 这里如果v==n说明n是一个删除标记,用来标记其前继节点已被删除,即b已被删除
                  if (v == n || b.value == null) // b is deleted
                      break;
                  int c = key.compareTo(n.key);
            // 如果指定key>n的key,则判断下一个节点,直到n==null,或者指定key<n的key
                  if (c > 0) {
                      b = n;
                      n = f;
                      continue;
                  }
            // 相等,则更新value即可,更新失败,就再来一次,一直到成功为止
                  if (c == 0) {
                      if (onlyIfAbsent || n.casValue(v, value))
                          return (V)v;
                      else
                          break; // restart if lost race to replace value
                  }
                  // else c < 0; fall through
              }
        // 创建一个节点,next指向n
              Node<K,V> z = new Node<K,V>(kkey, value, n);
        // 将b的next指向新创建的节点,则新的链表为:b-->new-->n,即将新节点插入到b和n之间
              if (!b.casNext(n, z))
                  break;         // restart if lost race to append to b
        // 随机计算一个层级
              int level = randomLevel();
              if (level > 0)
        // 将z插入到该层级
                  insertIndex(z, level);
              return null;
          }
      }
  }

代码中已经附上了大量的注释,这里再简单的梳理下流程。首先put()方法是调用内部的doPut()方法。Comparable< ? super K&> key = comparable(kkey);这一句将key封装成一个Comparable对象,上面已经介绍了comparable这个方法。接着进入到死循环,循环第一步是调用findPredecessor(key)方法,该方法返回一个key最接近指定key的节点(最接近指的是小于等于),该节点是处于最底层的,下面介绍下这个方法的逻辑。

    /*
    *在跳表中查找节点的key小于指定key,且处于最底层的节点,即找到指定key的前继节点
    *基本逻辑是从head(跳表的最高层链表的头结点)开始自右开始查找,当找到该层链表的最
    *接近且小于指定key的节点时,往下开始查找,
    *最终找到最底层的那个节点
    */
private Node<K,V> findPredecessor(Comparable<? super K> key) {
      if (key == null)
          throw new NullPointerException(); // don't postpone errors
      for (;;) {
    // head是跳表的最高层链表的头结点
          Index<K,V> q = head;
          Index<K,V> r = q.right;// head的右边节点
          for (;;) {
        // r==null说明该层链表已经查找到头,且未找到符合条件的节点,需开始往下查找
              if (r != null) {
                  Node<K,V> n = r.node;// r的数据节点
                  K k = n.key;
                  if (n.value == null) {
  
    // n的value为null,说明该节点已被删除
                // 将该节点从链表移除,通过将其(n)前置节点的right指向其(n)的后置节点
                      if (!q.unlink(r))
                          break;           // restart
                      r = q.right;         // reread r 移除value==null的n节点之后,继续从n的下一个节点查找
                      continue;
                  }
            // 比较当前查找的节点的key与指定key,如果小于指定key,则继续查找,
            // 大于等于key则q即为该层链表最接近指定key的         
                  if (key.compareTo(k) > 0) {
                      q = r;
                      r = r.right;
                      continue;
                  }
              }
        // 到这里有两种情况:
        //1)该层链表已经查找完,仍未找到符号条件的节点
        //2)找到一个符合条件的节点
              // 开始往下一层链表进行查找
              Index<K,V> d = q.down;
              if (d != null) { // 从下层对应位置继续查找
                  q = d;
                  r = d.right;
              } else // 如果无下层链表则直接返回当前节点的node
                  return q.node;
          }
      }
  }

该方法的查找逻辑是:从head(跳表的最高层链表的头结点)开始自右开始查找,当找到该层链表的最接近且小于指定key的节点时,往下开始查找,最终找到最底层的那个节点。具体的代码可以看注释,应该说的挺明白的了,针对Put方法,这个方法返回的节点就是将要插入的节点的前继节点,即新节点将插到该节点后面。下面是查找的示意图:

*

所有的修改操作都是使用CAS,只要失败就会重试,直至成功,所以就算多线程并发操作也不会出现错误,而且通过CAS避免了使用锁,性能比用锁好很多。

接下来在看一下remove:

public V remove(Object key) {
   return doRemove(key, null);
}

final V doRemove(Object okey, Object value) {
    Comparable<? super K> key = comparable(okey);
    for (;;) {
// 从跳表中查找最接近指定key的节点:该节点的key小于等于指定key,且处于最底层
        Node<K,V> b = findPredecessor(key);
        Node<K,V> n = b.next;
        for (;;) {
            if (n == null)
                return null;
//获取n节点的下一个节点
            Node<K,V> f = n.next;
            if (n != b.next)                    // inconsistent read
                break;
            Object v = n.value;
            if (v == null) {                    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (v == n || b.value == null)      // b is deleted
                break;
            int c = key.compareTo(n.key);
            if (c < 0)
                return null;
            if (c > 0) {
  
    //将该节点移除
                b = n;
                n = f;
                continue;
            }
            if (value != null && !value.equals(v))
                return null;
            if (!n.casValue(v, null))
                break;
            if (!n.appendMarker(f) || !b.casNext(n, f))
                findNode(key);                  // Retry via findNode
            else {
                findPredecessor(key);           // Clean index
                if (head.right == null)
                    tryReduceLevel();
            }
            return (V)v;
        }
    }
}

说明:doRemove函数的处理流程如下。

  ①根据key值找到前驱结点,查找的过程会删除一个标记为删除的结点。

  ②从前驱结点往后查找该结点。

  ③在该结点后面添加一个marker结点,若添加成功,则将该结点的前驱的后继设置为该结点之前的后继。

  ④头结点的next域是否为空,若为空,则减少层级。