🔺 Semaphore

吞佛童子2022年10月10日
  • Java
  • Concurrency
大约 3 分钟

🔺 Semaphore

1. 类注释

  • 计数信号量。从概念上讲,信号量维护一组许可。
  • 基于 AQS 实现,支持 公平策略 & 非公平策略
  • Semaphore & (CountDownLatch | CyclicBarrier) 的区别在于:
    • CountDownLatch | CyclicBarrier 初始时,设置指定 count,每个任务完成后,让 count --,当 count == 0 时总任务执行
    • Semaphore 初始时,一般不放可用许可,每个任务完成后,释放许可,让 count ++,当 count == expect 时总任务执行
  • 示例:
    • 信号量通常用于限制可以访问某些(物理或逻辑)资源的线程数。
    • 例如,下面是一个使用信号量来控制对项目池的访问的类:
 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

2. 类图

public class Semaphore implements java.io.Serializable {
    // ...
}

img_9.png


3. 内部类

    /**
     * 继承 AQS,有 FairSync & NonfairSync 2 种子类
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        // 非公平尝试获取许可,返回剩余许可数
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState(); // 剩余可用许可
                int remaining = available - acquires; // 减去自身需要许可后,剩余许可数
                if (remaining < 0 ||
                    compareAndSetState(available, remaining)) // 无论 CLH 队列是否为空,直接参与竞争
                    return remaining;
            }
        }

        // 释放许可,可用许可数增加
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        // 减少许可数
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        // 将许可数 置零
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /**
     * 非公平
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * 公平
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors()) // CLH 队列非空,返回 -1
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining)) // 队列为空时,才参与竞争
                    return remaining;
            }
        }
    }

4. 属性

    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;

5. 构造函数

    /**
     * 非公平锁,指定初始许可数
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 指定是否公平,指定初始许可数
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

6. 常见方法

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    public void release() {
        sync.releaseShared(1);
    }
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
上次编辑于: 2022/10/10 下午8:43:48
贡献者: liuxianzhishou