kotlin练习 - 协程

GlobalScope构造函数

  • launch : 创建协程
  • async : 创建带返回值的协程,返回 Deferred
  • withContext : 不会创建新的协程,在指定协程上运行代码块
  • runBlocking : 不是GlobalScope的API,可以单独使用, runBlocking里面的delay()会堵塞当前线程,launch等不会堵塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main(arg: Array<String>) {
GlobalScope.launch(Dispatchers.Main) {
LogUtils.d("当前线程main: ${Thread.currentThread().name} ")
withContextTest()
}
//newSingleThreadContext 单线程
//newFixedThreadPoolContext 线程池
val singleThreadContext = newSingleThreadContext("single")
GlobalScope.launch(singleThreadContext) {
LogUtils.d("当前线程_单线程: ${Thread.currentThread().name} ")//single
}
val fixedThreadPoolContext = newFixedThreadPoolContext(1, "fixed")
GlobalScope.launch(fixedThreadPoolContext) {
LogUtils.d("当前线程_线程池: ${Thread.currentThread().name} ")//fixed
}
}

suspend fun withContextTest() {
withContext(Dispatchers.IO) {
LogUtils.d("当前线程io: ${Thread.currentThread().name} ")
}
}

CoroutineContext协程运行的线程调度器

  • Dispatchers.Default : 默认(如果不写,默认就是Dispatchers.Default模式)
  • Dispatchers.IO : IO线程
  • Dispatchers.Main : 主线程
  • Dispatchers.Unconfined : 没指定,就是在当前线程

CoroutineStart 启动模式

  • CoroutineStart.DEFAULT:默认(如果不写,默认就是CoroutineStart.DEFAULT模式)
  • CoroutineStart.ATOMIC:自动(协程在开始执行之前不能被取消)
  • CoroutineStart.UNDISPATCHED:立即执行协程
  • CoroutineStart.LAZY:懒加载

Job方法

  • job.start() : 启动协程,除了 lazy 模式,协程都不需要手动启动

  • job.join() : 等待协程执行完毕后再执行后面的代码块

  • job.cancel() : 取消一个协程
    协程的取消有些特质,因为协程内部可以在创建协程的,这样的协程组织关系可以称为父协程,子协程:

    1. 父协程手动调用 cancel() 或者异常结束,会立即取消它的所有子协程
    2. 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成
    3. 子协程抛出未捕获的异常时,默认情况下会取消其父协程
  • job.cancelAndJoin() : 等待协程取消完毕后再执行后面的代码块

  • job.isActive : true - 处于活动状态

  • job.isCancelled : true - 已完成

  • job.isCompleted : true - 已取消

第一个协程程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

fun main(arg: Array<String>) {
GlobalScope.launch {//在后台启动一个新的协程并继续
delay(2000L)//非阻塞的等待 2 秒钟(默认时间单位是毫秒)
println("我是kotlin")
}
println("你好,")//协程已在等待时主线程还在继续
Thread.sleep(3000L)// 阻塞主线程 3 秒钟来保证 JVM 存活
}
println("运行了吗?")
}

桥接阻塞与非阻塞的世界

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

import kotlinx.coroutines.*

fun main(arg: Array<String>) {
GlobalScope.launch {
//在后台启动新的协程并继续
delay(2000L)//非阻塞的等待 2 秒钟(默认时间单位是毫秒)
println("当前线程: ${Thread.currentThread().name} ,我是kotlin")
}
println("当前线程: ${Thread.currentThread().name} ,你好,")//协程已在等待时主线程还在继续
runBlocking {
//主线程
delay(3000L)//延迟 3 秒来保证 JVM 的存活
println("当前线程: ${Thread.currentThread().name} ,运行了吗?")
}
}
1
2
3
4
5
6
7
8
9
10
11
12
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
//启动主协程
GlobalScope.launch {
delay(2000L)//非阻塞的等待 2 秒钟(默认时间单位是毫秒)
println("当前线程: ${Thread.currentThread().name} ,我是kotlin")
}
println("当前线程: ${Thread.currentThread().name} ,你好,")//主协同程序在此处继续
delay(3000L)// //延迟 3 秒以保持JVM活动
println("当前线程: ${Thread.currentThread().name} ,运行了吗?")
}

等待工作

1
2
3
4
5
6
7
8
9
10
11
12
13

import kotlinx.coroutines.*

fun main() = runBlocking {

var job = GlobalScope.launch {
delay(2000L)//非阻塞的等待 2 秒钟(默认时间单位是毫秒)
println("当前线程: ${Thread.currentThread().name} ,我是kotlin")
}
println("当前线程: ${Thread.currentThread().name} ,你好,")//主协同程序在此处继续
job.join()//等到子协程完成后进行
println("当前线程: ${Thread.currentThread().name} ,运行了吗?")
}

结构化的并发

1
2
3
4
5
6
7
8
9
10

import kotlinx.coroutines.*

fun main() = runBlocking {
launch {
delay(2000L)//非阻塞的等待 2 秒钟(默认时间单位是毫秒)
println("当前线程: ${Thread.currentThread().name} ,我是kotlin")
}
println("当前线程: ${Thread.currentThread().name} ,你好,")//主协同程序在此处继续
}

范围构建器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

import kotlinx.coroutines.*

fun main() = runBlocking {
launch {
delay(200L)
println("当前线程: ${Thread.currentThread().name} ,我是Kotlin")
}
coroutineScope {
launch {
delay(500L)
println("当前线程: ${Thread.currentThread().name} ,你知道吗")
}
delay(100L)
println("当前线程: ${Thread.currentThread().name} ,运行了吗")
}
println("当前线程: ${Thread.currentThread().name} ,你好,")
}

提取函数重构

1
2
3
4
5
6
7
8
9
10
11
12
13
import kotlinx.coroutines.*

fun main() = runBlocking {
launch {
doWorld()
}
println("当前线程: ${Thread.currentThread().name} ,你好,")
}

suspend fun doWorld() {
delay(2000L)
println("当前线程: ${Thread.currentThread().name} ,我是Kotlin")
}

协同程序重量轻

1
2
3
4
5
6
7
8
9
10
import kotlinx.coroutines.*

fun main() = runBlocking {
repeat(100_000){
launch {
delay(1000L)
println(".")
}
}
}

全局协程像守护线程

1
2
3
4
5
6
7
8
9
10
11
12

import kotlinx.coroutines.*

fun main() = runBlocking {
GlobalScope.launch {
repeat(1000) {
println("当前是第${(it + 1)}个线程")
delay(500L)
}
}
delay(1300L)
}

协程请求网络数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MainActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
coroutine.setOnClickListener { click() }
}

private fun click() = runBlocking {
GlobalScope.launch(Dispatchers.Main) {
coroutine.text = GlobalScope.async(Dispatchers.IO) {
// 比如进行了网络请求
// 放回了请求后的结构
return@async "main"
}.await()
}
}
}

本文参考RxJava2 只看这一篇文章就够了,强烈推荐大家去看一下。

RxJava的组成

  • 被观察者-------Observable
  • 观察者-----------Observer
  • 订阅---------------subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("a");
emitter.onNext("b");
emitter.onNext("c");
emitter.onComplete();
}
});
//观察者
Observer observer = new Observer<String>() {

@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
LogUtils.e(s);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};
//订阅
observable.subscribe(observer);

创建操作符

  • create()
1
2
3
4
5
6
7
8
//创建一个被观察者
Observable<String>observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello Java");
emitter.onComplete();
}
});
  • just() --------发送事件不可以超过10个
1
2
3
4
5
6
Observable.just(1,2,3,4,5,6,7,8,9,0).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e(integer);
}
});
  • From 操作符
  1. fromArray() ----- 可以发送数组(数量可以大于10个)
1
2
3
4
5
6
7
Integer integers[] = {0,1,2,3,4,5};
Observable.fromArray(integers).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e(integer);
}
});
  1. fromCallable() -----被观察者返回一个结果值给观察者
1
2
3
4
5
6
7
8
9
10
11
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "hello";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
  1. fromIterable() ---------可以发送一个List集合给观察者
1
2
3
4
5
6
7
8
9
10
11
12
List<String>list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Observable.fromIterable(list).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
  1. fromFuture() ------- 可以发送一个Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
FutureTask<String>futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello";
}
});
//doOnSubscribe()----- 开始订阅时才会执行
Observable.fromFuture(futureTask).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
futureTask.run();//开始执行
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
  • defer() -------------- 只有观察者订阅时,才会创建新的被观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
String str_name = "张三";
public void rxjava() {
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(str_name);
}
});
str_name = "李四";
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {
LogUtils.e("str_name = " + s);//王五,赵六
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};
str_name = "王五";
observable.subscribe(observer);
str_name = "赵六";
observable.subscribe(observer);
}
  • timer() -------------当到了指定时间就发送一个0L的值给观察者
1
2
3
4
5
6
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.e("along : "+aLong);//along : 0
}
});
  • interval() ------------ 每隔一段时间就会发送一个事件(从0开始不断增加1的数字)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 0, along : 1
}
});

//延迟5s后开始执行
Observable.interval(5,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 0, along : 1
}
});
  • intervalRange() ------------ 可以指定发送事件的开始值,数量,其他的和interval()一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//start:起始数值 -------- 10
//count:发射数量 -------- 3
//initialDelay:延迟执行时间-------- 5s
//period:发射周期时间------2s
//unit:时间单位
//数字从10开始,传递3次,第一次执行延迟5s,每隔2s执行一次
Observable.intervalRange(10,3,5,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 10, along : 11, along : 12
}
});

//start:起始数值 -------- 10
//count:发射数量 -------- 3
//initialDelay:延迟执行时间-------- 5s
//period:发射周期时间------2s
//unit:时间单位
//Scheduler:线程调度
//数字从10开始,传递3次,第一次执行延迟5s,每隔2s执行一次,在新线程中执行
Observable.intervalRange(10,3,5,2, TimeUnit.SECONDS, Schedulers.newThread()).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.e("当前线程 :"+Thread.currentThread().getName());//RxNewThreadScheduler
//每隔2s 执行一次 along : 从0开始每次回增加1
LogUtils.e("along : "+aLong);//along : 10, along : 11, along : 12
}
});

  • range() ----------- 发送一定范围内的事件
1
2
3
4
5
6
7
//从10开始,执行3次
Observable.range(10,3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("integer : "+integer);//10,11,12
}
});
  • rangeLong() ----和range()方法类似,只是数据类型为Long
1
2
3
4
5
6
7
//从10开始,执行3次
Observable.rangeLong(10,3).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.e("along : "+aLong);//10,11,12
}
});
  • empty() & never() & error()
  1. empty() --------------- 直接发送 onComplete() 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//只会进入 onSubscribe(),onComplete()方法
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.e("==================onSubscribe");
}
@Override
public void onNext(Object o) {
LogUtils.e("==================onNext");
}
@Override
public void onError(Throwable e) {
LogUtils.e("==================onError " + e);
}
@Override
public void onComplete() {
LogUtils.e("==================onComplete");
}
});
  1. never() ---------------- 不发生任何时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//只会进入 onSubscribe()方法
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.e("==================onSubscribe");
}
@Override
public void onNext(Object o) {
LogUtils.e("==================onNext");
}
@Override
public void onError(Throwable e) {
LogUtils.e("==================onError " + e);
}
@Override
public void onComplete() {
LogUtils.e("==================onComplete");
}
});
  1. error() -------------------发送onError()事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//只会进入 onSubscribe(),onError()方法
Observable.error(new NullPointerException()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.e("==================onSubscribe");
}
@Override
public void onNext(Object o) {
LogUtils.e("==================onNext");
}
@Override
public void onError(Throwable e) {
LogUtils.e("==================onError " + e);
}
@Override
public void onComplete() {
LogUtils.e("==================onComplete");
}
});

转换操作符

  • map() -------------- 将被观察者发送的数据类型转换成其他的类型
1
2
3
4
5
6
7
8
9
10
11
Observable.just(1,2,3,4,5).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "当前值为:"+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);
}
});
  • flatMap() ----------------- 作用于map() 方法类似,返回一个 新的Observerable (无序: flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add("当前值为:"+integer);
if (integer == 3) {
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//延迟10毫秒
} else {
return Observable.fromIterable(list);
}
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
LogUtils.e(o.toString());//当前值为:1,2,4,5,3 ------无序的
}
});
  • concatMap() ------------ 作用和flatMap() 方法一样(有序:concatMap()转发事件的顺序是有序的)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just(1, 2, 3, 4, 5).concatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add("当前值为:"+integer);
if (integer == 3) {
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//延迟10毫秒
} else {
return Observable.fromIterable(list);
}
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
LogUtils.e(o.toString());//当前值为:1,2,3,4,5, ------有序的
}
});
  • buffer() ------------- 从需要发送的事件中获取一定数量的事件,将这些事件存放到缓冲区中一并发出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//count: 缓冲区元素的数量
//skip: 就代表缓冲区满了之后,发送下一次事件的时候要跳过的元素数量
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
LogUtils.e("缓冲区大小: " + integers.size());
String str = "";
for (int i = 0; i < integers.size(); i++) {
str = str + "," + integers.get(i);
}
LogUtils.e("当前元素: " + str);
}
});
  • groupBy() ---------------- 将发送的数据进行分组,每个分组都会返回一个被观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.just(1, 2, 3, 4, 5, 6, 7)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 2;//分为2组
}
})
.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
LogUtils.e(" 第" + integerIntegerGroupedObservable.getKey()+"组");
integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("第" + integerIntegerGroupedObservable.getKey() + "组,当前元素: " + integer);
}
});
}
});
  • scan() ---------------- 将数据按照一定的逻辑合并数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.just(1, 2, 3, 4, 5, 6, 7)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer1, Integer integer2) throws Exception {
LogUtils.e("integer1 = "+integer1);//上一次的结果
LogUtils.e("integer2 = "+integer2);
return integer1 +integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("integer1 + integer2 ="+integer);
}
});
  • window() --------------将发送数据 按指定数量进行分组
1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.just(1, 2, 3, 4, 5, 6, 7)
.window(3)
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("当前元素: " + integer);
}
});
}
});

组合操作符

  • zip() -------------- 将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable.zip(
Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
LogUtils.d("A 发送的事件: " + s1);
return s1;
}
}), Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
LogUtils.d("B 发送的事件: " + s2);
return s2;
}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
LogUtils.d("A & B 发送的事件: " + res);
return res;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.d( "onNext: " + s);
}
});
  • concat() -------------- 将多个观察者组合在一起,然后按照之前的发送顺序发送事件,最多只能合并4个被观察者
1
2
3
4
5
6
7
8
9
10
11
Observable.concat(
Observable.just(1,2),
Observable.just(3,4),
Observable.just(5,6),
Observable.just(7,8)
).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("当前数字:"+integer);//1,2,3,4,5,6,7,8
}
});
  • concatArray() ------------ 作用和concat()方法一样,可以发送多于4个的被观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.concatArray(
Observable.just(1,2),
Observable.just(3,4),
Observable.just(5,6),
Observable.just(7,8),
Observable.just(9,10),
Observable.just(11,12)
).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.e("当前数字:"+integer);//1,2,3,4,5,6,7,8,9,10,11,12
}
});
  • merge() -------------- 作用和concat() 方法一样,只不过concat()是串行发送,而merge() 是并行发送事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.merge(
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long,String>() {
@Override
public String apply(Long aLong) throws Exception {
return "A"+aLong;
}
}),
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long,String>() {
@Override
public String apply(Long aLong) throws Exception {
return "B"+aLong;
}
})
).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.e(s);//A,B 交替出现---------- A0,B0,A1,B1,A2,B2
}
});
  • combineLatest() ------------ 作用和zip()类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable.combineLatest(
Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
LogUtils.d("A 发送的事件: " + s1);
return s1;
}
}), Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
LogUtils.d("B 发送的事件: " + s2);
return s2;
}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
LogUtils.d("A & B 发送的事件: " + res);
return res;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.d( "onNext: " + s);
}
});
  • concatArrayDelayError() & mergeArrayDelayError()& combineLatestDelayError() -------------- 如果有一个被观察者发送了一个Error事件,那么就结束发送,如果你想将Error() 事件延迟到所有被观察者都发送完事件后再执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable.concatArrayDelayError(
Observable.just(1,2),
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(3);
e.onError(new NullPointerException());
e.onNext(4);
}
}),
Observable.just(5,6)
).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer integer) {
LogUtils.e("onNext : 当前数字:"+integer);//1,2,3,5,6
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError : "+e.toString());//java.lang.NullPointerException
}

@Override
public void onComplete() {

}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Observable.mergeArrayDelayError(
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return "A" + aLong;
}
}),
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("C1");
e.onNext("C2");
e.onNext("C3");
e.onError(new NullPointerException());
e.onNext("C4");
e.onNext("C5");
}
}),
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return "B" + aLong;
}
})
).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) { }
@Override
public void onNext(String s) {
LogUtils.e(s);
}
@Override
public void onError(Throwable e) {
LogUtils.e(e.toString());
}
@Override
public void onComplete() { }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Observable<String> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
LogUtils.d("A 发送的事件: " + s1);
return s1;
}
});
Observable<String> observable2 = Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
LogUtils.d("B 发送的事件: " + s2);
return s2;
}
});
Observable<String> observable3 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("C1");
e.onNext("C2");
e.onNext("C3");
e.onError(new NullPointerException());
e.onNext("C4");
e.onNext("C5");
}
});
Observable.combineLatestDelayError(new ObservableSource[]{observable1, observable2, observable3}, new Function() {
@Override
public Object apply(Object o) throws Exception {
Object[] objects = (Object[]) o;
String res = "";
for (int i = 0; i < objects.length; i++) {
res = res + String.valueOf(objects[i]);
}
LogUtils.d("A & B & C 发送的事件: " + res);
return res;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}

@Override
public void onNext(String s) {
LogUtils.d(s);
}

@Override
public void onError(Throwable e) {
LogUtils.e(e.toString());
}

@Override
public void onComplete() {
}
});
  • reduce() ------------ 将所有数据聚合在一起才会发送事件给观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just(1,2,3,4,5,6,7,8)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
int res = integer + integer2;
LogUtils.d("integer : " + integer);
LogUtils.d("integer2 : " + integer2);
LogUtils.d("res : " + res);
return res;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("accept : "+ integer);
}
});
  • collect() ------------ 将数据收集到数据结构当中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just(1,2,3,4,5,6,7,8)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
LogUtils.d(integers);
}
});
  • startWith() & startWithArray() ------------ 在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出
1
2
3
4
5
6
7
8
9
10
Observable.just(6, 7, 8)
.startWithArray(3, 4, 5)
.startWith(2)
.startWithArray(0, 1)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d(String.valueOf(integer));
}
});
  • count() ------------ 返回被观察者发送事件的数量
1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
LogUtils.d("发送数量:" + aLong);//8
}
});

功能操作符

  • delay() -------------- 延迟一段事件发送事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.just(1, 2, 3, 4)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnEach() ---------------- Observable 每发送一个之前都会先回调这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
LogUtils.d("doOnEach 方法 "+ integerNotification.getValue());
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnNext() ----------------- Observable 每发送 onNext() 之前都会先回调这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnNext(new Consumer<Integer>(){
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("doOnNext 方法 "+ integer);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doAfterNext() -------------- Observable 每发送 onNext() 之后都会回调这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doAfterNext(new Consumer<Integer>(){
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("doAfterNext 方法 "+ integer);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnComplete ------------------ Observable 每发送 onComplete() 之前都会回调这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnComplete 方法");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnError() ---------------- Observable 每发送 onError() 之前都会回调这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtils.e("doOnError() :"+throwable.toString());
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnSubscribe() ----------------- Observable 每发送 onSubscribe() 之前都会回调这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtils.d("doOnSubscribe()方法");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnDispose() -------------- 当调用 Disposable 的取消订阅dispose()方法之后回调该方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnDispose()方法");
}
})
.subscribe(new Observer<Integer>() {
private Disposable disposable;

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
this.disposable = d;
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
if(integer==2){
disposable.dispose();//取消订阅
}
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnLifecycle() ------------- 在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
LogUtils.d("doOnLifecycle accept");
//disposable.dispose();//取消订阅
}
}, new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnLifecycle Action ");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnDispose()方法");
}
})
.subscribe(new Observer<Integer>() {
private Disposable disposable;

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
this.disposable = d;
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
if (integer == 2) {
disposable.dispose();//取消订阅
}
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doOnTerminate() & doAfterTerminate() --------------- doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。如果取消订阅之后 doAfterTerminate() 就不会被回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnTerminate() 方法");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doAfterTerminate() 方法");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • doFinally() ------------- 在所有事件发送完毕之后回调该方法,doFinally() 在取消订阅后也都会被回调,且都会在事件序列的最后。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doFinally() 方法");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doAfterTerminate() 方法");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
LogUtils.d("doOnDispose()方法");
}
})
.subscribe(new Observer<Integer>() {
private Disposable disposable;

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
this.disposable = d;
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
if (integer == 2) {
disposable.dispose();//取消订阅
}
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • onErrorReturn() ------------------- 当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
LogUtils.e("onErrorReturn() 方法"+throwable.toString());
return 404;
}
})
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • onErrorResumeNext() ----------- 当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
LogUtils.e("onErrorResumeNext()方法"+throwable.toString());
return Observable.just(5,6,7,8,9);
}
})
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • onExceptionResumeNext() -------------- 与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new Exception("111"));
}
})
.onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(404);
observer.onNext(405);
observer.onComplete();
}
})
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • retry() ----------------- 如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException());
}
})
.retry(2)
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});

  • retryUntil() --------------- 出现错误事件之后,可以通过此方法判断是否继续发送事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final int[] i = {0};
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onNext(5);
e.onError(new NullPointerException());
}
})
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
if (i[0] >= 6) {//停止继续发送
return true;
} else {
return false;
}
}
})
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
i[0] += integer;
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • retryWhen() ---------------- 当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onNext(5);
e.onError(new NullPointerException());
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof NullPointerException){
return Observable.error(new Throwable("终止啦"));
}else{
return Observable.just(6,7,8,9);
}
}
});
}
})
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • repeat() -------------- 重复发送被观察者的事件,times 为发送次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.repeat(2)
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • repeatWhen() ------------------ 这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return Observable.empty();//直接发送 onComplete() 事件
// return Observable.just(1);//不发送任何事件
// return Observable.error(new Exception("404"));//发送 onError() 事件
}
})
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.e("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • subscribeOn() ------------ 指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
LogUtils.d("当前线程:"+Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
LogUtils.d("onSubscribe()方法—当前线程:"+Thread.currentThread().getName());
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
LogUtils.d("onNext()方法—当前线程:"+Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
LogUtils.d("onError()方法—当前线程:"+Thread.currentThread().getName());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
LogUtils.d("onComplete()方法—当前线程:"+Thread.currentThread().getName());
}
});
  • observeOn() ----------------- 指定观察者的线程,每指定一次就会生效一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
LogUtils.d("当前线程:"+Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
// e.onError(new NullPointerException());
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
LogUtils.d("onSubscribe()方法—当前线程:"+Thread.currentThread().getName());
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
LogUtils.d("onNext()方法—当前线程:"+Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
LogUtils.d("onError()方法—当前线程:"+Thread.currentThread().getName());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
LogUtils.d("onComplete()方法—当前线程:"+Thread.currentThread().getName());
}
});

过滤操作符

  • filter() -------------- 通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer % 2 == 0) {
return true;
} else {
return false;
}
}
})
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • ofType() ----------------- 可以过滤不符合该类型事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.just("one","two","three",1,2,3)
.ofType(Integer.class)
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • skip() & skipLast() --------------- 跳过正序某些事件,count 代表跳过事件的数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.just(1,2,3,4,5,6)
.skip(2)//跳过前面2个事件
.skipLast(2)//跳过后面2个事件
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • distinct() ------------ 过滤事件序列中的重复事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.just(1,2,3,4,4,3,2,1)
.distinct()
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);//1,2,3,4
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • distinctUntilChanged() ---------------- 过滤掉连续重复的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.just(1,2,3,4,4,3,2,1)
.distinctUntilChanged()
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);//1,2,3,4,3,2,1
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • take() & takeLast() --------------------- 控制观察者接收的事件的数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.just(1,2,3,4,5,6,7,8,9)
.take(3)//只接收前面3个
// .takeLast(2)//只接收后面2个
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • debounce() ----------------- 如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(900);
e.onNext(2);
}
})
.debounce(1,TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Integer integer) {
LogUtils.d("onNext()方法 : " + integer);
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • firstElement() && lastElement() ----------------- firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("firstElement() 方法 " + integer);
}
});
Observable.just(1, 2, 3, 4)
.lastElement()
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("lastElement() 方法 " + integer);
}
});
  • elementAt() & elementAtOrError() -------------- elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.just(1, 2, 3, 4)
.elementAt(4)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("elementAt() 方法 " + integer);
}
});


Observable.just(1, 2, 3, 4)
.elementAtOrError(4)//报错
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("elementAtOrError() 方法 " + integer);
}
});

条件操作符

  • all() ---------------- 判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false
1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1, 2, 3, 4)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
  • takeWhile() ------------- 可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送
1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1, 2, 3, 4, 3, 2)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 4;//如果第一条数据没有满足条件,后面的都不会进行
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});
  • skipWhile() ---------- 可以设置条件,当某个数据满足条件时不发送该数据,反之则发送
1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1, 2, 3, 4, 3, 2)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 4;//当满足条件时,后面的都运行
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});
  • takeUntil() -------------- 可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了
1
2
3
4
5
6
7
8
9
10
11
12
Observable.just(1, 2, 3, 4, 3, 2)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >= 3;//当满足条件后,从下一次的事件开始都不会发送了
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});
  • skipUntil() ------------ 当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS))
.subscribe(new Observer<Long>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Long along) {
LogUtils.d("onNext()方法 : " + along);
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});
  • sequenceEqual() --------------- 判断两个 Observable 发送的事件是否相同
1
2
3
4
5
6
7
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
  • contains() ------------- 判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false
1
2
3
4
5
6
7
8
Observable.just(1,2,3,4,5)
.contains(3)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
  • isEmpty() --------------- 判断事件序列是否为空 ( true :空 )
1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onComplete();
}
})
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
LogUtils.d("aBoolean : " + aBoolean);
}
});
  • amb() --------------- amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
List<Observable<Long>> list = new ArrayList<>();
list.add(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(11, 5, 0, 1, TimeUnit.SECONDS));
list.add(Observable.intervalRange(21, 5, 2, 1, TimeUnit.SECONDS));
Observable.amb(list)
.subscribe(new Observer<Long>() {

@Override
public void onSubscribe(Disposable d) {
LogUtils.d("onSubscribe()方法");
}

@Override
public void onNext(Long along) {
LogUtils.d("onNext()方法 : " + along);
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError()方法 :" + e.toString());
}

@Override
public void onComplete() {
LogUtils.d("onComplete()方法");
}
});

  • defaultIfEmpty() -------------- 如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值
1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onComplete();
}
})
.defaultIfEmpty(1001)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.d("integer : " + integer);
}
});

kotlin练习 - 集合练习

Set集合

  • Set集合创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fun main(args: Array<String>) {
//创建不可变集合,返回Set
var set = setOf("java", "kotlin", "go")
println(set)//[java, kotlin, go]
println("setOf 返回类型 ${set.javaClass}")

//创建可变集合,返回值MutableSet
var mutablemap = mutableListOf("java", "kotlin", "go")
mutablemap[0] = "JavaScript"
println(mutablemap)//[JavaScript, kotlin, go]
println("mutableListOf 返回类型 ${mutablemap.javaClass}")

//创建LinkedHashSet集合
var linkedHashSet = linkedSetOf("java", "kotlin", "go")
println(linkedHashSet)
println("linkedHashSet 返回类型 ${linkedHashSet.javaClass}")

//创建HashSet集合
var hashSet = hashSetOf("java", "kotlin", "go")
println(hashSet)//不保证元素的顺序--[kotlin, go, java]

//创建TreeSet集合
var treeSet = sortedSetOf("java", "kotlin", "go")
println(treeSet)//集合按从小到大排列--[go, java, kotlin]
}
  • Set集合的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//创建不可变集合,返回Set
var set = setOf("java", "kotlin", "go")

//判断是否所有的元素长度都大于5 ------所有元素都满足条件
println(set.all { it.length > 5 })// false

//判断是否任意一元素的长度大于5 ------任意元素都满足条件
println(set.any { it.length > 5 })//true

//以Lambda 表达式的值为key , 集合元素为value ,组成的Map集合
var map = set.associateBy({ "我正在学习${it}" })
println(map)//{我正在学习java=java, 我正在学习kotlin=kotlin, 我正在学习go=go}

//使用in、!in运算符
println("java" in set)// true
println("java" !in set)// false

//删除Set集合前面两个元素
var dropedList = set.drop(2)
println(dropedList)// [go]

//对Set集合进行过滤: 需要得到所有包含 va 的集合,
var filteredList1 = set.filter({ "va" in it })//如果没有返回空元素的集合
println(filteredList1)//[java]

//查找Set集合中包含 va 的元素,如果找到就返回该元素,否则返回 null
var filteredList2 = set.find({ "o" in it })
println(filteredList2)//kotlin
var filteredList3 = set.find({ "va1" in it })
println(filteredList3)//null

//将Set集合中所有字符串拼接在一起
var foldedList1 = set.fold("", { acc, s -> acc + s })
println(foldedList1)//javakotlingo
var foldedList2 = set.fold("456", { acc, s -> acc +" 123 ${s}" })
println(foldedList2)//456 123 java 123 kotlin 123 go
//查找某个元素的出现位置------没有就会返回-1
println(set.indexOf("kotlin"))// 1
println(set.indexOf("kotlin1"))// -1

//循环遍历
var books = setOf("疯狂java讲义", "疯狂kotlin讲义", "疯狂IOS讲义", "疯狂android讲义")
for (book in books){
println(book)
}
books.forEach({
println(it)
})
for (i in books.indices){
println(books.elementAt(i))
}

var books = mutableSetOf("疯狂java讲义")
//新增
books.addAll(setOf("疯狂kotlin讲义", "疯狂IOS讲义"))
books.add("疯狂android讲义")
println(books.toString())//[疯狂java讲义, 疯狂kotlin讲义, 疯狂IOS讲义, 疯狂android讲义]
//删除
books.remove("疯狂kotlin讲义")
println(books.toString())//[疯狂java讲义,疯狂IOS讲义, 疯狂android讲义]
books.removeAll(setOf("疯狂java讲义"))
println(books.toString())//[疯狂IOS讲义, 疯狂android讲义]
//清空所有
books.clear()
println(books.toString())//[]
//只保留公共的元素
books = mutableSetOf("疯狂java讲义", "疯狂kotlin讲义", "疯狂IOS讲义", "疯狂android讲义")
books.retainAll(setOf("疯狂kotlin讲义", "疯狂PHP讲义", "疯狂android讲义"))
println(books.toString())//[疯狂kotlin讲义, 疯狂android讲义]

List集合

  • List集合创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
var list1 = listOf(1, 2, 3, 4, 5, null, 7, 8, 9)
println(list1.toString())//[1, 2, 3, 4, 5, null, 7, 8, 9]
var list2 = listOfNotNull(1, 2, 3, 4, 5, null, 7, 8, 9)
println(list2.toString())//[1, 2, 3, 4, 5, 7, 8, 9]
var list3 = mutableListOf(1, 2, 3, 4, 5, null, 7, 8, 9)
println(list3.toString())//[1, 2, 3, 4, 5, null, 7, 8, 9]
list3.add(10)
list3.set(5, 6)
println(list3.toString())//[1, 2, 3, 4, 5, 6, 7, 8, 9,10]
var list4 = arrayListOf(1, 2, 3, 4, 5, null, 7, 8, 9)
println(list4.toString())//[1, 2, 3, 4, 5, null, 7, 8, 9]
list4.set(5, 6)
list4.add(10)
println(list4.toString())//[1, 2, 3, 4, 5, 6, 7, 8, 9,10]
  • List集合的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
 //遍历List
var list1 = listOf("Java", null, "Kotlin", "Go")
list1.forEach {
println(it)
}
for (str in list1) {
println(str)
}
for (i in list1.indices) {
println(list1[i])
println("get = ${list1.get(i)}")
println("elementAt = ${list1.elementAt(i)}")
}
var index = list1.indexOf("Go")
println("Go 在数组中的位置:${index}")//3

var list2 = mutableListOf(1, 2, 3, 1, 8, 3, 4, 6, 1, 4, 2, 3)
//最后一次出现的的位置
var last1 = list2.lastIndexOf(2)//10
//最后一次出现的的位置
var last2 = list2.indexOfLast { it == 2 }//10
//第一次出现的位置
var first = list2.indexOfFirst { it == 2 }//1
println("2在数组中第一次出现在${first},最后一次出现在${last1},${last2}")
//返回集合
var list3 = listOf("Java", null, "Kotlin", "PHP", null, "Go")
var sublist = list3.subList(2, 5)//从第2个到第5个之间的元素
println(sublist.toString())//[Kotlin, PHP, null]
// var list4 = mutableListOf("Java", "JavaScript", null, "HTML", "Kotlin", null, "Python", "PHP", "Go")
var list4 = mutableListOf("Java", null, "HTML", "Go")
println(list4)//[Java, null, HTML, Go]
//新增
list4.add("Python")
list4.add(2, "C++")
list4.addAll(listOf("Kotlin", null))
println(list4)//[Java, null, C++, HTML, Go, Python, Kotlin, null]
//删除
list4.removeAt(1)
println(list4)//[Java, C++, HTML, Go, Python, Kotlin, null]
//list4[6] ="CSS";
list4.set(6, "CSS")
println(list4)//[Java, C++, HTML, Go, Python, Kotlin, CSS]
list4.remove("Go")
println(list4)//[Java, C++, HTML, Python, Kotlin, CSS]
list4.add(3, "Go")
list4.add(5, "Go")
println(list4)//[Java, C++, HTML, Go, Python, Go, Kotlin, CSS]
//替换元素
list4.replaceAll {
if (it.equals("Go")) {
"go"
} else {
it
}
}
println(list4)//[Java, C++, HTML, go, Python, go, Kotlin, CSS]
//删除
list4.removeAll(listOf("go"))
println(list4)//[Java, C++, HTML, Python, Kotlin, CSS]

//清空
list4.clear()
println(list4)//[]

Map集合

  • Map集合创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//不可变map集合
var map1 = mapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4)
println(map1)//{Java=1, JavaScript=2, HTML=3, Kotlin=4}
//可变集合MutableMap
var map2 = mutableMapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4)
println(map2)//{Java=1, JavaScript=2, HTML=3, Kotlin=4}
map2.put("Python", 5)
println(map2)//{Java=1, JavaScript=2, HTML=3, Kotlin=4, Python=5}
map2.remove("JavaScript")
println(map2)//{Java=1, HTML=3, Kotlin=4, Python=5}
//HashMap集合----不保证key-value的顺序
var map3 = hashMapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4)
println(map3)//{Java=1, HTML=3, JavaScript=2, Kotlin=4}
//LinkedHashMap集合---key-value按添加顺序排列
var map4 = linkedMapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4)
println(map4)//{Java=1, JavaScript=2, HTML=3, Kotlin=4}
//SortedMap集合------ key-value按key由小到大排列
var map5 = sortedMapOf("Java" to 1, "Python" to 5, "HTML" to 3, "Kotlin" to 4, "JavaScript" to 2, "Go" to 6)
println(map5)//{Go=6, HTML=3, Java=1, JavaScript=2, Kotlin=4, Python=5}

  • Map集合的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
var map1 = mapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4)
//当map集合中所有key-value对都满足条件返回true
var all = map1.all {
it.key.length >= 4 && it.value > 0
}
println(all)//true
//当map集合中任意一个key-value对满足条件就会返回true
var any = map1.any {
it.key.length >= 10 && it.value > 1
}
println(any)//true
//判断map集合中是否有对于key的key-value
println("Java" in map1);//true
println("Java" !in map1);//false
println("Go" in map1);//false
println("Go" !in map1);//true

//对map进行过滤,要求key中包含:Java
var filterMap1 = map1.filter { "Java" in it.key }
println(filterMap1)//{Java=1, JavaScript=2}
var filterMap2 = map1.filter { it.key in "Java" }
println(filterMap2)//{Java=1}
//通过map集合,返回一个新的List
var mappedList1 = map1.map { "《疯狂${it.key}》讲义,第${it.value}个" }
var mappedList2 = map1.map { "《疯狂${it.key}》讲义" to "第${it.value}个" }
println(mappedList1)//[《疯狂Java》讲义,第1个, 《疯狂JavaScript》讲义,第2个, 《疯狂HTML》讲义,第3个, 《疯狂Kotlin》讲义,第4个]
println(mappedList2)//[(《疯狂Java》讲义, 第1个), (《疯狂JavaScript》讲义, 第2个), (《疯狂HTML》讲义, 第3个), (《疯狂Kotlin》讲义, 第4个)]
//Map集合中key-value对中 value最大的值
var maxby1 = map1.maxBy { it.value }
println(maxby1)//Kotlin=4

//Map集合中key-value对中 value最小的值
var minby1 = map1.minBy { it.value }
println(minby1)//Java=1

//Map集合中key-value对中 key的长度最大的值
var maxby2 = map1.maxBy { it.key.length }
println(maxby2)//JavaScript=2

//Map集合中key-value对中 key的长度最小的值
var minby2 = map1.minBy { it.key.length }
println(minby2)//Java=1

var map2 = mapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4)
var map3 = mapOf("Python" to 1, "Go" to 2, "HTML" to 3, "Kotlin" to 4)
//集合相加---相当于并集
println(map2 + map3)//{Java=1, JavaScript=2, HTML=3, Kotlin=4, Python=1, Go=2}
//集合相减---减去公共的元素
println(map2 - map3)//{Java=1, JavaScript=2, HTML=3, Kotlin=4}

var plus = map2.plus(map3)
println(plus)//{Java=1, JavaScript=2, HTML=3, Kotlin=4, Python=1, Go=2}

var minus = map2.minus(map3)
println(minus)//{Java=1, JavaScript=2, HTML=3, Kotlin=4}
//遍历Map集合
var map4 = mapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4, "Go" to 5)
for (en in map4.entries) {
println("${en.key} -> ${en.value}")
}
for (key in map4.keys) {
println("${key} -> ${map4[key]}")
}
for ((key, value) in map4) {
println("${key} -> ${value}")
}
map4.forEach({
println("${it.key} -> ${it.value}")
})

var map5 = mutableMapOf("Java" to 1, "JavaScript" to 2, "HTML" to 3, "Kotlin" to 4, "Go" to 5)
map5["Go"] = 0
println(map5)//{Java=1, JavaScript=2, HTML=3, Kotlin=4, Go=0}
map5.put("HTML", 10)
println(map5)//{Java=1, JavaScript=2, HTML=10, Kotlin=4, Go=0}
map5.remove("Go")
println(map5)//{Java=1, JavaScript=2, HTML=10, Kotlin=4}
map5.remove("Java", 2)
println(map5)//{Java=1, JavaScript=2, HTML=10, Kotlin=4}
map5.remove("Java", 1)
println(map5)//{JavaScript=2, HTML=10, Kotlin=4}
map5.putAll(hashMapOf("PHP" to 6, "C++" to 7,"JavaScript" to 8 ))
println(map5)//{JavaScript=8, HTML=10, Kotlin=4, PHP=6, C++=7}
//清空Map集合
map5.clear()
println(map5)//{}

WebView 图片选择

最近需要做webview选择图片,就找了一些资料,记录一下。
本文参考:Android使用WebView从相册/拍照中添加图片

自定义WebChromeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import android.net.Uri;
import android.webkit.ValueCallback;
import android.webkit.WebChromeClient;
import android.webkit.WebView;

/**
* WebView 上传文件
*/
public class ReWebChomeClient extends WebChromeClient {

private OpenFileChooserCallBack mOpenFileChooserCallBack;

public ReWebChomeClient(OpenFileChooserCallBack openFileChooserCallBack) {
mOpenFileChooserCallBack = openFileChooserCallBack;
}

//For Android 3.0+
public void openFileChooser(ValueCallback<Uri> uploadMsg, String acceptType) {
mOpenFileChooserCallBack.openFileChooserCallBack(uploadMsg, acceptType);
}

// For Android < 3.0
public void openFileChooser(ValueCallback<Uri> uploadMsg) {
openFileChooser(uploadMsg, "");
}

// For Android > 4.1.1
public void openFileChooser(ValueCallback<Uri> uploadMsg, String acceptType, String capture) {
openFileChooser(uploadMsg, acceptType);
}

// For Android 5.0+
@Override
public boolean onShowFileChooser(WebView webView, ValueCallback<Uri[]> filePathCallback, FileChooserParams fileChooserParams) {
mOpenFileChooserCallBack.showFileChooserCallBack(filePathCallback);
return true;
}

public interface OpenFileChooserCallBack {
void openFileChooserCallBack(ValueCallback<Uri> uploadMsg, String acceptType);

void showFileChooserCallBack(ValueCallback<Uri[]> filePathCallback);
}
}

使用ReWebChomeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//定义变量
private ValueCallback<Uri> uploadMessage;
private ValueCallback<Uri[]> uploadMessageAboveL;
//用来判断是否需要给WebView返回null
private int web_image = 0;
private String picFilePath;//图片保存路径
private int IDENTITY_IMAGE_REQUEST_CODE_Album = 1;//相册
private int IDENTITY_IMAGE_REQUEST_CODE_Photograph = 2;// 拍照
private int FILE_CHOOSER_RESULT_CODE = 3;//图片选择

...
//设置WebChromeClient
mWebView.setWebChromeClient(mWebChromeClient);

...

private ReWebChomeClient mWebChromeClient = new ReWebChomeClient(new ReWebChomeClient.OpenFileChooserCallBack() {
@Override
public void openFileChooserCallBack(ValueCallback<Uri> uploadMsg, String acceptType) {//Android >=3.0
uploadMessage = uploadMsg;
openImageChooserActivity();
}

@Override
public void showFileChooserCallBack(ValueCallback<Uri[]> filePathCallback) {// Android >= 5.0
uploadMessageAboveL = filePathCallback;
openImageChooserActivity();
}
});

选择图片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
  private void openImageChooserActivity() {
web_image = 0;//判断是否已经选择了
//自定义选择图片提示框
AlertDialog dialog = new AlertDialog.Builder(mContext).setItems(R.array.head_type_array, (dialog1, which) -> {
//如果点击了dialog的选项,修改变量,不要在setOnDismissListener()方法中
web_image = 1;
selected(which);
}).create();
dialog.show();
dialog.setOnDismissListener(dialog12 -> {
if (web_image == 0) {
getImageWebView(null);
}
});
}

public void selected(int position) {
switch (position) {
case 0://相册
getPermissionsStorage();
break;
case 1:// 拍照
getPermissionsCamera();
break;
case 2://选择图片
Intent i = new Intent(Intent.ACTION_GET_CONTENT);
i.addCategory(Intent.CATEGORY_OPENABLE);
i.setType("image/*");
startActivityForResult(Intent.createChooser(i, "Image Chooser"), FILE_CHOOSER_RESULT_CODE);
break;
}

@Override
protected void onActivityResult(int requestCode, int resultCode, Intent data) {
super.onActivityResult(requestCode, resultCode, data);
if (requestCode == IDENTITY_IMAGE_REQUEST_CODE_Photograph) {//拍照
if (resultCode == Activity.RESULT_OK) {
// 添加图片
if (picFilePath == null) {
picFilePath = Datas.picPathSD + BitmapUtil.pictime;
}
getImageWebView(picFilePath);
} else {
// 删除图片
BitmapUtil.deleteTempFile(picFilePath);
getImageWebView(null);
}
} else if (requestCode == IDENTITY_IMAGE_REQUEST_CODE_Album) {//相册
if (resultCode == 1020) {
String str_images = StringUtils.null2Length0(data.getStringExtra("images"));
getImageWebView(str_images);
} else {
getImageWebView(null);
}
} else if (requestCode == FILE_CHOOSER_RESULT_CODE) {//选择图片
if (null == uploadMessage && null == uploadMessageAboveL) {
return;
}
Uri result = data == null || resultCode != RESULT_OK ? null : data.getData();
if (uploadMessageAboveL != null) {
onActivityResultAboveL(requestCode, resultCode, data);
} else if (uploadMessage != null) {
uploadMessage.onReceiveValue(result);
}
}
}

private void getImageWebView(String str_image) {//将图片路径返回给webview
if (!StringUtils.isEmpty(str_image)) {
Uri uri = getImageContentUri(mContext, new File(str_image));
if (uploadMessageAboveL != null) {
Uri[] uris = new Uri[]{uri};
uploadMessageAboveL.onReceiveValue(uris);
uploadMessageAboveL = null;
} else if (uploadMessage != null) {
uploadMessage.onReceiveValue(uri);
uploadMessage = null;
}
} else {
if (uploadMessageAboveL != null) {
uploadMessageAboveL.onReceiveValue(null);
uploadMessageAboveL = null;
} else if (uploadMessage != null) {
uploadMessage.onReceiveValue(null);
uploadMessage = null;
}
}
}

//选择图片
@TargetApi(Build.VERSION_CODES.LOLLIPOP)
private void onActivityResultAboveL(int requestCode, int resultCode, Intent intent) {
if (uploadMessageAboveL == null) {
return;
}
Uri[] results = null;
if (resultCode == Activity.RESULT_OK) {
if (intent != null) {
String dataString = intent.getDataString();
LogUtils.e("web", dataString);
ClipData clipData = intent.getClipData();
if (clipData != null) {
results = new Uri[clipData.getItemCount()];
for (int i = 0; i < clipData.getItemCount(); i++) {
ClipData.Item item = clipData.getItemAt(i);
results[i] = item.getUri();
}
}
if (dataString != null) {
results = new Uri[]{Uri.parse(dataString)};
}
}
}
uploadMessageAboveL.onReceiveValue(results);
}

@Override
protected void onDestroy() {
super.onDestroy();
uploadMessage = null;
uploadMessageAboveL = null;
}

//将文件File转成Uri
public Uri getImageContentUri(Context context, File imageFile) {
String filePath = imageFile.getAbsolutePath();
Cursor cursor = context.getContentResolver().query(MediaStore.Images.Media.EXTERNAL_CONTENT_URI,
new String[]{MediaStore.Images.Media._ID}, MediaStore.Images.Media.DATA + "=? ",
new String[]{filePath}, null);
if (cursor != null && cursor.moveToFirst()) {
int id = cursor.getInt(cursor.getColumnIndex(MediaStore.MediaColumns._ID));
Uri baseUri = Uri.parse("content://media/external/images/media");
return Uri.withAppendedPath(baseUri, "" + id);
} else {
if (imageFile.exists()) {
ContentValues values = new ContentValues();
values.put(MediaStore.Images.Media.DATA, filePath);
return context.getContentResolver().insert(MediaStore.Images.Media.EXTERNAL_CONTENT_URI, values);
} else {
return null;
}
}
}

flutter练习 - Container控件

属性

key: Container唯一标识符,用于查找更新。

alignment: 控制child的对齐方式,如果container或者container父节点尺寸大于child的尺寸,这个属性设置会起作用,有很多种对齐方式。

padding: decoration内部的空白区域,如果有child的话,child位于padding内部。padding与margin的不同之处在于,padding是包含在content内,而margin则是外部边界,设置点击事件的话,padding区域会响应,而margin区域不会响应。

color: 用来设置container背景色,如果foregroundDecoration设置的话,可能会遮盖color效果。

decoration: 绘制在child后面的装饰,设置了decoration的话,就不能设置color属性,否则会报错,此时应该在decoration中进行颜色的设置。

foregroundDecoration: 绘制在child前面的装饰。

width: container的宽度,设置为double.infinity可以强制在宽度上撑满,不设置,则根据
child和父节点两者一起布局。

height: container的高度,设置为double.infinity可以强制在高度上撑满。

constraints: 添加到child上额外的约束条件。

margin: 围绕在decoration和child之外的空白区域,不属于内容区域。

transform: 设置container的变换矩阵,类型为Matrix4。

child: container中的内容widget。

实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import 'package:flutter/material.dart';

void main() {
runApp(new myApp());
}

class myApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: "text",
home: Scaffold(
appBar: AppBar(
title: Text("Container练习"),
),
body: Container(
constraints: new BoxConstraints.expand(
height: Theme.of(context).textTheme.display1.fontSize * 1.1 + 200.0,
),
decoration: new BoxDecoration(
//边框
border: new Border.all(width: 2.0, color: Colors.red),
//背景色
color: Colors.grey,
//边框圆角
borderRadius: new BorderRadius.all(new Radius.circular(20.0)),
image: new DecorationImage(
image: new NetworkImage(
"http://www.zhangjiaxue.cn/images/avatar.jpg"),
centerSlice: new Rect.fromLTRB(270.0, 180.0, 1360.0, 730.0),
),
),
padding: const EdgeInsets.all(8.0),
alignment: Alignment.center,
child: new Text(
'Hello World',
style: Theme.of(context)
.textTheme
.display1
.copyWith(color: Colors.blue),
),
//变换矩阵
transform: new Matrix4.rotationZ(0.3),
),
),
);
}
}