🏉 ConcurrentHashMap

吞佛童子2022年6月20日
  • Java
  • Collection
大约 21 分钟

🏉 ConcurrentHashMap

1. 类注释 & JDK 7

1) JDK 7

  1. ConcurrentHashMapSegment 数组结构和 HashEntry 数组组成
  • Segment 是一种可重入锁,extends ReentrantLock ,类中包含一个 HashEntry 数组
  • 每个 HashEntry 是一个链表结构
  • 通过 Segment 分段锁,ConcurrentHashMap 实现了高效率的并发
  • 但并发程度由 segment 数组元素个数决定,并发度一旦初始化无法扩容
  1. size()
  • 该方法用来求 K-V 对元素个数
  • 无锁情况下统计每个 Segment 的元素个数,求和后判断前后两次总和是否相同
  • 若相同,则返回 sum
  • 若不同,再重复 2 次无锁求和
  • 若 3 次计算均无法满足要求,则将所有 Segment 加锁 lock(),再求和

img.png

2) 类注释

  • key & val 均非空,类似于 HashTable
  • get() 操作不会发生阻塞,支持并发,没有锁机制 & CAS 方法
  • size() isEmpty() containsValue() 只在没有其他线程对其进行更新操作时正确,否则结果只反映了瞬态的值,可以用于监测 | 估算,但不足以进行程序控制

补充

  1. 采用了 数组 + 链表 + 红黑树 的数据结构
  2. 并发时,并发度最大为 数组容量大小
  3. 采用 CAS + synchronized 进行并发控制
    • CAS 主要用于
      • put() 操作时,对应数组下标没有任何节点
      • 更新 baseCount 操作
      • transfer() 操作时,当前负责下标处为空,CAS 置 fwd 表示已经迁移完成
    • synchronized 主要用于
      • put() 操作时,锁住要插入元素所在数组下标的头结点
      • transfer() 操作时,锁住本线程负责迁移下标范围的当前下标的头结点
      • replaceNode(Object key, V value, Object cv)
      • clear()
      • treeifyBin()

2. 类图

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
// ......
}

3. 属性

    private static final long serialVersionUID = 7249069246763182397L;
    /* ---------------- 常量部分 -------------- */
    /**
     * 最大容量  
     * 为了保证数组分配,以及 32 位 hash 的前2位被用于 标志位
     */
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 默认数组容量大小,必须为 2 的幂 && 最大为 MAXIMUM_CAPACITY
     */
    private static final int DEFAULT_CAPACITY = 16;

    /**
     * 数组最大长度
     */
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * 默认并发度,并未被使用,只是为了兼容以前的版本
     */
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * 负载因子,并未被实际使用,而是通过 n - (n >>> 2) 判断
     */
    private static final float LOAD_FACTOR = 0.75f;

    /**
     * >= 8 时转红黑树
     */
    static final int TREEIFY_THRESHOLD = 8;

    /**
     * <= 6 退化为链表
     */
    static final int UNTREEIFY_THRESHOLD = 6;

    /**
     * >= 64 时才转红黑树,否则会先扩容
     */
    static final int MIN_TREEIFY_CAPACITY = 64;

    /**
     * 每个线程每次协助迁移的数组下标个数最小值,必须 >= DEFAULT_CAPACITY
     */
    private static final int MIN_TRANSFER_STRIDE = 16;

    /**
     * 用于 sizeCtl 的位数,对于 32 位数组来说,必须 >= 6,sizeCtl 代表的值见下
     */
    private static int RESIZE_STAMP_BITS = 16;

    /**
     * 最多有多少条线程可以同时协助扩容
     */
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    /**
     * 一个 32 位 int,分为 2 部分,一部分记录有多少条线程在协助扩容;一部分记录每个线程负责扩容过程中的节点迁移个数
     */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /*
     * Encodings for Node hash fields. See above for explanation.
     */
    static final int MOVED     = -1; // 正在扩容
    static final int TREEBIN   = -2; // 已经转换成树
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // 获得hash值的辅助参数

    /** 获取可用的 CPU 个数 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /** For serialization compatibility. */
    private static final ObjectStreamField[] serialPersistentFields = {
        new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
    };

    /* ---------------- 字段 -------------- */
    /**
     * 数组,只有初次插入时才进行初始化,长度为 2 的幂
     */
    transient volatile Node<K,V>[] table;

    /**
     * 用于扩容,只有在扩容时才非空
     */
    private transient volatile Node<K,V>[] nextTable;

    /**
     * 记录元素个数,当无竞争时才有效,通过 CAS 进行更新
     */
    private transient volatile long baseCount;

    /**
     * 控制数组的初始化 & 扩容操作
     * 当 < 0 时,表示数组正在初始化 | 扩容
     * -1 表示初始化
     * -n 表示正在有 (n - 1) 个线程正在进行扩容操作
     * 0 表示数组还没有进行初始化
     * > 0 表示元素个数达到多少时才会触发下一次的扩容
     */
    private transient volatile int sizeCtl;

    /**
     * The next table index (plus one) to split while resizing.
     */
    private transient volatile int transferIndex;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
     */
    private transient volatile int cellsBusy;

    /**
     * Table of counter cells. When non-null, size is a power of 2.
     */
    private transient volatile CounterCell[] counterCells;

    // views
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;

    /* ---------------- 静态工具 -------------- */
    /**
     * 高位 & 低位异或
     */
    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

    /**
     * 基于给定值,保证返回最接近给定值的比它大的 2 的幂
     */
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

4. 内部类

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }


5. 构造函数

    /**
     * 1. 什么也不做
     */
    public ConcurrentHashMap() {
    }

    /**
     * 2. 根据指定 initialCapacity 确定 sizeCtl,这里的 sizeCtl 在 initTable() 时会作为初始数组的长度
     * sizeCtl >= (1.5initialCapacity + 1)
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();

        // sizeCtl 扩容阈值 >= (1.5initialCapacity + 1)
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // >= 1.5 + 1 初始容量的 2 的幂
        this.sizeCtl = cap;
    }

    /**
     * 3. 创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射
     */
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    /**
     * 4. 创建一个带有指定初始容量、加载因子和并发级别的新的空映射
     */
    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
    
    /**
     * 5. 将 给定 map 转换为 ConcurrentHashMap
     *
     * @param m the map
     */
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    
    public void putAll(Map<? extends K, ? extends V> m) {
        tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }
    
    /**
     * Tries to presize table to accommodate the given number of elements.
     *
     * @param size number of elements (doesn't need to be perfectly accurate)
     */
    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; 
            int n;
            if (tab == null || (n = tab.length) == 0) { // 原数组为空
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 当前线程 CAS 尝试初始化数组
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            } else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
    
    /**
     * Returns the stamp bits for resizing a table of size n.
     * Must be negative when shifted left by RESIZE_STAMP_SHIFT.
     */
    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

6. 常用方法

1) get(Object key)

    /**
     * 返回 key 对应的 value | null
     * 支持多线程并发,不加锁,用到了 tabAt() 函数保证每次取到的都是最新值
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Node<K,V>[] tab = table;
        int n = tab.length;
        Node<K,V> e, p;
        int n, eh;
        K ek;
        int h = spread(key.hashCode());
        // 表不为空并且表的长度大于0并且key所在的桶不为空
        if (tab != null && n > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
            // 表中的元素的hash值与key的hash值相等
            eh = e.hash; // 获取头节点的 hash 值
            if (eh == h) {
                // 说明只能是头节点
                ek = e.key;
                if (ek == key || (ek != null && key.equals(ek)))
                    return e.val;
            } else if (eh < 0) { // 说明是该节点下是红黑树,进行红黑树的查找
                p = e.find(h, key); // e 作为头节点,判断是否存在 Hash == h && Key == key 的节点
                return p != null ? p.val : null; // 是个TreeBin hash = -2
            }
            // 不是红黑树,进行链表的查找
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

    /*
     * 使用了 unSafe 方法
     * 通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制
     * table[i] 数据是通过 Unsafe 对象通过 反射 获取的
     * 在java内存模型中,每个线程都有一个工作内存,里面存储着table的「副本」,
     * 虽然table是 volatile 修饰的,但不能保证里面的每个元素均为最新值
     * Unsafe.getObjectVolatile 可以直接获取指定内存的数据,「保证了每次拿到数据都是最新的」
     */
    // 用来返回节点数组的指定位置的节点的原子操作
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    // Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final long SIZECTL;
    private static final long TRANSFERINDEX;
    private static final long BASECOUNT;
    private static final long CELLSBUSY;
    private static final long CELLVALUE;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset(k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset(k.getDeclaredField("cellsBusy"));
            
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset(ck.getDeclaredField("value"));
            
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

2) put(K key, V value)

    /**
     * Key & Val 均非空
     */
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key & val 均非空
        if (key == null || value == null) throw new NullPointerException();
        // 求 根据 key 得到的 hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; 
            int n, i, fh;
            // 数组为空,进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 对应插入位置为空,此时 CAS 插入元素
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    break;                   // CAS 插入成功,跳出循环。失败了重新进入该 for 循环
            } else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f); // 正在扩容,需要协助扩容
            else {
                // 数组有了,下标处头结点也有了,现在也不是扩容中,因此正常遍历该下标处的节点,进行插入操作
                V oldVal = null;
                // 锁住该下标处的头结点
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) { // 说明是链表
                            binCount = 1; // 记录链表长度
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val; // 在原链表中找到原有节点,判断是否进行原值覆盖
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 之前没有该节点,尾插法插入元素
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) { // 说明是红黑树
                            Node<K,V> p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value));
                            binCount = 2;
                            if (p != null) { // 已经存在该节点
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value; // 需要覆盖原值
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i); // 转红黑树
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

    /**
     * 初始化数组
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab = table; 
        int sc;
        while (tab == null || tab.length == 0) {
            sc = sizeCtl;
            if (sc < 0)
                Thread.yield(); // 已经有其他线程在进行初始化了,该线程等待
                
            // 没有其他线程进行初始化操作,本线程通过 CAS 尝试将 SIZECTL 变为 -1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                // 变更成功,说明本线程获得初始化数组的资格
                try {
                    tab = table;
                    if (tab == null || tab.length == 0) { // 再次判断
                        
                        // 1) 无参构造函数时,sc == 0 --> 初始数组长度 == 16
                        // 2) 只指定容量时,sc == tableSizeFor(1.5initialCapacity + 1)
                        // 3) 指定 初始容量 & 负载因子 & 并发级别时,sc == tableSizeFor(1 + initialCap/loadFactor)
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2); // 0.75n 表示新的下一次扩容阈值
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

    /**
     * 协助扩容
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; 
        int sc;
        // table不是空  且 node节点是转移类型,并且转移类型的 nextTable 不是空 说明还在扩容ing
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            // 确定新table指向没有变,老table数据也没变,并且此时 sizeCtl小于0 还在扩容ing
            while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
                // 1. 生成的标识符改变
                // 2. 扩容结束 【疑问?】
                // 3. 达到最大可以协助扩容的线程数
                // 4. 扩容转移下标已经 <= 0 说明不需要协助了
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                // sc ++ 表示现在又多了一个线程协助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab); // 进行原数组元素转移到新数组
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

    /**
     * 再扩容阶段需要用到的节点类
     * hash == -1
     * 放在数组下标的头结点处
     * 若看到该节点,则表明该节点已经完成迁移操作,不需要管
     */
    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null); // 该节点类的 hash == MOVED == -1
            this.nextTable = tab;
        }

        Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }
    }
    /**
     * 根据扩容中的长度为 n 的数组生成一个标识符,左移后必须为负值
     * Integer.numberOfLeadingZeros(n) : 返回无符号整型 n 的最高非零位前面的 0 的个数
     */
    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

    /**
     * 将元素迁移到新数组中
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length;
        // 获取每个线程负责迁移的下标个数,与 CPU 数有关,且 >= MIN_TRANSFER_STRIDE(16)
        int stride = (NCPU > 1) ? (n >>> 3) / NCPU : n;       
        if (stride < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
            
        // 判断新数组是否为 null,若是,则先进行新数组的初始化操作,扩容为原来的 2 倍
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n; // 初始开始迁移的对应原数组的尾节点下标为 n,从后往前一个线程负责一部分下标处的迁移工作
        }
        
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; 
            int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false; // 不需要获取负责迁移的数组下标,退出
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false; // 别的线程已经迁移完成,不需要本线程协助了,退出
                }
                else if (U.compareAndSwapInt(this, 
                                            TRANSFERINDEX, 
                                            nextIndex, 
                                            nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                    // 获取当前线程需要负责迁移的原数组下标范围 [transferIndex - stride, transferIndex]
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false; // 成功获取到本线程要负责迁移的数组下标范围,退出
                }
            }
            // 已经没有还未分配的需要迁移的下标了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 全部迁移完成
                if (finishing) {
                    nextTable = null;
                    table = nextTab; // table 成为新数组,原新数组清空,留待下次迁移操作
                    sizeCtl = (n << 1) - (n >>> 1); // 扩容阈值为新数组长度的 0.75
                    return;
                }
                // 全部迁移完成,但是 finish 标志位还未被设置,说明虽然下标已经分配给线程,
                // 但有的线程还在执行自己负责的下标迁移操作
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return; // 当前线程不是最后一个还在扩容的线程,不能设置 finish 标志位,直接返回
                    finishing = advance = true; // 是最后一个还在扩容的线程,设置 finish 标志位,表示迁移操作全部线程均已完成
                    i = n; // recheck before commit
                }
            } else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd); // 负责迁移的该下标为 null,CAS 设置为 fwd 节点,表示该节点已完成迁移工作
            else if ((fh = f.hash) == MOVED)
                advance = true; // 在负责之前就已经是完成状态,则跳过当前下标
            else {
                // 加锁,锁住该下标处的头结点,开始该下标处节点的迁移工作
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n; // 判断迁移后的下标是 index,还是 index + n
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            // 分成 2 条链表,分别放在新数组对应的下标处
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; 
                                K pk = p.key; 
                                V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) { // 红黑树的迁移
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

    /**
     * Adds to count, and if table is too small and not already
     * resizing, initiates transfer. If already resizing, helps
     * perform transfer if work is available.  Rechecks occupancy
     * after a transfer to see if another resize is already needed
     * because resizings are lagging additions.
     *
     * @param x the count to add
     * @param check if <0, don't check resize, if <= 1 only check if uncontended
     */
    private final void addCount(long x, int check) {
        CounterCell[] as = counterCells; 
        long b, s;
        // 若桶非空,或者尝试 CAS 修改 baseCount 值失败
        // 说明可能存在并发
        if (as != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; 
            long v; 
            int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 1. 桶为空,无并发
                // 2. 随机取出一个桶发现是空的
                // 3. 修改随机出来的桶的桶容量,发现失败
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount(); // 统计所有桶中的元素总数
        }
        // check >= 0 需要检查是否需要进行扩容操作
        if (check >= 0) {
            Node<K,V>[] tab, nt; 
            int n, sc;
            // 当前元素总数 > 容量阈值 && 数组非空 && 数组长度为超范 ==> 需要进行扩容操作
            while (s >= (long)(sc = sizeCtl) 
                    && (tab = table) != null 
                    && (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                // 此时已经正在进行扩容
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break; // 当前线程不需要协助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt); // 参与扩容
                } else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null); // 当前未进行扩容,需要开始扩容
                s = sumCount();
            }
        }
    }
    
    /**
     * A padded cell for distributing counts.  Adapted from LongAdder
     * and Striped64.  See their internal docs for explanation.
     */
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

    // See LongAdder version for explanation
    // 分桶计数
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

    // 遍历桶,统计每个桶的元素和
    final long sumCount() {
        CounterCell[] as = counterCells; 
        CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

3) 遍历

① entrySet()

    public Set<Map.Entry<K,V>> entrySet() {
        EntrySetView<K,V> es;
        return (es = entrySet) != null ? es : (entrySet = new EntrySetView<K,V>(this));
    }
    
    /**
     * A view of a ConcurrentHashMap as a {@link Set} of (key, value)
     * entries.  This class cannot be directly instantiated. See
     * {@link #entrySet()}.
     */
    static final class EntrySetView<K,V> extends CollectionView<K,V,Map.Entry<K,V>> 
                                        implements Set<Map.Entry<K,V>>, java.io.Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
        EntrySetView(ConcurrentHashMap<K,V> map) { super(map); }

        public boolean contains(Object o) {
            Object k, v, r; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (r = map.get(k)) != null &&
                    (v = e.getValue()) != null &&
                    (v == r || v.equals(r)));
        }

        public boolean remove(Object o) {
            Object k, v; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    map.remove(k, v));
        }

        /**
         * @return an iterator over the entries of the backing map
         */
        public Iterator<Map.Entry<K,V>> iterator() {
            ConcurrentHashMap<K,V> m = map;
            Node<K,V>[] t;
            int f = (t = m.table) == null ? 0 : t.length;
            return new EntryIterator<K,V>(t, f, 0, f, m);
        } 

        public boolean add(Entry<K,V> e) {
            return map.putVal(e.getKey(), e.getValue(), false) == null;
        }

        public boolean addAll(Collection<? extends Entry<K,V>> c) {
            boolean added = false;
            for (Entry<K,V> e : c) {
                if (add(e))
                    added = true;
            }
            return added;
        }

        public final int hashCode() {
            int h = 0;
            Node<K,V>[] t;
            if ((t = map.table) != null) {
                Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
                for (Node<K,V> p; (p = it.advance()) != null; ) {
                    h += p.hashCode();
                }
            }
            return h;
        }

        public final boolean equals(Object o) {
            Set<?> c;
            return ((o instanceof Set) &&
                    ((c = (Set<?>)o) == this ||
                     (containsAll(c) && c.containsAll(this))));
        }

        public Spliterator<Map.Entry<K,V>> spliterator() {
            Node<K,V>[] t;
            ConcurrentHashMap<K,V> m = map;
            long n = m.sumCount();
            int f = (t = m.table) == null ? 0 : t.length;
            return new EntrySpliterator<K,V>(t, f, 0, f, n < 0L ? 0L : n, m);
        }

        public void forEach(Consumer<? super Map.Entry<K,V>> action) {
            if (action == null) throw new NullPointerException();
            Node<K,V>[] t;
            if ((t = map.table) != null) {
                Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
                for (Node<K,V> p; (p = it.advance()) != null; )
                    action.accept(new MapEntry<K,V>(p.key, p.val, map));
            }
        }

    }
    
    static final class EntryIterator<K,V> extends BaseIterator<K,V> implements Iterator<Map.Entry<K,V>> {
        EntryIterator(Node<K,V>[] tab, int index, int size, int limit, ConcurrentHashMap<K,V> map) {
            super(tab, index, size, limit, map);
        }

        public final Map.Entry<K,V> next() {
            Node<K,V> p = next;
            if (p== null)
                throw new NoSuchElementException();
            K k = p.key;
            V v = p.val;
            lastReturned = p;
            advance(); // Iterator 前进一个节点
            return new MapEntry<K,V>(k, v, map);
        }
    }
    
    /**
     * Base of key, value, and entry Iterators. Adds fields to
     * Traverser to support iterator.remove.
     */
    static class BaseIterator<K,V> extends Traverser<K,V> {
        final ConcurrentHashMap<K,V> map;
        Node<K,V> lastReturned;
        BaseIterator(Node<K,V>[] tab, int size, int index, int limit, ConcurrentHashMap<K,V> map) {
            super(tab, size, index, limit);
            this.map = map;
            advance();
        }

        public final boolean hasNext() { return next != null; }
        public final boolean hasMoreElements() { return next != null; }

        public final void remove() {
            Node<K,V> p;
            if ((p = lastReturned) == null)
                throw new IllegalStateException();
            lastReturned = null;
            map.replaceNode(p.key, null, null);
        }
    }
    
    /**
     * Encapsulates traversal for methods such as containsValue; also
     * serves as a base class for other iterators and spliterators.
     *
     * Method advance visits once each still-valid node that was
     * reachable upon iterator construction. It might miss some that
     * were added to a bin after the bin was visited, which is OK wrt
     * consistency guarantees. Maintaining this property in the face
     * of possible ongoing resizes requires a fair amount of
     * bookkeeping state that is difficult to optimize away amidst
     * volatile accesses.  Even so, traversal maintains reasonable
     * throughput.
     *
     * Normally, iteration proceeds bin-by-bin traversing lists.
     * However, if the table has been resized, then all future steps
     * must traverse both the bin at the current index as well as at
     * (index + baseSize); and so on for further resizings. To
     * paranoically cope with potential sharing by users of iterators
     * across threads, iteration terminates if a bounds checks fails
     * for a table read.
     */
    static class Traverser<K,V> {
        Node<K,V>[] tab;        // current table; updated if resized
        Node<K,V> next;         // the next entry to use
        TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
        int index;              // index of bin to use next
        int baseIndex;          // current index of initial table
        int baseLimit;          // index bound for initial table
        final int baseSize;     // initial table size

        Traverser(Node<K,V>[] tab, int size, int index, int limit) {
            this.tab = tab;
            this.baseSize = size;
            this.baseIndex = this.index = index;
            this.baseLimit = limit;
            this.next = null;
        }

        /**
         * 前进一个节点
         */
        final Node<K,V> advance() {
            Node<K,V> e = next; // e 前进一个节点
            if (e != null)
                e = e.next; // e 前进一个节点后,再次前进一个节点
            for (;;) {
                Node<K,V>[] t; 
                int i, n;  // must use locals in checks
                
                // 1. 尝试更新 next,若 next.netxt != null 说明可以直接进行 .next 返回
                if (e != null)
                    return next = e; 
                // 2. .next == null 说明已经是当前下标节点的最后一个,判断是否是最后一个下标,如果是,说明后续不存在节点,返回 null
                if (baseIndex >= baseLimit || (t = tab) == null ||
                    (n = t.length) <= (i = index) || i < 0)
                    return next = null; 
                // 3. 尝试找数组下一个非空节点,判断是否处于扩容中
                if ((e = tabAt(t, i)) != null && e.hash < 0) {
                    // 3.1 包含当前节点已经扩容完成,更新 table 为 nextTable 找到已经扩容完成后的数组
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        e = null;
                        pushState(t, i, n);
                        continue;
                    }
                    else if (e instanceof TreeBin)
                        e = ((TreeBin<K,V>)e).first;
                    else
                        e = null;
                }
                if (stack != null)
                    recoverState(n);
                else if ((index = i + baseSize) >= n)
                    index = ++baseIndex; // visit upper slots if present
            }
        }

        /**
         * Saves traversal state upon encountering a forwarding node.
         */
        private void pushState(Node<K,V>[] t, int i, int n) {
            TableStack<K,V> s = spare;  // reuse if possible
            if (s != null)
                spare = s.next;
            else
                s = new TableStack<K,V>();
            s.tab = t;
            s.length = n;
            s.index = i;
            s.next = stack;
            stack = s;
        }

        /**
         * Possibly pops traversal state.
         *
         * @param n length of current table
         */
        private void recoverState(int n) {
            TableStack<K,V> s; 
            int len;
            while ((s = stack) != null && (index += (len = s.length)) >= n) {
                n = len;
                index = s.index;
                tab = s.tab;
                s.tab = null;
                TableStack<K,V> next = s.next;
                s.next = spare; // save for reuse
                stack = next;
                spare = s;
            }
            if (s == null && (index += baseSize) >= n)
                index = ++baseIndex;
        }
    }

② keySet()

    public KeySetView<K,V> keySet() {
        KeySetView<K,V> ks;
        return (ks = keySet) != null ? ks : (keySet = new KeySetView<K,V>(this, null));
    }

③ values()

    public Collection<V> values() {
        ValuesView<K,V> vs;
        return (vs = values) != null ? vs : (values = new ValuesView<K,V>(this));
    }

7. 问题补充

1) val != null

  • 依据 ConcurrentHashMap 的作者的回答进行解释:
  • 在并发情况下会带来 二义性
  • 详细说明:
    • 不论是 HashMap 还是 ConcurrentHashMap,经过 map.get(key) 函数如果得到了 null 的结果
    • 此时无法确定到底是 不存在该 key 还是 存在该 key 但是 key.val == null 这两种情况,因此存在 二义性
  • 此外,作者个人认为,无论是否考虑线程安全问题,都不应该允许 null 的存在

2) key != null

  • 作者不希望出现 key == null 的设计,因此在代码中出现该情况会抛出异常

3) 迭代器是 强一致性 or 弱一致性?

  • 弱一致性
  • 在遍历过程中,内部元素被其他线程改变
  • 如果改变的部分是已经遍历过的,迭代器不会反映出来
  • 若改变的部分是还没有遍历过的,迭代器会输出改变后的元素情况

4) ConcurrentHashMap VS HashTable VS Collections.synchronizedMap(map)

上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou