rxjava原理篇

概述

本文简述rxjava2的实现原理。搞清原理,使用起来才得心应手。

基本原理

先看一个最简单的示例:

// 被观察者
 Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
        @Override
        public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
            System.out.println("emit a data");
            emitter.onNext("data");
        }
    });
    // 观察者
    Observer<Object> observer = new Observer<Object>() {
        private Disposable disposable;
        @Override
        public void onSubscribe(Disposable d) {
            this.disposable = d;
        }
        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
            System.out.println("订阅结束....");
            System.out.println("订阅取消"+disposable.isDisposed());
        }

        @Override
        public void onNext(Object s) {
            System.out.println("接收消息:"+s);
        }
    };
    // 订阅
    observable.subscribe(observer);
}
  1. 创建一个发生器(emitter),并把观察者(observer)传递给发生器
  2. 被观察者拿发送器,并用发射器发射数据。
  3. 发生器拿到数据后,回调观察者的对应方法。

源码:

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 创建发射器,发射器持有observer的引用
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            // 把创建好的发射器给被观察者,让它发送数据
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

从第一个示例中我们看到,通过 emitter.onNext(«data»)发射数据。看下这个方法。

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        // 如果订阅关系还在,就执行回调观察者的onNext方法。
        observer.onNext(t);
    }
}

如果从如此简单的源码还是无法理解,那先来看看回调模式:

public interface CallBack {
    void call();
}
public class Caller {
    private CallBack callBack;

    public Caller(CallBack callBack) {
        this.callBack = callBack;
    }
    public void doSomething(){
        System.out.println("do somethings...");
        callBack.call();
    }
}
public class Client {
    public static void main(String[] args) {
        Caller caller = new Caller(new CallBack() {
            @Override
            public void call() {
                System.out.println("回调方法被执行...");
            }
        });
        caller.doSomething();
    }
}

结果:
do somethings…
回调方法被执行…

这就是最简单的回调方法的实现,对它稍多修改:

interface Callback<T> {
    void onCompleted();
    void onError(Throwable t);
    void onNext(T var1);
}
public class Callable {
    final OnCall onCall;
    public Callable(OnCall onCall) {
        this.onCall = onCall;
    }
    public void invoke(Callback callback) {
        onCall.call(callback);
    }
    public interface OnCall<T> {
        void call(Callback callback);
    }
    public static void main(String[] args) {
        new Callable(observer -> {
            observer.onNext(1);
            observer.onCompleted();
        }).invoke(new Callback() {
            @Override
            public void onCompleted() { System.out.println("complete ... "); }
            @Override
            public void onError(Throwable t) { }
            @Override
            public void onNext(Object obj) { System.out.println("revived data: "+obj); }
        });
    }
}

revived data: 1
complete …

是不是跟rxjava很像,其实它就是rxjava最简化的实现。现在回去看rxjava最基本的实现就简单很多了吧。简化后的角色关系如下:

操作符原理

rxjava得益于它强大的操作符。操作符其实就是桥接作用,订阅者向操作符订阅,操作符再向上层的可被订阅者订阅,如果到了顶层,发射数据再依次往下传递。相对比较好理解。下面看一个map操作符的示例:

Observable.create(emitter -> {
    System.out.println("emit 1");
    emitter.onNext(1);
    System.out.println("emit 2");
    emitter.onNext(2);
}).map(new Function<Object, Object>() {
    @Override
    public Object apply(Object o) throws Exception {
        if (o instanceof Integer) {
            Integer o1 = (Integer) o;
            return (o1 + "A");
        }
        return o;
    }
}).subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Object o) {
        System.out.println("接收消息:" + o);
    }
    @Override
    public void onError(Throwable e) {}
    @Override
    public void onComplete() {}
});

emit 1
接收消息:1A
emit 2
接收消息:2A

如下图:

需要注意的是,数据是emit1->map1->observer1->emit2->map1-observer2这样的执行顺序,而不是先把1和2都经过map后再发给observer。这点很重要,也就意味着,如果map1阻塞,那么emit2压根不会执行。

每发射一个数据,都经历订阅者链上的所有操作符和observer。如果发射器都同步发射数据,所有链都走完,才会处理下一个数据。

当然了,不同的操作符实现不一样。比如reduce:

 Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
}).reduce((integer, integer2) -> integer + integer2)
.subscribe(System.out::println);

结果打印6

操作符reduce把上游数据都加起来,再发给订阅者。它的原理是:
1. 把上游数据作用于reduce的方法,结果保存起来
2. 直到收到onComplete事件,才调用下游的onNext()方法。

总之可以这么理解,下游订阅到操作符时,操作符可以异步往上订阅,可以决定发给你下游的时机,从而实现不同功能的操作符。

线程调度

还是先看示例:

Observable.create(emitter -> {
    for (int i = 0; i < 30; i++) {
        log.info("emit " + i);
        emitter.onNext(i);
    }
}).observeOn(Schedulers.io())
          .map(o -> {
              if (o instanceof Integer) {
                  Integer o1 = (Integer) o;
                  log.info("map->" + o1);
                  return (o1 + "A");
              }
              return o;
          }).subscribe(o -> log.info("接收数据: " + o));
Thread.sleep(3000);

如果看输出结果,顺序不再像先前map示例中每个数据走完订阅链才发送下一个数据。observeOn(Schedulers.io())使得上游和下游在不同的线程中执行。

原理其实也简单,observeOn也是一个操作符,他的工作主要是:
1. 开启一个缓冲区
2. 在observeOn指定的线程中订阅上游数据,放入缓冲区
3. 另一个线程从缓冲区拿出数据,发给下游订阅者

这就意味着,线程切换上游不再受下游处理速度的限制,如果下游处理过慢,缓冲区堆积的数据会越来越多,可能导致内存溢出。

背压

上文提到,使用线程时,如果上游发送的数据下游处理不过来,可能产生内存溢出。当要处理的数据大于1000个时,我们考虑背压的方式控制上游发送速度。

rxjava2和rxjava1的其中一个区别:
* Observable的缓冲区没有限制,不支持背压
* Flowable支持背压,缓冲区大小默认是128。

可以看Flowable源码的第一行:

static final int BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128).intValue());

使用observeOn后,上游订阅的数据往缓冲队列放,下游订阅线程从队列里面读数据。背压的关键就是BUFFER_SIZE,如果下游消费95个数据,会清理一次。

上游的发射器能获取到buffer目前的大小(flowableEmitter.requested()),只要保证buffer不满(flowableEmitter.requested()不为0),就不会出现溢出的情况,从而实现背压。


https://zouzhberk.github.io/rxjava-study/
https://www.jianshu.com/p/9419b102f442

CONTENTS