【JDK源码】线程系列之FutureTask

简介

在Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建多线程,但是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,因此Java 1.5之后提供了Callable和Future接口,通过它们就可以在任务执行完毕之后得到任务的执行结果。

继承体系

在这里插入图片描述

源码分析

Runnable接口

public interface Runnable {
    public abstract void run();
}

可以看到Runnable接口既没有返回值也没有抛出异常

Callable接口

//Runnable是没有返回结果的任务,而Callable则是有返回结果的任务
public interface Callable<V> {
    /**
     * 有返回结果,并且可能抛出异常
     */
    V call() throws Exception;
}

可以看到Callable是个泛型接口,泛型V就是要call()方法返回的类型。Callable接口和Runnable接口很像,都可以被另外一个线程执行,但是,Runnable不会返回数据也不能抛出异常。

Future接口

/**
 * 表示异步执行的结果,有三个功能:
 * 1.获取异步执行任务的结果
 * 2.查看异步任务的执行状态(取消或终止)
 * 3.取消异步任务
 */
public interface Future<V> {
 
    /**
     * 尝试取消任务,如果任务已经完成或已经取消,则取消失败。
     * 1.如果任务没未被启动,则该任务不会被运行;
     * 2.如果任务已经被启动,参数mayInterruptIfRunning决定是否执行当前任务的线程是否应该被中断,这只是作为一种终止任务的尝试
     * 执行这个方法之后,以后的isDone方法调用都会返回true。
     * 如果这个方法返回true,以后的isCancelled方法调用都会返回true。
     */
    boolean cancel(boolean mayInterruptIfRunning);
 
    /**
     * cancel()调用返回true之后,这个方法会返回true。
     */
    boolean isCancelled();
 
    /**
     * 如果当前任务执行成功,或者被取消,或者抛出异常,则返回true
     */
    boolean isDone();
 
    /**
     * 阻塞直到任务完成,并返回任务执行结果。
     * 当异步任务被取消,或抛出异常,get()方法会抛出相应的异常
     */
    V get() throws InterruptedException, ExecutionException;
 
    /**
     * 阻塞一定时间等待任务完成,并返回任务执行结果,超过时间未返回结果会抛出异常
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • Future只是一个接口,不能直接用来创建对象,FutureTask是Future的实现类

成员属性

/**
 * 可能的状态转换:
 * NEW -> COMPLETING -> NORMAL   正常完成
 * NEW -> COMPLETING -> EXCEPTIONAL  异常完成
 * NEW -> CANCELLED  取消
 * NEW -> INTERRUPTING -> INTERRUPTED 打断
 */
// 表示当前task的状态
private volatile int state;
// 表示当前任务尚未执行
private static final int NEW          = 0;
// 表示当前任务正在结束,尚未完全结束,一种临界状态
private static final int COMPLETING   = 1;
// 表示当前任务正常结束
private static final int NORMAL       = 2;
// 表示当前任务执行过程中发生了异常。 内部封装的 callable.run() 向上抛出异常了
private static final int EXCEPTIONAL  = 3;
// 表示当前任务被取消
private static final int CANCELLED    = 4;
// 表示当前任务中断中..
private static final int INTERRUPTING = 5;
// 表示当前任务已中断
private static final int INTERRUPTED  = 6;

/** 如果构造FutureTask传入Runnable,则会使用装饰者设计模式伪装成 Callable了 */
private Callable<V> callable;
/** 从get()返回的结果或抛出的异常 */
private Object outcome; // non-volatile, protected by state reads/writes
/**  表示当前任务被线程执行期间,保存当前执行任务的线程对象引用*/
private volatile Thread runner;
/** 因为会有很多线程去get当前任务的结果,所以会有阻塞,这里使用了一种数据结构 stack 头插、头取的一个队列。 */
private volatile WaitNode waiters;

构造方法

public FutureTask(Callable<V> callable) {
    // 非空校验
    if (callable == null)
        throw new NullPointerException();
    // callable就是我们自己的任务
    this.callable = callable;
    // 设置当前任务状态为NEW: 表示当前任务尚未执行
    this.state = NEW;       // ensure visibility of callable 确保可调用文件的可见性
}

public FutureTask(Runnable runnable, V result) {
    // 使用装饰者模式将runnable转换为了callable接口,外部线程通过get获取
    // 当前任务执行结束时,结果可能为 null 也可能为传进来的值
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
// callable
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
// RunnableAdapter
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
  • 当参数为Callable时,就直接赋值给callable
  • 当参数为Runnable时,就还要传入一个返回结果,并且用装饰者模式把Runnable装饰成Callable

成员方法

RunnableFuture 接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
  • RunnableFuture 接口继承了Runnable和Future接口。而FutureTask重写run方法

run()

  • 运行当前任务,其中涉及setException、set、handlePossibleCancellationInterrupt
public void run() {
    /**
     * 1、状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
     * 2、状态如果是NEW,则CAS尝试把当前执行线程保存在runner字段中,如果赋值失败(当前任务被其它线程抢占了)则直接返回,
     *    保证callable任务只被运行一次
     */
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    // 如果执行到这里,说明当前task一定是 NEW 状态,而且当前线程也抢占TASK成功!
    try {
        // callable 就是我们自己封装逻辑的callable任务 或者装饰后的runnable
        Callable<V> c = callable;
        // 再次检验、防止空指针异常、防止外部线程 cancel掉当前任务。
        if (c != null && state == NEW) {
            // 结果的引用
            V result;
            // true 表示callable.run 代码块执行成功 未抛出异常
            // false 表示callable.run 代码块执行失败 抛出异常
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 设置异常信息
                setException(ex);
            }
            if (ran)
                // 正常运行结束
                // 设置正常结束的结果
                set(result);
        }
    } finally {
        // 将当前执行任务的线程置为null
        runner = null;
        // 当前任务的状态
        int s = state;
        if (s >= INTERRUPTING)
            // 说明当前任务处于中断中或者已中断状态
            // 让出cpu,不断的判断是否是中断中...
            handlePossibleCancellationInterrupt(s);
    }
}
  1. 判断当前任务的state是否等于NEW(任务未执行),如果不为NEW则说明任务或者已经执行过,或者已经被取消,直接返回。
  2. 如果状态为NEW则接着会通过unsafe类把任务执行线程引用采用CAS保存在runner字段中,如果保存失败,则直接返回。
  3. 执行任务,设置任务返回结果
  4. 如果任务执行发生异常,则调用setException()方法保存异常信息
  • set 以CAS的方式设置结果v给outcome
protected void set(V v) {
    //  使用CAS方式设置当前任务状态为完成中(一种临界状态)
    // 也有可能会失败,就是其他线程在本线程CAS的时候,就把task取消了
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        outcome = v;
        // 将结果v赋值给outcome之后,马上会将当前任务状态修改为NORMAL(正常结束状态)
        STATE.setRelease(this, NORMAL); // final state 最终的状态
        // 唤醒之前挂起的线程
        finishCompletion();
    }
}
  • finishCompletion 移除并唤醒所有等待线程,执行done,置空callable
private void finishCompletion() {
    // 遍历阻塞队列 q指向waiters链表的头结点
    for (WaitNode q; (q = waiters) != null;) {
        // 使用cas设置 waiters为null 
        // 为了防止外部线程使用cancel取消当前任务,也会触发finishCompletion方法。(小概率事件)
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            // 自旋
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // 唤醒当前节点对应的线程(在awaitDone方法最后一个else判断中park,在此处唤醒)
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                // next == null 说明是最后一个节点,则直接break即可
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 模板方法,可以被覆盖
    done();
    // 将callable 设置为null helpGC
    callable = null;        // to reduce footprint
}
  • setException以CAS的方式设置异常信息t给outcome
protected void setException(Throwable t) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        // 引用的是 callable 向上层抛出来的异常。
        outcome = t;
        // 将当前任务的状态 修改为 EXCEPTIONAL(发生了异常)
        STATE.setRelease(this, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
  • handlePossibleCancellationInterrupt,状态为中断中…则会让出cpu。在cancel里面会有设置为中断中状态
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            // 如果是中断中,则让出cpu
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

get()

  • 获取当前任务执行结束后得到的结果,其中涉及awaitDonereport
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // COMPLETING(尚未完全结束,一种临界状态)
    if (s <= COMPLETING)
        // 说明当前任务还没有结束,当前线程就会被阻塞
        // awaitDone执行完后会返回task当前状态,如果该方法执行期间,task被中断了,则会直接抛出中断异常:
        // awaitDone是futureTask实现阻塞的关键方法: 等待任务执行完毕,如果任务取消或者超时则停止!
        s = awaitDone(false, 0L);
    return report(s);
}
  1. 如果此时任务已经执行完毕则会直接返回任务结果,如果任务还没执行完毕,则调用方会阻塞直到任务执行结束返回结果为止
  • awaitDone 是futureTask实现阻塞的关键方法
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // deadline=0 不会超时
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 引用当前线程封装成 WaitNode对象(头插、头取的一个队列。)
    WaitNode q = null;
    // 表示当前线程 waitNode对象是否入队/压栈
    boolean queued = false;
    // 自旋
    for (;;) {
        // 判断阻塞线程是否被中断,如果被中断则在等待队列中删除该节点并抛出InterruptedException异常
        if (Thread.interrupted()) {
            // 当前线程节点出队
            removeWaiter(q);
            // 上抛,使get方法抛出中断异常。
            throw new InterruptedException();
        }

        // 假设当前线程是被其它线程使用unpark(thread) 唤醒的话,会正常自旋,走下面逻辑:

        // 获取当前任务最新状态
        int s = state;
        // 条件成立:说明当前任务已经有结果了.. (可能是正常完成、异常、中断、取消等等)
        if (s > COMPLETING) {
            // 条件成立:说明已经为当前线程创建过WaitNode了,此时需要将 node.thread = null helpGC
            if (q != null)
                q.thread = null;
            // 直接返回当前状态.
            return s;
        }
        // 条件成立:说明当前任务接近完成状态(表示任务已经结束但是任务执行线程还没来得及给outcome赋值)
        // 这里让当前线程释放cpu让其他线程优先执行 ,进行下一次抢占cpu:
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 条件成立:第一次自旋,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象,也就是创建一个结点
        else if (q == null)
            q = new WaitNode();
        // 条件成立:第二次自旋,当前线程已经创建 WaitNode对象了,但是node对象还未入队
        else if (!queued){
            // 当前线程node节点 next 指向原队列的头节点 waiters 一直指向队列的头!
            q.next = waiters;
            // cas方式设置waiters引用指向当前线程node, 成功的话 queued == true 否则,可能其它线程先你一步入队了。
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
        }
        // 第三次自旋,会到这里:表示是否设置了超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // 已经超时的话,移除等待节点
                removeWaiter(q);
                return state;
            }
            // 未超时,将当前线程挂起指定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 走到这里,当前get操作的线程就会被park了。线程状态会变为 WAITING状态,相当于休眠了..
            // 除非有其它线程将你唤醒 或者 将当前线程中断。
            // 如果当前线程被其他线程唤醒,醒来时,还是从这里向下继续执行(继续进入自旋for进行条件判断)
            // (在上面的finishCompletion中会唤醒这个挂起的线程!)
            LockSupport.park(this);
    }
}
  1. 判断调用get()的线程是否被其他线程中断,如果是的话则在等待队列中删除对应节点然后抛出InterruptedException异常。
  2. 获取任务当前状态,如果当前任务状态大于COMPLETING则表示任务执行完成,则把thread字段置null(协助GC)并返回结果。
  3. 如果任务处于COMPLETING状态,则表示任务已经处理完成(正常执行完成或者执行出现异常),但是执行结果或者异常原因还没有保存到outcome字段中。这个时候调用线程让出执行权让其他线程优先执行。
  4. 如果等待节点为空,则构造一个等待节点WaitNode。
  5. 如果第四步中新建的节点还没如队列,则CAS的把该节点加入waiters队列的首节点。
  6. 阻塞等待。
  • report去获取最终task执行结束得到的结果
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    // 正常情况下,outcome 保存的是callable运行结束的结果
    // 非正常情况下,保存的是 callable 抛出的异常。
    Object x = outcome;
    // 条件成立(正常情况):当前任务状态正常结束
    if (s == NORMAL)
        // 直接返回callable运算结果
        return (V)x;

    // 条件成立(非正常情况):当前任务是被取消或中断状态
    if (s >= CANCELLED)
        // 抛异常!
        throw new CancellationException();

    // 执行到这,说明callable接口实现中,是有bug的...
    throw new ExecutionException((Throwable)x);
}

cancel()

  • 将当前线程的任务取消(中断)掉
public boolean cancel(boolean mayInterruptIfRunning) {
    // tate == NEW 成立,表示当前任务处于运行中或者处于线程池任务队列中.
    // mayInterruptIfRunning为true则修改为中断中..,为false则修改为任务被取消
    // 条件成立:说明CAS修改状态成功,可以去执行下面逻辑了,否则返回false,表示cancel失败。
    if (!(state == NEW && STATE.compareAndSet
          (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                // 执行当前FutureTask 的线程,有可能现在是null,是null 的情况是: 当前任务在 队列中,还没有线程获取到它呢
                Thread t = runner;
                if (t != null)
                    // 给runner线程一个中断信号
                    t.interrupt();
            } finally { // final state
                // 设置任务状态为 中断完成。
                STATE.setRelease(this, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒所有get()阻塞的线程。
        finishCompletion();
    }
    return true;
}

  1. 根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
  2. 如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
  3. 唤醒所有在get()方法等待的线程

总结

  1. 通过FutureTask不仅能够获取任务执行的结果,还有感知到任务执行的异常,甚至还可以取消任务
  2. FutureTask其实就是典型的异常调用的实现方式
  3. 比如RPC框架常用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的

参考文章
线程池源码分析_01 FutureTask源码分析
FutureTask源码解析(JDK1.8)
FutureTask源码解读

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码

)">
< <上一篇

)">
下一篇>>