[并发进阶]——读写锁 原理

笔记来源于 黑马程序员全面深入学习Java并发编程,从《Java并发编程的艺术》中作为补充

?概念

?读写锁与排它锁不同在于它锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读 线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写 锁,使得并发性相比一般的排他锁有了很大提升。

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select … from … lock in share mode

?读写状态的设计

读写锁同样是通过自定义同步器来实现同步功能,而不同于ReentrantLock中的同步状态仅能表示锁被一个线程重复获取的次数,读写锁状态(一个整型变量)可以表示多个读线程和一个写线程的状态。

那如何才能实现一个整型变量可以维护多个状态?

读写锁将 变量切分成了两个部分,高16位表示读,低16位表示写

image-20220112202308470

读写锁是如何迅速确定读和写各自的状态呢?

?通过位运算。

假设当前同步状态 值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移 16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是 S+0x00010000。

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读 状态(S>>>16)大于0,即读锁已被获取

⚠️注意事项:

  • 读锁不支持条件变量

  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

  • 重入时降级支持:即持有写锁的情况下去获取读锁

?图解流程以及源码分析

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

我们假设有两个线程t1、t2,其中t1是写线程,t2是读线程

1、t1先上写锁,然后t2尝试获取读锁

1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁 使用的是 state 的高 16 位

image-20220112205731698

// 外部类 WriteLock 方法, 方便阅读, 放在此处
public void lock() {
    sync.acquire(1);
}

// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
    if (
        // 尝试获得写锁失败
        !tryAcquire(arg) &&
        // 将当前线程关联到一个 Node 对象上, 模式为独占模式
        // 进入 AQS 队列阻塞
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    ) {
        selfInterrupt();
    }
}

2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写 锁占据,那么 tryAcquireShared 返回 -1 表示失败

tryAcquireShared 返回值表示

  • -1 表示失败
  • 0 表示成功,但后继节点不会继续唤醒
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒

实际上,在读写锁中,只会返回-1、1两种值

image-20220112205739754

public final void acquireShared(int arg) {
 // tryAcquireShared 返回负数, 表示获取读锁失败
 if (tryAcquireShared(arg) < 0) {
 doAcquireShared(arg);
 }
}	

3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

image-20220112210224611

// AQS 继承过来的方法, 方便阅读, 放在此处
private void doAcquireShared(int arg) {
    // 将当前线程关联到一个 Node 对象上, 模式为共享模式
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 再一次尝试获取读锁
                int r = tryAcquireShared(arg);
                // 成功
                if (r >= 0) {
                    // r 表示可用资源数, 在这里总是 1 允许传播
                    //(唤醒 AQS 中下一个 Share 节点)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (
                // 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
                shouldParkAfterFailedAcquire(p, node) &&
                // park 当前线程
                parkAndCheckInterrupt()
            ) {
                interrupted = true;
            }
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁(节点自旋)

5)如果没有成功,在 doAcquireShared 内 循环一次,把前驱节点的 waitStatus 改为 -1,再 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

image-20220112210612334

2、又 有t3加读锁和 t4加写锁

假设t3是读锁、t4是写锁

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

image-20220112210911985

3、t1释放锁

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

image-20220112211020968

// AQS 继承过来的方法
public final boolean release(int arg) {
    // 尝试释放写锁成功
        if (tryRelease(arg)) {
            // unpark AQS 中等待的线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
    return false;
}
// Sync 继承过来的方法
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    // 因为可重入的原因, 写锁计数为 0, 才算释放成功
    boolean free = exclusiveCount(nextc) == 0;
    if (free) {
        setExclusiveOwnerThread(null);
    }
    setState(nextc);
    return free;
}

4、t2恢复运行

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

这回再来一次 for (;? 执行 tryAcquireShared 成功则让读锁计数加一,如下state从0_0变成了1_0

image-20220112211323960

这时 t2 已经恢复运行 ,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

注意此处的t2不是设置为exclusiveOwnerThread,因为读线程是并行的

image-20220112211438439

5、紧接着唤醒下一个读线程

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

因为读线程是并行的,所以要继续唤醒下一个读线程

// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 设置自己为 head
    setHead(node);

    // propagate 表示有共享资源(例如共享读锁或信号量)
    // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
    // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果是最后一个节点或者是等待共享读锁的节点
        if (s == null || s.isShared()) {
            doReleaseShared();
        }
    }
}

image-20220112211610042

这回再来一次 for (;? 执行 tryAcquireShared 成功则让读锁计数加一

image-20220112211828461

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

image-20220112212017005

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

6、读线程释放锁

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

// AQS 继承过来的方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryReleaseShared(int unused) {
    // ... 省略不重要的代码
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc)) {
            // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
            // 计数为 0 才是真正释放
            return nextc == 0;
        }
    }
}

image-20220112212129855

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

注意这里要先将waitStatus 先改为 0, 防止 多线程状态下 unparkSuccessor 被多次执行

// AQS 继承过来的方法
private void doReleaseShared() {
    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
    // 如果 head.waitStatus == 0 ==> Node.PROPAGATE 
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
            // 防止 unparkSuccessor 被多次执行
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            // 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
} 

image-20220112212146161

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;? 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束

image-20220112212200469

?锁降级

锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读 锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到 读锁,随后释放(先前拥有的)写锁的过程。

?锁降级中读锁的获取是否必要呢?

答案是必要的。主要是为了保证数据的可见性,如果 当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修 改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级 的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进 行数据更新。

⚠️RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的 也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了 数据,则其更新对其他获取到读锁的线程是不可见的。
锁降级是指把持住(当前拥有的)写锁,再获取到 读锁,随后释放(先前拥有的)写锁的过程。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>