[JDK源码]J.U.C-AQS-CountDownLatch和Semaphore

CountDownLatch

定义

CountDownLatch :是一个同步器 用于一个或者多个线程等待其他线程完成一组操作
1、AQS的state变量用于表示操作个数
2、AQS的共享锁机制完成唤醒
3、等待锁的线程使用acquireShared方法获取共享锁等待
4、操作线程使用releaseShared方法用于唤醒等待共享锁的线程

count 表示操作个数,当每一个操作执行完成时,原子性的减少一个,当count变量为0时,代表所有的操作都完成了,唤醒等待的线程即可。

构造器

    //count 值  和 核心操作 均是由 sync 内部类来完成得
	public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

countDown方法

    public void countDown() {//调用releaseShared 父类的模板方法
        sync.releaseShared(1);
    }

releaseShared方法

//该方法由AQS来实现,通过子类完成tryReleaseShared方法释放共享锁,如果释放成功,那么直接调用doReleaseShared方法完成等待获取共享锁的线程,获取共享锁。
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared 方法

//sync下
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {//获取当前state值
        int c = getState();
        if (c == 0)
            return false;
        // 计算更新值,CAS原子性的修改即可
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            //若修改成功,需要判断当前线程是不是最后一个完成操作的线程,是的情况返回true,唤醒所有等待共享锁的线程
            return nextc == 0;
    }
}

await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

tryAcquireSharedNanos方法

//AQS实现, 抛出InterruptedException 异常 响应线程中断,
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||//核心操作还是由于子类的 tryAcquireShared方法来实现的
        doAcquireSharedNanos(arg, nanosTimeout);
}

tryAcquireShared方法

//就是看变量是否为0,如果为0,那么无条件返回1,此时将会直接获取到共享锁。
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

Semaphore

作用:限制并发
通过设置初始permit,许可值,每获取一个permit,那么减1,直到为0,此时限制并发。

public class Semaphore implements java.io.Serializable {
	private final Sync sync;
    //3个内部类 
    static final class FairSync extends Sync {}
    static final class NonfairSync extends Sync {}
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    
    
    //构造方法 默认非公平锁  通过permits 变量来限制并发
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}

acquire方法

    //获取permit的acquire(int permits)操作 是由sync类来完成的
	public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

acquireSharedInterruptibly方法

AQS下

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//线程中断
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//调用子类的tryAcquireShared模板方法,让子类实现自己获取信号量的机制。
            doAcquireSharedInterruptibly(arg);
    }    
}

公平锁的实现

    static final class FairSync extends Sync {
        //初始化父类AQS的 state 变量
        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {//如果线程在队列中  然会-1,AQS完成阻塞
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                //此时有可用的信号量,进行CAS操作
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    //失败的情况下 返回 剩余信号量
                    return remaining;
            }
        }
    }

非公平锁的实现

    static final class NonfairSync extends Sync {
        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
	//由 sync 实现
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {//非公平的情况下 直接CAS枪锁就可,看可用资源数 < 0 ?
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }	

release

    //释放permit的release(int permits)操作  由sync类来完成
	public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

releaseShared方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//由子类释放完
        doReleaseShared(); //直接调用doReleaseShared 方法唤醒后面等待的线程
        return true;
    }
    return false;
}

sync实现:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {// 直接通过CAS操作对state变量+1即可
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码

)">
< <上一篇
下一篇>>