[JDK源码]J.U.C-AbstractQueuedSynchronizer
AQS(一)简单介绍
原理:
AQS用于实现线程间的同步操作。它通过一个FIFO队列作为线程等待队列,一个volative 的state变量来作为同步状态,并且提供了大量的模板方法,子类可以通过实现这些方法来完成线程同步。
AQS核心变量
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
static final class Node {//Node节点
//等待时的模式 为 共享
static final Node SHARED = new Node();
//互斥
static final Node EXCLUSIVE = null;
//等待线程已被取消
static final int CANCELLED = 1;
//等待线程需要唤醒后继等待线程
static final int SIGNAL = -1;
//等待线程后 等待在条件变量队列中
static final int CONDITION = -2;
//需要无条件唤醒后面的等待线程
static final int PROPAGATE = -3;
volatile int waitStatus;//等待状态 值 是上面的几个 用的ws就是它
volatile Node prev;//前驱节点
volatile Node next;//后继节点
volatile Thread thread;//当前等待线程
Node nextWaiter;//连接在条件队列中等待的下一个线程节点
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取当前节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
//mode 模式
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 等待状态
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
//等待队列的头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
//同步状态
private volatile int state;
}
AQS的核心变量,同步队列就是通过Node节点将线程包装后进行排队阻塞,通过volatile 的state变量表示同步状态。
AQS核心方法
最好先看一下:互斥锁和共享锁
//互斥锁的操作
public final void acquire(int arg) {//互斥锁 不可中断退出
if (!tryAcquire(arg) &&//如果子类获取失败 调用 acquireQueued方法排队 阻塞当前队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//设置中断标志位
}
public final void acquireInterruptibly(int arg)//互斥锁 可中断
throws InterruptedException {
if (Thread.interrupted())//如果线程中断 直接抛出异常
throw new InterruptedException();
if (!tryAcquire(arg))//子类方法 获取失败 调用 doAcquireInterruptibly 方法 排队 阻塞当前队列
doAcquireInterruptibly(arg);
}
public final boolean release(int arg) {//释放互斥锁
if (tryRelease(arg)) {//如果子类释放成功,那么就根据条件唤醒后面的等待线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//共享锁的操作
public final void acquireShared(int arg) {//获取共享锁 不可中断
if (tryAcquireShared(arg) < 0)//如果子类获取失败 调用 doAcquireShared 方法 阻塞线程
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)//互斥锁可中断
throws InterruptedException {
if (Thread.interrupted())//线程中断..
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//子类获取失败 doAcquireSharedInterruptibly 阻塞线程
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg) {//释放共享锁
if (tryReleaseShared(arg)) {//如果子类释放共享锁成功,那么唤醒等待节点
doReleaseShared();
return true;
}
return false;
}
//队列第一个等待节点是不是 互斥模式 ,只在读写锁中使用
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&//头节点不为空
(s = h.next) != null &&//头节点的next 不为空
!s.isShared() &&//状态 不是共享默认
s.thread != null; //线程不是空
}
//释放有线程等待获取锁的时间 大于当前线程
public final boolean hasQueuedPredecessors() {
Node t = tail; // 尾 头 节点
Node h = head;
Node s;
return h != t &&//队列中有没有 等待节点, 头节点后有等待节点 s,s的线程 不是当前线程
((s = h.next) == null || s.thread != Thread.currentThread());
}
//为子类提供的模板方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
这里 可以先看下子类的实现。ReentrantLock 、ReentrantReadWriteLock、
acquire 互斥锁
public final void acquire(int arg) {//互斥锁 不可中断退出
if (!tryAcquire(arg) &&//如果子类获取失败 调用 acquireQueued方法排队 阻塞当前队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//设置中断标志位
}
addWaiter方法
addWaiter(Node.EXCLUSIVE)方法用于排队站位
private Node addWaiter(Node mode) {
//创建等待的节点 当前线程 + mode(互斥 共享)
Node node = new Node(Thread.currentThread(), mode);
// 就是 把它加入到队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//CAS
pred.next = node;
return node;
}
}//失败 enq方法
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {//死循环插入
Node t = tail;
if (t == null) { //初始化尾指针
if (compareAndSetHead(new Node()))
tail = head;
} else {//CAS插入
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
//node节点位 阻塞队列中的线程节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//中断标志位,表示线程在等待过程中是否产生了 中断
for (;;) {
final Node p = node.predecessor();//获取当前节点的前驱节点
if (p == head && tryAcquire(arg)) {
//如果当前的节点为头节点,尝试子类 获取锁,成功设置当前节点为头节点
setHead(node);
p.next = null; //释放前一个节点
failed = false;
return interrupted;
}
//如果前驱节点不是头节点 判断是否允许应该阻塞 shouldParkAfterFailedAcquire判断 parkAndCheckInterrupt阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)//如果获取失败,则调用 cancelAcquire 取消获取锁
cancelAcquire(node);
}
}
acquireShared 共享锁
//共享锁的操作
public final void acquireShared(int arg) {//获取共享锁 不可中断
if (tryAcquireShared(arg) < 0)//如果子类获取失败 调用 doAcquireShared 方法 阻塞线程
doAcquireShared(arg);
}
doAcquireShared方法
private void doAcquireShared(int arg) {
// 同acquire 一样,这里状态是SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//同样死循环插入
final Node p = node.predecessor();
//判断当前节点的前节点是不是头节点
if (p == head) {//是头节点的情况下 需要tryAcquireShared 来获取锁 获取锁成功 setHeadAndPropagate
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//和 acquire 一样
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法
//用于更新头节点,并唤醒后继
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录当前头节点 并将当前节点设置为头节点
setHead(node);
//如果propagate 信号量 大于0,直接唤醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) { SIGNAL 表明必须唤醒后继节点 那么直接唤醒
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared方法
//释放共享节点
private void doReleaseShared() {
for (;;) {
Node h = head;//保存头节点
if (h != null && h != tail) {//说明还有等待的节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//如果头节点的状态SIGNAL 通过CAS -> 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; //重新检查
unparkSuccessor(h);//如果修改成功则唤醒后继节点
}
//如果等待状态为0 ,通过CAS 0替换从PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//PROPAGATE = -3
continue; //失败检查
}
if (h == head) // 头节点没有发生改变,说明没有需要唤醒的节点
break;
}
}
中断 Interruptibly
acquireInterruptibly 下 doAcquireInterruptibly ,acquire 和acquireInterruptibly 一样,只是acquireInterruptibly 响应中断,再获取锁流程中如果发现线程中断,则抛出异常。
//互斥中断
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
调用doAcquireSharedInterruptibly方法,和acquireShared 也是如此
//共享中断
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
方法
shouldParkAfterFailedAcquire
//判断线程是否应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//前一个节点的状态
if (ws == Node.SIGNAL)//如果 SIGNAL 表示会唤醒它的后继 当前线程可以阻塞
return true;
if (ws > 0) {//表示被取消了,此时跳过前一个节点,找到一个 等待状态不大于0 的 CANCELLED=1
do {//链表的简单操作
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 正常节点,那么CAS 将其变为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
//阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire
//取消线程获取锁的操作
private void cancelAcquire(Node node) {
if (node == null)//当前节点不存在
return;
node.thread = null;//释放绑定的 thread 对象
//当前节点 往前
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//前一个节点的下一个节点
Node predNext = pred.next;
//状态设置为 CANCELLED
node.waitStatus = Node.CANCELLED;
//如果当前节点是尾节点 ,CAS操作将尾指针修改为前一个节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
//如果前一个结点的状态 为SIGNAL 需要唤醒节点, 尝试车市 前一个的next指针为需要唤醒的节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {//链表的操作
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果当前节点为头节点则唤醒后面的节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
unparkSuccessor
private void unparkSuccessor(Node node) {
//唤醒后继节点,当前头节点 ws状态 为SIGNAL CAS将其变为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当前节点的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {//当前节点为空吗,当前节点的状态
s = null;
//因为tail 引用一定是最新的,所有从后往前找 找到 第一个有效节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到了s节点,此时s就是要唤醒的节点
if (s != null)
LockSupport.unpark(s.thread);
}