Mutiny - 流控制和背压
响应式编程提供了一种优雅、灵活且强大的编写异步代码的方式。它支持故障处理、顺序和并行操作组合以及大量的操作符。响应式编程将数据流作为主要构造,您的代码会观察数据流并响应信号。
然而,使用数据流作为主要构造并非没有问题。主要问题之一是需要流控制。
生产者/消费者问题
让我们想象一个快速的生产者和一个慢的消费者。生产者发送事件的速度太快,以至于消费者跟不上。下面的图片生动地说明了这种现象。
让我们看一些代码。假设一个生产者每十毫秒发射一个事件,而消费者每秒只能消费一个。
Multi.createFrom().ticks().every(Duration.ofMillis(10))
.emitOn(Infrastructure.getDefaultExecutor())
.onItem().transform(BackPressureExample::canOnlyConsumeOneItemPerSecond)
.subscribe().with(
item -> System.out.println("Got item: " + item),
failure -> System.out.println("Got failure: " + failure)
);
如果运行该代码,您将看到订阅者收到一个 MissingBackPressureFailure
,表明下游无法跟上。
Got item: 0
Got failure: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit tick 16 due to lack of requests
emitOn
在之前的代码中,您可能想知道 |
那么,我们该如何处理这种“溢出”呢?
缓冲项目
第一个自然的解决方案是使用缓冲区。消费者可以缓冲事件,这样就不会失败

使用缓冲区可以处理小的突发流量,但这并不是一个长期的解决方案。如果我们更新代码使用大缓冲区,消费者可以处理更多事件,但最终还是会失败
Multi.createFrom().ticks().every(Duration.ofMillis(10))
.onOverflow().buffer(250)
.emitOn(Infrastructure.getDefaultExecutor())
.onItem().transform(BufferingExample::canOnlyConsumeOneItemPerSecond)
.subscribe().with(
item -> System.out.println("Got item: " + item),
failure -> System.out.println("Got failure: " + failure)
);
Got item: 0
Got item: 1
Got item: 2
Got failure: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer is full due to lack of downstream consumption
您可以想象增加缓冲区的大小,但很难预估最佳值。无界缓冲区呢?通常来说,这是一个糟糕的主意,因为您可能会耗尽内存。
丢弃项目
另一种解决方案是丢弃项目

您可以丢弃接收到的最新项目或最旧的项目
Multi.createFrom().ticks().every(Duration.ofMillis(10))
.onOverflow().drop(x -> System.out.println("Dropping item " + x))
.emitOn(Infrastructure.getDefaultExecutor())
.onItem().transform(DropExample::canOnlyConsumeOneItemPerSecond)
.transform().byTakingFirstItems(10)
.subscribe().with(
item -> System.out.println("Got item: " + item),
failure -> System.out.println("Got failure: " + failure)
);
// ....
Dropping item 997
Dropping item 998
Dropping item 999
Dropping item 1000
Dropping item 1001
Dropping item 1002
Dropping item 1003
Got item: 9
丢弃项目为我们的问题提供了一个可持续的解决方案,但我们会丢失事件!正如我们在前面的输出中看到的,我们可能会丢弃大部分项目。在许多情况下,这是不可接受的。
因此,我们需要另一个解决方案,一个能够调整整体速率以满足管道中最慢元素的解决方案。我们需要流控制。
到底什么是反压?
您可能已经多次看到这个词,并且经常与 Reactive 相关联。在力学中,反压是通过管道控制流体流动的一种方式,会导致压降。这种控制可以使用减压阀或弯头。虽然非常有趣,但如果您是一名水管工,不清楚它在这里如何帮助我们?
我们可以将我们的数据流视为流体流动,并且一系列阶段(操作符或订阅者)形成管道。我们希望使流体尽可能无摩擦地流动,没有涡流和波纹。
流体机械的一个有趣特征是下游吞吐量的减少如何影响上游。本质上,这就是我们需要的:一种让下游操作符和订阅者减少吞吐量的方式,不仅是局部的,而且是上游的。
不要误会,反压在 IT 界并不是什么新鲜事物,也不仅限于 Reactive。反压最巧妙的用法之一是 TCP。接收数据的读取器,如果读不到发送的数据,可以阻止线路另一端的写入器。这样,读取器永远不会被压垮。但是,需要理解其后果:阻塞写入器可能会产生副作用。
还有其他反压实现。AMQP 使用基于信用的流控制,您只有在有足够信用时才能发送。Eclipse Vert.x 的反压基于暂停/恢复,其中消费者可以暂停上游直到它赶上,并在恢复正常后恢复它。
介绍 Reactive Streams
现在,让我们关注另一个反压协议:Reactive Streams。它定义了一个适合我们快速生产者/慢速消费者问题的异步和反压协议。使用 Reactive Streams,称为 Subscriber
的消费者从称为 Publisher
的生产者请求项目。Publisher
不能发送比请求数量更多的项目。

当项目被接收和处理后,消费者可以请求更多项目,依此类推。因此,消费者控制流。

Reactive Streams 实体
为了实现该协议,Reactive Streams 定义了一组实体。首先,Subscriber
是一个消费者。它订阅一个产生项目的 Publisher
。然后,Publisher
异步发送一个 Subscription
对象给 Subscriber
。这个 Subscription
是一个契约。通过这个 Subscription
,Subscriber
可以请求项目并在不再需要项目时取消订阅。

Subscriber
和 Publisher
之间交互的示例Publisher
不能向 Subscriber
发送比请求更多的项目,并且 Subscriber
可以随时请求更多项目。
理解请求和发射不一定同步发生很重要。Subscriber
可以请求三个项目,并且 Publisher
在项目可用时会一个一个地发送。
Reactive Streams 引入了另一个称为 Processor
的实体。Processor
同时是 Subscriber
和 Publisher
。换句话说,它是我们管道中的一个环节。

Subscriber
、Processor
和 Publisher
之间交互的示例Subscriber
调用 Processor
上的 subscribe
。在接收到 Subscription
之前,Processor
会订阅自己的上游(上图中的 Publisher
)。当上游向我们的 Processor 提供 Subscription
时,它可以向 Subscriber 提供 Subscription
。所有这些交互都是异步的。当这个握手完成时,Subscriber
就可以开始请求项目了。Processor 负责协调 Subscriber
的请求与上游。例如,如上图所示,如果 Subscriber
需要两个项目,Processor
也会向自己的上游请求两个项目。当然,根据 Processor
的代码,情况可能并不那么简单。重要的是,每个 Publisher
和 Processor
都会强制执行流请求,永远不会压垮下游的 Subscriber。
注意:这是一个陷阱!
如果您查看 Reactive Streams API,您会发现它很简单。请注意;这是一个陷阱!在这种明显的简单性背后,自己实现 Reactive Streams 实体是一场噩梦。Reactive Streams 附带了一系列规则和一个严格的 TCK。
但是,别担心,Mutiny 会为您实现 Reactive Streams 协议。换句话说,当您使用 Multi
时,您正在使用遵循 Reactive Streams 协议的 Publisher
。所有订阅握手和请求协商都已为您处理。
深入了解
我知道!您很好奇!您可以使用 onSubscribe().invoke()
和 onRequest().invoke()
来检查正在发生的事情。
Multi.createFrom().range(0, 10)
.onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
.onRequest().invoke(req -> System.out.println("Got a request: " + req))
.transform().byFilteringItemsWith(i -> i % 2 == 0)
.onItem().transform(i -> i * 100)
.subscribe().with(
i -> System.out.println("i: " + i)
);
您还可以更进一步,不仅进行观察,还可以自己控制流。
Multi.createFrom().range(0, 10)
.onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
.onRequest().invoke(req -> System.out.println("Got a request: " + req))
.onItem().transform(i -> i * 100)
.subscribe().with(
subscription -> {
// Got the subscription
upstream.set(subscription);
subscription.request(1);
},
i -> {
System.out.println("i: " + i);
upstream.get().request(1);
},
f -> System.out.println("Failed with " + f),
() -> System.out.println("Completed")
);
如果您有点疯狂,甚至可以直接实现一个 Subscriber
。
Multi.createFrom().range(0, 10)
.onSubscribe().invoke(sub -> System.out.println("Received subscription: " + sub))
.onRequest().invoke(req -> System.out.println("Got a request: " + req))
.onItem().transform(i -> i * 100)
.subscribe().withSubscriber(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Got item " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onComplete() {
// ...
}
}
);