Ⓜ️ Atomic & Unsafe

吞佛童子2022年10月10日
  • Java
  • Concurrency
大约 4 分钟

Ⓜ️ Atomic & Unsafe

1. 原理

  • 基于 UNSAFE 类的 CAS 操作实现
  • 无锁

2. 分类

1. 基本数据类型

1) 种类

  • AtomicInteger
  • AtomicBoolean
  • AtomicLong

2) 使用

public class AtomicIntegerTest {
    public static void main(String[] args) {
        AtomicInteger atomic = new AtomicInteger(10);
        System.out.println(atomic.getAndIncrement()); // 输出当前值,然后 ++
        System.out.println(atomic.incrementAndGet()); // 当前值 ++,然后 输出当前值
        System.out.println(atomic.getAndDecrement()); // 输出当前值,然后 --
        System.out.println(atomic.getAndAdd(2));// 输出当前值,然后 +
        System.out.println(atomic.getAndSet(10)); // 输出当前值,然后 赋新值
        System.out.println(atomic.get());             // 输出当前值

        System.out.println(atomic.getAndAccumulate(3, (left, right) -> left * right)); // 输出当前值,然后 根据指定运算方式,得到新值

        System.out.println(atomic.getAndUpdate(operand -> operand * 20)); // 输出当前值,然后 根据指定运算方式,得到新值
    }
} 

2. 数组类型

1) 种类

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

2) 使用

public class AtomicIntegerArrayTest {
    public static void main(String[] args) {
        int[] array = {10};
        AtomicIntegerArray atomic = new AtomicIntegerArray(array);
        System.out.println(atomic.getAndIncrement(0)); //10
        System.out.println(atomic.get(0));//11
        System.out.println(atomic.getAndDecrement(0)); //11
        System.out.println(atomic.getAndAdd(0, 2));//10
        System.out.println(atomic.getAndSet(0, 10)); //12
        System.out.println(atomic.get(0));             //10

        System.out.println(atomic.getAndAccumulate(0, 3, (left, right) -> left * right)); // 10

        System.out.println(atomic.getAndUpdate(0, operand -> operand * 20)); // 10
    }
}

3. 引用数据类型

1) 种类

  • AtomicReference
  • AtomicStampedReference
    • 标志为 boolean mark
  • AtomicMarkableReference
    • 标志为 int stamp

2) 使用

public class AtomicReferenceTest {
    public static void main(String[] args) {
        User user = new User(1L, "test", "test");
        AtomicReference<User> atomic = new AtomicReference<>(user);

        User pwdUpdateUser = new User(1L,"test","newPwd");
        System.out.println(atomic.getAndSet(pwdUpdateUser));
        System.out.println(atomic.get());
    }

    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    @ToString
    static class User {
        private Long id;
        private String username;
        private String password;
    }
}

4. 累加器

  • 为了解决 高并发下,只能有一个线程 CAS 成功的问题
  • 在高竞争下,该类的预期吞吐量明显更高,代价是 更高的空间消耗
  • @sun.misc.Contended 修饰 Cell 类,可避免多个变量共用同一缓存行,通过空间,避免 伪共享 问题
  • ConcurrentHashMap 中用到了 LogAdder 的属性 - 桶

1) 种类

  • LongAdder
  • LongAccumulator
    • 功能会更加强大,可以自定义累加的规则
  • DoubleAdder
  • DoubleAccumulator

2) 使用

public class LongAdderTest {
    public static void main(String[] args) {
        LongAdder longAdder = new LongAdder(); // 该式等价于 下面一行
        LongAccumulator accumulator = new LongAccumulator((left, right) -> 0, 0);
    }
}

3) 源码

LongAdder

  • 一个或多个变量共同保持初始为 0 的 long 型数据总和。
  • 当跨线程争用更新(方法添加)时,变量集可能会动态增长以减少争用。
  • 方法 sum(或等价的 longValue)返回在保持总和的变量之间组合的当前总和。
  • 当多个线程更新用于收集统计信息等目的的公共总和时,此类通常比 AtomicLong 更可取,而不是用于细粒度的同步控制。
  • 在低更新争用下,这两个类具有相似的特征。
  • 但是在高竞争下,这个类的 预期吞吐量明显更高,代价是 更高的空间消耗
  • LongAdders 可以与 java.util.concurrent.ConcurrentHashMap 一起使用,以维护可扩展的频率图(直方图或多重集的一种形式)。
    • 例如,要将计数添加到 ConcurrentHashMap<String,LongAdder> freqs,
    • 如果不存在则进行初始化,您可以使用 freqs.computeIfAbsent(k -> new LongAdder()).increment();
  • 该类扩展了 Number,但没有定义诸如 equals、hashCode 和 compareTo 等方法,因为实例预计会发生变异,因此不能用作集合键。
public class LongAdder extends Striped64 implements Serializable {
    // 构造函数
    public LongAdder() {}
    
    // 增加
    public void add(long x) {
        Cell[] as = cells; 
        long b, v; 
        int m; 
        Cell a;
        if (as != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null 
                || (m = as.length - 1) < 0 
                || (a = as[getProbe() & m]) == null 
                || !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    
    // ++
    public void increment() {
        add(1L);
    }
    
    // --
    public void decrement() {
        add(-1L);
    }
    
    // 返回当前总和
    // 返回的值不是原子快照;在没有并发更新的情况下调用会返回准确的结果,但在计算总和时发生的并发更新可能不会被合并。
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    
    // 重置
    public void reset() {
        Cell[] as = cells; 
        Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
    
    public long longValue() {
        return sum();
    }
    public int intValue() {
        return (int)sum();
    }
    public float floatValue() {
        return (float)sum();
    }
    public double doubleValue() {
        return (double)sum();
    }
}

Striped64

abstract class Striped64 extends Number {
    // ------------- 属性 -------------------
    static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU 数
    transient volatile Cell[] cells; // 拆分成的数组,数量为 2 的幂
    transient volatile long base; // 基值,主要在无竞争时使用,通过 CAS 更新。
    transient volatile int cellsBusy; // 调整大小 | 创建单元格时使用自旋锁(通过 CAS 锁定)。
    
    // ---------- 构造函数 ---------------
    Striped64() {}
    
    // ---------- 内部类 -------------
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe(); // Unsafe 类获取
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    // ----------- 常用方法 -------------
    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        // ...
    }
    
    final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended) {
        // ...
    }
}

5. Unsafe

1) 获取 Unsafe 对象

2) 常见方法

    // LockSupport 类用到
    public native void park(boolean isAbsolute, long time);
    public native void unpark(Object thread);
    
    // CAS 操作 
    public final native boolean compareAndSwapLong(Object o, long offset,
                                                   long expected,
                                                   long x);
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou