[JDK源码]J.U.C-AQS-CountDownLatch和Semaphore
CountDownLatch
定义
CountDownLatch :是一个同步器 用于一个或者多个线程等待其他线程完成一组操作
1、AQS的state变量用于表示操作个数
2、AQS的共享锁机制完成唤醒
3、等待锁的线程使用acquireShared方法获取共享锁等待
4、操作线程使用releaseShared方法用于唤醒等待共享锁的线程
count 表示操作个数,当每一个操作执行完成时,原子性的减少一个,当count变量为0时,代表所有的操作都完成了,唤醒等待的线程即可。
构造器
//count 值 和 核心操作 均是由 sync 内部类来完成得
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
countDown方法
public void countDown() {//调用releaseShared 父类的模板方法
sync.releaseShared(1);
}
releaseShared方法
//该方法由AQS来实现,通过子类完成tryReleaseShared方法释放共享锁,如果释放成功,那么直接调用doReleaseShared方法完成等待获取共享锁的线程,获取共享锁。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared 方法
//sync下
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {//获取当前state值
int c = getState();
if (c == 0)
return false;
// 计算更新值,CAS原子性的修改即可
int nextc = c-1;
if (compareAndSetState(c, nextc))
//若修改成功,需要判断当前线程是不是最后一个完成操作的线程,是的情况返回true,唤醒所有等待共享锁的线程
return nextc == 0;
}
}
await方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
tryAcquireSharedNanos方法
//AQS实现, 抛出InterruptedException 异常 响应线程中断,
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||//核心操作还是由于子类的 tryAcquireShared方法来实现的
doAcquireSharedNanos(arg, nanosTimeout);
}
tryAcquireShared方法
//就是看变量是否为0,如果为0,那么无条件返回1,此时将会直接获取到共享锁。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
Semaphore
作用:限制并发
通过设置初始permit,许可值,每获取一个permit,那么减1,直到为0,此时限制并发。
public class Semaphore implements java.io.Serializable {
private final Sync sync;
//3个内部类
static final class FairSync extends Sync {}
static final class NonfairSync extends Sync {}
abstract static class Sync extends AbstractQueuedSynchronizer {}
//构造方法 默认非公平锁 通过permits 变量来限制并发
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
}
acquire方法
//获取permit的acquire(int permits)操作 是由sync类来完成的
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
acquireSharedInterruptibly方法
AQS下
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//线程中断
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//调用子类的tryAcquireShared模板方法,让子类实现自己获取信号量的机制。
doAcquireSharedInterruptibly(arg);
}
}
公平锁的实现
static final class FairSync extends Sync {
//初始化父类AQS的 state 变量
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {//如果线程在队列中 然会-1,AQS完成阻塞
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
//此时有可用的信号量,进行CAS操作
if (remaining < 0 ||
compareAndSetState(available, remaining))
//失败的情况下 返回 剩余信号量
return remaining;
}
}
}
非公平锁的实现
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//由 sync 实现
final int nonfairTryAcquireShared(int acquires) {
for (;;) {//非公平的情况下 直接CAS枪锁就可,看可用资源数 < 0 ?
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
release
//释放permit的release(int permits)操作 由sync类来完成
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//由子类释放完
doReleaseShared(); //直接调用doReleaseShared 方法唤醒后面等待的线程
return true;
}
return false;
}
sync实现:
protected final boolean tryReleaseShared(int releases) {
for (;;) {// 直接通过CAS操作对state变量+1即可
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}