rxjava基础篇

概念

响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持。整个数据流可能包含一个或者多个事件。

如果理解著名而简单的设计模式:发布-订阅模式,再看rxJava会容易许多。

对于异步操作,java可以使用Future<T>,异步地处理程序执行结果。但只能获取单个数据,并且不断注册回调函数导致代码嵌套难看且难维护。

单个数据 多个数据
同步 T getDate() Iterable getData()
异步 Future getData() Observable getData()

Observable可以像使用java8 stream一样处理数据流,简单,逻辑清晰。

使用rxjava主要涉及以下操作:

  • 创建事件流或者数据流
  • 组合操作、交换数据流
  • 订阅数据流并执行相应操作

RxJava响应式编程有以下优点:

  1. java8 stream一样的函数式风格
  2. 简化代码,逻辑清晰
  3. 异步处理错误,不需要catch
  4. 支持并发,不关心底层线程同步和并发问题

几个名词

  • Observable:任何可订阅的对象,数据源。或者说,观察者希望在Observable对象变化时被提醒。在ReactiveX世界里,一个observer(观察者)subscribes(订阅)Observable(可订阅object),然后根据Observable发出的一个或者一系列元素做出对应的反应。
  • Single:一个特殊的Observable,只发射单个数据。
  • Observer:观察者
  • Subscriber:订阅者,Observer的特殊实现
  • Subscription:名词,描述订阅关系。
  • emit:发射,数据产生或者变化是通知订阅者,调用对应方法。
  • items:项目,元素,数据项

角色简介

观察者(Observer、Subscriber)

Observer接口有3个方法,分别对流数据、结束事件、异常事件做相应处理。

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

Subscriber是Observer的子类,但它实现了Subscription接口,可以取消订阅它不再关心的Observable。

public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

订阅关系(Subscription)

创建订阅有两种方法:

  1. Observable.subscribe(Subscriber)
  2. Observable.subscribe(Action)

被观察者(Observable)

Observable上面已经介绍,它提供了一些工厂方法创建Observable对象,再通过他绑定到订阅者。有冷和热之分,热的Observable可能创建完成就开始发射数据,订阅者不能保证从头获取数据;冷的Observable一直等待有观察者订阅才开放发射数据。

如果想找到一个类似于Observable的接口,那就是迭代器Iterator 。但Observable可以产生3种类型的事件;

  • Observable声明的类型值
  • 完成事件
  • 错误事件

示例:

Observable<Integer> observable = Observable.create(emitter -> {
    try {
        IntStream.range(1, 6).forEach(item -> {
            System.out.println("emit " + item);
            emitter.onNext(item);
        });
        emitter.onComplete();
    } catch (Exception e) {
        emitter.onError(e);
    }
});

可以通过just(),from(),create()创建Observable,但是他们也有些不同,并不都是在订阅发送时,才出发Observable发射数据。defer()可以把Observable转成lazy。

public Observable<Person> listPeople() {
    return Observable.defer(() ->
        Observable.from(query("SELECT * FROM PEOPLE")));
}

操作符

rxjava提供了各种操作符,使得复制逻辑可以通过操作符叠加组合完成,这也是rxjava强大的原因。一个操作符是一个方法,获取上游Observable,处理后返回给下游Observable

List<String> words = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dogs");
Observable.from(words)
          .flatMap(word -> Observable.from(word.split("")))
          .distinct()
          .sorted()
          .zipWith(Observable.range(1, Integer.MAX_VALUE),
                  (string, count) -> String.format("%2d. %s", count, string))
          .subscribe(System.out::println);

按序号输出26个字母

示例中的from,sorted,zipWith等均为操作符。跟Java 8里引入的Streams API很相似,不过后者只是遍历和操作集合,无法更细粒度的控制和无限制的流处理。

操作符还有很多,根据功能分类如:创建操作,变换操作,过滤操作,连接操作…等,详细见operators

多线程

rxjava并没有引入自己的并发特性,只是对thread的抽象和封装。原则上,他和ScheduledExecutorService类似,只是实现了更细粒度的抽象。

Observable通过操作subscribeOn() 和 observeOn()绑定到对应的scheduler上,如:

Observable.just("Hello","rxJava")
          .subscribeOn(Schedulers.io())
          .subscribe(System.out::println);

scheduler只是创建worker实例负责调度和运行代码,Schedulers提供了一些创建scheduler的静态方法:

  • Schedulers.newThread()
  • Schedulers.io()
  • Schedulers.computation()
  • Schedulers.from(Executor executor)

可以这么理解,Schedulers类似线程池,负责调度;worker类似thread,负责执行具体代码。最好Observables是异步的,从而避免使用subscribeOn();

对于阻塞的Observables(blocking Observables,用得少):

  1. Observable如果不用Scheduler,跟一个单线程操作没有区别
  2. Observable只有一个subscribeOn(),就像开启了一个后台线程处理任务,还是线程内阻塞。
  3. Observable使用flatMap(),给每一个内部Observable都使用subscribeOn(),就像ForkJoinPool
  4. 连续使用subscribeOn,只有第一个生效。observeOn可以多次调用。

下面通过一个示例说明:

 ExecutorService es = Executors.newFixedThreadPool(200,new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
CountDownLatch finishedLatch = new CountDownLatch(1);
long start = System.currentTimeMillis();
Observable.range(0, 100)
            //.subscribeOn(Schedulers.from(es))
            .map(item -> {
                try {
                    // do something here
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    return Observable.error(e);
                }
                return item + "map";
            }).observeOn(Schedulers.computation()).subscribe(item -> { }, (Throwable error) -> { }, finishedLatch::countDown);
finishedLatch.await();
System.out.println((System.currentTimeMillis() - start) + "");

输出:2068

没有使用subscribeOn(),客户端线程阻塞。打开注释.subscribeOn(Schedulers.from(es)),执行时间:2073。map处理100个数据只是从线程池中获取一个线程处理,还是线程内阻塞。再修改如下:

ExecutorService es = Executors.newFixedThreadPool(200,
        new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());
CountDownLatch finishedLatch = new CountDownLatch(1);
long start = System.currentTimeMillis();
Observable.range(0, 100)
            .flatMap(rangeItem -> Observable
                    .just(rangeItem)
                    .subscribeOn(Schedulers.from(es))
                    .map(i -> {
                        try {
                            Thread.sleep(20);
                        } catch (InterruptedException e) {
                            return Observable.error(e);
                        }
                        return i + "map";
                    }))
            .observeOn(Schedulers.computation()).subscribe(item -> {
}, (Throwable error) -> {
}, finishedLatch::countDown);
finishedLatch.await();
System.out.println("-------------");
System.out.println((System.currentTimeMillis() - start) + "");

输出:95

flatMap通过给每个元素创建一个新的Observable处理:再放进一个单独的Observable。

总结

rxjava其实就是观察者模式的扩展,通过强大的操作符组合使得数据流处理逻辑简单清晰,再加上线程的封装实现异步,无需担忧无关的抽象底层线程,同步,线程安全和并发数据结构。


参考文档: http://reactivex.io/ http://rxmarbles.com/ https://blog.csdn.net/xmxkf/article/details/51791120 https://www.jianshu.com/p/88aacbed8aa5 https://www.gitbook.com/book/mcxiaoke/rxdocs/details https://www.youtube.com/watch?v=WKore-AkisY

CONTENTS