RxJava线程切换之subscribeOn和observeOn详解

移动端 0 1325
小小草
小小草 2022年2月26日 13:01 发表

在前面两篇文章中我们主要讲了RxJava在实战开发中的一些应用案例,这些案例都是公司项目中的实战应用,项目在五一的时候已经上线了,通过先入为主的方式直接上案例,然后在结合理论在进行学习,这样我们的理解就会更加的深刻,通过前面两篇文章的介绍,大家已经掌握了RxJava的一些基本使用,这篇文章我们主要介绍RxJava的高级应用: 线程调度。

我们知道,响应式编程是通过异步和数据流来构建事物关系的编程模型,后台处理数据,前台响应数据,而RxJava是响应式编程在Java语言中拓展库,它以观察者模式为核心,通过强大的操作符,对事件中的信息进行操作转换,并可以灵活的实现线程调度的一个框架,随着业务逻辑的越来越复杂,使我们的代码依然保持简洁,正是由于这些优势,深受广大开发者的追捧。

RxJava的最大优势之一是它能够轻松地在各种线程上安排工作和处理结果,我们在实际开发中,一个RxJava调用能不能多次操作observeOn和subscribeOn?observeOn和subscribeOn的先后调用顺序对线程切换有没有影响?Schedulers.io()和Schedulers.newThread()到底有何区别?通过本文的学习这些问题将逐一揭开他们的神秘面纱,避免线程调度中出现的一些常见问题,因为与线程相关的错误非常难以追踪。

RxJava线程调度

RxJava中的线程调度是在Scheduler的帮助下完成的。Scheduler可以被认为是管理1个或多个线程的线程池。每当Scheduler需要执行任务时,它将从其池中获取一个线程并在该线程中运行该任务。

让我们总结一下可用的Scheduler类型及其常见用途:

  • 1.Schedulers.io()由无限制的线程池支持。它用于非CPU密集型I / O类型工作,包括与文件系统的交互,执行网络调用,数据库交互等。此线程池旨在用于异步执行阻塞IO,该线程池中的线程可以重复使用的。

  • 2.Schedulers.computation()由有限线程池支持,其大小可达处理器的数量。它用于计算或CPU密集型工作,例如调整图像大小,处理大型数据集等。

  • 3.Schedulers.newThread()为每个安排的工作任务创建一个新线程,这个调度程序很昂贵,因为每次都会生成新线程,并且不会重复使用。

  • 4.Schedulers.from(Executor executor)创建并返回由指定执行程序支持的自定义调度程序。要限制线程池中同时线程的数量,请使用Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果在所有线程都被占用时调度任务,它将排队。池中的线程将一直存在,直到它被明确关闭。

  • 5.主线程或AndroidSchedulers.mainThread()由RxAndroid扩展库提供给RxJava。主线程(也称为UI线程)是用户交互发生的地方。应该注意不要重载此线程以防止应用程序无响应(ANR)对话框。

  • 6.Schedulers.single() 是RxJava 2中的新增功能。此调度程序由单个线程支持,该线程按请求的顺序依次执行任务。

  • 7.Schedulers.trampoline()由参与的工作线程之一以FIFO(先进先出)方式执行任务。它通常在实现递归时使用,以避免增加调用堆栈。

警告:小心使用无界线程调度:如Schedulers.io()和Schedulers.newThread()

简单的例子

让我们从一个基本的RxJava代码开始,其中Observable<String>发出一个字符串并计算每个字符串的长度,我们将在以下部分中以此示例为基础。

Observable.just("long", "longer", "longest")
    .map(String::length)
    .subscribe(length -> System.out.println("item length " + length));

执行时,将打印:

item length 4
item length 6
item length 7

现在,让我们通过在doOnNext()中打印出线程信息来看看这项工作正在进行的线程,这是一个为每个发出的的数据执行的操作。

Observable.just("long", "longer", "longest")
    .doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
    .map(String::length)
    .subscribe(length -> System.out.println("item length " + length));

执行时,将打印:

processing item on thread main
item length 4
processing item on thread main
item length 6
processing item on thread main
item length 7

所以这个流是在主线程上发出和处理的,因为上面的代码块位于我的类的main方法中

在后台线程上工作

通常将某些工作委托给后台线程是有意义的,
subscribeOn运算符告诉源Observable发出哪个线程并将项目一直推送到Observer,因此它会影响上游和下游的所有操作, 将subscribeOn放在Observable操作符链中的哪个位置其实并不重要。

关于我们的Observable要记住的事情是:

subscribeOn()这仅影响Observable订阅时使用的线程,并且它将保留在下游

  • 1.Observable它的工作是被动的,只有订阅subscribe发生后它才开始工作,并且每订阅一次,工作就再次进行一次。
  • 2.subscribeOn()指定工作执行所在的线程池,它的位置无关紧要,它可以在流的任何位置,如果流中有多个实例subscribeOn,则只有第一个具有实际效果。
  • 3.数据的处理工作和数据的接收工作是在同一线程中完成的,可以使用observeOn()进行更改线程。

使用subscribeOn()

Observable.just("long", "longer", "longest")
    .doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
    .subscribeOn(Schedulers.newThread())
    .map(String::length)
    .subscribe(length -> System.out.println("item length " + length));

让我们在main方法中运行更新的代码示例,执行时,这将不会打印任何内容
这是因为main方法在后台线程返回结果之前完成执行。为了解决这个问题,我们将主方法保持活动3秒钟 以便有足够长的时间让我们有机会在后台线程上处理数据。

public static void main(String[] args) throws InterruptedException {
Observable.just("long", "longer", "longest")
        .doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.newThread())
        .map(String::length)
        .subscribe(length -> System.out.println("item length " + length + "received on " + Thread.currentThread().getName()));
    Thread.sleep(3000);
}

执行时,将打印:

processing item on thread RxNewThreadScheduler-1
item length 4 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 6 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 7 received on RxNewThreadScheduler-1

后台线程工作的结果在同一个线程RxNewThreadScheduler-1上返回。

执行网络/ IO /计算任务时,使用后台调度程序至关重要。如果没有subscribeOn(),您的代码将使用调用程序线程来执行操作,从而导致Observable阻塞

理解observeOn()

observeOn()指定下游运算所在的线程

正如我们在上面看到的那样,subscribeOn()指示源Observable从哪个线程发出数据,这个线程将把数据流一直推到我们的Observer。但是如果遇到observeOn()链中的任何位置,它将使用observeOn所指定的线程来操作的后续切换和数据流推送。
通常Android中的观察线程是主UI线程AndroidSchedulers.mainThread()。这需要RxAndroid扩展库到RxJava。
让我们修改我们的示例代码以执行后台工作,Schedulers.newThread()然后切换到AndroidSchedulers.mainThread()。

使用observeOn()

Observable.just("long", "longer", "longest")
        .doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.newThread())
        .map(String::length)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(length -> System.out.println("item length " + length + " received on thread " + Thread.currentThread().getName()));

执行时,我们将看到主线程现在收到结果

processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 4 received on thread main
item length 6 received on thread main
item length 7 received on thread main

异步工作

虽然RxJava被称为使用可观察序列组成异步和基于事件的程序的库,但它可以同步执行许多有用的任务。例如,map(String::length)上面使用相同的线程RxNewThreadScheduler-1处理每个项目,顺序保留相同的顺序。

简单地subscribeOn()在Observable链的开头使用意味着该过程仍然在单个线程上运行并且在下游同步发送事件。然而,当你开始在不同的线程或使用操作符,如不同的流相结合observeOn(),interval(),delay(),你可观察链不再是同步的。

现在,让我们看看如何修改上面的示例,以便每个发出的事件同时由一个单独的线程处理。

介绍flatMap()

flatMap()包装了被发射的每一个项目Observable,让你自己RxJava操作符,包括使用分配一个新的计划subscribeOn()来处理这些操作。一旦所有的项目中flatMap()都已经被处理,个别Observables的然后合并到一个单一的Observable没有特定的顺序。

为了使事情变得更加真实,让我们假装每个项目的转换最多需要3秒钟才能完成。以下两件事应该成立:

  • 1.每个项目都由自己的线程处理
  • 2.由于处理每个项目所需的随机时间,因此无法保证完成的项目的顺序

使用 flatMap()

public static void main(String[] args) throws InterruptedException {
    Observable.just("long", "longer", "longest")
        .flatMap(v ->
    performLongOperation(v)
            .doOnNext(s -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread()))
        .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));

    Thread.sleep(10000);
}
/**
 * Returns length of each param wrapped into an Observable.
 */
protected static Observable<Integer> performLongOperation(String v) {
    Random random = new Random();
    try {
        Thread.sleep(random.nextInt(3) * 1000);
        return Observable.just(v.length());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return null;
}

这将导致以下输出:

processing item on thread RxNewThreadScheduler-3
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 7 on thread RxNewThreadScheduler-3
received item length 4 on thread RxNewThreadScheduler-1
received item length 6 on thread RxNewThreadScheduler-2

请注意:

  • 1.每个项目由一个单独的线程处理。
  • 2.转换后元素的顺序是随机的。

如果您需要保留结果项的顺序怎么办?

介绍concatMap()

concatMap()类似flatMap()但保证处理的项目的顺序与原始排放的顺序相同。

使用concatMap()

public static void main(String[] args) throws InterruptedException {
    Observable.just("long", "longer", "longest")
        .concatMap(v ->
    performLongOperation(v)
            .doOnNext(s -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
            .subscribeOn(Schedulers.newThread()))
        .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));

    Thread.sleep(10000);
}

这导致以下输出:

processing item on thread RxNewThreadScheduler-1
received item length 4 on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 6 on thread RxNewThreadScheduler-2
processing item on thread RxNewThreadScheduler-3
received item length 7 on thread RxNewThreadScheduler-3

请注意,项目的返回顺序与原始流中的顺序相同

subscribeOn()的陷阱

如上所示,subscribeOn()更改我们Observable发出和转换的线程。在没有observeOn()的情况下,流处理的结果被发送到完成工作的线程。例如,如果我们有subscribeOn(Schedulers.computation())和observeOn()没有指定,结果也会被分派到Computation线程。

将subscribeOn()操作符放在链中的位置并不重要,它仍然表示将在其上发出Observable的线程。
如果subscribeOn()在链中指定了多个RxJava操作符,则只使用第一个运算符,除非在flatMap()中使用subscribeOn(),否则将忽略后面的运算符,如上所示。

这是一个例子:
使用多个subscribeOn()

Observable.just("long", "longer", "longest")
        .doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.computation())
        .subscribeOn(Schedulers.newThread())
        .map(String::length)
        .subscribe(length -> System.out.println("item length " + length + " received on " + Thread.currentThread().getName()));

这将导致以下输出:

processing item on thread RxComputationThreadPool-1
item length 4 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 6 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 7 received on RxComputationThreadPool-1

请注意,Schedulers.computation()上面的线程池Schedulers.newThread()在从未使用过的情况下完成了工作。这是因为计算调度程序首先列出,所有后续subscribeOn()运算符都被忽略。

默认调度程序

一些库在subscribeOn()内部指定以强制执行后台工作的线程。例如,默认情况下,Observable.delay()RxJava库将在Computation上发出Scheduler。subscribeOn()您在其上指定的任何内容都将无效。但是您可以为该运算符使用重载版本的工厂方法,而不是传递您选择的自定义Scheduler。

@SchedulerSupport("io.reactivex:computation")
public final Observable<T> delay(long delay, TimeUnit unit) {
    return this.delay(delay, unit, Schedulers.computation(), false);
}
@SchedulerSupport("custom")
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
    return this.delay(delay, unit, scheduler, false);
}

提示: 当您使用运算符时,编辑器可以向您发出警告,例如delay()不覆盖其默认调度程序
这也意味着当您在使用时使用依赖于调度程序的运算符(如delay(),interval()等)时subscribeOn(),您可能会在没有意识到的情况下生成(但不使用)线程。始终查看这些运算符的Javadoc以确保最佳使用。特别要注意@SchedulerSupport注释

onError()

最后,当subscribeOn()使用但没有使用onError(),如果发生错误,它将被抛出订阅的调度程序线程,但错误堆栈跟踪将没有引用您订阅的位置。这将使调试非常困难。要避免此问题,请使用onError()。

observeOn()的陷阱

重要的是要记住,与事物的subscribeOn()放置不同observeOn()。切换调度程序observeOn()适用于所有下游操作符。

例如,在以下示例中由于observeOn()放置map(String::length)而filter(length -> length == 6)将在主线程上执行。这真的是有意的吗?

Observable.just("long", "longer", "longest")
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .map(String::length)
        .filter(length -> length == 6)
        .subscribe(length -> System.out.println("item length " + length));

放置observeOn()操作符的位置要小心,因为它会更改执行工作的调度程序,在大多数情况下,您可能希望延迟切换到观察线程,直到Rx链的最后。

例如,让我们看看下面的RxJava链,该链进行HTTP网络调用:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(response -> response.body().string())

没有理由在observeOn()操作符上方应用map()操作员。实际上,这段代码会导致NetworkOnMainThreadException!我们不想在主线程上读取HTTP响应 - 它应该在我们切换回主线程之前完成:

.subscribeOn(Schedulers.io())
.map(response -> response.body().string())
.observeOn(AndroidSchedulers.mainThread())

警惕多重 observeOn()

您可以拥有多个observeOn()操作符。当在下游执行时,observeOn()下面的每个都将覆盖上面的操作符。
这是一个例子:
使用多个 observeOn()

Observable.just("long", "longer", "longest")
        .doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
        .observeOn(Schedulers.computation())
        .map(String::toString)
        .doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
        .observeOn(Schedulers.io())
        .map(String::toString)
        .subscribeOn(Schedulers.newThread())
        .map(String::length)
        .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));

输出如下:

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

最后,我建议你尽可能避免这种复杂性。这样做将使将来调试和维护此代码变得更加容易。


点赞 0 收藏(0)    分享
相关标签: RxJava
问题没解决?让AI助手帮你作答 AI助手
0 个评论
  • 消灭零评论