本文参考RxJava2 只看这一篇文章就够了,强烈推荐大家去看一下。
RxJava的组成
- 被观察者-------Observable
- 观察者-----------Observer
- 订阅---------------subscribe
1 | //被观察者 |
创建操作符
- create()
1 | //创建一个被观察者 |
- just() --------发送事件不可以超过10个
1 | Observable.just(1,2,3,4,5,6,7,8,9,0).subscribe(new Consumer<Integer>() { |
- From 操作符
- fromArray() ----- 可以发送数组(数量可以大于10个)
1 | Integer integers[] = {0,1,2,3,4,5}; |
- fromCallable() -----被观察者返回一个结果值给观察者
1 | Observable.fromCallable(new Callable<String>() { |
- fromIterable() ---------可以发送一个List集合给观察者
1 | List<String>list = new ArrayList<>(); |
- fromFuture() ------- 可以发送一个Future
1 | FutureTask<String>futureTask = new FutureTask<>(new Callable<String>() { |
- defer() -------------- 只有观察者订阅时,才会创建新的被观察者
1 | String str_name = "张三"; |
- timer() -------------当到了指定时间就发送一个0L的值给观察者
1 | Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { |
- interval() ------------ 每隔一段时间就会发送一个事件(从0开始不断增加1的数字)
1 | Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { |
- intervalRange() ------------ 可以指定发送事件的开始值,数量,其他的和interval()一样
1 | //start:起始数值 -------- 10 |
- range() ----------- 发送一定范围内的事件
1 | //从10开始,执行3次 |
- rangeLong() ----和range()方法类似,只是数据类型为Long
1 | //从10开始,执行3次 |
- empty() & never() & error()
- empty() --------------- 直接发送 onComplete() 事件
1 | //只会进入 onSubscribe(),onComplete()方法 |
- never() ---------------- 不发生任何时间
1 | //只会进入 onSubscribe()方法 |
- error() -------------------发送onError()事件
1 | //只会进入 onSubscribe(),onError()方法 |
转换操作符
- map() -------------- 将被观察者发送的数据类型转换成其他的类型
1 | Observable.just(1,2,3,4,5).map(new Function<Integer, String>() { |
- flatMap() ----------------- 作用于map() 方法类似,返回一个 新的Observerable (无序: flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序 )
1 | Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<?>>() { |
- concatMap() ------------ 作用和flatMap() 方法一样(有序:concatMap()转发事件的顺序是有序的)
1 | Observable.just(1, 2, 3, 4, 5).concatMap(new Function<Integer, ObservableSource<?>>() { |
- buffer() ------------- 从需要发送的事件中获取一定数量的事件,将这些事件存放到缓冲区中一并发出
1 | //count: 缓冲区元素的数量 |
- groupBy() ---------------- 将发送的数据进行分组,每个分组都会返回一个被观察者
1 | Observable.just(1, 2, 3, 4, 5, 6, 7) |
- scan() ---------------- 将数据按照一定的逻辑合并数据
1 | Observable.just(1, 2, 3, 4, 5, 6, 7) |
- window() --------------将发送数据 按指定数量进行分组
1 | Observable.just(1, 2, 3, 4, 5, 6, 7) |
组合操作符
- zip() -------------- 将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
1 | Observable.zip( |
- concat() -------------- 将多个观察者组合在一起,然后按照之前的发送顺序发送事件,最多只能合并4个被观察者
1 | Observable.concat( |
- concatArray() ------------ 作用和concat()方法一样,可以发送多于4个的被观察者
1 | Observable.concatArray( |
- merge() -------------- 作用和concat() 方法一样,只不过concat()是串行发送,而merge() 是并行发送事件
1 | Observable.merge( |
- combineLatest() ------------ 作用和zip()类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送。
1 | Observable.combineLatest( |
- concatArrayDelayError() & mergeArrayDelayError()& combineLatestDelayError() -------------- 如果有一个被观察者发送了一个Error事件,那么就结束发送,如果你想将Error() 事件延迟到所有被观察者都发送完事件后再执行。
1 | Observable.concatArrayDelayError( |
1 | Observable.mergeArrayDelayError( |
1 | Observable<String> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() { |
- reduce() ------------ 将所有数据聚合在一起才会发送事件给观察者
1 | Observable.just(1,2,3,4,5,6,7,8) |
- collect() ------------ 将数据收集到数据结构当中。
1 | Observable.just(1,2,3,4,5,6,7,8) |
- startWith() & startWithArray() ------------ 在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出
1 | Observable.just(6, 7, 8) |
- count() ------------ 返回被观察者发送事件的数量
1 | Observable.just(1, 2, 3, 4, 5, 6, 7, 8) |
功能操作符
- delay() -------------- 延迟一段事件发送事件
1 | Observable.just(1, 2, 3, 4) |
- doOnEach() ---------------- Observable 每发送一个之前都会先回调这个方法
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doOnNext() ----------------- Observable 每发送 onNext() 之前都会先回调这个方法
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doAfterNext() -------------- Observable 每发送 onNext() 之后都会回调这个方法
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doOnComplete ------------------ Observable 每发送 onComplete() 之前都会回调这个方法
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doOnError() ---------------- Observable 每发送 onError() 之前都会回调这个方法
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doOnSubscribe() ----------------- Observable 每发送 onSubscribe() 之前都会回调这个方法
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doOnDispose() -------------- 当调用 Disposable 的取消订阅dispose()方法之后回调该方法
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doOnLifecycle() ------------- 在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doOnTerminate() & doAfterTerminate() --------------- doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。如果取消订阅之后 doAfterTerminate() 就不会被回调
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- doFinally() ------------- 在所有事件发送完毕之后回调该方法,doFinally() 在取消订阅后也都会被回调,且都会在事件序列的最后。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- onErrorReturn() ------------------- 当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- onErrorResumeNext() ----------- 当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- onExceptionResumeNext() -------------- 与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- retry() ----------------- 如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- retryUntil() --------------- 出现错误事件之后,可以通过此方法判断是否继续发送事件。
1 | final int[] i = {0}; |
- retryWhen() ---------------- 当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- repeat() -------------- 重复发送被观察者的事件,times 为发送次数
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- repeatWhen() ------------------ 这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- subscribeOn() ------------ 指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- observeOn() ----------------- 指定观察者的线程,每指定一次就会生效一次
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
过滤操作符
- filter() -------------- 通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送
1 | Observable.just(1, 2, 3, 4, 5, 6) |
- ofType() ----------------- 可以过滤不符合该类型事件
1 | Observable.just("one","two","three",1,2,3) |
- skip() & skipLast() --------------- 跳过正序某些事件,count 代表跳过事件的数量
1 | Observable.just(1,2,3,4,5,6) |
- distinct() ------------ 过滤事件序列中的重复事件
1 | Observable.just(1,2,3,4,4,3,2,1) |
- distinctUntilChanged() ---------------- 过滤掉连续重复的事件
1 | Observable.just(1,2,3,4,4,3,2,1) |
- take() & takeLast() --------------------- 控制观察者接收的事件的数量
1 | Observable.just(1,2,3,4,5,6,7,8,9) |
- debounce() ----------------- 如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- firstElement() && lastElement() ----------------- firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素
1 | Observable.just(1, 2, 3, 4) |
- elementAt() & elementAtOrError() -------------- elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError()
1 | Observable.just(1, 2, 3, 4) |
条件操作符
- all() ---------------- 判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false
1 | Observable.just(1, 2, 3, 4) |
- takeWhile() ------------- 可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送
1 | Observable.just(1, 2, 3, 4, 3, 2) |
- skipWhile() ---------- 可以设置条件,当某个数据满足条件时不发送该数据,反之则发送
1 | Observable.just(1, 2, 3, 4, 3, 2) |
- takeUntil() -------------- 可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了
1 | Observable.just(1, 2, 3, 4, 3, 2) |
- skipUntil() ------------ 当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者
1 | Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) |
- sequenceEqual() --------------- 判断两个 Observable 发送的事件是否相同
1 | Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3)) |
- contains() ------------- 判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false
1 | Observable.just(1,2,3,4,5) |
- isEmpty() --------------- 判断事件序列是否为空 ( true :空 )
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
- amb() --------------- amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃
1 | List<Observable<Long>> list = new ArrayList<>(); |
- defaultIfEmpty() -------------- 如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |