本次源码分析是以 GitHub 上 RxJava 仓库 1.3.0版本来进行的分析,同时以官方中文文档作为参考。

上一篇中,主要写了两个重要的概念:Observable, Subject。 (Observer Subscriber 不做描写) 本篇中,我们着重来对比几个常用的操作符,并结合具体的环境给出例子。由于大部分的操作符都有多个参数类型,这里根据最基本的参数类型,解释对比其原理。

创建操作符

Empty Just(null)

  • Empty 创建一个不发射任何数据但是正常终止的 Observable.
  • Just(null) 返回一个发射 null 值的Observable.不是什么数据都不会发射

Timer Interval Delay

  • Timer 创建一个Observable,它在一个给定的延迟后发射一个特殊的值。
  • Interval 创建一个按固定时间间隔发射整数序列的Observable。
  • Delay 创建一个按固定时间间隔发射整数序列的Observable。

严格来说 Delay 不应该放在这里来说,毕竟不是创作操作符,不过由于在效果上类似 timer 这里就一并说了。

Observable.timer(3,TimeUnit.SECONDS).subscribe(aLong -> Log.i(ARIRUS, "timer subscribe"));

Observable.just(1).delay(3,TimeUnit.SECONDS).subscribe(aLong -> Log.i(ARIRUS, "delay subscribe"));

Observable.interval(3,TimeUnit.SECONDS).subscribe(aLong -> Log.i(ARIRUS, "interval subscribe"));

log:
16:28:12.022  I/ARIURS: timer subscribe
16:28:12.032  I/ARIURS: delay subscribe
16:28:12.032  I/ARIURS: interval subscribe
16:28:15.035  I/ARIURS: interval subscribe
16:28:18.038  I/ARIURS: interval subscribe

可以看到,timer 是延迟发射一个0值,而 delay 是将流中的某一个 item 延长发送到下游,二者最大的区别就在这里。interval 则是周期性的发送 item 从0开始。我们可以使用 interval 来写一个倒计时:

    Observable.interval(1, TimeUnit.SECONDS)
        .take(60)
        .map(aLong -> 59 - aLong)
        .subscribe(aLong -> Log.i(ARIRUS, "还剩:" + aLong));
log:
16:38:30.906  I/ARIURS: 还剩:59
16:38:31.907  I/ARIURS: 还剩:58
16:38:32.908  I/ARIURS: 还剩:57
...
16:39:27.912  I/ARIURS: 还剩:2
16:39:28.902  I/ARIURS: 还剩:1
16:39:29.903  I/ARIURS: 还剩:0

变换操作符

Buffer GroupBy Window

  • Buffer 定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
  • GroupBy 将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列。
  • Window 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据。

这三个操作符都是将原始数据分类处理,不过侧重点不一样:Buffer 按照原始数据顺序,将其打包成 List 发送到下游。GroupBy 则是根据发送数据进行分类,同一类的生成新的 GroupedObservable(也是一种 Observable),并将其发送到下游,这时数据流中数据不是 item 也不是 List ,而是GroupedObservable<分类依据, item>。Window 则是位于二者之间,也是每 n 个 item 一组发送出去,发送的不是 List ,而是 Observable

    Observable<Integer> observable = Observable.range(1,9);

    observable.buffer(3).subscribe(new Action1<List<Integer>>() {
      @Override
      public void call(List<Integer> list) {
        Log.i(ARIRUS, "buffer call: "+list);
      }
    });

    observable.groupBy(new Func1<Integer, Boolean>() {
      @Override
      public Boolean call(Integer integer) {
        return integer%2==0;
      }
    }).subscribe(booleanIntegerGroupedObservable -> {
      if (booleanIntegerGroupedObservable.getKey())
        booleanIntegerGroupedObservable.subscribe(integer -> Log.i(ARIRUS, "groupBy 偶数 call: "+integer));
      else
        booleanIntegerGroupedObservable.subscribe(integer -> Log.i(ARIRUS, "groupBy 奇数 call: "+integer));
    });

    observable.window(3).subscribe(new Action1<Observable<Integer>>() {
      @Override
      public void call(Observable<Integer> integerObservable) {
        Log.i(ARIRUS, "window call: "+ integerObservable);
        integerObservable.subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer integer) {
            Log.i(ARIRUS, "integerObservable call: "+integer);
          }
        });
      }
    });

log:
I/ARIURS: groupBy 奇数 call: 1
I/ARIURS: groupBy 偶数 call: 2
I/ARIURS: groupBy 奇数 call: 3
I/ARIURS: groupBy 偶数 call: 4
I/ARIURS: groupBy 奇数 call: 5
I/ARIURS: groupBy 偶数 call: 6
...
I/ARIURS: buffer call: [1, 2, 3]
I/ARIURS: buffer call: [4, 5, 6]
I/ARIURS: buffer call: [7, 8, 9]

I/ARIURS: window call: rx.subjects.UnicastSubject@3c6ccfb8
I/ARIURS: integerObservable call: 1
I/ARIURS: integerObservable call: 2
I/ARIURS: integerObservable call: 3
I/ARIURS: window call: rx.subjects.UnicastSubject@1eeb191
I/ARIURS: integerObservable call: 4
I/ARIURS: integerObservable call: 5
I/ARIURS: integerObservable call: 6
...

Map FlatMap

  • Map 原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射 这些结果的Observable。
  • FlatMap 使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这 个函数返回一个本身也发射数据的Observable,然后 FlatMap 合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

这两个操作符,应该是大家用的最早的两个操作符,区别在于返回值不同,map 返回的是转变后的 item 数据,而 flatmap 则是返回转变后 item 的 Observable。例子很多就不写了。

过滤操作符

ThrottleWithTimeout(Debounce) ThrottleFirst ThrottleLast(Sample)

  • ThrottleFirst 发射采样期间的第一项数据。
  • ThrottleLast 发射采样期间的最后一项数据。
  • ThrottleWithTimeout 采样期间如果当前数据之后时间段内没有另外的数据,则发射当前数据。

三者的区别不是很大,差不多就是上述所描述的那样。 Debounce 在自定义搜索框时十分有用。

    PublishSubject<CharSequence> publishSubject = PublishSubject.create();

    publishSubject.debounce(1,TimeUnit.SECONDS).subscribe(charSequence -> Log.i(ARIRUS, "EditText: "+charSequence));

    mEditText.addTextChangedListener(new TextWatcher() {
      @Override
      public void onTextChanged(CharSequence charSequence, int i, int i1, int i2) {
        publishSubject.onNext(charSequence);
      }

      ....
    });

这样的话,每次 edittext 的内容变化后,并且在1s内没有继续变化,则把其变化后的内容传输到下游。 ThrottleFirst 则可以用到按钮的消抖上面。

    publishSubject.throttleFirst(1,TimeUnit.SECONDS).subscribe(integer -> Log.i(ARIRUS,"Button: "));
    mButton.setOnClickListener(view -> publishSubject.onNext(1));

First Take(1)

  • First 只发射第一项(或者满足某个条件的第一项)数据。
  • Take 只发射前面的N项数据。

当 Take(1) 实质也是只发射第一项,但是二者最大的区别就是如果没有所谓的第一项,First 则会报错,原因是,First操作符是由 take(1).single() 实现的,single 操作符如果没有唯一的一个元素返回,就会抛出 NoSuchElementException , 而take本身不会抛出异常。

    publishSubject.first().subscribe(Actions.empty(),Throwable::printStackTrace,()-> Log.i(ARIRUS,
        "finish"));
    mButton.setOnClickListener(view -> {
        publishSubject.onCompleted();
    });

log:
    java.util.NoSuchElementException: Sequence contains no elements
***********************************************************************
    publishSubject.take(1).subscribe(Actions.empty(),Throwable::printStackTrace,()-> Log.i(ARIRUS,
        "finish"));
    mButton.setOnClickListener(view -> {
        publishSubject.onCompleted();
    });
log:
    finish

Subject 没有向下游发射任何数据,只是发射了完成的通知,因此对于 first 直接报错,而 take 获取不到需要的数据,收到了完成的通知。同理 last() 和 takeLast() 操作符。

结合操作符

CombineLatest Zip

  • CombineLatest 当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
  • Zip 通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

CombineLatest 操作符行为类似于 zip ,但是只有当原始的Observable中的每一个都发射了一条数据时 zip 才发射数据。 CombineLatest 则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时, CombineLatest 使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

    Observable<Long> longObservable1 = Observable.interval(1,TimeUnit.SECONDS).take(10);
    Observable<Long> longObservable2 = Observable.interval(3,TimeUnit.SECONDS).take(3);

    Observable.combineLatest(longObservable1,longObservable2,(aLong, aLong2) -> aLong+" "+aLong2)
        .subscribe(s -> Log.i(ARIRUS, "combineLatest: "+s));

    Observable.zip(longObservable1,longObservable2,(aLong, aLong2) -> aLong+" "+aLong2)
        .subscribe(s -> Log.i(ARIRUS, "zip: "+s));

log:
 I/ARIURS: combineLatest: 2 0
 I/ARIURS: combineLatest: 3 0
 I/ARIURS: combineLatest: 4 0
 I/ARIURS: combineLatest: 5 0
 I/ARIURS: combineLatest: 5 1
 I/ARIURS: combineLatest: 6 1
 I/ARIURS: combineLatest: 7 1
 I/ARIURS: combineLatest: 8 1
 I/ARIURS: combineLatest: 8 2
 I/ARIURS: combineLatest: 9 2

 I/ARIURS: zip: 0 0
 I/ARIURS: zip: 1 1
 I/ARIURS: zip: 2 2

这里有必要说明一下 combineLatest 没有 0 0 1 0 这是由于此时 longObservable2 尚没有发射数据,因此只有一个 Observable 发射数据依然是没有可能向下游发射数据的。同时 zip 遵循两个 Observable 同步的数据的原则,因此不会出现 X 0 这种。

Merge Concat

  • Merge 合并多个Observables的发射物,可能会让合并的Observables发射的数据交错
  • Concat 合并多个Observables的发射物,会让合并的Observables发射的数据保持原来的顺序

    Observable<String> longObservable1 = Observable.interval(1,TimeUnit.SECONDS).take(5).map(aLong -> "ob1 "+aLong);

    Observable<String> longObservable2 = Observable.interval(3,TimeUnit.SECONDS).take(3).map(aLong -> "ob2 "+aLong);

    Observable.concat(longObservable1,longObservable2)
    .subscribe(string -> Log.i(ARIRUS, "concat: "+string));
log:
14:28:14.149 I/ARIURS: concat: ob1 0
14:28:15.100 I/ARIURS: concat: ob1 1
14:28:16.101 I/ARIURS: concat: ob1 2
14:28:17.092 I/ARIURS: concat: ob1 3
14:28:18.093 I/ARIURS: concat: ob1 4
14:28:21.106 I/ARIURS: concat: ob2 0
14:28:24.109 I/ARIURS: concat: ob2 1
14:28:27.112 I/ARIURS: concat: ob2 2
*******************************************************************
    Observable.merge(longObservable1,longObservable2)
    .subscribe(string -> Log.i(ARIRUS,"merge: "+string));

log:
14:30:21.293 I/ARIURS: merge: ob1 0
14:30:22.294 I/ARIURS: merge: ob1 1
14:30:23.295 I/ARIURS: merge: ob1 2
14:30:23.295 I/ARIURS: merge: ob2 0
14:30:24.286 I/ARIURS: merge: ob1 3
14:30:25.287 I/ARIURS: merge: ob1 4
14:30:26.288 I/ARIURS: merge: ob2 1
14:30:29.311 I/ARIURS: merge: ob2 2

其他操作符

ToBlocking

  • ToBlocking 要将一个 Observable 转换为一个 BlockingObservable。

BlockingObservable 会阻塞等待直到Observable发射了想要的数据,然后返回 这个数据(而不是一个Observable)。 由于在 Android 上不支持,链式操作,因此 List.forEach() 操作符不支持,想要遍历一个 Array 还是需要 for 语句,而我们可以使用 ToBlocking 来解决这个问题:

void handleString(String string){//实现}
...
List<String> list = Arrays.asList("1", "s", "d", "43", "fg", "re");
Observable.from(list).toBlocking().subscribe(this::handleString);

PS:forEach 操作符在 BlockingObservable 和 Observable 都有,这里可以坐下对比:

    Observable<Long> timeObservable = Observable.interval(1, TimeUnit.SECONDS, Schedulers.io()).take(5);

    Log.i(ARIRUS, "onViewCreated: start blocking forEach");

    timeObservable.toBlocking()
        .forEach(s -> Log.i(ARIRUS,
            "toBlocking forEach: " + s + " " + Thread.currentThread().getName()));
    Log.i(ARIRUS, "onViewCreated: start blocking subscribe");

    timeObservable.toBlocking()
        .subscribe(s -> Log.i(ARIRUS,
            "toBlocking subscribe: " + s + " " + Thread.currentThread().getName()));
    Log.i(ARIRUS, "onViewCreated: start foreach");

    timeObservable.take(5)
        .forEach(s -> Log.i(ARIRUS, "forEach: " + s + " " + Thread.currentThread().getName()));
    Log.i(ARIRUS, "onViewCreated: end");

log:
16:10:40.796 I/ARIURS: start blocking forEach
16:10:41.797 I/ARIURS: toBlocking forEach: 0 RxIoScheduler-2
16:10:42.798 I/ARIURS: toBlocking forEach: 1 RxIoScheduler-2
...
16:10:45.800 I/ARIURS: onViewCreated: start blocking subscribe
16:10:46.811 I/ARIURS: toBlocking subscribe: 0 main
16:10:47.802 I/ARIURS: toBlocking subscribe: 1 main
...
16:10:50.855 I/ARIURS: onViewCreated: start foreach
16:10:50.855 I/ARIURS: onViewCreated: end
16:10:51.856 I/ARIURS: forEach: 0 RxIoScheduler-2
16:10:52.857 I/ARIURS: forEach: 1 RxIoScheduler-2
...

这里可以看出 Observable.forEach() 是非阻塞的,BlockingObservable.forEach() 则是阻塞的。但是有一点我不是很明白,为什么 toBlocking.subscribe() 使得线程切换到了主线程。。

RepeatWhen RetryWhen

RepeatWhen 和 RetryWhen 分别是 Repeat 与 Retry 的条件形式,就是说在条件满足的情况下才会进行重复/重试。二者主要的区别在于触发条件:

repeat() resubscribes when it receives onCompleted().

retry() resubscribes when it receives onError().

因此对于 RepeatWhen RetryWhen 两个操作符的参数分别是

Func1<? super Observable<? extends Void>, ? extends Observable<?>>
Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> 

因为 onCompleted 没有参数,因此 Func1 的第一个形参是 Observable<? extends Void>,但是为什么是 Observable ?因为,其实是将每次 complete 通知,组成一个流,流上的数据就是一个个 complete 通知,并将每一个通知经过 handler 观察,决定是否继续重复,因此参数的形参名叫做 notificationHandler ,很直观了。同理 RetryWhen。还有一点需要注意:返回的 Observable 与 传输来的 Observable 必须是同一个,即链式规则不能打断。不然是无效的。

    Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
          @Override
          public Observable<?> call(Observable<? extends Void> observable) {
            return observable.take(3);
          }
        })
        .subscribe(aLong -> Log.i(ARIRUS, "onViewCreated: " + aLong));

这里就是重复执行2次,带上本身总会执行一次,总共执行3次。

    static int i = 1;
    Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
          @Override
          public Observable<?> call(Observable<? extends Void> observable) {
            return observable.flatMap(new Func1<Void, Observable<?>>() {
              @Override
              public Observable<?> call(Void aVoid) {
                if (i<3){
                  i++;
                  return Observable.just(1);
                }
                else
                  return Observable.error(new IOException());
              }
            });
          }
        })
        .subscribe(aLong -> Log.i(ARIRUS, "onViewCreated: " + aLong));

这样我们就可以根据当前情况来进行判断,是否需要重复执行。当参数 Observable 调用了 onComplete 或者 onError ,便会结束重复。还有一个小坑需要注意一下,这里调用的是 observable.flatMap() ,而非 observable.map(), 因为如果调用 map 返回 new IOException() ,相当于调用了 Observable.onNext() 方法,只有 flatMap 才是正确的写法。 使用 retryWhen 可以在失败时进行重新请求:

    Observable.timer(3, TimeUnit.SECONDS)
        .repeatWhen(attempts -> attempts.zipWith(Observable.range(1, 3), (n, i) -> i)
            .flatMap(i -> Observable.timer(i, TimeUnit.SECONDS)))
        .subscribe(aLong -> Log.i(ARIRUS, "timer: "));

ObserveOn SubscribeOn

  • ObserveOn 指定一个观察者在哪个调度器上观察这个Observable,事件消费的线程
  • SubscribeOn 指定Observable自身在哪个调度器上执行,事件产生的线程

简单理解的区别就是,SubscribeOn 是全局切换线程, ObserveOn 是局部切换线程。

    Observable.just(1)
        .subscribe(integer -> Log.i(ARIRUS, "normal: " + Thread.currentThread().getName()));

    Observable.just(1)
        .map(integer -> {
          Log.i(ARIRUS, "subscribeOn map: "+ Thread.currentThread().getName());
          return  ""+i;
        })
        .doOnNext(integer -> Log.i(ARIRUS, "doOnNext subscribeOn: "+ Thread.currentThread().getName()))
        .subscribeOn(Schedulers.io())
        .subscribe(integer -> Log.i(ARIRUS, "subscribeOn: " + Thread.currentThread().getName()));

    Observable.just(1)
        .map(integer -> {
          Log.i(ARIRUS, "subscribeOn map: "+ Thread.currentThread().getName());
          return  ""+i;
        })
        .doOnNext(integer -> Log.i(ARIRUS, "doOnNext observeOn: "+ Thread.currentThread().getName()))
        .observeOn(Schedulers.io())
        .subscribe(integer -> Log.i(ARIRUS, "observeOn: " + Thread.currentThread().getName()));

log:
I/ARIURS: normal: main

I/ARIURS: subscribeOn map: RxIoScheduler-2
I/ARIURS: doOnNext subscribeOn: RxIoScheduler-2
I/ARIURS: subscribeOn: RxIoScheduler-2

I/ARIURS: subscribeOn map: main
I/ARIURS: doOnNext observeOn: main
I/ARIURS: observeOn: RxIoScheduler-3

我们使用第一条作为参照,开始线程为 main 第二条在 IO 线程上进行订阅,就是说从 item 发射,到最后的观察,全部是在 IO 线程上进行的。第三条,仅在最后观察的时候调用了 observeOn 方法,因此只是最后的观察是在 IO 线程上进行的,之前的操作 map doOnNext 依然是在 main 线程上进行的。

    Observable.just(1)
        .subscribeOn(Schedulers.computation())
        .map(integer -> {
          Log.i(ARIRUS, "subscribeOn map: "+ Thread.currentThread().getName());
          return  ""+i;
        })
        .subscribeOn(Schedulers.newThread())
        .doOnNext(integer -> Log.i(ARIRUS, "doOnNext subscribeOn: "+ Thread.currentThread().getName()))
        .subscribeOn(Schedulers.io())
        .subscribe(integer -> Log.i(ARIRUS, "subscribeOn: " + Thread.currentThread().getName()));

    Observable.just(1)
        .observeOn(Schedulers.computation())
        .map(integer -> {
          Log.i(ARIRUS, "observeOn map: "+ Thread.currentThread().getName());
          return  ""+i;
        })
        .observeOn(Schedulers.newThread())
        .doOnNext(integer -> Log.i(ARIRUS, "doOnNext observeOn: "+ Thread.currentThread().getName()))
        .observeOn(Schedulers.io())
        .subscribe(integer -> Log.i(ARIRUS, "observeOn: " + Thread.currentThread().getName()));

log:
I/ARIURS: subscribeOn map: RxComputationScheduler-1
I/ARIURS: doOnNext subscribeOn: RxComputationScheduler-1
I/ARIURS: subscribeOn: RxComputationScheduler-1

I/ARIURS: observeOn map: RxComputationScheduler-2
I/ARIURS: doOnNext observeOn: RxNewThreadScheduler-2
I/ARIURS: observeOn: RxIoScheduler-3

这两条分别是在每个操作符之前分别多次调用了 subscribeOn 与 observeOn ,可以看到 subscribeOn 只有在第一次调用时其起作用,之后调用是不会改变其订阅线程的,而 observeOn 则是每次调用时,都会起作用。

多次调用 subscribeOn 仅在调用 doOnSubscribe 情况下才有用:

    Observable.just(1)
        .subscribeOn(Schedulers.computation())
        .map(integer -> {
          Log.i(ARIRUS, "subscribeOn map: "+ Thread.currentThread().getName());
          return  ""+i;
        })
        .doOnSubscribe(()-> Log.i(ARIRUS, "doOnSubscribe 1: " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.newThread())
        .doOnNext(integer -> Log.i(ARIRUS, "doOnNext subscribeOn: "+ Thread.currentThread().getName()))
        .doOnSubscribe(()-> Log.i(ARIRUS, "doOnSubscribe 2: " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.io())
        .subscribe(integer -> Log.i(ARIRUS, "subscribeOn: " + Thread.currentThread().getName()));

log:
I/ARIURS: doOnSubscribe 2: RxIoScheduler-2
I/ARIURS: doOnSubscribe 1: RxNewThreadScheduler-1
I/ARIURS: subscribeOn map: RxComputationScheduler-1
I/ARIURS: doOnNext subscribeOn: RxComputationScheduler-1
I/ARIURS: subscribeOn: RxComputationScheduler-1

可以看到调用 doOnSubscribe ,如果后面还有调用 subscribeOn 则 doOnSubscribe 工作在此线程。因此多次调用 subscribeOn 可以改变开始订阅的线程,不过整体的事件的生产线程不变依然还是只有第一个 subscribeOn 线程有效。

结尾

本篇中,我们对比并列举了一些常用的操作符,并且针对某些操作符给出了一些 demo。下一篇,我们将会继续进行操作符的进阶分析,并从源码的角度分析操作符的原理。

results matching ""

    No results matching ""