RxJava使用与原理分析

一、事件分发流程:

常规创建Observable观察者:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("");
                emitter.onComplete();
            }
        }).flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull String s) throws Exception {
                   System.out.println("我们这里需要重写的方法");
                   return Observable.just("");
            }
        }).observeOn(Schedulers.io())
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        System.out.println("方法一:onSubscribe");
                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        System.out.println("方法二:onNext");
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        System.out.println("方法三:onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("方法四:onComplete");
                    }
                });
    }

从Observable.create()点进去,通过一个静态方法,追踪到

  public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

  }

这里构造方法中保存的source就是外部传进来的ObservableOnSubscribe,存储下来后,当外部调用subscribe时,会执行到Observable的subscribeActual方法,而ObservableCreate实现了该方法,

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

这里做了三件事情:

1、创建分发器CreateEmitter对象,

2、回调生命周期给观察者,

3、执行subscribe方法,并传入CreateEmitter对象,

解释一:创建CreateEmitter用来把对象句柄传递给外部使用,同时把观察者和被观察者关联起来,这里的CreateEmitter就是被观察者,而Observer对象则属于观察者

解释二:这里的observer.onSubscribe(parent),则是执行了 “System.out.println("方法一:onSubscribe")”,在整个处理流开始执行之前,把生命周期回调给了订阅者

解释三:通过source.subscribe(parent),把分发器传递给被观察着用来发送事件,这里的subscribe,就是调用了外部传入的ObservableOnSubscribe对象,通知被观察者开始发送事件,这里会执行到”System.out.println("开始发送事件")“,我们可以通过subscribe中传递过来的emitter对象进行事件的分发处理

当我们调用emitter.onNext("")时,可以理解为被观察者发送了一个事件出来,因为CreateEmitter实现了ObservableEmitter,因此这里会回调到CreateEmitter的onNext方法,代码如下:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        

        // 会回调到这里
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

在onNext内部,CreateEmitter会调用观察者对象的onNext方法,把事件(也就是数据)传递给观察者,也就是会执行到前面demo中的System.out.println("方法二:onNext");方法,同时我们也可以从代码中看到,当判空时,会执行到onError()方法,这里同样会调用观察者的onError()方法,把事件传递出去

二、订阅与取消

在CreateEmitter的每次回调中,我们都可以看到isDisposed()的判断,这个判断是用来检测观察者是否已取消订阅。如果观察者取消订阅的话,那么就不会把执行结果通知到观察者,这里的典型使用场景:在activity中请求网络数据,当数据请求回来后刷新界面,但是如果数据请求回来前用户手动关闭了activity的话。当数据请求回来后,因为activity内部的view已经被销毁,会出现控件报空指针的问题,在这里我们可以通过dispose()方法取消事件的回调传递,这里涉及到一个类:DisposableHelper

public enum DisposableHelper implements Disposable {
    /**
     * The singleton instance representing a terminal, disposed state, don't leak it.
     */
    DISPOSED
    ;

    /**
     * Checks if the given Disposable is the common {@link #DISPOSED} enum value.
     * @param d the disposable to check
     * @return true if d is {@link #DISPOSED}
     */
    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }
 }

DisposableHelper是一个枚举类,有一个DISPOSED枚举类型,用来记录观察者当前的状态,当执订阅者执行,代码如下:

   @Override
   public void dispose() {
          DisposableHelper.dispose(this);
   }

其内部实现为:

   public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

通过这里把观察者的状态设置为DISPOSED,用来标志观察者已取消订阅,在被观察者每次执行生命周期时,通过对观察者的状态判断,用以确定是否需要把数据回调给被观察者

三、线程调度:

这里通过observeOn举例说明:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        // 省略非关键代码
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn实现了AbstractObservableWithUpstream,进而实现了Observable

这里可以追踪到ObservableObserveOn的源码中,可以看到:

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        // 省略非关键代码
    }

它的构造器调用了super(),而super内部则保存了前一个观察者对象:

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
}

当我们调用subcribe()时,内部会执行到ObservableObserveOn如下方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 这里的if用来判断是否调度为当前线程,如果是,则不需要调度器来切换新线程
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

通过 scheduler.createWorker()来切换到指定的线程,此时外部调用onNext()时,会执行到这里,在指定线程进行处理:

 @Override
 public void onNext(T t) {
    if (done) {
         return;
    }

  if (sourceMode != QueueDisposable.ASYNC) {
       queue.offer(t);
    }
       schedule();
 }

// 省略非关键代码

void schedule() {
   if (getAndIncrement() == 0) {
       worker.schedule(this);
      }
}

schedule是一个抽象方法,我们选择一个具体实现NewThreadWorker,这个NewThreadWorker是用来切换新线程的,我们定位到它的具体实现,看下内部是怎么处理的:

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        // 省略非关键代码
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            // 省略非关键代码
        }
        return sr;
    }

ObserveOnObserver继承了Runnable,这个executor.submit((Callable<Object>)sr);就是线程池中用来提交任务的方法,线程池内部会调用runnable的run方法,从而调用到:

@Override
public void run() {
     if (outputFused) {
         drainFused();
     } else {
         drainNormal();
     }
}

接着:

void drainNormal() {
	// 省略非关键代码
	for (;;) {
		if (checkTerminated(done, q.isEmpty(), a)) {
			return;
		}

		// 省略非关键代码
			a.onNext(v);

		missed = addAndGet(-missed);
		if (missed == 0) {
			break;
		}
	}
}

实现生命周期的回调,类似的,其它生命周期回调也是同样的流程

补充一下线程调度的各参数说明:

参数类型 解释 使用场景
Schedulers.immediate() 当前线程 = 不指定线程 默认
AndroidSchedulers.mainThread() Android主线程 操作UI
Schedulers.newThread() 常规新线程 耗时等操作
Schedulers.io() io操作线程 网络请求、读写文件等io密集型操作
Schedulers.computation() CPU计算操作线程 大量计算操作

四、流式转换:

当我们调用flatMap()时,经过重重方法重载,最终会走到这里

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
        // 省略非关键代码
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }

我们点击去ObservableFlatMap,可以看到,它最终也是实现了Observable类,那么前面分析过,当外部调用subscribe()时,会到这里:

    @Override
    public void subscribeActual(Observer<? super U> t) {
        // 省略非关键代码
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

我们从MergeObserver类追进去,可以看到,当外部调用onNext()时,会走到这里:

@Override
 public void onNext(T t) {
  // 省略非关键代码
  try {
    p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
  } catch (Throwable e) {
   // 省略非关键代码
    }
 }

重点在mapper.apply(t)这一行,点击去可以看到这是一个抽象方法,那么它的具体实现在哪里呢?一步一步的回溯,发现这个mapper就是我们外部传入的new Function<String, ObservableSource<String>>(),那么这个apply就是外部我们需要重写的方法,也就是这个打印这个log的位置:System.out.println("我们这里需要重写的方法");至此,整个回路追踪可以完整串联起来

解释:其它省略的方法内容,因为flatMap可以执行多个,省略的代码就是循环执行的分发流程,我们只追寻主要流程,这些分支流程没有详细区分

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇

)">
下一篇>>