网站建设推广选stso88效果好,代刷网站是怎么做的,微商城建设购物网站,网站注册搜索引擎的目的一看就懂的RxJava源码分析 前言零、观察模式简介一、RxJava使用示例一二、示例一源码分析0. 示例一代码分解1. RxJava中的观察者是谁#xff1f;2. RxJava中的被观察者又是谁#xff1f;3. 观察者又是如何安插到被观察者中的#xff1f;4. 示例一RxJava源码整体关系类图4. R… 一看就懂的RxJava源码分析 前言零、观察模式简介一、RxJava使用示例一二、示例一源码分析0. 示例一代码分解1. RxJava中的观察者是谁2. RxJava中的被观察者又是谁3. 观察者又是如何安插到被观察者中的4. 示例一RxJava源码整体关系类图4. RxJava的Hook机制 三、RxJava使用示例二四、示例二源码分析1. 同样从subscribe方法看起看看观察者如何被安插到被观察者中2. subscribe方法源码如下3. subscribeActual方法是被map方法返回的对象实现我们来看看map方法的源码4. 因此接着第2步去ObservableMap类中查看subscribeActual方法的源码5. 第4步提到的ObservableMap类中的source和function是什么我们看下源码6. 第5步提到的map方法属于哪个对象呢7. 我们接着第4步继续分析8. 我们看看ObservableEmitter发送的数据如何被map处理然后又被观察者接收 五、后记 前言
RxJava是一种基于观察者模式的异步编程库且支持响应式编程适用于处理复杂的事件流。RxJava是观察者模式的扩展应用其的核心概念是Observable和Observer。Observable表示一个异步事件流Observer表示对这个事件流的观察者。当Observable发出一个事件时Observer会收到这个事件并进行相应的处理。RxJava还提供了一些操作符可以对事件流进行过滤、转换、组合等操作从而更方便地处理事件流。 本文对于RxJava的源码进行分析以更加彻底的了解RxJava的实现思路。
零、观察模式简介
前言中我们提到RxJava是观察者模式的扩展应用那么学习RxJava的源码肯定要懂观察者模式的如果不懂这种设计模式直接来看源码是比较吃力的。本文也是以观察者模式为切入点来讲解RxJava源码。
大家可以看下我之前写的全网最全面最精华的设计模式讲解从程序员转变为工程师的第一步这篇文章中对于观察者模式的介绍。再啰嗦一句设计模式非常重要也非常难以掌握因为它不是简单的技术而是一种思想这世界上最难学习的就是思想希望各位博友能够真正将这种思想融入自己的思维中。 简单来说观察者模式有两个主要角色一个是观察者一个是被观察者而实现观察者模式的核心是将观察者安插聚合到被观察者之中这样被观察者的一举一动都能被观察者所捕捉而这也是分析RxJava源码的核心如果不能理解该核心请移步我上面提到的文章里面有比较详细的介绍。
一、RxJava使用示例一
使用RxJava需要在build.gradle中引用RxJava库
implementation io.reactivex.rxjava3:rxjava:3.0.4
implementation io.reactivex.rxjava3:rxandroid:3.0.0最简单的RxJava使用示例
public class MainActivity extends AppCompatActivity {private final static String TAG MainActivity.class.getSimpleName();Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);Observable.create(new ObservableOnSubscribeString() {Overridepublic void subscribe(NonNull ObservableEmitterString emitter) throws Throwable {emitter.onNext(sun);emitter.onNext(hao);}}).subscribe(new ObserverString() {Overridepublic void onSubscribe(NonNull Disposable d) {Log.d(TAG, onSubscribe);}Overridepublic void onNext(NonNull String s) {Log.d(TAG, onNext -》 s);}Overridepublic void onError(NonNull Throwable e) {Log.d(TAG, onError);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}});}
}运行结果
2023-11-17 04:12:57.125 15872-15872/com.xiaomi.myaxjava D/MainActivity: onSubscribe
2023-11-17 04:12:57.125 15872-15872/com.xiaomi.myaxjava D/MainActivity: onNext -》 sun
2023-11-17 04:12:57.126 15872-15872/com.xiaomi.myaxjava D/MainActivity: onNext -》 hao二、示例一源码分析
示例一的代码用于实现观察者监控被观察者发送的字符串并进行打印。
0. 示例一代码分解
为了便于我们更好的分析源码我们将上面示例一中的匿名类实现全部去掉。匿名类是简化了代码但有时候也提高了代码阅读的难度。去掉匿名类后的代码如下看上去是不是容易理解多了。
public class MainActivity extends AppCompatActivity {private final static String TAG MainActivity.class.getSimpleName();Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);MySource mySource new MySource();MyObserver myObserver new MyObserver();Observable.create(mySource).subscribe(myObserver);}private class MySource implements ObservableOnSubscribeString{Overridepublic void subscribe(NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext(sun);emitter.onNext(hao);}}private class MyObserver implements ObserverString{Overridepublic void onSubscribe(NonNull Disposable d) {Log.d(TAG, onSubscribe);}Overridepublic void onNext(NonNull String s) {Log.d(TAG, onNext -》 s);}Overridepublic void onError(NonNull Throwable e) {Log.d(TAG, onError);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}}
}RxJava核心代码就一行 Observable.create(mySource).subscribe(myObserver);上面我们提到RxJava是观察者模式的扩展应用而观察者模式的核心是将观察者Observer安插到被观察者Observable之中监视被观察者的一举一动。下面我们需要找出观察者是谁被观察者是谁观察者又如何被插入到被观察者之中即可。
1. RxJava中的观察者是谁
RxJava通过subscribe方法订阅观察者subscribe方法其实就是实现将观察者安插到被观察者后面我们会详细介绍如何实现安插的即示例一代码分解后中的观察者是myObserver。
private class MyObserver implements ObserverString{Overridepublic void onSubscribe(NonNull Disposable d) {Log.d(TAG, onSubscribe);}Overridepublic void onNext(NonNull String s) {Log.d(TAG, onNext -》 s);}Overridepublic void onError(NonNull Throwable e) {Log.d(TAG, onError);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}}2. RxJava中的被观察者又是谁
其实如果理解观察者设计模式的话我们从MyObservable的具体代码中我们也能猜到真正的被观察者应该是ObservableEmitter对象因为被观察者是发数据方观察者监察被观察者发送的数据而负责发送数据的是ObservableEmitter对象的onNext方法因此被观察者应该是ObservableEmitter对象后面我们会从源码中证实这一点。 private class MySource implements ObservableOnSubscribeString{Overridepublic void subscribe(NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext(sun);emitter.onNext(hao);}}3. 观察者又是如何安插到被观察者中的
上面我们提到RxJava通过subscribe方法订阅观察者即通过subscribe方法将观察者安插到被观察者中。下面我们开始分析subscribe方法的源码看看观察者示例中的myObserver如何被安插到被观察者中。
示例一的RxJava核心代码就是下面一句
Observable.create(mySource).subscribe(myObserver);subscribe方法是属于哪个对象呢 很明显subscribe方法的对象是由Observable.create()方法创建的下面我们看看Observable.create方法的源码 public static T ObservableT create(NonNull ObservableOnSubscribeT source) {Objects.requireNonNull(source, source is null);return RxJavaPlugins.onAssembly(new ObservableCreate(source));}create方法传入了我们自定义的MySource对象返回时又被RxJavaPlugins.onAssembly方法处理了一下我们来看看RxJavaPlugins.onAssembly方法
public static T ObservableT onAssembly(NonNull ObservableT source) {Function? super Observable, ? extends Observable f onObservableAssembly;if (f ! null) {return apply(f, source);}return source;}可以看到如果onObservableAssembly为null那么onAssembly方法就什么都不做如果不为null就对source进行处理一下再返回其实这就是RxJava的Hook机制也就是通过onAssembly方法拦截一下被观察者先对被观察者处理一波后面我们会详细介绍这儿我们忽略这个hook即可。 那么Observable.create()方法创建的就是ObservableCreate对象即该案例中subscribe方法是ObservableCreate对象的方法。ObservableCreate的类图如下subscribe方法被抽象类Observable所实现。 3. subscribe方法的源码如下
public final void subscribe(NonNull Observer? super T observer) {Objects.requireNonNull(observer, observer is null);try {observer RxJavaPlugins.onSubscribe(this, observer);Objects.requireNonNull(observer, The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins);subscribeActual(observer); //这个是核心方法} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// cant call onError because no way to know if a Disposable has been set or not// cant call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe new NullPointerException(Actually not, but cant throw other exceptions due to RS);npe.initCause(e);throw npe;}}看源码我们要先梳理主线一定不要想着一开始每一句都看懂因为这样大概率会看晕一些支线的细节代码可以先不关注重点是关注主线代码。 大概一看subscribe方法中核心方法是subscribeActual(observer)方法上面我们分析了subscribe方法属于ObservableCreate对象那么subscribeActual方法自然也是属于ObservableCreate类我们去ObservableCreate类中查看subscribeActual方法的源码如下 Overrideprotected void subscribeActual(Observer? super T observer) {CreateEmitterT parent new CreateEmitter(observer);//这里创建了CreateEmitter对象并将我们的观察者传进去了observer.onSubscribe(parent);//观察者执行onSubscribe方法。这里就能明白为什么观察者都是先执行onSubscribe方法了try {source.subscribe(parent);//这里的source是谁呢} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}subscribeActual方法中首先创建了CreateEmitter对象并将我们自定义的观察者聚合进去。其实从这就证实了CreateEmitter对象是被观察者CreateEmitter通过构造函数将观察者注入如此以来观察者就被安插进了被观察者之中具体如何监控被观察者我们后面再说。 其次subscribeActual方法不管三七二十一先执行观察者的onSubscribe方法这里也能说明为什么RxJava都是先执行onSubscribe方法。 最后subscribeActual方法执行source.subscribe(parent)将创建的被观察者即CreateEmitter对象parent传递了出去那么这里的source又是谁呢我们看看源码中source赋值的地方
public final class ObservableCreateT extends ObservableT {final ObservableOnSubscribeT source;public ObservableCreate(ObservableOnSubscribeT source) {this.source source;//通过ObservableCreate构造函数传入source的值}Overrideprotected void subscribeActual(Observer? super T observer) {CreateEmitterT parent new CreateEmitter(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}
}从源码中可以看到source是创建ObservableCreate对象的时候传入的上面分析subscribe方法是属于哪个对象的时候我们就看到ObservableCreate对象是由Observable.create创建的 Observable.create(mySource).subscribe(myObserver);public static T ObservableT create(NonNull ObservableOnSubscribeT source) {Objects.requireNonNull(source, source is null);return RxJavaPlugins.onAssembly(new ObservableCreate(source));//这里的source就是我们自定的mySource}private class MySource implements ObservableOnSubscribeString{Overridepublic void subscribe(NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext(sun);emitter.onNext(hao);}}这时我们再看subscribeActual方法源码 Overrideprotected void subscribeActual(Observer? super T observer) {CreateEmitterT parent new CreateEmitter(observer);observer.onSubscribe(parent);try {source.subscribe(parent);//source就是我们自定义MySource对象} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}source.subscribe(parent)中的source就是我们自定义的MySource对象通过subscribe方法将subscribeActual方法中创建的被观察者CreateEmitter对象parent传递到MySource中。此时我们再看被观察者发送数据时观察者是如何监察到的
private class MySource implements ObservableOnSubscribeString{Overridepublic void subscribe(NonNull ObservableEmitter emitter) throws Throwable {emitter.onNext(sun);//emitter就是subscribeActual方法中创建的CreateEmitter对象emitter.onNext(hao);}}被观察者emitter通过onNext发送数据我们来看看onNext方法的源码
static final class CreateEmitterTextends AtomicReferenceDisposableimplements ObservableEmitterT, Disposable {private static final long serialVersionUID -3434801548987643227L;final Observer? super T observer;CreateEmitter(Observer? super T observer) {this.observer observer;}Overridepublic void onNext(T t) {if (t null) {onError(ExceptionHelper.createNullPointerException(onNext called with a null value.));return;}if (!isDisposed()) {observer.onNext(t);//观察者接收到被观察者发送的数据}}}可以看到当被观察者对象调用onNext方法时安插在内部的观察者会接收到数据而观察者的onNext方法就是我们自定义实现的
private class MyObserver implements ObserverString{Overridepublic void onSubscribe(NonNull Disposable d) {Log.d(TAG, onSubscribe);}Overridepublic void onNext(NonNull String s) {Log.d(TAG, onNext -》 s);//监察到被观察者发送的数据}Overridepublic void onError(NonNull Throwable e) {Log.d(TAG, onError);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}}4. 示例一RxJava源码整体关系类图 ObservableCreate就是一个中转器它把MySource和MyObserver中转出去之后观察者MyObserver聚合安插到被观察者CreateEmitter对象最后被观察者对象又通过subscribe方法被传递给MySource对象。
4. RxJava的Hook机制
上面我们也提到了该机制Observable的create方法源码如下 public static T ObservableT create(NonNull ObservableOnSubscribeT source) {Objects.requireNonNull(source, source is null);return RxJavaPlugins.onAssembly(new ObservableCreate(source));}创建ObservableCreate对象之后又被RxJavaPlugins.onAssembly方法处理一下才返回这个就是RxJava的hook机制其实就是将创建的被观察者先自己玩一把再给观察者处理。
RxJavaPlugins.onAssembly方法源码如下
static volatile Function? super Observable, ? extends Observable onObservableAssembly;public static T ObservableT onAssembly(NonNull ObservableT source) {Function? super Observable, ? extends Observable f onObservableAssembly;if (f ! null) {//如果设置了onObservableAssembly则意味着开启hookreturn apply(f, source);}return source;}static T, R R apply(NonNull FunctionT, R f, NonNull T t) {try {return f.apply(t);} catch (Throwable ex) {throw ExceptionHelper.wrapOrThrow(ex);}}通过如下方法给onObservableAssembly赋值 public static void setOnObservableAssembly(Nullable Function? super Observable, ? extends Observable onObservableAssembly) {if (lockdown) {throw new IllegalStateException(Plugins cant be changed anymore);}RxJavaPlugins.onObservableAssembly onObservableAssembly;}使用RxJava的hook机制示例 RxJavaPlugins.setOnObservableAssembly(new FunctionObservable, Observable() {Overridepublic Observable apply(Observable observable) throws Throwable {return null;//在这拦截住Observable自己先处理一波}});三、RxJava使用示例二
public class MainActivity extends AppCompatActivity {private final static String TAG MainActivity.class.getSimpleName();Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);Observable.create(new ObservableOnSubscribeString() {Overridepublic void subscribe(NonNull ObservableEmitterString emitter) throws Throwable {emitter.onNext(sun);emitter.onNext(hao);}}).map(new FunctionString, Integer() {Overridepublic Integer apply(String s) throws Throwable {Integer ans 0;if (s.equals(sun)){ans 66;}else if (s.equals(hao)){ans 99;}else{ans 88;}return ans;}}).subscribe(new ObserverInteger() {Overridepublic void onSubscribe(NonNull Disposable d) {Log.d(TAG, onSubscribe);}Overridepublic void onNext(NonNull Integer s) {Log.d(TAG, onNext -》 s);}Overridepublic void onError(NonNull Throwable e) {Log.d(TAG, onError);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}});}
}运行结果如下
2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onSubscribe
2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onNext -》 66
2023-11-17 04:58:08.660 16545-16545/com.xiaomi.myaxjava D/MainActivity: onNext -》 99四、示例二源码分析
示例二比示例一稍微复杂一点加了map操作符map操作符的作用是将被观察者发送的数据先做一波处理再给观察者。有了示例一源码的基础分析示例二源码也比较简单了。在分析该部分源码前建议先去看一下我之前写的装饰模式RxJava就是通过装饰模式将一个个的操作符装饰为各种类型的观察者都是Observer的子类。
1. 同样从subscribe方法看起看看观察者如何被安插到被观察者中
Observable.create(new ObservableOnSubscribeString() {Overridepublic void subscribe(NonNull ObservableEmitterString emitter) throws Throwable {emitter.onNext(sun);emitter.onNext(hao);}}).map(new FunctionString, Integer() {Overridepublic Integer apply(String s) throws Throwable {Integer ans 0;if (s.equals(sun)){ans 66;}else if (s.equals(hao)){ans 99;}else{ans 88;}return ans;}}).subscribe(new ObserverInteger() {//subscribe方法是哪个对象的方法很明显是map方法返回的对象Overridepublic void onSubscribe(NonNull Disposable d) {Log.d(TAG, onSubscribe);}Overridepublic void onNext(NonNull Integer s) {Log.d(TAG, onNext -》 s);}Overridepublic void onError(NonNull Throwable e) {Log.d(TAG, onError);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}});2. subscribe方法源码如下
public final void subscribe(NonNull Observer? super T observer) {Objects.requireNonNull(observer, observer is null);try {observer RxJavaPlugins.onSubscribe(this, observer);Objects.requireNonNull(observer, The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins);subscribeActual(observer);//该方法是被哪个类实现的很明显subscribe方法别哪个类实现该方法就是被哪个方法实现} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// cant call onError because no way to know if a Disposable has been set or not// cant call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe new NullPointerException(Actually not, but cant throw other exceptions due to RS);npe.initCause(e);throw npe;}}3. subscribeActual方法是被map方法返回的对象实现我们来看看map方法的源码 public final R ObservableR map(NonNull Function? super T, ? extends R mapper) {Objects.requireNonNull(mapper, mapper is null);//示例一中讲解过RxJavaPlugins.onAssembly是hook函数在此我们不考虑那么返回的就是ObservableMap对象return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));}4. 因此接着第2步去ObservableMap类中查看subscribeActual方法的源码 Overridepublic void subscribeActual(Observer? super U t) {source.subscribe(new MapObserverT, U(t, function));//这里面的source和function是什么我们后面分析}可以看到subscribeActual方法中将我们自定义的观察者送入了MapObserver的构造函数这儿就是采用装饰模型将我们自定义的观察者进行装饰形成一个新的观察者。我们看下MapObserver的源码
static final class MapObserverT, U extends BasicFuseableObserverT, U {final Function? super T, ? extends U mapper;MapObserver(Observer? super U actual, Function? super T, ? extends U mapper) {super(actual); //把我们自定义的观察者传递给了MapObserver的父类this.mapper mapper;}Overridepublic void onNext(T t) {...}}
}我们接着看MapObserver的父类BasicFuseableObserver是如何处理我们自定义的观察者的
public abstract class BasicFuseableObserverT, R implements ObserverT, QueueDisposableR {/** The downstream subscriber. */protected final Observer? super R downstream;/*** Construct a BasicFuseableObserver by wrapping the given subscriber.* param downstream the subscriber, not null (not verified)*/public BasicFuseableObserver(Observer? super R downstream) {this.downstream downstream;//将我们自定义的观察者赋值给了downstream}
}到这儿我们知道自定义的观察者最终被传递给了downstream下面我们分析ObservableMap类中的source和function是什么
5. 第4步提到的ObservableMap类中的source和function是什么我们看下源码
public final class ObservableMapT, U extends AbstractObservableWithUpstreamT, U {final Function? super T, ? extends U function;// source和function都是通过ObservableMap的构造函数传入的public ObservableMap(ObservableSourceT source, Function? super T, ? extends U function) {super(source);this.function function;}Overridepublic void subscribeActual(Observer? super U t) {source.subscribe(new MapObserverT, U(t, function));}
}source和function都是通过ObservableMap的构造函数传入的第3步我们分析map源码的时候看到创建了ObservableMap对象 public final R ObservableR map(NonNull Function? super T, ? extends R mapper) {Objects.requireNonNull(mapper, mapper is null);//这里的mapper就是我们自定义的Function//那这里的this是谁呢很明显map方法属于哪个对象这个this是那个对象return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));}6. 第5步提到的map方法属于哪个对象呢
Observable.create(new ObservableOnSubscribeString() {Overridepublic void subscribe(NonNull ObservableEmitterString emitter) throws Throwable {emitter.onNext(sun);emitter.onNext(hao);}}).map(new FunctionString, Integer() {//map方法是Observable.create方法返回的对象Overridepublic Integer apply(String s) throws Throwable {Integer ans 0;if (s.equals(sun)){ans 66;}else if (s.equals(hao)){ans 99;}else{ans 88;}return ans;}}).subscribe(new ObserverInteger() {Overridepublic void onSubscribe(NonNull Disposable d) {Log.d(TAG, onSubscribe);}Overridepublic void onNext(NonNull Integer s) {Log.d(TAG, onNext -》 s);}Overridepublic void onError(NonNull Throwable e) {Log.d(TAG, onError);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}});我们接着看Observable.create方法的源码
public static T ObservableT create(NonNull ObservableOnSubscribeT source) {Objects.requireNonNull(source, source is null);//返回的是一个ObservableCreate的对象这里的source是我们自定义的ObservableOnSubscribe对象return RxJavaPlugins.onAssembly(new ObservableCreate(source));}到这儿我们知道ObservableMap类中的function就是我们在map方法中传入的自定义Function匿名类source就是我们通过Observable.create方法创建的ObservableCreate对象
7. 我们接着第4步继续分析 Overridepublic void subscribeActual(Observer? super U t) {//source就是我们通过Observable.create方法创建的ObservableCreate对象//function就是我们在map方法中传入的自定义Function匿名类source.subscribe(new MapObserverT, U(t, function));}我们再来看ObservableCreate类中实现的subscribe方法ObservableCreate继承自ObservableObservableCreate实现的subscribe也是继承自Observable因此我们去Observable中查看subscribe方法的源码
public final void subscribe(NonNull Observer? super T observer) {Objects.requireNonNull(observer, observer is null);try {observer RxJavaPlugins.onSubscribe(this, observer);Objects.requireNonNull(observer, The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins);subscribeActual(observer);//把装饰后的MapObserver对象通过subscribeActual方法传递出去} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// cant call onError because no way to know if a Disposable has been set or not// cant call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe new NullPointerException(Actually not, but cant throw other exceptions due to RS);npe.initCause(e);throw npe;}}我们接着看ObservableCreate类中的subscribeActual方法源码这个时候就和之前示例一分析的源码接上了就不再详细分析 Overrideprotected void subscribeActual(Observer? super T observer) {//创建CreateEmitter对象并将MapObserver对象传递进去CreateEmitterT parent new CreateEmitter(observer);observer.onSubscribe(parent);try {//这里的source就是我们传入Observable.create方法的自定义ObservableOnSubscribe匿名类//那么subscribe方法就是我们外面自己实现的方法source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}Overridepublic void subscribe(NonNull ObservableEmitterString emitter) throws Throwable {//emitter就是上面subscribeActual方法中实例化的CreateEmitter对象emitter.onNext(sun);emitter.onNext(hao);}8. 我们看看ObservableEmitter发送的数据如何被map处理然后又被观察者接收
ObservableEmitter通过onNext发送数据以emitter.onNext(“sun”)为例我们看下onNext的源码 Overridepublic void onNext(T t) {if (t null) {onError(ExceptionHelper.createNullPointerException(onNext called with a null value.));return;}if (!isDisposed()) {observer.onNext(t);//这里的observer就是ObservableMap对象t是我们发送的数据“sun”}}接着我们再看下ObservableMap中的onNext方法源码 Overridepublic void onNext(T t) {// t sunif (done) {return;}if (sourceMode ! NONE) {downstream.onNext(null);return;}U v;try {//这里的mapper就是我们外面在map方法中传入的自定义Function在此map先对发送的数据“sun”做了处理v Objects.requireNonNull(mapper.apply(t), The mapper function returned a null value.);} catch (Throwable ex) {fail(ex);return;}//downstream就是我们外面subscribe方法中传入的自定义Observer对象downstream.onNext(v);//这时候拿到数据是经过map处理后的数据}Overridepublic Integer apply(String s) throws Throwable {Integer ans 0;if (s.equals(sun)){ans 66;}else if (s.equals(hao)){ans 99;}else{ans 88;}return ans;}至此整个示例二的源码我们就分析完了
五、后记
这也是我们第一次输出分析源码的文章一是帮助自己理清楚RxJava源码流程二是希望能够对其他博友有所帮忙但第一次写源码分析类的文章确实经验不足感觉写的还是有点混乱没有写出自己的预期后面继续努力。
总之就针对RxJava的源码个人任务掌握一个切入点就对了那就是从subscribe方法入手看看我们自定义的观察者如何与发送数据的被观察者关联在一起的。