概述
本文简述rxjava2的实现原理。搞清原理,使用起来才得心应手。
基本原理
先看一个最简单的示例:
// 被观察者
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
System.out.println("emit a data");
emitter.onNext("data");
}
});
// 观察者
Observer<Object> observer = new Observer<Object>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
this.disposable = d;
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("订阅结束....");
System.out.println("订阅取消"+disposable.isDisposed());
}
@Override
public void onNext(Object s) {
System.out.println("接收消息:"+s);
}
};
// 订阅
observable.subscribe(observer);
}
- 创建一个发生器(emitter),并把观察者(observer)传递给发生器
- 被观察者拿发送器,并用发射器发射数据。
- 发生器拿到数据后,回调观察者的对应方法。
源码:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 创建发射器,发射器持有observer的引用
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// 把创建好的发射器给被观察者,让它发送数据
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
从第一个示例中我们看到,通过 emitter.onNext(“data”)发射数据。看下这个方法。
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// 如果订阅关系还在,就执行回调观察者的onNext方法。
observer.onNext(t);
}
}
如果从如此简单的源码还是无法理解,那先来看看回调模式:
public interface CallBack {
void call();
}
public class Caller {
private CallBack callBack;
public Caller(CallBack callBack) {
this.callBack = callBack;
}
public void doSomething(){
System.out.println("do somethings...");
callBack.call();
}
}
public class Client {
public static void main(String[] args) {
Caller caller = new Caller(new CallBack() {
@Override
public void call() {
System.out.println("回调方法被执行...");
}
});
caller.doSomething();
}
}
结果: do somethings… 回调方法被执行…
这就是最简单的回调方法的实现,对它稍多修改:
interface Callback<T> {
void onCompleted();
void onError(Throwable t);
void onNext(T var1);
}
public class Callable {
final OnCall onCall;
public Callable(OnCall onCall) {
this.onCall = onCall;
}
public void invoke(Callback callback) {
onCall.call(callback);
}
public interface OnCall<T> {
void call(Callback callback);
}
public static void main(String[] args) {
new Callable(observer -> {
observer.onNext(1);
observer.onCompleted();
}).invoke(new Callback() {
@Override
public void onCompleted() { System.out.println("complete ... "); }
@Override
public void onError(Throwable t) { }
@Override
public void onNext(Object obj) { System.out.println("revived data: "+obj); }
});
}
}
revived data: 1 complete …
是不是跟rxjava很像,其实它就是rxjava最简化的实现。现在回去看rxjava最基本的实现就简单很多了吧。简化后的角色关系如下:
操作符原理
rxjava得益于它强大的操作符。操作符其实就是桥接作用,订阅者向操作符订阅,操作符再向上层的可被订阅者订阅,如果到了顶层,发射数据再依次往下传递。相对比较好理解。下面看一个map操作符的示例:
Observable.create(emitter -> {
System.out.println("emit 1");
emitter.onNext(1);
System.out.println("emit 2");
emitter.onNext(2);
}).map(new Function<Object, Object>() {
@Override
public Object apply(Object o) throws Exception {
if (o instanceof Integer) {
Integer o1 = (Integer) o;
return (o1 + "A");
}
return o;
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
System.out.println("接收消息:" + o);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
emit 1 接收消息:1A emit 2 接收消息:2A
如下图: 需要注意的是,数据是emit1->map1->observer1->emit2->map1-observer2这样的执行顺序,而不是先把1和2都经过map后再发给observer。这点很重要,也就意味着,如果map1阻塞,那么emit2压根不会执行。
每发射一个数据,都经历订阅者链上的所有操作符和observer。如果发射器都同步发射数据,所有链都走完,才会处理下一个数据。
当然了,不同的操作符实现不一样。比如reduce:
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}).reduce((integer, integer2) -> integer + integer2)
.subscribe(System.out::println);
结果打印6
操作符reduce把上游数据都加起来,再发给订阅者。它的原理是:
- 把上游数据作用于reduce的方法,结果保存起来
- 直到收到onComplete事件,才调用下游的onNext()方法。
总之可以这么理解,下游订阅到操作符时,操作符可以异步往上订阅,可以决定发给你下游的时机,从而实现不同功能的操作符。
线程调度
还是先看示例:
Observable.create(emitter -> {
for (int i = 0; i < 30; i++) {
log.info("emit " + i);
emitter.onNext(i);
}
}).observeOn(Schedulers.io())
.map(o -> {
if (o instanceof Integer) {
Integer o1 = (Integer) o;
log.info("map->" + o1);
return (o1 + "A");
}
return o;
}).subscribe(o -> log.info("接收数据: " + o));
Thread.sleep(3000);
如果看输出结果,顺序不再像先前map示例中每个数据走完订阅链才发送下一个数据。observeOn(Schedulers.io())使得上游和下游在不同的线程中执行。
原理其实也简单,observeOn也是一个操作符,他的工作主要是:
- 开启一个缓冲区
- 在observeOn指定的线程中订阅上游数据,放入缓冲区
- 另一个线程从缓冲区拿出数据,发给下游订阅者
这就意味着,线程切换上游不再受下游处理速度的限制,如果下游处理过慢,缓冲区堆积的数据会越来越多,可能导致内存溢出。
背压
上文提到,使用线程时,如果上游发送的数据下游处理不过来,可能产生内存溢出。当要处理的数据大于1000个时,我们考虑背压的方式控制上游发送速度。
rxjava2和rxjava1的其中一个区别:
- Observable的缓冲区没有限制,不支持背压
- Flowable支持背压,缓冲区大小默认是128。
可以看Flowable源码的第一行:
static final int BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128).intValue());
使用observeOn后,上游订阅的数据往缓冲队列放,下游订阅线程从队列里面读数据。背压的关键就是BUFFER_SIZE,如果下游消费95个数据,会清理一次。
上游的发射器能获取到buffer目前的大小(flowableEmitter.requested()),只要保证buffer不满(flowableEmitter.requested()不为0),就不会出现溢出的情况,从而实现背压。
https://zouzhberk.github.io/rxjava-study/ https://www.jianshu.com/p/9419b102f442