记一次RxJava里zip操作符引起的崩溃

先说一下是什么造成了崩溃

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with

在已经取消或处置完成的流中发生了异常,异常无处可去,只能抛出来给系统。

案发现场

代码很简单,在一个zip操作符中接收两个数据源,然后转发出一个新数据。

// 创建数据源, 代码简化为崩溃复现的必要条件
fun getSource(id: Int): Observable<String> {
        return Observable.create<String> { emitter ->
                lifecycleScope.launch {
                    delay(100) // 模拟数据加载耗时
                    emitter.onError(IllegalStateException("发生异常 $id"))
                }
        }
}
// 合并两个数据加载,可以看到`subscribe`里有做接收异常的处理
Observable.zip(getSource(1), getSource(2), { s1, s2 -> ""})
        .subscribe({}, {})

初见时应该会有人疑惑,这应该没问题啊,怎么会崩溃。

先看一眼异常堆栈信息

java.lang.IllegalStateException: 发生异常 2
    at com.example.myapp.ui.BaseHomeActivity$onItemPressed$getSource$1$1.invokeSuspend(BaseHomeActivity.kt:868)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
    at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
    at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
    at kotlinx.coroutines.android.HandlerContext$scheduleResumeAfterDelay$$inlined$Runnable$1.run(Runnable.kt:19)
    at android.os.Handler.handleCallback(Handler.java:790)
    at android.os.Handler.dispatchMessage(Handler.java:99)
    at android.os.Looper.loop(Looper.java:164)
    at android.app.ActivityThread.main(ActivityThread.java:6518)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)

也是让人一头雾水,从协程一路调用过来就崩溃了?难道是协程问题?

先不急下定论,当你把抛Rx异常用的类从IllegalStateException换成Throwable后就能得到另一套堆栈

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Throwable: 发生异常 2
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
    at io.reactivex.internal.operators.single.SingleCreate$Emitter.onError(SingleCreate.java:81)
    at com.example.myapp.ui.BaseHomeActivity$onItemPressed$getSource$1$1.invokeSuspend(BaseHomeActivity.kt:868)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
    at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
    at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
    at kotlinx.coroutines.android.HandlerContext$scheduleResumeAfterDelay$$inlined$Runnable$1.run(Runnable.kt:19)
    at android.os.Handler.handleCallback(Handler.java:790)
    at android.os.Handler.dispatchMessage(Handler.java:99)
    at android.os.Looper.loop(Looper.java:164)
    at android.app.ActivityThread.main(ActivityThread.java:6518)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)
Caused by: java.lang.Throwable: 发生异常 2
    ... 16 more

分析原因

从第二个堆栈中终于暴露了崩溃的真凶,就是文章开头的内容。

两个数据源只是负责发射事件,出问题的应该是zip操作符。断点调试可以追踪到源码类ObservableZip

发现zip的实现原理是最后一个参数observer去订阅观察前面两个数据源,收到事件后用for循环判断所有数据源是否结束.

当zip收到一个onError事件取消结束当前流,向下发送error事件。

这在同一线程下,或运气好的异步情况下是没问题的,但zip没有线程安全,两个数据源异步发送事件时就容易发生以上崩溃。

总结

  1. 不要乱用异常类进行异常封装,容易丢失堆栈信息
  2. RxJava的各种操作多个数据源的操作符应该都是没有线程安全的,要小心使用。

本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 进行许可。