企业建设网站没有服务器,网站开发与应用论文,金华网,wordpress 用户导入要使用Rxjava首先要导入两个包#xff0c;其中rxandroid是rxjava在android中的扩展 implementation io.reactivex:rxandroid:1.2.1implementation io.reactivex:rxjava:1.2.0Rxjava中的操作符
创建型操作符 interval 创建一个按固定时间间隔发射整数序列的Observable#xf…要使用Rxjava首先要导入两个包其中rxandroid是rxjava在android中的扩展
implementation io.reactivex:rxandroid:1.2.1implementation io.reactivex:rxjava:1.2.0Rxjava中的操作符
创建型操作符 interval 创建一个按固定时间间隔发射整数序列的Observable相当于定时器 ObservableLong observable1 Observable.interval(3,TimeUnit.SECONDS);observable1.subscribe(new Action1Long() {Overridepublic void call(Long aLong) {ILog.LogDebug(Observable.interval along aLong);}});
上面的代码每隔3s就会调用call方法并打印Log。其中aLong为从0开始的整数每次1 上面代码也可以合起来写没有特别说明本文后面介绍操作符的示例代码都是合起来写的 Observable.interval(3, TimeUnit.SECONDS).subscribe(new Action1Long() {Overridepublic void call(Long aLong) {ILog.LogDebug(Observable.interval along aLong);}});range 创建发射指定范围的整数序列的Observable可以拿来替代for循环发射一个范围内的有序整数序列。第一个参数是起始值并且不小于0第二个参数为终值左闭右开。 Observable.range(0,5).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(Observable.range integer integer);}});打印日志
Observable.range integer 0
Observable.range integer 1
Observable.range integer 2
Observable.range integer 3
Observable.range integer 4repeat 创建一个N次重复发射特定数据的Observable Observable.just(1,2,3).repeat(2).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(Observable.range integer integer);}});日志打印
Observable.range integer 1
Observable.range integer 2
Observable.range integer 3
Observable.range integer 1
Observable.range integer 2
Observable.range integer 3变换操作符 map map操作符通过指定一个Func对象将Observable转换为一个新的Observable对象并发射观察者将收到新的Observable处理。假设我们要访问网络Host地址时常是变化的它有时是测试服务器地址有时可能是正式服务器地址但是具体界面的URL地址则是不变的。 final String host https://www.baidu.com/;Observable.just(app).map(new Func1String, String() {Overridepublic String call(String s) {return hosts;}}).subscribe(new Action1String() {Overridepublic void call(String s) {ILog.LogDebug( Observable.just(app).map s s);}});上面代码将会打印
Observable.just(app).map s https://www.baidu.com/appflatMap flatMap操作符将Observable发射的数据集合变换为Observable集合然后将这些Observable发射的数据平坦化地放进一个单独的 Observable。cast 操作符的作用是强制将 Observable 发射的所有数据转换为指定类型另外flatMap的合并允许交叉也就是说可能会交错地发送事件最终结果的顺序可能并不是原始Observable发送时的顺序。 ListString mList new ArrayList();mList.add(add1);mList.add(add2);mList.add(add3);mList.add(add4);Observable.from(mList).flatMap(new Func1String, Observable?() {Overridepublic Observable? call(String s) {return Observable.just(hosts);}}).cast(String.class).subscribe(new Action1String() {Overridepublic void call(String s) {ILog.LogDebug(flatMap call s s);}});上面代码将会打印
flatMap call s https://www.baidu.com/add1
flatMap call s https://www.baidu.com/add2
flatMap call s https://www.baidu.com/add3
flatMap call s https://www.baidu.com/add4concatMap concatMap操作符功能与flatMap操作符一致不过它解决了flatMap交叉问题提供了一种能够把发射的值连续在一起的函数而不是合并它们。concatMap的使用方法和flatMap类似 ListString mList new ArrayList();mList.add(add1);mList.add(add2);mList.add(add3);mList.add(add4);Observable.from(mList).concatMap(new Func1String, Observable?() {Overridepublic Observable? call(String s) {return Observable.just(hosts);}}).cast(String.class).subscribe(new Action1String() {Overridepublic void call(String s) {ILog.LogDebug(concatMap call s s);}});上面代码会打印
concatMap call s https://www.baidu.com/add1
concatMap call s https://www.baidu.com/add2
concatMap call s https://www.baidu.com/add3
concatMap call s https://www.baidu.com/add4flatMapIterable flatMapIterable操作符可以将数据包装成Iterable在Iterable中我们就可以对数据进行处理了 Observable.just(1,2,3).flatMapIterable(new Func1Integer, IterableInteger() {Overridepublic IterableInteger call(Integer integer) {ListInteger list new ArrayList();list.add(integer1);return list;}}).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(flatMapIterable integer integer);}});在上面的call方法中 我们对每个integer1 所以打印的结果也都会1
flatMapIterable integer 2
flatMapIterable integer 3
flatMapIterable integer 4buffer buffer操作符将源Observable变换为一个新的Observable这个新的Observable每次发射一组列表值而不是一个一个发射。 Observable.just(1,2,3,4,5,6).buffer(3).subscribe(new Action1ListInteger() {Overridepublic void call(ListInteger integers) {for(Integer integer : integers){ILog.LogDebug(buffer integer integer);}ILog.LogDebug(----------------);}});上面代码将会打印
buffer integer 1
buffer integer 2
buffer integer 3
----------------
buffer integer 4
buffer integer 5
buffer integer 6window window 操作符和buffer 操作符类似只不过 window操作符发射的是Observable而不是数据列表 Observable.just(1,2,3,4,5,6).window(3).subscribe(new Action1ObservableInteger() {Overridepublic void call(ObservableInteger integerObservable) {integerObservable.subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(window integer integer);}});ILog.LogDebug(-----------------);}});上面代码会打印
window integer 1
window integer 2
window integer 3
-----------------
window integer 4
window integer 5
window integer 6
-----------------groupby goupBy操作符用于分组元素将源Observable变换成一个发射Observables的新Observable 分组后的。它们中的每一个新Observable都发射一组指定的数据 SuperMan s1 new SuperMan(钢铁侠,AAA);SuperMan s2 new SuperMan(张三丰,SSS);SuperMan s3 new SuperMan(美国队长,SSS);SuperMan s4 new SuperMan(蜘蛛侠,S);SuperMan s5 new SuperMan(绿巨人,AA);SuperMan s6 new SuperMan(李元霸,AA);ObservableGroupedObservableString ,SuperMan groupedObservable Observable.just(s1,s2,s3,s4,s5,s6).groupBy(new Func1SuperMan, String() {Overridepublic String call(SuperMan superMan) {return superMan.getLevel();}});Observable.concat(groupedObservable).subscribe(new Action1SuperMan() {Overridepublic void call(SuperMan superMan) {ILog.LogDebug(groupby superMan.getName()---superMan.getLevel());}});
上面代码会打印
groupby 钢铁侠---AAA
groupby 张三丰---SSS
groupby 美国队长---SSS
groupby 蜘蛛侠---S
groupby 绿巨人---AA
groupby 李元霸---AAfilter filter操作符是对源Observable产生的结果自定义规则进行过滤只有满足条件的结果才会提交给订阅者 Observable.just(1,2,3,4).filter(new Func1Integer, Boolean() {Overridepublic Boolean call(Integer integer) {return integer 2;}}).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(filter integer);}});上面代码会打印
filter 3
filter 4elementAt elementAt操作符用来返回指定位置的数据。和它类似的有elementAtOrDefaultintT其可以允许默认值。 Observable.just(1,2,3,4).elementAt(2).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(elementAt integer integer);}});上面代码会打印
elementAt integer 3distinct distinct 操作符用来去重其只允许还没有发射过的数据项通过。
Observable.just(1,2,2,2,3,4,5,4,3).distinct().subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(distinct integer integer);}});上面代码会打印
distinct integer 1
distinct integer 2
distinct integer 3
distinct integer 4
distinct integer 5distinctUntilChanged distinctUntilChanged操作符和distinct类似它用来去掉连续重复的数据。 Observable.just(1,2,2,2,3,4,5,4,3).distinctUntilChanged().subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(distinct integer integer);}});上面代码会打印
distinct integer 1
distinct integer 2
distinct integer 3
distinct integer 4
distinct integer 5
distinct integer 4
distinct integer 3skip skip操作符将源Observable发射的数据过滤掉前n项而take操作符则只取前n项另外还有skipLast和takeLast操作符则是从后面进行过滤操作 Observable.just(1,2,3,4,5).skip(3).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(skip integer integer);}});上面代码会打印
skip integer 4
skip integer 5ignoreElements ignoreElements操作符忽略所有源Observable产生的结果只把Observable的onCompleted和onError事件通知给订阅者。 Observable.just(1,2,3,4).ignoreElements().subscribe(new ObserverInteger() {Overridepublic void onCompleted() {ILog.LogDebug(ignoreElements onCompleted);}Overridepublic void onError(Throwable e) {ILog.LogDebug(ignoreElements onError);}Overridepublic void onNext(Integer integer) {ILog.LogDebug(ignoreElements onNext);}});上面代码会打印
ignoreElements onCompletedthrottleFirst throttleFirst操作符则会定期发射这个时间段里源Observable发射的第一个数据throttleFirst操作符默认在computation调度器上执行关于调度器后面会讲到。和 throttleFirst 操作符类似的有sample操作符它会定时地发射源Observable最近发射的数据其他的都会被过滤掉。 Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i10;i){subscriber.onNext(i);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}}).throttleFirst(200,TimeUnit.MILLISECONDS).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(throttleFirst integer integer);}});上面代码会打印
throttleFirst integer 0
throttleFirst integer 2
throttleFirst integer 4
throttleFirst integer 6
throttleFirst integer 8Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i10;i){subscriber.onNext(i);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}}).sample(200,TimeUnit.MILLISECONDS).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(throttleFirst integer integer);}});上面代码会打印
throttleFirst integer 1
throttleFirst integer 3
throttleFirst integer 5
throttleFirst integer 7
throttleFirst integer 9throttleWithTimeout 通过时间来限流。源Observable每次发射出来一个数据后就会进行计时。如果在设定好的时间结束前源Observable有新的数据发射出来这个数据就会被丢弃同时throttleWithTimeOut重新开始计时。如果每次都是在计时结束前发射数据那么这个限流就会走向极端只会发射最后一个数据。其默认在 computation 调度器上执行。 Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i10;i){subscriber.onNext(i);int sleep 100;if(i%3 0){sleep 300;}try {Thread.sleep(sleep);} catch (InterruptedException e) {e.printStackTrace();}}}}).throttleWithTimeout(200,TimeUnit.MILLISECONDS).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(throttleWithTimeout integer integer);}});上面代码会打印
throttleWithTimeout integer 0
throttleWithTimeout integer 3
throttleWithTimeout integer 6
throttleWithTimeout integer 9debounce和throttleWithTimeout类似它不仅可以使用时间来进行过滤还可以根据一个函数来进行限流
startWith startWith操作符会在源Observable发射的数据前面插上一些数据 Observable.just(4,5,6).startWith(1,2,3).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(startWith integer integer);}});上面代码会打印 startWith integer 1startWith integer 2startWith integer 3startWith integer 4startWith integer 5startWith integer 6merge merge操作符将多个Observable合并到一个Observable中进行发射merge可能会让合并的Observable发射的数据交错。concat 严格按照顺序发射数据前一个Observable没发射完成是不会发射后一个Observable的数据的。 ObservableInteger ob1 Observable.just(1,2,3);ObservableInteger ob2 Observable.just(4,5);Observable.merge(ob1,ob2).subscribe(new Action1Integer() {//Observable.concat(ob1,ob2).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(merge integer integer);}});上面代码会打印
merge integer 1
merge integer 2
merge integer 3
merge integer 4
merge integer 5zip zip操作符合并两个或者多个Observable发射出的数据项根据指定的函数变换它们并发射一个新值。zip操作符作用于最近未打包的两个Observable只有当原始的Observable中的每一个都发射了一条数据时 zip 才发射数据 ObservableInteger ob1 Observable.just(1,2,3,4);ObservableString ob2 Observable.just(a,b,c);Observable.zip(ob1, ob2, new Func2Integer, String, String() {Overridepublic String call(Integer integer, String s) {return integers;}}).subscribe(new Action1String() {Overridepublic void call(String s) {ILog.LogDebug(zip s s);}});
上面代码会打印
zip s 1a
zip s 2b
zip s 3ccombineLatest 使用ob1最后发射的数据组合ob2每一条数据 ObservableInteger ob1 Observable.just(1,2,3,4);ObservableString ob2 Observable.just(a,b,c);Observable.combineLatest(ob1, ob2, new Func2Integer, String, String() {Overridepublic String call(Integer integer, String s) {return integers;}}).subscribe(new Action1String() {Overridepublic void call(String s) {ILog.LogDebug(combineLatest s s);}});上面代码会打印
combineLatest s 4a
combineLatest s 4b
combineLatest s 4cdelay delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。 Observable.create(new Observable.OnSubscribeLong() {Overridepublic void call(Subscriber? super Long subscriber) {Long time System.currentTimeMillis()/1000;subscriber.onNext(time);}}).delay(2,TimeUnit.SECONDS).subscribe(new Action1Long() {Overridepublic void call(Long aLong) {Long time System.currentTimeMillis()/1000;ILog.LogDebug(delay time (time - aLong));}});上面代码会打印
delay time 2Do Do系列操作符就是为原始Observable的生命周期事件注册一个回调当Observable的某个事件发生时就会调用这些回调。RxJava中有很多Do系列操作符如下所示。 • doOnEach为 Observable注册这样一个回调当Observable每发射一项数据时就会调用它一次包括onNext、onError和 onCompleted。 • doOnNext只有执行onNext的时候会被调用。 • doOnSubscribe当观察者订阅Observable时就会被调用。 • doOnUnsubscribe当观察者取消订阅Observable时就会被调用Observable通过onError或者onCompleted结束时会取消订阅所有的Subscriber。 • doOnCompleted当Observable 正常终止调用onCompleted时会被调用。 • doOnError当Observable 异常终止调用onError时会被调用。 • doOnTerminate当Observable 终止无论是正常终止还是异常终止之前会被调用。 • finallyDo当Observable 终止无论是正常终止还是异常终止之后会被调用。 Observable.just(1,2).doOnNext(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(do onNext integer integer);}}).doOnCompleted(new Action0() {Overridepublic void call() {ILog.LogDebug(do doOnCompleted );}}).doOnSubscribe(new Action0() {Overridepublic void call() {ILog.LogDebug(do doOnSubscribe );}}).doOnUnsubscribe(new Action0() {Overridepublic void call() {ILog.LogDebug(do doOnUnsubscribe );}}).subscribe(new SubscriberInteger() {Overridepublic void onCompleted() {ILog.LogDebug(Subscriber onCompleted );}Overridepublic void onError(Throwable e) {ILog.LogDebug(Subscriber onError );}Overridepublic void onNext(Integer integer) {ILog.LogDebug(Subscriber onNext integer integer);}});上面代码会打印 do doOnSubscribe do onNext integer 1Subscriber onNext integer 1do onNext integer 2Subscriber onNext integer 2do doOnCompleted Subscriber onCompleted do doOnUnsubscribe subscribeOn subscribeOn操作符用于指定Observable自身在哪个线程上运行。如果Observable需要执行耗时操作一般可以让其在新开的一个子线程上运行。observerOn用来指定Observer所运行的线程也就是发射出的数据在哪个线程上使用。一般情况下会指定在主线程中运行这样就可以修改UI。 Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {subscriber.onNext(1);}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(integer integer);}});
上面代码会打印
integer 1timeout 如果原始 Observable 过了指定的一段时长没有发射任何数据timeout 操作符会以一个onError通知终止这个Observable或者继续执行一个备用的Observable。timeout有很多变体这里介绍其中的一种timeoutlongTimeUnitObservable 它在超时时会切换到使用一个你指定的备用的Observable而不是发送错误通知。它默认在computation调度器上执行。 Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i4;i){try {Thread.sleep(i*100);} catch (InterruptedException e) {e.printStackTrace();}subscriber.onNext(i);}subscriber.onCompleted();}}).timeout(200, TimeUnit.MILLISECONDS,Observable.just(10,11)).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(integer integer);}});
上面代码会打印
integer 0
integer 1
integer 10
integer 11catch catch操作符拦截原始Observable的onError通知将它替换为其他数据项或数据序列让产生的Observable能够正常终止或者根本不终止。RxJava将catch实现为以下 3个不同的操作符。 • onErrorReturnObservable遇到错误时返回原有Observable行为的备用Observable备用Observable会忽略原有Observable的onError调用不会将错误传递给观察者。作为替代它会发射一个特殊的项并调用观察者的onCompleted方法。 • onErrorResumeNextObservable遇到错误时返回原有Observable行为的备用Observable备用Observable会忽略原有Observable的onError调用不会将错误传递给观察者。作为替代它会发射备用Observable的数据。 • onExceptionResumeNext它和onErrorResumeNext类似。不同的是如果onError收到的Throwable不是一个Exception它会将错误传递给观察者的onError方法不会使用备用的Observable。 Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i5;i){if(i 2){subscriber.onError(new Throwable(throwable));}subscriber.onNext(i);}subscriber.onCompleted();}}).onErrorReturn(new Func1Throwable, Integer() {Overridepublic Integer call(Throwable throwable) {return 6;}}).subscribe(new ObserverInteger() {Overridepublic void onCompleted() {ILog.LogDebug(onCompleted ...);}Overridepublic void onError(Throwable e) {ILog.LogDebug(onError ...);}Overridepublic void onNext(Integer integer) {ILog.LogDebug(onNext ...integer);}});上面代码会打印
onNext ...0
onNext ...1
onNext ...2
onNext ...6
onCompleted ...Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i5;i){if(i 2){subscriber.onError(new Throwable(throwable));}subscriber.onNext(i);}subscriber.onCompleted();}}).onErrorResumeNext(new Func1Throwable, Observable? extends Integer() {Overridepublic Observable? extends Integer call(Throwable throwable) {return Observable.just(5);}}).subscribe(new ObserverInteger() {Overridepublic void onCompleted() {ILog.LogDebug(onCompleted ...);}Overridepublic void onError(Throwable e) {ILog.LogDebug(onError ...);}Overridepublic void onNext(Integer integer) {ILog.LogDebug(onNext ...integer);}});上面代码会打印
onNext ...0
onNext ...1
onNext ...2
onNext ...5
onCompleted ...Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i5;i){if(i 2){subscriber.onError(new Exception(throwable)); //注意这里是Exception}subscriber.onNext(i);}subscriber.onCompleted();}}).onExceptionResumeNext(Observable.just(5)).subscribe(new ObserverInteger() {Overridepublic void onCompleted() {ILog.LogDebug(onCompleted ...);}Overridepublic void onError(Throwable e) {ILog.LogDebug(onError ...);}Overridepublic void onNext(Integer integer) {ILog.LogDebug(onNext ...integer);}});上面代码会打印
onNext ...0
onNext ...1
onNext ...2
onNext ...5
onCompleted ...retry retry操作符不会将原始Observable的onError通知传递给观察者它会订阅这个Observable再给它一次机会无错误地完成其数据序列。retry总是传递onNext通知给观察者由于重新订阅这可能会造成数据项重复。RxJava 中的实现为retry和retryWhen。这里拿retrylong来举例它指定最多重新订阅的次数。如果次数超了它不会尝试再次订阅而会把最新的一个onError通知传递给自己的观察者。 Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {for(int i0;i5;i){if(i 1){subscriber.onError(new Throwable(error));}else{subscriber.onNext(i);}}subscriber.onCompleted();}}).retry(2).subscribe(new ObserverInteger() {Overridepublic void onCompleted() {ILog.LogDebug(onCompleted ...);}Overridepublic void onError(Throwable e) {ILog.LogDebug(onError ...);}Overridepublic void onNext(Integer integer) {ILog.LogDebug(onNext ...integer);}});
上面代码会打印
onNext ...0
onNext ...0
onNext ...0
onError ...all all操作符根据一个函数对源Observable发射的所有数据进行判断最终返回的结果就是这个判断结果。这个函数使用发射的数据作为参数内部判断所有的数据是否满足我们定义好的判断条件。如果全部都满足则返回true否则就返回false Observable.just(1,2,3,4).all(new Func1Integer, Boolean() {Overridepublic Boolean call(Integer integer) {return integer 2;}}).subscribe(new ObserverBoolean() {Overridepublic void onCompleted() {ILog.LogDebug(onCompleted);}Overridepublic void onError(Throwable e) {ILog.LogDebug(onError);}Overridepublic void onNext(Boolean aBoolean) {ILog.LogDebug(onNext aBoolean);}});
上面代码会打印
onNext true
onCompletedcontains contains 操作符用来判断源 Observable 所发射的数据是否包含某一个数据。如果包含该数据会返回true如果源Observable已经结束了却还没有发射这个数据则返回false。 Observable.just(1,2,3).contains(2).subscribe(new Action1Boolean() {Overridepublic void call(Boolean aBoolean) {ILog.LogDebug(contains boolean aBoolean);}});上面代码会打印
contains boolean trueisEmpty isEmpty操作符用来判断源 Observable 是否发射过数据。如果发射过该数据就会返回 false如果源Observable已经结束了却还没有发射这个数据则返回true。 Observable.just(1,2,3).isEmpty().subscribe(new Action1Boolean() {Overridepublic void call(Boolean aBoolean) {ILog.LogDebug(isEmpty boolean aBoolean);}});上面代码会打印
isEmpty boolean falseamb amb 操作符对于给定两个或多个 Observable它只发射首先发射数据或通知的那个Observable的所有数据。 Observable.amb(Observable.just(1,2,3).delay(2,TimeUnit.SECONDS),Observable.just(4,5)).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(amb integer);}});上面代码会打印
amb 4
amb 5defaultIfEmpty 发射来自原始Observable的数据。如果原始Observable没有发射数据就发射一个默认数据 Observable.create(new Observable.OnSubscribeInteger() {Overridepublic void call(Subscriber? super Integer subscriber) {subscriber.onCompleted();}}).defaultIfEmpty(2).subscribe(new Action1Integer() {Overridepublic void call(Integer integer) {ILog.LogDebug(defaultIfEmpty integer integer);}});上面代码会打印
defaultIfEmpty integer 2toList toList操作符将发射多项数据且为每一项数据调用onNext方法的Observable发射的多项数据组合成一个List然后调用一次onNext方法传递整个列表。 Observable.just(1,2,3).toList().subscribe(new Action1ListInteger() {Overridepublic void call(ListInteger integers) {ILog.LogDebug(toList() integers.toString());}});上面代码会打印
toList() [1, 2, 3]toSortedList toSortedList操作符类似于toList操作符不同的是它会对产生的列表排序默认是自然升序。如果发射的数据项没有实现Comparable接口会抛出一个异常。当然若发射的数据项没有实现Comparable接口可以使用toSortedListFunc2变体其传递的函数参数Func2会作用于比较两个数据项。 Observable.just(2,1,3,5).toSortedList().subscribe(new Action1ListInteger() {Overridepublic void call(ListInteger integers) {ILog.LogDebug(toSortedList() integers.toString());}});上面代码会打印
toSortedList() [1, 2, 3, 5]toMap toMap操作符收集原始Observable发射的所有数据项到一个Map默认是HashMap然后发射这个Map。你可以提供一个用于生成Map的key的函数也可以提供一个函数转换数据项到Map存储的值默认数据项本身就是值 SuperMan s1 new SuperMan(钢铁侠,AAA);SuperMan s2 new SuperMan(张三丰,SS);SuperMan s3 new SuperMan(美国队长,SSS);Observable.just(s1,s2,s3).toMap(new Func1SuperMan, String() {Overridepublic String call(SuperMan superMan) {return superMan.getLevel();}}).subscribe(new Action1MapString, SuperMan() {Overridepublic void call(MapString, SuperMan objectSuperManMap) {IteratorSuperMan iterator objectSuperManMap.values().iterator();while (iterator.hasNext()){ILog.LogDebug(toMap s iterator.next().getName());}}});上面代码会打印
toMap s 张三丰
toMap s 钢铁侠
toMap s 美国队长