Mutiny - 流控制和背压

响应式编程提供了一种优雅、灵活且强大的编写异步代码的方式。它支持故障处理、顺序和并行操作组合以及大量的操作符。响应式编程将数据流作为主要构造,您的代码会观察数据流并响应信号。

然而,使用数据流作为主要构造并非没有问题。主要问题之一是需要流控制。

生产者/消费者问题

让我们想象一个快速的生产者和一个慢的消费者。生产者发送事件的速度太快,以至于消费者跟不上。下面的图片生动地说明了这种现象。

让我们看一些代码。假设一个生产者每十毫秒发射一个事件,而消费者每秒只能消费一个。

Multi.createFrom().ticks().every(Duration.ofMillis(10))
    .emitOn(Infrastructure.getDefaultExecutor())
    .onItem().transform(BackPressureExample::canOnlyConsumeOneItemPerSecond)
    .subscribe().with(
        item -> System.out.println("Got item: " + item),
        failure -> System.out.println("Got failure: " + failure)
);

如果运行该代码,您将看到订阅者收到一个 MissingBackPressureFailure,表明下游无法跟上。

Got item: 0
Got failure: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit tick 16 due to lack of requests
emitOn

在之前的代码中,您可能想知道 emitOn。这是一个 Mutiny 操作符,允许控制订阅者在哪个线程上接收事件。反压通常在涉及多个线程时需要,因为在单线程方法中,阻塞该线程会阻塞源,这可能会产生灾难性的后果。

那么,我们该如何处理这种“溢出”呢?

缓冲项目

第一个自然的解决方案是使用缓冲区。消费者可以缓冲事件,这样就不会失败

back pressure buffer
图 3. 使用缓冲避免压垮下游消费者

使用缓冲区可以处理小的突发流量,但这并不是一个长期的解决方案。如果我们更新代码使用大缓冲区,消费者可以处理更多事件,但最终还是会失败

Multi.createFrom().ticks().every(Duration.ofMillis(10))
    .onOverflow().buffer(250)
    .emitOn(Infrastructure.getDefaultExecutor())
    .onItem().transform(BufferingExample::canOnlyConsumeOneItemPerSecond)
    .subscribe().with(
        item -> System.out.println("Got item: " + item),
        failure -> System.out.println("Got failure: " + failure)
);
Got item: 0
Got item: 1
Got item: 2
Got failure: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer is full due to lack of downstream consumption

您可以想象增加缓冲区的大小,但很难预估最佳值。无界缓冲区呢?通常来说,这是一个糟糕的主意,因为您可能会耗尽内存。

丢弃项目

另一种解决方案是丢弃项目

back pressure drop
图 4. 丢弃项目以避免压垮下游消费者

您可以丢弃接收到的最新项目或最旧的项目

Multi.createFrom().ticks().every(Duration.ofMillis(10))
    .onOverflow().drop(x -> System.out.println("Dropping item " + x))
    .emitOn(Infrastructure.getDefaultExecutor())
    .onItem().transform(DropExample::canOnlyConsumeOneItemPerSecond)
    .transform().byTakingFirstItems(10)
    .subscribe().with(
        item -> System.out.println("Got item: " + item),
        failure -> System.out.println("Got failure: " + failure)
);
// ....
Dropping item 997
Dropping item 998
Dropping item 999
Dropping item 1000
Dropping item 1001
Dropping item 1002
Dropping item 1003
Got item: 9

丢弃项目为我们的问题提供了一个可持续的解决方案,但我们会丢失事件!正如我们在前面的输出中看到的,我们可能会丢弃大部分项目。在许多情况下,这是不可接受的。

因此,我们需要另一个解决方案,一个能够调整整体速率以满足管道中最慢元素的解决方案。我们需要流控制。

到底什么是反压?

您可能已经多次看到这个词,并且经常与 Reactive 相关联。在力学中,反压是通过管道控制流体流动的一种方式,会导致压降。这种控制可以使用减压阀或弯头。虽然非常有趣,但如果您是一名水管工,不清楚它在这里如何帮助我们?

我们可以将我们的数据流视为流体流动,并且一系列阶段(操作符或订阅者)形成管道。我们希望使流体尽可能无摩擦地流动,没有涡流和波纹。

流体机械的一个有趣特征是下游吞吐量的减少如何影响上游。本质上,这就是我们需要的:一种让下游操作符和订阅者减少吞吐量的方式,不仅是局部的,而且是上游的。

不要误会,反压在 IT 界并不是什么新鲜事物,也不仅限于 Reactive。反压最巧妙的用法之一是 TCP。接收数据的读取器,如果读不到发送的数据,可以阻止线路另一端的写入器。这样,读取器永远不会被压垮。但是,需要理解其后果:阻塞写入器可能会产生副作用。

还有其他反压实现。AMQP 使用基于信用的流控制,您只有在有足够信用时才能发送。Eclipse Vert.x 的反压基于暂停/恢复,其中消费者可以暂停上游直到它赶上,并在恢复正常后恢复它。

介绍 Reactive Streams

现在,让我们关注另一个反压协议:Reactive Streams。它定义了一个适合我们快速生产者/慢速消费者问题的异步和反压协议。使用 Reactive Streams,称为 Subscriber 的消费者从称为 Publisher 的生产者请求项目。Publisher 不能发送比请求数量更多的项目。

当项目被接收和处理后,消费者可以请求更多项目,依此类推。因此,消费者控制流。

back pressure flow control
图 6. 使用流控制避免压垮消费者

Reactive Streams 实体

为了实现该协议,Reactive Streams 定义了一组实体。首先,Subscriber 是一个消费者。它订阅一个产生项目的 Publisher。然后,Publisher 异步发送一个 Subscription 对象给 Subscriber。这个 Subscription 是一个契约。通过这个 SubscriptionSubscriber 可以请求项目并在不再需要项目时取消订阅。

back pressure sequence
图 7. SubscriberPublisher 之间交互的示例

Publisher 不能向 Subscriber 发送比请求更多的项目,并且 Subscriber 可以随时请求更多项目。

理解请求和发射不一定同步发生很重要。Subscriber 可以请求三个项目,并且 Publisher 在项目可用时会一个一个地发送。

Reactive Streams 引入了另一个称为 Processor 的实体。Processor 同时是 SubscriberPublisher。换句话说,它是我们管道中的一个环节。

pipeline with processor
图 8. SubscriberProcessorPublisher 之间交互的示例

Subscriber 调用 Processor 上的 subscribe。在接收到 Subscription 之前,Processor 会订阅自己的上游(上图中的 Publisher)。当上游向我们的 Processor 提供 Subscription 时,它可以向 Subscriber 提供 Subscription。所有这些交互都是异步的。当这个握手完成时,Subscriber 就可以开始请求项目了。Processor 负责协调 Subscriber 的请求与上游。例如,如上图所示,如果 Subscriber 需要两个项目,Processor 也会向自己的上游请求两个项目。当然,根据 Processor 的代码,情况可能并不那么简单。重要的是,每个 PublisherProcessor 都会强制执行流请求,永远不会压垮下游的 Subscriber。

注意:这是一个陷阱!

如果您查看 Reactive Streams API,您会发现它很简单。请注意;这是一个陷阱!在这种明显的简单性背后,自己实现 Reactive Streams 实体是一场噩梦。Reactive Streams 附带了一系列规则和一个严格的 TCK。

但是,别担心,Mutiny 会为您实现 Reactive Streams 协议。换句话说,当您使用 Multi 时,您正在使用遵循 Reactive Streams 协议的 Publisher。所有订阅握手和请求协商都已为您处理。

深入了解

我知道!您很好奇!您可以使用 onSubscribe().invoke()onRequest().invoke() 来检查正在发生的事情。

Multi.createFrom().range(0, 10)
    .onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
    .onRequest().invoke(req -> System.out.println("Got a request: " + req))
    .transform().byFilteringItemsWith(i -> i % 2 == 0)
    .onItem().transform(i -> i * 100)
    .subscribe().with(
            i -> System.out.println("i: " + i)
);

您还可以更进一步,不仅进行观察,还可以自己控制流。

Multi.createFrom().range(0, 10)
    .onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
    .onRequest().invoke(req -> System.out.println("Got a request: " + req))
    .onItem().transform(i -> i * 100)
    .subscribe().with(
            subscription -> {
                // Got the subscription
                upstream.set(subscription);
                subscription.request(1);
            },
            i -> {
                System.out.println("i: " + i);
                upstream.get().request(1);
            },
            f -> System.out.println("Failed with " + f),
            () -> System.out.println("Completed")
);

如果您有点疯狂,甚至可以直接实现一个 Subscriber

Multi.createFrom().range(0, 10)
    .onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
    .onRequest().invoke(req -> System.out.println("Got a request: " + req))
    .onItem().transform(i -> i * 100)
    .subscribe().withSubscriber(new Subscriber<Integer>() {

        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(1);
        }

        @Override
        public void onNext(Integer item) {
            System.out.println("Got item " + item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            // ...
        }

        @Override
        public void onComplete() {
            // ...
        }
    }
);

总结

这篇博文描述了 Mutiny 提供的处理反压的不同方法。当您可以控制上游的速率时,Reactive Streams 协议效果很好。但情况并非总是如此。代表物理实体的流很少是可控的。想象一个发出用户击键的流。您不能要求用户放慢速度。那将是一种糟糕的用户体验。正如我们在上面看到的,不幸的是,时间也不是我们可以减慢的……在这种情况下,onOverflow() 组让您决定缓解措施,例如使用缓冲区或丢弃项目。

避免压垮下游订阅者至关重要。这是您系统中蔓延开来的小裂痕,会带来可怕的后果。