编辑此页面

具有 Reactive Messaging 的 Quarkus 虚拟线程支持

本指南解释了在 Quarkus 中编写消息处理应用程序时如何利用 Java 虚拟线程。

本指南重点介绍将虚拟线程与响应式消息传递扩展结合使用。请参阅使用 Quarkus 虚拟线程支持编写更简单的响应式 REST 服务,以了解有关 Java 虚拟线程的更多信息以及 Quarkus 虚拟线程对 REST 服务的支持。

默认情况下,响应式消息传递会在事件循环线程上调用消息处理方法。有关此主题的更多详细信息,请参见Quarkus 响应式架构文档。但是,有时您需要将响应式消息传递与阻塞处理(例如调用外部服务或数据库操作)结合使用。为此,您可以使用@Blocking注解,表明该处理是*阻塞的*,应该在工作线程上运行。您可以在SmallRye 响应式消息传递文档中阅读更多关于阻塞处理的信息。

Quarkus 虚拟线程支持响应式消息传递背后的想法是将消息处理卸载到虚拟线程上,而不是在事件循环线程或工作线程上运行它。

要在消息消费者方法上启用虚拟线程支持,只需将@RunOnVirtualThread注解添加到该方法。如果 JDK 兼容(Java 19 或更高版本,我们推荐 21+),则每个传入消息将被卸载到新的虚拟线程。然后可以执行阻塞操作,而不会阻塞虚拟线程所挂载的平台线程。

使用响应式消息传递 Kafka 扩展的示例

让我们看一个如何在虚拟线程上处理 Kafka 记录的示例。首先,请确保在构建文件中包含响应式消息传递扩展依赖项

pom.xml
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-kafka")

您还需要确保您正在使用 Java 19 或更高版本(我们推荐 21+),这可以在您的pom.xml文件中强制执行,如下所示

pom.xml
<properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
</properties>

使用以下命令运行应用程序

java -jar target/quarkus-app/quarkus-run.jar

或者要使用 Quarkus Dev 模式,请将以下内容插入quarkus-maven-plugin配置中

pom.xml
<maven.compiler.release>21</maven.compiler.release>

然后,您可以开始在也使用@Incoming注解的消费者方法上使用注解@RunOnVirtualThread。在以下示例中,我们将使用REST Client对 REST 端点进行阻塞调用

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.smallrye.common.annotation.RunOnVirtualThread;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @RestClient (2)
    PriceAlertService alertService;


    @Incoming("prices")
    @RunOnVirtualThread (1)
    public void consume(double price) {
        if (price > 90.0) {
            alertService.alert(price); (3)
        }
    }

    @Outgoing("prices-out") (4)
    public Multi<Double> randomPriceGenerator() {
        return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> {
            e.emit(r.nextDouble(100));
            return r;
        });
    }


}
1 @RunOnVirtualThread注解在@Incoming方法上确保该方法将在虚拟线程上调用。
2 REST 客户端存根通过@RestClient注解注入。
3 alert方法会阻塞虚拟线程,直到 REST 调用返回。
4 @Outgoing方法生成随机价格并将它们写入 Kafka 主题,以便被应用程序消耗。

请注意,默认情况下,响应式消息传递消息处理是按顺序发生的,从而保留消息的顺序。以同样的方式,@Blocking(ordered = false)注解会更改此行为,使用@RunOnVirtualThread会强制执行并发消息处理,而不保留顺序。

使用 @RunOnVirtualThread 注解

符合 @RunOnVirtualThread 条件的方法签名

只有使用@Blocking注解的方法才能使用@RunOnVirtualThreads。符合条件的方法签名包括:

  • @Outgoing("channel-out") O generator()

  • @Outgoing("channel-out") Message<O> generator()

  • @Incoming("channel-in") @Outgoing("channel-out") O process(I in)

  • @Incoming("channel-in") @Outgoing("channel-out") Message<O> process(I in)

  • @Incoming("channel-in") void consume(I in)

  • @Incoming("channel-in") Uni<Void> consume(I in)

  • @Incoming("channel-in") Uni<Void> consume(Message<I> msg)

  • @Incoming("channel-in") CompletionStage<Void> consume(I in)

  • @Incoming("channel-in") CompletionStage<Void> consume(Message<I> msg)

在方法和类上使用 @RunOnVirtualThread 注解

您可以使用@RunOnVirtualThread注解:

  1. 直接在响应式消息传递方法上 - 此方法将被视为*阻塞*并在虚拟线程上执行

  2. 在包含响应式消息传递方法的类上 - 此类中使用@Blocking注解的方法将在虚拟线程上执行,除非该注解定义了配置为使用常规工作线程的池名称

例如,您可以直接在方法上使用@RunOnVirtualThread

@ApplicationScoped
public class MyBean {

    @Incoming("in")
    @Outgoing("out")
    @RunOnVirtualThread
    public String process(String s) {
        // Called on a new virtual thread for every incoming message
    }
}

或者,您可以在类本身上使用@RunOnVirtualThread

@ApplicationScoped
@RunOnVirtualThread
public class MyBean {

    @Incoming("in1")
    @Outgoing("out1")
    public String process(String s) {
        // Called on the event loop - no @Blocking annotation
    }

    @Incoming("in2")
    @Outgoing("out2")
    @Blocking
    public String process(String s) {
        // Call on a new virtual thread for every incoming message
    }

    @Incoming("in3")
    @Outgoing("out3")
    @Blocking("my-worker-pool")
    public String process(String s) {
        // Called on a regular worker thread from the pool named "my-worker-pool"
    }
}

控制最大并发数

为了利用虚拟线程的轻量级特性,使用@RunOnVirtualThread注解的方法的默认最大并发数为 1024。与平台线程相反,虚拟线程不会被池化,而是每个消息创建一个。因此,最大并发数分别应用于所有@RunOnVirtualThread方法。

有两种方法可以自定义并发级别:

  1. @RunOnVirtualThread注解可以与@Blocking注解一起使用来指定工作线程名称。

    @Incoming("prices")
    @RunOnVirtualThread
    @Blocking("my-worker")
    public void consume(double price) {
        //...
    }

    然后,例如,要将此方法的最大并发数设置为 30,请使用配置属性smallrye.messaging.worker.my-worker.max-concurrency=30进行设置。

  2. 对于每个未配置工作线程名称的@RunOnVirtualThread方法,您可以使用配置属性smallrye.messaging.worker.<virtual-thread>.max-concurrency

相关内容