RxJava使用与原理分析
一、事件分发流程:
常规创建Observable观察者:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("");
emitter.onComplete();
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
System.out.println("我们这里需要重写的方法");
return Observable.just("");
}
}).observeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("方法一:onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("方法二:onNext");
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("方法三:onError");
}
@Override
public void onComplete() {
System.out.println("方法四:onComplete");
}
});
}
从Observable.create()点进去,通过一个静态方法,追踪到
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
这里构造方法中保存的source就是外部传进来的ObservableOnSubscribe,存储下来后,当外部调用subscribe时,会执行到Observable的subscribeActual方法,而ObservableCreate实现了该方法,
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里做了三件事情:
1、创建分发器CreateEmitter对象,
2、回调生命周期给观察者,
3、执行subscribe方法,并传入CreateEmitter对象,
解释一:创建CreateEmitter用来把对象句柄传递给外部使用,同时把观察者和被观察者关联起来,这里的CreateEmitter就是被观察者,而Observer对象则属于观察者
解释二:这里的observer.onSubscribe(parent),则是执行了 “System.out.println("方法一:onSubscribe")”,在整个处理流开始执行之前,把生命周期回调给了订阅者
解释三:通过source.subscribe(parent),把分发器传递给被观察着用来发送事件,这里的subscribe,就是调用了外部传入的ObservableOnSubscribe对象,通知被观察者开始发送事件,这里会执行到”System.out.println("开始发送事件")“,我们可以通过subscribe中传递过来的emitter对象进行事件的分发处理
当我们调用emitter.onNext("")时,可以理解为被观察者发送了一个事件出来,因为CreateEmitter实现了ObservableEmitter,因此这里会回调到CreateEmitter的onNext方法,代码如下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
// 会回调到这里
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
在onNext内部,CreateEmitter会调用观察者对象的onNext方法,把事件(也就是数据)传递给观察者,也就是会执行到前面demo中的System.out.println("方法二:onNext");方法,同时我们也可以从代码中看到,当判空时,会执行到onError()方法,这里同样会调用观察者的onError()方法,把事件传递出去
二、订阅与取消
在CreateEmitter的每次回调中,我们都可以看到isDisposed()的判断,这个判断是用来检测观察者是否已取消订阅。如果观察者取消订阅的话,那么就不会把执行结果通知到观察者,这里的典型使用场景:在activity中请求网络数据,当数据请求回来后刷新界面,但是如果数据请求回来前用户手动关闭了activity的话。当数据请求回来后,因为activity内部的view已经被销毁,会出现控件报空指针的问题,在这里我们可以通过dispose()方法取消事件的回调传递,这里涉及到一个类:DisposableHelper
public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
DISPOSED
;
/**
* Checks if the given Disposable is the common {@link #DISPOSED} enum value.
* @param d the disposable to check
* @return true if d is {@link #DISPOSED}
*/
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
}
DisposableHelper是一个枚举类,有一个DISPOSED枚举类型,用来记录观察者当前的状态,当执订阅者执行,代码如下:
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
其内部实现为:
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
通过这里把观察者的状态设置为DISPOSED,用来标志观察者已取消订阅,在被观察者每次执行生命周期时,通过对观察者的状态判断,用以确定是否需要把数据回调给被观察者
三、线程调度:
这里通过observeOn举例说明:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
// 省略非关键代码
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
ObservableObserveOn实现了AbstractObservableWithUpstream,进而实现了Observable
这里可以追踪到ObservableObserveOn的源码中,可以看到:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
// 省略非关键代码
}
它的构造器调用了super(),而super内部则保存了前一个观察者对象:
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
}
当我们调用subcribe()时,内部会执行到ObservableObserveOn如下方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 这里的if用来判断是否调度为当前线程,如果是,则不需要调度器来切换新线程
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
通过 scheduler.createWorker()来切换到指定的线程,此时外部调用onNext()时,会执行到这里,在指定线程进行处理:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
// 省略非关键代码
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
schedule是一个抽象方法,我们选择一个具体实现NewThreadWorker,这个NewThreadWorker是用来切换新线程的,我们定位到它的具体实现,看下内部是怎么处理的:
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// 省略非关键代码
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
// 省略非关键代码
}
return sr;
}
ObserveOnObserver继承了Runnable,这个executor.submit((Callable<Object>)sr);就是线程池中用来提交任务的方法,线程池内部会调用runnable的run方法,从而调用到:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
接着:
void drainNormal() {
// 省略非关键代码
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
// 省略非关键代码
a.onNext(v);
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
实现生命周期的回调,类似的,其它生命周期回调也是同样的流程
补充一下线程调度的各参数说明:
参数类型 | 解释 | 使用场景 |
Schedulers.immediate() | 当前线程 = 不指定线程 | 默认 |
AndroidSchedulers.mainThread() | Android主线程 | 操作UI |
Schedulers.newThread() | 常规新线程 | 耗时等操作 |
Schedulers.io() | io操作线程 | 网络请求、读写文件等io密集型操作 |
Schedulers.computation() | CPU计算操作线程 | 大量计算操作 |
四、流式转换:
当我们调用flatMap()时,经过重重方法重载,最终会走到这里
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
// 省略非关键代码
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
我们点击去ObservableFlatMap,可以看到,它最终也是实现了Observable类,那么前面分析过,当外部调用subscribe()时,会到这里:
@Override
public void subscribeActual(Observer<? super U> t) {
// 省略非关键代码
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
我们从MergeObserver类追进去,可以看到,当外部调用onNext()时,会走到这里:
@Override
public void onNext(T t) {
// 省略非关键代码
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
// 省略非关键代码
}
}
重点在mapper.apply(t)这一行,点击去可以看到这是一个抽象方法,那么它的具体实现在哪里呢?一步一步的回溯,发现这个mapper就是我们外部传入的new Function<String, ObservableSource<String>>(),那么这个apply就是外部我们需要重写的方法,也就是这个打印这个log的位置:System.out.println("我们这里需要重写的方法");至此,整个回路追踪可以完整串联起来
解释:其它省略的方法内容,因为flatMap可以执行多个,省略的代码就是循环执行的分发流程,我们只追寻主要流程,这些分支流程没有详细区分