深入理解RxJava与RxKotlin

What is Rx?

基本概念

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

按照官方介绍RxJava是由Netflix发起的一个在JVM平台上以Observable序列来处理异步任务的响应式编码库

Why Rx?

现在Android社区都在广泛的推广和使用RxJava,但是使用它究竟能能带来什么样的好处呢,我们通过一个具体的案例来研究

用户登录系统需求

假设现在我们有一个微信用户第三方登录功能的需求,这个需求很简单,就是实现一个登录界面,用户点击登录跳转到微信进行用户认证,拿到微信的openId然后再接入到自己后台服务器进行登录。这里我们设计一个AccountManager类,这个类里面有2个方法,首先是requestWxOpenId()方法用于跳转微信获得微信的openId,另外一个就是通过openId登录认证的login()方法拿到用户信息

1
2
3
4
5
interface AccountManager {
fun requestWxOpenId(): String
fun login(wxOpenId: String): User
}

传统代码实现

因为AccountManager这些方法都是网络访问的接口,是个异步任务,如果直接调用会堵塞主线程肯定是不行的,所以我们会封装一个callback出来,回调方法用来处理当接口请求结束后的操作,因为接口请求有可能成功也有可能失败,所以回调接口中包含了两个方法onSuccessonFaillure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
interface OpenIdCallback {
fun onSuccess(openId: String)
fun onFailure()
}
interface AccountCallback {
fun onSuccess(resultl: String)
fun onFailure()
}
interface AccountManager {
fun requestWxOpenId(callback: OpenIdCallback): String
fun login(wxOpenId: String, callback: AccountCallback)
}

AccountManager的实现很简单首先调用requestWxOpenId方法获取到openId后再调用login方法获取用户信息,调用代码如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private fun clickLogin() {
val accountManager = AccountManagerIml()
accountManager.requestWxOpenId(object : OpenIdCallback {
override fun onSuccess(openId: String) {
accountManager.login(openId, object : AccountCallback {
override fun onSuccess(user: User) {
if (!isActivityDestroyed) {
runOnUiThread {
//update UI
}
}
}
override fun onFailure() {
//handle error
}
})
}
override fun onFailure() {
//handle error
}
})
}

可以看到整套流程下来,有两个痛点:

第一是每一个异步请求我们都需要声明一个callback接口,另外一个是callback的多层嵌套使得代码很不清晰,如果再嵌套多几层会使得代码很难看,这种callback的多层嵌套还有个专有的名词叫回调地狱(callback hell)

第二就是像在Android应用开发中需要在主线程操作UI组件以及Activity生命周期的判断,这就需要在各个回调中加上各种判断。

为了解决这些问题Rx应运而生

使用RX的实现

为了解决上述提到哪些问题,我们看看Rx怎么解决的,首先为了解决不同的callback问题,Rx定义了统一的回调接口Observer

1
2
3
4
5
6
7
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}

通过泛型包装来统一回调接口,Observer回调中封装了三个方法onErro用来处理错误情况,onNext用来处理正常情况下成功返回的结果,onComplete表示整个流程结束时的回调。

通过统一泛型回调接口解决了声明多个回调的问题,那么如何解决回调嵌套的问题呢,因为回调嵌套产生的原因是由于异步方法的的链式调用产生的,如果把异步方法的的链式调用都能直接反映在链式的数据处理上,Rx是通过将每一个异步方法直接返回Observable对象,异步方法的组合调用通过Observable的组合,最终得到一个Observable来表示链式任务处理的最终结果。

Observable表示一个数据源,在AccountManager的方法中,我们对应的接口就需要修改

1
2
3
4
5
interface AccountManager {
fun requestWxOpenId(): Observable<String>
fun login(wxOpenId: String):Observable<User>
}

我们调用代码就只需要一个subscriber就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private fun clickLogin(){
val accountManager = AccountManagerIml2()
accountManager.requestWxOpenId()
.flatMap { accountManager.login(it) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: Subscriber<User>() {
override fun onNext(t: User?) {
//update ui
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
}

看到上面的代码是不是变得清晰了,如果使用RxKotlin,会更加清晰

1
2
3
4
5
6
7
8
9
10
11
12
private fun clickLogin() {
val accountManager = AccountManagerIml2()
accountManager.requestWxOpenId()
.flatMap { accountManager.login(it) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(
onError = {},
onNext = {
//update ui
})
}

可以看到,使用RxJava来封装这些异步操作后,整个代码跟同步操作的代码都非常接近,结构很清晰和明了

What’s Rx inside?

要想更好的使用RxJava,就有必要更好更深入的了解RxJava的api,这一部分将深入了解RxJava和RxKotlin的api和部分实现原理

RxJava1 Api探究

事件源

Observable

Observable是最为常用的数据源,Observable对象提供了常用的创建方法,create,just,from等方法

1
2
3
4
5
6
7
8
9
10
11
12
Observable.just("hello")
Observable.from(arrayOf(1, 2, 3, 4, 5))
Observable.create(Observable.OnSubscribe<String> { t ->
try {
t.onNext("hello")
t.onStart()
} catch (e: Exception) {
t.onError(e)
}
})

Completable

Completable数据源只会触发onComplete和onError回调,通过from()方法可以创建Completable数据源

1
Completable.fromAction { println("action done") }

Single

Single数据源只会发射一条数据,Single类也有create,just,from等方法用来创建Single数据源对象,subscribe方法只有onSuccess和onError,比方说如下代码

1
2
3
4
5
Single.just(1)
.subscribeBy(
onSuccess ={ println(it) },
onError = {}
)

操作符

操作符可以说是是RxJava中的核心了,通过不同的操作符来实现数据的拼接,转换等等操作从而达到最终的数据结果,RxJava提过了很多的操作符,这里就不一一讲解,主要列出一些常用的

map

用于数据转换,常见的应用场景比方说id应设成名称

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1, 2, 3, 4)
.map {
when (it) {
1 -> "Jane"
2 -> "Mike"
3 -> "Mary"
4 -> "tom"
else -> "noName"
}
}
.subscribeBy(onNext = { println(it) })

输出内容为

1
2
3
4
Jane
Mike
Mary
tom

flatMap

flatMap操作类似于Map,但是Map转换是T -> D,但是flatMap是T -> Observable\

1
2
3
4
5
6
7
8
fun getInfoById(id: Int): Observable<Int> {
//模拟耗时请求
return Observable.just(id)
}
Observable.just(1, 2, 3, 4)
.flatMap { getInfoById(it) }
.subscribeBy(onNext = { println(it)})

这里getInfoById我们假设是一个网络请求或者数据库操作之类的耗时操作,它返回Observable对象,最后subscriber中onNext中接受到的还是Int数据

输出结果

1
2
3
4
1
2
3
4

filter

filter用于过滤数据

1
2
3
Observable.just(1,2,3,4)
.filter { it > 2 }
.subscribeBy(onNext = { println(it)})

输出结果

1
2
3
4

….

RxJava中操作符可以说是非常非常多,想了解更多可以查看官方文档Rx操作符

线程调度

线程调度在RxJava可以说是非常精简,简单到一两行代码来实现线程切换

1
2
3
4
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = { println(it)})

上面代码中通过切换android ui线程和io线程是的回调方法都运行在ui线程,Observable数据源操作在io线程中运行

Transformer

我们在使用RxJava的过程中,通常都会用到线程切换,耗时操作在io线程,subscriber回调在主线程

1
2
3
4
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = { println(it)})

对于多个地方都总是要重复调用subscirbeOn和observeOn这两个方法,我们会想到去封装一下,比方说增加一个方法

1
2
3
4
fun schedule(observable: Observable<Any>): Observable<Any> {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}

这样我们就可以这样调用

1
2
schedule(observable)
.subscribeBy(onNext = { println(it)})

这样做虽然加了一层封装,但是打破了整个Observable的链式调用,这就显得不太好看

好在RxJava有Transformer,通过与Observable的compose()操作符使用来组合Observable操作

Transformer就是一个Func1, Observable>,可以通过它将一种类型的Observable转换成另一种类型的Observable

1
2
3
4
5
6
fun <T> applySchedulers(): Observable.Transformer<T, T> {
return Observable.Transformer { observable ->
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}

我们创建一个applySchedulers方法返回一个Transformer对象,这时候我们就可以直接链式切换线程了

1
2
observable.compose(applySchedulers())
.subscribeBy(onNext = { println(it)})

Subject

Subject是一个很特殊的接口,它即实现了Observable接口又实现了Observer接口,也就是说它既是一个数据发射方又是一个数据接收方

AsyncSubject

AsyncSubject注册到的Observer只会在onComplete回调之后接收到最后一个onNext发射出来的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// observer不会接收到数据,因为subject没有调用onComplete
AsyncSubject<Object> subject = AsyncSubject.create();
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer只会接收到最后一个数据,也就是“three”
AsyncSubject<Object> subject = AsyncSubject.create();
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();

BehaviorSubject

注册到BehaviorSubject的observer只会接收到subscribe之前发射的最后一个item和之后发射的所有item

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// observer会接收到所有4个事件(包括"default").
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "default" and "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onCompleted
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.onCompleted();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);

PublishSubject

observer会接收到subscribe后所有发射出来的item

1
2
3
4
5
6
7
8
9
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();

TestSubject

TestSubject是专门为单元测试开发而来的

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void cancelShouldUnsubscribe() {
TestSubject<String> observable = TestSubject.create(new TestScheduler());
assertFalse(observable.hasObservers());
T future = toFuture(observable);
assertTrue(observable.hasObservers());
future.cancel(true);
assertFalse(observable.hasObservers());
}

RxJava2

Reactive Streams

Reactive Streams是一套在JVM上建立的响应式标准协议,旨在标准化以无阻塞式back pressre的形式来处理异步事件流协议,类似于网络的标准http协议一样,只是Reactive Streams是在JVM和JavaScript平台上的响应式协议

规范概览

  • Publisher

    1
    2
    3
    public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
    }

Publisher接口用来定义发射事件对象,通过subscribe方法接收想要接收事件的subscriber

  • Subscriber
1
2
3
4
5
6
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
  • Subscription
1
2
3
4
public interface Subscription {
public void request(long n);
public void cancel();
}
  • Processor
1
2
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Java9 Flow

Reactive Streams定义了规范,JDK也实现了这个规范,在最新的Java9中引入了一个java.util.concurrent.Flow模块,这就意味着如果你使用Java9,可以直接引用Flow模块来应用响应式api,而不必须引入第三方的RxJava包

api变动

数据源
  • Observable和Flowable两种数据源,Observable是没有back pressure支持的,Flowable是需要设置back pressure选项的,也就是说对于事件发射方是否会产生back pressure问题需要使用者来决定,使用Flowable就必须要指定back pressure模式,不指定会直接报错
  • Single数据源的subscriber在RxJava2中改为SingleObserver,增加了onSubscribe方法
1
2
3
4
5
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
  • Completable的Subscriber在RxJava2中改为CompletableObserver
1
2
3
4
5
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
  • Maybe: Maybe是在RxJava2中新加的一种数据类型,他是Single和Completable的结合体Maybe可能会发射0或者1个事件或者error
Subject和Processor

在RxJava1中的Subject,在RxJava2中新增了Processor,Processor与Subject类似,Processor支持back pressure而Subject不支持back pressure

Subscriber和Subscription

Subscriber增加了一个onSubscribe(Subscription)的回调方法,因为原有的Observable在subscribe一个Subscriber的时候会返回Subscription,但是在RxJava2中subscribe方法返回的是void,所以对应在Subscriber中增加了Subscription的回调方法

其他
  • RxJava2不再接收直接发射null,例如
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    * Schedules将immediate改为trampoline
    * 增加了test的操作符
    RxJava2还有许多变更,这里未能一一讲述,更详细文档请参考官方[What's-different-in-2.0](https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0)
    ### RxKotlin
    RxKotlin是基于RxJava轻量级框架,在基于Kotlin的项目中,本身可以直接使用RxJava,但是RxKotlin通过增加各类扩展函数和规范使得RxJava在Kotlin项目中更易使用和标准化
    ```kotlin
    import io.reactivex.rxkotlin.subscribeBy
    import io.reactivex.rxkotlin.toObservable
    fun main(args: Array<String>) {
    val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
    list.toObservable() // extension function for Iterables
    .filter { it.length >= 5 }
    .subscribeBy( // named arguments for lambda Subscribers
    onNext = { println(it) },
    onError = { it.printStackTrace() },
    onComplete = { println("Done!") }
    )
    }

completable扩展

1
2
3
4
fun Action0.toCompletable(): Completable = Completable.fromAction(this)
fun <T> Callable<T>.toCompletable(): Completable = Completable.fromCallable { this.call() }
fun <T> Future<T>.toCompletable(): Completable = Completable.fromFuture(this)
fun (() -> Any).toCompletable(): Completable = Completable.fromCallable(this)

completable扩展主要是为各种对象增加toCompletable()的转换方法

observables扩展

observabled扩展方法主要是对各个对象增加toObservable对象,包括各个Array对象,iterator对象,Sequence对象等

operators扩展

1
2
3
4
5
6
7
fun <T> Observable<Observable<T>>.mergeAll() = flatMap { it }
fun <T> Observable<Observable<T>>.concatAll() = concatMap { it }
fun <T> Observable<Observable<T>>.switchLatest() = switchMap { it }
fun <T> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)

操作符主要增加了对Observable>操作的扩展方法

single扩展

1
2
3
fun <T> T.toSingle(): Single<T> = Single.just(this)
fun <T> Future<T>.toSingle(): Single<T> = Single.from(this)
fun <T> Callable<T>.toSingle(): Single<T> = Single.fromCallable { this.call() }

single扩展增加了Future,Callable和T的toSingle方法

subscribers扩展

subscribers增加了subscribeBy及其相应的重构方法,使用方法可参考如下代码

1
2
3
4
5
6
7
list.toObservable() // extension function for Iterables
.filter { it.length >= 5 }
.subscribeBy( // named arguments for lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)

subscription扩展

subscription增加了两个扩展方法

1
2
3
4
5
6
operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)
fun Subscription.addTo(compositeSubscription: CompositeSubscription) : Subscription {
compositeSubscription.add(this)
return this
}

一个是kotlin符号重载,可以直接使用+=将subscription添加到CompositeSubscription中

1
subscription += observable.subscribe{}

另外一个是在Subscription中增加了addTo方法

1
2
3
4
5
6
7
list.toObservable()
.subscribeBy( // named arguments for lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)
.addTo(compositeSubscription)

How to apply Rx?

Android Project Architecture

上图是Google IO 2017上新提出的Android Architecture,旨在帮助开发者如何更快更稳定的基于此架构开发应用

从上面的架构图中,我们可以看到数据源的处理统一是通过Repository来封装,不管是数据库的数据还是服务器接口的数据,通过通过Repository封装再以LiveData的形式提供,注意这里的LiveData其实可以理解为RxJava的Observable。所以对于Android项目结构中数据源的封装,建议都采用Repository的形式,如果采用Android Architecture那可以采用LiveData,如果采用RxJava就以Observable来组合Repository

Rx应用

Retrofit

Retrofit是一个基于RestFul Api的Java框架,我们可以直接使用Observable返回

1
2
3
4
public interface GitHubService {
@GET("users/{user}/repos")
Observable<List<Repo>> listRepos(@Path("user") String user);
}

定义接口请求后,我们就可以通过Retrofit获取service实例做请求

1
2
3
4
5
6
7
8
9
10
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.build();
GitHubService service = retrofit.create(GitHubService.class);
service.listRepost("test")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext={})

Retrofit具体介绍可查看其项目https://github.com/square/retrofit 这里就不一一介绍了

RxBinding

RxBinding是一个将Android UI控件事件转换为Rx事件的Java库

1
2
3
4
5
6
7
8
Button button = ...;
RxView.clickEvents(button)
.subscribe(new Action1<ViewClickEvent>() {
@Override
public void call(ViewClickEvent event) {
// Click handling
}
});

上面的代码中就是通过RxBinding将按钮的点击事件转换为Observable发射,这么做就可以通过一些操作符来显示快速点击或屏蔽指定的点击动作等等

RxLifecycle

RxLifecycle是一个管理Android Activity或Fragment组件周期的应用库

1
2
3
myObservable
.compose(RxLifecycle.bind(lifecycle))
.subscribe();

通过RxLifecycle的绑定到指定生命周期来处理比方说在Activity
destroy的时候unsubscribe,在Activity onCreate的时候进行绑定

RxBus

RxBus是一个通过RxJava来实现时间总线的开源库,使用方式与大多是EventBus库类似,只是内部是通过RxJava来处理事件,这里就不做篇幅讲解了,有兴趣的同学可以参考https://github.com/AndroidKnife/RxBus

注意事项

  • 在使用RxJava的时候,有时候会遇到小伙伴如下代码
1
fun getUser(uId: String, subscriber: Subscriber<User>)

在调用是传入Subscriber,这种方式还是传统的callback形式,这种方式完全背弃了Rx的初衷,这样修正会更贴切

1
fun getUser(uId: String):Observable<User>
  • 在使用RxJava的时候忽视back pressure问题,可能会产生莫名的问题,需要注意实际事件发射场景是否会产生back pressure然后采用正确的操作符或者Flowable源来处理

总结

至此,整个RxJava的学习过程结束了,本篇文章站在RxJava应用的角度去分析,没有太过深入源码解析,这也不是本篇文章的目的,后续可能会再出一些源码解析类的文章,再就是随着Kotlin coroutine(协程)出现,Android有更多可以实现异步任务的方式。

关于Android Coroutine相关内容,可以参考Android Coroutine开发实践

更多Android相关技术文章,欢迎订阅微信公众号“TechLee”

写得好,就打赏一下吧!