概念
响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持。整个数据流可能包含一个或者多个事件。
如果理解著名而简单的设计模式:发布-订阅模式,再看rxJava会容易许多。
对于异步操作,java可以使用Future<T>
,异步地处理程序执行结果。但只能获取单个数据,并且不断注册回调函数导致代码嵌套难看且难维护。
单个数据 | 多个数据 | ||
---|---|---|---|
同步 | T getDate() | Iterable getData() | |
异步 | Future getData() | Observable getData() |
Observable可以像使用java8 stream一样处理数据流,简单,逻辑清晰。
使用rxjava主要涉及以下操作:
- 创建事件流或者数据流
- 组合操作、交换数据流
- 订阅数据流并执行相应操作
RxJava响应式编程有以下优点:
- java8 stream一样的函数式风格
- 简化代码,逻辑清晰
- 异步处理错误,不需要catch
- 支持并发,不关心底层线程同步和并发问题
几个名词
- Observable:任何可订阅的对象,数据源。或者说,观察者希望在Observable对象变化时被提醒。在ReactiveX世界里,一个observer(观察者)subscribes(订阅)Observable(可订阅object),然后根据Observable发出的一个或者一系列元素做出对应的反应。
- Single:一个特殊的Observable,只发射单个数据。
- Observer:观察者
- Subscriber:订阅者,Observer的特殊实现
- Subscription:名词,描述订阅关系。
- emit:发射,数据产生或者变化是通知订阅者,调用对应方法。
- items:项目,元素,数据项
角色简介
观察者(Observer、Subscriber)
Observer接口有3个方法,分别对流数据、结束事件、异常事件做相应处理。
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
Subscriber是Observer的子类,但它实现了Subscription接口,可以取消订阅它不再关心的Observable。
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
订阅关系(Subscription)
创建订阅有两种方法:
Observable.subscribe(Subscriber)
Observable.subscribe(Action)
被观察者(Observable)
Observable上面已经介绍,它提供了一些工厂方法创建Observable对象,再通过他绑定到订阅者。有冷和热之分,热的Observable可能创建完成就开始发射数据,订阅者不能保证从头获取数据;冷的Observable一直等待有观察者订阅才开放发射数据。
如果想找到一个类似于Observable的接口,那就是迭代器Iterator 。但Observable可以产生3种类型的事件;
- Observable声明的类型值
- 完成事件
- 错误事件
示例:
Observable<Integer> observable = Observable.create(emitter -> {
try {
IntStream.range(1, 6).forEach(item -> {
System.out.println("emit " + item);
emitter.onNext(item);
});
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
可以通过just()
,from()
,create()
创建Observable,但是他们也有些不同,并不都是在订阅发送时,才出发Observable发射数据。defer()
可以把Observable转成lazy。
public Observable<Person> listPeople() {
return Observable.defer(() ->
Observable.from(query("SELECT * FROM PEOPLE")));
}
操作符
rxjava提供了各种操作符,使得复制逻辑可以通过操作符叠加组合完成,这也是rxjava强大的原因。一个操作符是一个方法,获取上游Observable,处理后返回给下游Observable。
List<String> words = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dogs");
Observable.from(words)
.flatMap(word -> Observable.from(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d. %s", count, string))
.subscribe(System.out::println);
按序号输出26个字母
示例中的from,sorted,zipWith等均为操作符。跟Java 8里引入的Streams API很相似,不过后者只是遍历和操作集合,无法更细粒度的控制和无限制的流处理。
操作符还有很多,根据功能分类如:创建操作,变换操作,过滤操作,连接操作…等,详细见operators
多线程
rxjava并没有引入自己的并发特性,只是对thread的抽象和封装。原则上,他和ScheduledExecutorService类似,只是实现了更细粒度的抽象。
Observable通过操作subscribeOn() 和 observeOn()绑定到对应的scheduler上,如:
Observable.just("Hello","rxJava")
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
scheduler只是创建worker实例负责调度和运行代码,Schedulers提供了一些创建scheduler的静态方法:
- Schedulers.newThread()
- Schedulers.io()
- Schedulers.computation()
- Schedulers.from(Executor executor)
可以这么理解,Schedulers类似线程池,负责调度;worker类似thread,负责执行具体代码。最好Observables是异步的,从而避免使用subscribeOn();
对于阻塞的Observables(blocking Observables,用得少):
- Observable如果不用Scheduler,跟一个单线程操作没有区别
- Observable只有一个subscribeOn(),就像开启了一个后台线程处理任务,还是线程内阻塞。
- Observable使用flatMap(),给每一个内部Observable都使用subscribeOn(),就像ForkJoinPool
- 连续使用subscribeOn,只有第一个生效。observeOn可以多次调用。
下面通过一个示例说明:
ExecutorService es = Executors.newFixedThreadPool(200,new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
CountDownLatch finishedLatch = new CountDownLatch(1);
long start = System.currentTimeMillis();
Observable.range(0, 100)
//.subscribeOn(Schedulers.from(es))
.map(item -> {
try {
// do something here
Thread.sleep(20);
} catch (InterruptedException e) {
return Observable.error(e);
}
return item + "map";
}).observeOn(Schedulers.computation()).subscribe(item -> { }, (Throwable error) -> { }, finishedLatch::countDown);
finishedLatch.await();
System.out.println((System.currentTimeMillis() - start) + "");
输出:2068
没有使用subscribeOn(),客户端线程阻塞。打开注释.subscribeOn(Schedulers.from(es))
,执行时间:2073。map处理100个数据只是从线程池中获取一个线程处理,还是线程内阻塞。再修改如下:
ExecutorService es = Executors.newFixedThreadPool(200,
new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
CountDownLatch finishedLatch = new CountDownLatch(1);
long start = System.currentTimeMillis();
Observable.range(0, 100)
.flatMap(rangeItem -> Observable
.just(rangeItem)
.subscribeOn(Schedulers.from(es))
.map(i -> {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
return Observable.error(e);
}
return i + "map";
}))
.observeOn(Schedulers.computation()).subscribe(item -> {
}, (Throwable error) -> {
}, finishedLatch::countDown);
finishedLatch.await();
System.out.println("-------------");
System.out.println((System.currentTimeMillis() - start) + "");
输出:95
flatMap通过给每个元素创建一个新的Observable处理:再放进一个单独的Observable。
总结
rxjava其实就是观察者模式的扩展,通过强大的操作符组合使得数据流处理逻辑简单清晰,再加上线程的封装实现异步,无需担忧无关的抽象底层线程,同步,线程安全和并发数据结构。
参考文档: http://reactivex.io/ http://rxmarbles.com/ https://blog.csdn.net/xmxkf/article/details/51791120 https://www.jianshu.com/p/88aacbed8aa5 https://www.gitbook.com/book/mcxiaoke/rxdocs/details https://www.youtube.com/watch?v=WKore-AkisY