RxJava原理解析

一、RXjava介绍

首先看一下Rxjava这个名字,其中java代表java语言,而Rx是什么意思呢?Rx是Reactive Extensions的简写,翻译过来就是,响应式拓展。所以Rxjava的名字的含义就是,对java语言的拓展,让其可以实现对数据的响应式编程。

那么响应的是什么呢?响应的是上游数据的变化。常规用法是,对数据源进行监听,然后做出响应。

RxJava的整体结构是一条链,其中有这三个角色。

  1. 链的上游:生产者 Observable
  2. 链的下游:观察者 Observer
  3. 链的中间:各个中介节点,既是下游的Observable,又是上游的Observer

二、Rxjava基本使用

Single.just("hfhuaizhi").subscribe(object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable) {
        Log.e(TAG, "onSubscribe")
    }

    override fun onSuccess(t: String) {
        Log.e(TAG, "onSuccess:$t")
    }

    override fun onError(e: Throwable) {
        Log.e(TAG, "onError:$e")
    }

})

上面这段代码是对Rxjava简单的使用,其中

  • Single 发出单个数据的被观察者Observable,只发送一次,只有Success和Error两种状态,没有next,在Rxjava2中新增
  • just 被观察者生产的数据,参数类型是一个泛型,这里传进去的是一个String
  • subscribe 观察者Observer,这里声明的是SingleObserver,用来对Single中产生的数据进行响应
  • SingleObserver
    • onSubscribe 订阅成功后就会回调,一般会在此方法中进行一些初始化操作。其参数类型是Disposable,可以通过调用d.dispose() 取消对Observable的监听,并让其停止发送消息。
    • onSuccess 接收数据成功后就会回调,只会回调一次,其参数类型和Observable中just方法传入的数据类型一致,这里是String类型
    • onError 发生错误时回调,参数是Throwable,包含错误信息。

运行效果

2021-12-18 13:54:12.450 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 13:54:12.451 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:hfhuaizhi

可以看到首先onSubscribe被调用,表明注册了观察者。然后接收数据成功,打印出’hfhuaizhi’。 到这里我们就了解了Rxjava最基本的用法,接下来分析一下函数的内部做了什么。

三、Rxjava原理解析

1. just方法分析

public static <@NonNull T> Single<T> just(T item) {
    Objects.requireNonNull(item, "item is null");
    return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}

  1. 对方法参数进行判空

  2. 调用

    RxJavaPlugins.onAssembly
    

    方法,其参数是一个SingleJust,构造方法传入了item

    • 其中onAssembly方法内部对传入的参数进行一些处理,然后返回原参数类型,所以接下来分析的过程中会忽略此方法,可以简单认为just方法直接返回了一个SingleJust实例。
// onSingleAssembly 参数默认是空的,所以这个方法原样返回了source,当设置onSingleAssembly后,
// 会先对source进行处理后再返回
public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
    Function<? super Single, ? extends Single> f = onSingleAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

final T value;
public SingleJust(T value) {
    this.value = value;
}

SingleJust将构造方法传入的item保存在value字段中。 由上述分析可知,Single.just方法会返回一个SingleJust实例,所以在我们链式调用中的subscribe方法,实际上调用的是SingleJust的subscribe方法

public final void subscribe(@NonNull SingleObserver<? super T> observer) {
    // 1. 判空
    Objects.requireNonNull(observer, "observer is null");
    // 2. 对参数中的observer进行处理后又返回observer
    observer = RxJavaPlugins.onSubscribe(this, observer);
    // 3. 对Observer进行判空
    Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

    try {
        // 4. 调用真实注册方法
        subscribeActual(observer);
    } catch (NullPointerException ex) {
        throw ex;
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        NullPointerException npe = new NullPointerException("subscribeActual failed");
        npe.initCause(ex);
        throw npe;
    }
}

subscrib方法中主要做了注释中所写的四步操作,其中重要的是第4步subscribeActual,这里才是真正做事的,之前都是数据的校验,因为我们这个类的实例是SingleJust,所以接下来看一下SingleJust的subscribeActual方法做了什么。

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposable.disposed());
    observer.onSuccess(value);
}

可以看到内容十分简单

  1. 调用observer的onSubscribe方法,表明订阅成功,参数是Disposable.disposed()返回值
  2. 调用observer的onSuccess方法,表明数据回调成功,参数是value,而value就是通过Single的just函数传进来的,通过构造方法传入SingleJust实例中,因此,这一步的操作就是简单地将构造方法中传入的值,通过observer的onSuccess方法回调给我们定义的观察者SingleObserver。

这样就完事了,因为之前说过Single.just是最简单的RxJava使用方式,先调用onSubscribe表明注册监听,然后又紧接着通过onSuccess回调数据,所以不会有失败的情况。

2. map方法分析

map是Rxjava中比较常用的用法,用来实现数据类型的转换 比如像这样,我们发送的数据类型是Integer,接收的数据类型是String,这样当然是无法直接接收的,所以需要进行一下转换,将上游数据发送的Integer转换为String,然后由下游接收。

private fun testMap(view: View) {
    Single.just(123).map(object : Function<Int, String> {
        override fun apply(t: Int): String {
            return "$t"
        }
    }).subscribe(object : SingleObserver<String> {
        override fun onSubscribe(d: Disposable) {
            Log.e(TAG, "onSubscribe")
        }

        override fun onSuccess(t: String) {
            Log.e(TAG, "onSuccess:$t")
        }

        override fun onError(e: Throwable) {
            Log.e(TAG, "onError:$e")
        }

    })
}

打印结果

021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:123

just方法传入的123是Integer类型,onSuccess处接收的数据是String类型,通过map进行转换。其中map方法传入的参数是一个Function<T,E>,此类有两个泛型参数,T代表输入数据类型,E表示输出数据类型,这里的输入数据类型是Integer,返回类型是String,apply方法中返回了String类型的输出数据。

map(object : Function<Int, String> {
        override fun apply(t: Int): String {
            return "$t"
        }
    })

public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}

进入map方法内部,此方法判空后,返回了SingleMap实例,其构造方法传入了当前SingleJust实例和mapper转换参数,并将其分别保存在source和mapper成员变量中。

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;
    final Function<? super T, ? extends R> mapper;
    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }
    ...
}

好,map方法暂且看到这儿,我们接下来继续分析链式调用中的subscribe方法。
subccribe传入了一个SingleObserver,和之前分析的类似,但是区别在于调用的不再是SingleJust的subscribe方法,而是map方法返回的SingleMap的subscribe方法,由之前的分析可知,此方法调用会在数据的判空后调用到SingleMap的subscribeActual方法。 由之前的分析可知,链式调用到subscribe方法会调用到SingleMap的subscribeActual方法

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;
    final Function<? super T, ? extends R> mapper;
    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }
    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }
    ...
}

由之前的分析可知,source就是map的上游SingleJust, 所以在single的实际subscribe方法中会调用其上游的subscribe方法,并传入了一个封装好的新的MapSingleObserver,MapSingleObserver的构造方法中第一个参数t,是下游观察者,在我们这块代码中就是链式调用的时候传入的SingleObserver。第二个参数是我们在map方法中传入的数据类型转换转换器mapper。 由之前的分析可知,当source,也就是SingleJust的subscribe方法调用后,会依次调用其参数传入的Observer的onSubscribe方法和onSuccess方法,此时参数传入的Observer就是上面代码块里的MapSingleObserver

static final class MapSingleObserver<T, R> implements SingleObserver<T> {
    final SingleObserver<? super R> t;
    final Function<? super T, ? extends R> mapper;

    MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
        this.t = t;
        this.mapper = mapper;
    }

    @Override
    public void onSubscribe(Disposable d) {
        t.onSubscribe(d);
    }
    @Override
    public void onSuccess(T value) {
        R v;
        try {
            v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            onError(e);
            return;
        }
        t.onSuccess(v);
    }
    @Override
    public void onError(Throwable e) {
        t.onError(e);
    }
}

onSubscribe方法原封不动的调用了t.onSubscribe(d);而t就是在MapSingleObserver构造方法传入的下游观察者,也就是SingleObserver实例。这里直接调用了其onSubscribe方法表示注册监听成功。 onSuccess方法中调用了mapper.apply(value),这个mapper就是我们在map方法中传入的转换函数,这里输入了Integer数据类型,得到了String类型输出,最后调用t.onSuccess回调转换后的数据,也就是调用我们subscribe方法传入的实例的onSuccess。

map方法总结

map主要做的就是一个承上启下,链式调用中subscribe方法调用后,会依次向上调用中间节点的subscribe方法,直到调用到最初始的没有上游的Observable,最上层的Observable会在其subscribeActual方法中调用其下游观察者的onSubscribe和onSuccess/onError,将数据一层一层传下去,数据传递的过程中,中间节点可能会对数据进行处理后再接着向下传,最终传递到最底层的Observer,整个流程如图所示

图片含义解释

最上游的Single就是我们调用Single.just产生的SingleJust,其subscribe方法中会调用onSubscribe()和onSuccess(),向下方观察者传递Integer类型的结果,中间观察者SingleObserver由map方法创建,其接收到上游传递下来的数据后,将其转换为String,然后传递给下方观察者,最后下游收到的数据结果就是String类型。

3. 线程切换

线程切换可以说是RxJava中最常用的操作了,甚至很多人选择RxJava,就是因为RxJava可以和方便地实现线程切换。 线程切换主要用到这两个函数:

  • subscribeOn
  • observerOn
private fun testSubscribe(view: View) {
    Single.just("hfhuaizhi").subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<String> {
            override fun onSubscribe(d: Disposable) {
                Log.e(TAG, "onSubscribe")
            }

            override fun onSuccess(t: String) {
                Log.e(TAG, "onSuccess:$t")
            }

            override fun onError(e: Throwable) {
                Log.e(TAG, "onError:$e")
            }

        })
}

这样写,可以实现subscribe调用之前的消息发送在io线程,observerOn调用之后的Observer回调在android主线程,其中AndroidSchedulers类不在Rxjava标准库中,需要额外引入RxAndroid依赖。

subscribeOn

public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}

subscribeOn方法返回一个SingleSubscribeOn实例,其构造方法中传入了this(上游被观察者)和scheduler(线程调度器,我们传入的是Schedulers.io())。 由之前的分析可知,链式调用中最终subscribe方法调用的时候,会由下向上依次调用各个节点的subscribe方法,这里我们看一下SingleSubscribeOn这一线程切换的节点的subscribe方法做了什么,因为SingleSubscribeOn和SingleJust一样继承自Single,其subscribe方法也是调用到了subscribeActual方法

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;
    final Scheduler scheduler;
    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        // 上层被观察对象
        this.source = source;
        // 线程类型
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        observer.onSubscribe(parent);
        Disposable f = scheduler.scheduleDirect(parent);
        parent.task.replace(f);
    }
}

  1. 将observer(下游观察者)和source(上游被观察者)封装进一个新的观察者SubscribeOnObserver

  2. 调用下游观察者的onSubscribe方法

  3. 调用scheduler的scheduleDirect方法,参数传入刚封装的新的观察者SubscribeOnObserver实例

  4. 将parent的task变量替换为由传入的scheduler生成的Disposable

    final SequentialDisposable task;
    
    • 这个task的参数类型是Disposable,之前有提到过,在Observer的onSubscribe方法中会传入一个Disposable,调用Disposable的dispose()方法后,会取消注册并让上游停止发送任务,这个Disposable继承自AtomicReference 实现了Disposable接口,AtomicReference是java里的原子引用类型,可以线程安全地对对象引用进行修改,类似地还有AtomicInteger等,所以这里的parent.task.replace(f)就是将parent中的task这个disposable线程安全地替换为scheduler创建地这个新的Disposable,从而可以实现任务的取消。
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
}

接下来分析一下第3步主要做了什么

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

scheduleDirect方法中传入了一个Runnable类型参数,因为SubscribeOnObserver类实现了Runnable接口,所以可以被当作Runnable传进去。

因为我们传入的scheduler参数是由Schedulers.io()方法创建的,而此方法默认会返回一个IoScheduler

这个Scheduler的注释写着,会创建并缓存一个线程池。所以我们知道了scheduleDirect方法会将传入的Runnable放入一个线程池里执行,从而实现任务的异步执行,所以接下来我们去看一下SubscribeOnObserver的run方法里做了什么。

static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
    private static final long serialVersionUID = 7000911171163930287L;
    final SingleObserver<? super T> downstream;
    final SequentialDisposable task;
    final SingleSource<? extends T> source;
    SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
        this.downstream = actual;
        this.source = source;
        this.task = new SequentialDisposable();
    }
    @Override
    public void run() {
        source.subscribe(this);
    }
    @Override
    public void onSubscribe(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }

    @Override
    public void onSuccess(T value) {
        downstream.onSuccess(value);
    }
    ...
}

SubscribeOnObserver的run方法中会调用source.subscribe,并传入自己(自己也是一个Observer),由之前分析我们知道source就是我们监听的上游,这里调用了SingleJust的subscribe,由之前的分析我们知道subscribe会调用到subscribeActual,这里做任务的真正执行,因此就这样实现了让上游任务在异步线程中的执行,上游任务执行过后,会将数据向下传递,传递到当前SubscribeOnObserver节点的时候会调用其onSuccess方法,其调用downstream,也就是下游观察者的onSuccess方法,将数据继续向下传递,此时数据传递的线程也是run方法执行的线程,因为此时并没有再次对线程进行切换。

observerOn

public final Single<T> observeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}

observeOn函数返回了一个SingleObserveOn,也是需要传入this(上游被观察者),和scheduler(线程调度器类型,此时我们传入的是AndroidSchedulers.mainThread()),由之前分析可知我们此时应该去看SingleObserveOn的subscribeActual方法调用

protected void subscribeActual(final SingleObserver<? super T> observer) {
    source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}

此方法中调用了其上游的subscribe方法,和之前分析的数据流转过程一致,需要依次调用到最根节点的subscribe,参数传入的是封装后的观察者ObserveOnSingleObserver,其构造方法中传入了下游观察者和线程调度类型,接下来我们看一下当ObserveOnSingleObserver收到上游传下来的数据后进行了怎样的操作。

static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
    private static final long serialVersionUID = 3528003840217436037L;
    final SingleObserver<? super T> downstream;
    final Scheduler scheduler;
    T value;
    Throwable error;
    ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
        this.downstream = actual;
        this.scheduler = scheduler;
    }
    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            downstream.onSubscribe(this);
        }
    }
    @Override
    public void onSuccess(T value) {
        this.value = value;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }
    @Override
    public void run() {
        Throwable ex = error;
        if (ex != null) {
            downstream.onError(ex);
        } else {
            downstream.onSuccess(value);
        }
    }
    ...
}

可以看到在onSuccess方法中调用了scheduler.scheduleDirect(this),并穿了个this,而且自身实现了runnable接口,由之前分析可知,run方法会在某一时刻被调用。传入的scheduler是AndroidSchedulers.mainThread()其返回的是HandlerScheduler,其内部封装了个Handler,将Runnable 弄到主线程去执行。最终结果就是ObserveOnSingleObserver的run方法在主线程中被调用, 其run方法调用了下游观察者downstream的onSuccess/onError。 由此分析可知,observerOn方法控制此节点后的被观察者收到数据时所在的线程,无法影响其上游节点。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码

)">
< <上一篇
下一篇>>