编辑此页面

WebSockets Next 参考指南

quarkus-websockets-next 扩展提供了一个现代化的声明式 API,用于定义 WebSocket 服务器和客户端端点。

1. WebSocket 协议

WebSocket 协议,记录在 RFC6455 中,建立了一种标准化的方法,用于通过单个 TCP 连接在客户端和服务器之间创建双向通信通道。与 HTTP 不同,WebSocket 作为一种独立的 TCP 协议运行,但旨在与 HTTP 无缝协作。例如,它重用相同的端口并与相同的安全机制兼容。

使用 WebSocket 的交互以 HTTP 请求开始,该请求使用“Upgrade”标头来转换到 WebSocket 协议。服务器不是回复 200 OK,而是回复 101 Switching Protocols 响应,以将 HTTP 连接升级为 WebSocket 连接。在成功握手之后,初始 HTTP 升级请求中使用的 TCP 套接字保持打开状态,允许客户端和服务器持续地在两个方向上交换消息。

2. HTTP 和 WebSocket 架构风格

尽管 WebSocket 与 HTTP 兼容并且通过 HTTP 请求启动,但重要的是要认识到这两种协议会导致截然不同的架构和编程模型。

对于 HTTP/REST,应用程序围绕资源/端点构建,这些资源/端点处理各种 HTTP 方法和路径。客户端交互通过发出带有适当方法和路径的 HTTP 请求来进行,遵循请求-响应模式。服务器根据路径、方法和标头将传入的请求路由到相应的处理程序,然后回复一个定义良好的响应。

相反,WebSocket 通常涉及用于初始 HTTP 连接的单个端点,之后所有消息都使用相同的 TCP 连接。它引入了一种完全不同的交互模型:异步和消息驱动。

与 HTTP 相比,WebSocket 是一种低级传输协议。消息格式、路由或处理需要客户端和服务器之间事先就消息语义达成一致。

对于 WebSocket 客户端和服务器,HTTP 握手请求中的 Sec-WebSocket-Protocol 标头允许协商更高级别的消息传递协议。如果没有它,服务器和客户端必须建立自己的约定。

3. Quarkus WebSockets 与 Quarkus WebSockets Next

本指南使用 quarkus-websockets-next 扩展,这是 WebSocket API 的一个实现,与传统的 quarkus-websockets 扩展相比,它具有更高的效率和可用性。原始的 quarkus-websockets 扩展仍然可以访问,并将获得持续的支持,但不太可能进行功能开发。

quarkus-websockets 不同,quarkus-websockets-next 扩展实现 Jakarta WebSocket 规范。相反,它引入了一个现代 API,优先考虑易用性。此外,它还专门设计用于与 Quarkus 的反应式架构和网络层无缝集成。

Quarkus WebSockets next 扩展使用的注解与 JSR 356 中的注解不同,尽管有时共享相同的名称。JSR 注解带有 Quarkus WebSockets Next 扩展不遵循的语义。

4. 项目设置

要使用 websockets-next 扩展,您需要将 io.quarkus:quarkus-websockets-next 依赖项添加到您的项目中。

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-websockets-next</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-websockets-next")

5. 端点

服务器 API客户端 API 都定义了用于消费和发送消息的端点。端点实现为 CDI bean 并支持注入。端点声明了使用 @OnTextMessage@OnBinaryMessage@OnPingMessage@OnPongMessage@OnOpen@OnClose@OnError 注解的 回调方法。这些方法用于处理各种 WebSocket 事件。通常,当连接的客户端向服务器发送消息时,会调用使用 @OnTextMessage 注解的方法,反之亦然。

客户端 API 还包括用于配置和创建新 WebSocket 连接的 连接器

5.1. 服务器端点

服务器端点是使用 @io.quarkus.websockets.next.WebSocket 注解的类。WebSocket#path() 的值用于定义端点的路径。

package org.acme.websockets;

import io.quarkus.websockets.next.WebSocket;
import jakarta.inject.Inject;

@WebSocket(path = "/chat/{username}") (1)
public class ChatWebSocket {

}

因此,客户端可以使用 ws://:8080/chat/your-name 连接到此 web socket 端点。如果使用 TLS,则 URL 为 wss://:8443/chat/your-name

端点路径相对于由 quarkus.http.root-path 配置的 根上下文(默认为 /)。例如,如果您将 quarkus.http.root-path=/api 添加到您的 application.properties 中,则客户端可以使用 https://:8080/api/chat/the-name 连接到此端点。

5.2. 客户端端点

客户端端点是使用 @io.quarkus.websockets.next.WebSocketClient 注解的类。WebSocketClient#path() 的值用于定义此客户端将连接到的端点的路径。

package org.acme.websockets;

import io.quarkus.websockets.next.WebSocketClient;
import jakarta.inject.Inject;

@WebSocketClient(path = "/chat/{username}") (1)
public class ChatWebSocket {

}
客户端端点用于消费和发送消息。您需要 连接器 API 来配置和打开新的 WebSocket 连接。

5.3. 路径参数

WebSocket 端点的路径可以包含路径参数。语法与 JAX-RS 资源相同:{parameterName}

您可以使用 io.quarkus.websockets.next.WebSocketConnection#pathParam(String) 方法或 io.quarkus.websockets.next.WebSocketClientConnection#pathParam(String) 方法分别访问路径参数值。或者,会自动注入使用 @io.quarkus.websockets.next.PathParam 注解的端点回调方法参数。

WebSocketConnection#pathParam(String) 示例
@Inject io.quarkus.websockets.next.WebSocketConnection connection;
// ...
String value = connection.pathParam("parameterName");

路径参数值始终是字符串。如果路径中不存在路径参数,则 WebSocketConnection#pathParam(String)/WebSocketClientConnection#pathParam(String) 方法返回 null。如果存在使用 @PathParam 注解的端点回调方法参数,并且端点路径中未定义参数名称,则构建失败。

不支持查询参数。但是,您可以使用 WebSocketConnection#handshakeRequest().query() 访问查询。

5.4. CDI 范围

端点作为 CDI bean 进行管理。默认情况下,使用 @Singleton 范围。但是,开发人员可以指定替代范围以满足他们的特定要求。

@Singleton@ApplicationScoped 端点在所有 WebSocket 连接之间共享。因此,实现应该是无状态的或线程安全的。

5.4.1. 会话上下文

如果端点使用 @SessionScoped 注解,或者直接或间接依赖于 @SessionScoped bean,则每个 WebSocket 连接都与其自己的会话上下文相关联。会话上下文在端点回调调用期间处于活动状态。同一连接中后续的 回调方法 调用使用相同的会话上下文。会话上下文保持活动状态,直到连接关闭(通常在 @OnClose 方法完成执行时),此时它将终止。

也可以将 quarkus.websockets-next.server.activate-session-context 配置属性设置为 always。在这种情况下,会话上下文始终处于活动状态,无论 @SessionScoped bean 是否参与依赖关系树。
@SessionScoped 端点
import jakarta.enterprise.context.SessionScoped;

@WebSocket(path = "/ws")
@SessionScoped (1)
public class MyWebSocket {

}
1 此服务器端点未共享,并且作用域限定为会话/连接。

5.4.2. 请求上下文

如果端点是

  • 使用 @RequestScoped 注解进行注解,则每个 WebSocket 端点回调方法执行都与一个新的 CDI 请求上下文相关联。

  • 具有使用安全注解(例如 @RolesAllowed)注解的方法。

  • 直接或间接依赖于 @RequestScoped bean。

  • 直接或间接依赖于使用标准安全注解保护的 CDI bean。

也可以将 quarkus.websockets-next.server.activate-request-context 配置属性设置为 always。在这种情况下,调用端点回调时始终激活请求上下文。
@RequestScoped 端点
import jakarta.enterprise.context.RequestScoped;

@WebSocket(path = "/ws")
@RequestScoped (1)
public class MyWebSocket {

}
1 此服务器端点是为每次回调方法执行实例化的。

5.5. 回调方法

WebSocket 端点可以声明

  • 最多一个 @OnTextMessage 方法:处理来自连接的客户端/服务器的文本消息。

  • 最多一个 @OnBinaryMessage 方法:处理来自连接的客户端/服务器的二进制消息。

  • 最多一个 @OnPingMessage 方法:处理来自连接的客户端/服务器的 ping 消息。

  • 最多一个 @OnPongMessage 方法:处理来自连接的客户端/服务器的 pong 消息。

  • 最多一个 @OnOpen 方法:在打开连接时调用。

  • 最多一个 @OnClose 方法:在关闭连接时执行。

  • 任意数量的 @OnError 方法:当发生错误时调用;即当端点回调引发运行时错误,或者发生转换错误,或者返回的 io.smallrye.mutiny.Uni/io.smallrye.mutiny.Multi 收到故障时。

只有某些端点需要包括所有方法。但是,它必须至少包含 @On[Text|Binary]Message@OnOpen

如果任何端点违反这些规则,则会在构建时抛出错误。表示子 web socket 的静态嵌套类也遵循相同的准则。

在 WebSocket 端点之外使用 @OnTextMessage@OnBinaryMessage@OnOpen@OnClose 注解的任何方法都被认为是错误的,并且会导致构建失败并显示相应的错误消息。

5.6. 处理消息

接收来自客户端的消息的方法使用 @OnTextMessage@OnBinaryMessage 注解。

对于从客户端收到的每个文本消息,都会调用 OnTextMessage。对于客户端收到的每个二进制消息,都会调用 OnBinaryMessage

5.6.1. 调用规则

在调用回调方法时,链接到 WebSocket 连接的会话范围保持活动状态。此外,请求范围在方法完成之前(或直到它为异步和反应式方法生成结果)一直处于活动状态。

WebSocket Next 支持阻塞非阻塞逻辑,类似于 Quarkus REST,由方法的返回类型和额外的注解(如 @Blocking@NonBlocking)确定。

以下是控制执行的规则

  • 使用 @RunOnVirtualThread@Blocking@Transactional 注解的方法被认为是阻塞的。

  • 在使用 @RunOnVirtualThread 注解的类中声明的方法被认为是阻塞的。

  • 使用 @NonBlocking 注解的方法被认为是非阻塞的。

  • 在使用 @Transactional 注解的类中声明的方法被认为是阻塞的,除非使用 @NonBlocking 注解。

  • 如果方法未声明上述任何注解,则执行模型从返回类型派生

    • 返回 UniMulti 的方法被认为是非阻塞的。

    • 返回 void 或任何其他类型的方法被认为是阻塞的。

  • Kotlin suspend 函数始终被认为是非阻塞的,不能使用 @Blocking@NonBlocking@RunOnVirtualThread 注解,也不能在使用 @RunOnVirtualThread 注解的类中。

  • 非阻塞方法必须在连接的事件循环线程上执行。

  • 阻塞方法必须在工作线程上执行,除非使用 @RunOnVirtualThread 注解或在使用 @RunOnVirtualThread 注解的类中。

  • 使用 @RunOnVirtualThread 注解的方法或在使用 @RunOnVirtualThread 注解的类中声明的方法必须在虚拟线程上执行,每次调用都会生成一个新的虚拟线程。

5.6.2. 方法参数

该方法必须只接受一个消息参数

  • 消息对象(任何类型)。

  • 一个 Multi<X>,其中 X 是消息类型。

但是,它也可以接受以下参数

  • WebSocketConnection/WebSocketClientConnection

  • HandshakeRequest

  • 使用 @PathParam 注解的 String 参数

消息对象表示发送的数据,可以作为原始内容(StringJsonObjectJsonArrayBufferbyte[])访问,也可以作为反序列化的更高级别的对象访问,这是推荐的方法。

当接收到 Multi 时,该方法每个连接调用一次,并且提供的 Multi 接收由此连接传输的项目。如果该方法返回一个 Multi(从接收到的 Multi 构建),Quarkus 将自动订阅它并写入发出的项目,直到完成、失败或取消。但是,如果你的方法没有返回 Multi,你必须订阅传入的 Multi 才能消费数据。

以下是两个示例

// No need to subscribe to the incoming Multi as the method returns a Multi derived from the incoming one
@OnTextMessage
public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {
    return incoming.log();
}

// ...

// Must subscribe to the incoming Multi as the method does not return a Multi, otherwise no data will be consumed
@OnTextMessage
public void stream(Multi<ChatMessage> incoming) {
    incoming.subscribe().with(item -> log(item));
}

请参阅 何时订阅 UniMulti 以了解有关订阅传入的 Multi 的更多信息。

5.6.3. 支持的返回类型

使用 @OnTextMessage@OnBinaryMessage 注解的方法可以返回各种类型,以有效地处理 WebSocket 通信

  • void:表示一个阻塞方法,其中没有将显式响应发送回客户端。

  • Uni<Void>:表示一个非阻塞方法,其中返回的 Uni 的完成表示处理的结束。没有将显式响应发送回客户端。

  • 类型为 X 的对象表示一个阻塞方法,其中返回的对象被序列化并作为响应发送回客户端。

  • Uni<X>:指定一个非阻塞方法,其中非空 Uni 发出的项目作为响应发送给客户端。

  • Multi<X>:表示一个非阻塞方法,其中非空 Multi 发出的项目按顺序发送给客户端,直到完成或取消。

  • Kotlin suspend 函数返回 Unit:表示一个非阻塞方法,其中没有将显式响应发送回客户端。

  • Kotlin suspend 函数返回 X:指定一个非阻塞方法,其中返回的项目作为响应发送给客户端。

以下是这些方法的一些示例

@OnTextMessage
void consume(Message m) {
// Process the incoming message. The method is called on an executor thread for each incoming message.
}

@OnTextMessage
Uni<Void> consumeAsync(Message m) {
// Process the incoming message. The method is called on an event loop thread for each incoming message.
// The method completes when the returned Uni emits its item.
}

@OnTextMessage
ResponseMessage process(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client.
}

@OnTextMessage
Uni<ResponseMessage> processAsync(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client. The method completes when the returned Uni emits its item.
}

@OnTextMessage
Multi<ResponseMessage> stream(Message m) {
// Process the incoming message and send multiple responses to the client.
// The method is called for each incoming message.
// The method completes when the returned Multi emits its completion signal.
// The method cannot return `null` (but an empty multi if no response must be sent)
}

返回 UniMulti 的方法被认为是非阻塞的。此外,Quarkus 会自动订阅返回的 Multi / Uni 并写入发出的项目,直到完成、失败或取消。失败或取消会终止连接。

5.6.4. 流

除了单个消息之外,WebSocket 端点还可以处理消息流。在这种情况下,该方法接收一个 Multi<X> 作为参数。每个 X 的实例都使用上面列出的相同规则进行反序列化。

接收 Multi 的方法可以返回另一个 Multivoid。如果该方法返回一个 Multi,则不必订阅传入的 multi

@OnTextMessage
public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {
    return incoming.log();
}

此方法允许双向流。

当方法返回 void,因此不返回 Multi 时,代码必须订阅传入的 Multi。否则,将不会消费任何数据,并且连接将不会关闭

@OnTextMessage
public void stream(Multi<ChatMessage> incoming) {
    incoming.subscribe().with(item -> log(item));
}

另请注意,stream 方法将在 Multi 完成之前完成。

请参阅 何时订阅 UniMulti 以了解有关订阅传入的 Multi 的更多信息。

5.6.5. 跳过回复

当一个方法旨在生成写入客户端的消息时,它可以发出 null。发出 null 表示没有响应要发送给客户端,从而允许在需要时跳过响应。

5.6.6. JsonObject 和 JsonArray

Vert.x JsonObjectJsonArray 实例绕过序列化和反序列化机制。消息作为文本消息发送。

5.6.7. OnOpen 和 OnClose 方法

当客户端连接或断开连接时,也可以通知 WebSocket 端点。

这可以通过使用 @OnOpen@OnClose 注解方法来完成

@OnOpen(broadcast = true)
public ChatMessage onOpen() {
    return new ChatMessage(MessageType.USER_JOINED, connection.pathParam("username"), null);
}

@Inject WebSocketConnection connection;

@OnClose
public void onClose() {
    ChatMessage departure = new ChatMessage(MessageType.USER_LEFT, connection.pathParam("username"), null);
    connection.broadcast().sendTextAndAwait(departure);
}

@OnOpen 在客户端连接时触发,而 @OnClose 在断开连接时调用。

这些方法可以访问会话作用域WebSocketConnection bean。

5.6.8. 参数

使用 @OnOpen@OnClose 注解的方法可以接受以下参数

  • WebSocketConnection/WebSocketClientConnection

  • HandshakeRequest

  • 使用 @PathParam 注解的 String 参数

使用 @OnClose 注解的端点方法也可以接受 io.quarkus.websockets.next.CloseReason 参数,该参数可以指示关闭连接的原因。

5.6.9. 支持的返回类型

@OnOpen@OnClose 方法支持不同的返回类型。

对于 @OnOpen 方法,适用与 @On[Text|Binary]Message 相同的规则。因此,使用 @OnOpen 注解的方法可以在连接后立即向客户端发送消息。@OnOpen 方法支持的返回类型为

  • void:表示一个阻塞方法,其中没有将显式消息发送回连接的客户端。

  • Uni<Void>:表示一个非阻塞方法,其中返回的 Uni 的完成表示处理的结束。没有将消息发送回客户端。

  • 类型为 X 的对象:表示一个阻塞方法,其中返回的对象被序列化并发送回客户端。

  • Uni<X>:指定一个非阻塞方法,其中非空 Uni 发出的项目发送给客户端。

  • Multi<X>:表示一个非阻塞方法,其中非空 Multi 发出的项目按顺序发送给客户端,直到完成或取消。

  • Kotlin suspend 函数返回 Unit:表示一个非阻塞方法,其中没有将显式消息发送回客户端。

  • Kotlin suspend 函数返回 X:指定一个非阻塞方法,其中返回的项目发送给客户端。

发送给客户端的项目是 序列化的,但 Stringio.vertx.core.json.JsonObjectio.vertx.core.json.JsonArrayio.vertx.core.buffer.Bufferbyte[] 类型除外。对于 Multi,Quarkus 订阅返回的 Multi,并在发出项目时将它们写入 WebSocketStringJsonObjectJsonArray 作为文本消息发送。Buffers 和字节数组作为二进制消息发送。

对于 @OnClose 方法,支持的返回类型包括

  • void:该方法被认为是阻塞的。

  • Uni<Void>:该方法被认为是非阻塞的。

  • Kotlin suspend 函数返回 Unit:该方法被认为是非阻塞的。

在服务器端点上声明的 @OnClose 方法不能通过返回对象将项目发送给连接的客户端。它们只能使用 WebSocketConnection 对象将消息发送给其他客户端。

5.7. 错误处理

当发生错误时,也可以通知 WebSocket 端点。当端点回调抛出运行时错误,或者发生转换错误,或者返回的 io.smallrye.mutiny.Uni/io.smallrye.mutiny.Multi 收到故障时,将调用使用 @io.quarkus.websockets.next.OnError 注解的 WebSocket 端点方法。

该方法必须只接受一个错误参数,即可以从 java.lang.Throwable 分配的参数。该方法也可以接受以下参数

  • WebSocketConnection/WebSocketClientConnection

  • HandshakeRequest

  • 使用 @PathParam 注解的 String 参数

一个端点可以声明多个使用 @io.quarkus.websockets.next.OnError 注解的方法。但是,每个方法都必须声明不同的错误参数。选择声明实际异常的最具体的超类型的方法。

@io.quarkus.websockets.next.OnError 注解也可以用于声明全局错误处理程序,即未在 WebSocket 端点上声明的方法。此类方法可能不接受 @PathParam 参数。在端点上声明的错误处理程序优先于全局错误处理程序。

当发生错误但没有错误处理程序可以处理故障时,Quarkus 将使用 quarkus.websockets-next.server.unhandled-failure-strategy 指定的策略。对于服务器端点,默认情况下会记录错误消息并关闭连接。对于客户端端点,默认情况下会记录错误消息。

5.8. 序列化和反序列化

WebSocket Next 扩展支持自动序列化和反序列化消息。

类型为 StringJsonObjectJsonArrayBufferbyte[] 的对象按原样发送,并绕过序列化和反序列化。当未提供编解码器时,序列化和反序列化会自动将消息从/转换为 JSON。

当您需要自定义序列化和反序列化时,您可以提供自定义编解码器。

5.8.1. 自定义编解码器

要实现自定义编解码器,您必须提供一个实现以下接口的 CDI bean

  • 二进制消息的 io.quarkus.websockets.next.BinaryMessageCodec

  • 文本消息的 io.quarkus.websockets.next.TextMessageCodec

以下示例显示如何为 Item 类实现自定义编解码器

@Singleton
public class ItemBinaryMessageCodec implements BinaryMessageCodec<Item> {

    @Override
    public boolean supports(Type type) {
        // Allows selecting the right codec for the right type
        return type.equals(Item.class);
    }

    @Override
    public Buffer encode(Item value) {
        // Serialization
        return Buffer.buffer(value.toString());
    }

    @Override
    public Item decode(Type type, Buffer value) {
        // Deserialization
        return new Item(value.toString());
    }
}

OnTextMessageOnBinaryMessage 方法也可以显式指定应使用哪个编解码器

@OnTextMessage(codec = MyInputCodec.class) (1)
Item find(Item item) {
        //....
}
  1. 指定用于消息的反序列化和序列化的编解码器

当序列化和反序列化必须使用不同的编解码器时,您可以单独指定用于序列化和反序列化的编解码器

@OnTextMessage(
        codec = MyInputCodec.class, (1)
        outputCodec = MyOutputCodec.class (2)
Item find(Item item) {
        //....
}
  1. 指定用于传入消息的反序列化的编解码器

  2. 指定用于传出消息的序列化的编解码器

5.9. Ping/Pong 消息

ping 消息可以用作保持活动或验证远程端点。 pong 消息作为对 ping 消息的响应发送,并且必须具有相同的负载。

5.9.1. 发送 ping 消息

Ping 消息是可选的,默认情况下不发送。但是,可以将服务器和客户端端点配置为定期自动发送 ping 消息。

quarkus.websockets-next.server.auto-ping-interval=2 (1)
quarkus.websockets-next.client.auto-ping-interval=10 (2)
1 每 2 秒从服务器向每个连接的客户端发送一次 ping 消息。
2 每 10 秒从所有连接的客户端实例向其远程服务器发送一次 ping 消息。

服务器和客户端可以使用 WebSocketConnectionWebSocketClientConnection 随时以编程方式发送 ping 消息。有一个非阻塞变体:Sender#sendPing(Buffer) 和一个阻塞变体:Sender#sendPingAndAwait(Buffer)

5.9.2. 发送 pong 消息

服务器和客户端端点始终会使用来自 ping 消息的应用程序数据,使用相应的 pong 消息响应来自远程方的 ping 消息。此行为是内置的,不需要额外的代码或配置。

服务器和客户端可以使用 WebSocketConnectionWebSocketClientConnection 发送未经请求的 pong 消息,这些消息可以用作单向心跳。有一个非阻塞变体:Sender#sendPong(Buffer) 和一个阻塞变体:Sender#sendPongAndAwait(Buffer)

5.9.3. 处理 ping/pong 消息

由于 ping 消息会自动处理,并且 pong 消息不需要响应,因此无需为这些消息编写处理程序即可符合 WebSocket 协议。但是,有时了解端点何时收到 ping 或 pong 消息很有用。

@OnPingMessage@OnPongMessage 注解可用于定义使用从远程方发送的 ping 或 pong 消息的回调。一个端点最多可以声明一个 @OnPingMessage 回调和一个 @OnPongMessage 回调。回调方法必须返回 voidUni<Void>(或是一个返回 Unit 的 Kotlin suspend 函数),并且必须接受一个 Buffer 类型的参数。

@OnPingMessage
void ping(Buffer data) {
    // an incoming ping that will automatically receive a pong
}

@OnPongMessage
void pong(Buffer data) {
    // an incoming pong in response to the last ping sent
}

5.10. 入站处理模式

WebSocket 端点可以使用 @WebSocket#inboundProcessingMode()@WebSocketClient.inboundProcessingMode() 分别定义用于处理特定连接的传入事件的模式。传入事件可以表示消息(文本、二进制、pong)、打开连接和关闭连接。默认情况下,事件是串行处理的,并且保证了排序。这意味着如果一个端点接收到事件 AB(按此特定顺序),则在事件 A 的回调完成后,将调用事件 B 的回调。但是,在某些情况下,最好并发处理事件,即不保证排序,但也没有并发限制。对于这种情况,应使用 InboundProcessingMode#CONCURRENT

6. 服务器 API

6.1. HTTP 服务器配置

此扩展重用 HTTP 服务器。

因此,WebSocket 服务器的配置在 quarkus.http. 配置部分完成。

在应用程序中配置的 WebSocket 路径与 quarkus.http.root 定义的根路径连接(默认为 /)。此连接确保 WebSocket 端点适当地位于应用程序的 URL 结构中。

有关更多详细信息,请参阅 HTTP 指南

6.2. 子 web socket 端点

@WebSocket 端点可以封装静态嵌套类,这些类也使用 @WebSocket 注解,并表示子 web socket。这些子 web socket 的结果路径连接来自封闭类和嵌套类的路径。结果路径是标准化的,遵循 HTTP URL 规则。

子 web socket 继承对封闭类和嵌套类的 @WebSocket 注解中声明的路径参数的访问权限。封闭类中的 consumePrimary 方法可以在以下示例中访问 version 参数。同时,嵌套类中的 consumeNested 方法可以访问 versionid 参数

@WebSocket(path = "/ws/v{version}")
public class MyPrimaryWebSocket {

    @OnTextMessage
    void consumePrimary(String s)    { ... }

    @WebSocket(path = "/products/{id}")
    public static class MyNestedWebSocket {

      @OnTextMessage
      void consumeNested(String s)    { ... }

    }
}

6.3. WebSocket 连接

io.quarkus.websockets.next.WebSocketConnection 对象表示 WebSocket 连接。Quarkus 提供一个 @SessionScoped CDI bean,它实现此接口,可以注入到 WebSocket 端点中,并用于与连接的客户端交互。

使用 @OnOpen@OnTextMessage@OnBinaryMessage@OnClose 注解的方法可以访问注入的 WebSocketConnection 对象

@Inject WebSocketConnection connection;
请注意,在这些方法之外,WebSocketConnection 对象不可用。但是,可以 列出所有打开的连接

该连接可用于向客户端发送消息、访问路径参数、将消息广播给所有连接的客户端等。

// Send a message:
connection.sendTextAndAwait("Hello!");

// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);

// Access path parameters:
String param = connection.pathParam("foo");

WebSocketConnection 提供阻塞和非阻塞方法变体来发送消息

  • sendTextAndAwait(String message):向客户端发送文本消息,并等待消息发送。它是阻塞的,只能从执行器线程调用。

  • sendText(String message):向客户端发送文本消息。它返回一个 Uni。它是非阻塞的。请确保您或 Quarkus 订阅了返回的 Uni 以发送消息。如果您从 Quarkus 调用的方法(如 Quarkus REST、Quarkus WebSocket Next 或 Quarkus Messaging)返回 Uni,它将订阅它并发送消息。例如

@POST
public Uni<Void> send() {
    return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message.
}

请参阅 何时订阅 UniMulti 以了解有关订阅 Uni 的更多信息。

6.3.1. 列出打开的连接

也可以列出所有打开的连接。Quarkus 提供一个 io.quarkus.websockets.next.OpenConnections 类型的 CDI bean,它声明了访问连接的便捷方法。

import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenConnections;

class MyBean {

  @Inject
  OpenConnections connections;

  void logAllOpenConnections() {
     Log.infof("Open connections: %s", connections.listAll()); (1)
  }
}
1 OpenConnections#listAll() 返回给定时间所有打开的连接的不可变快照。

还有其他便捷方法。例如,OpenConnections#findByEndpointId(String) 可以轻松找到特定端点的连接。

6.3.2. 用户数据

也可以将任意用户数据与特定连接关联。通过 WebSocketConnection#userData() 方法获得的 io.quarkus.websockets.next.UserData 对象表示与连接关联的可变用户数据。

import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.UserData.TypedKey;

@WebSocket(path = "/endpoint/{username}")
class MyEndpoint {

  @Inject
  CoolService service;

  @OnOpen
  void open(WebSocketConnection connection) {
     connection.userData().put(TypedKey.forBoolean("isCool"), service.isCool(connection.pathParam("username"))); (1)
  }

  @OnTextMessage
  String process(String message) {
     if (connection.userData().get(TypedKey.forBoolean("isCool"))) { (2)
        return "Cool message processed!";
     } else {
        return "Message processed!";
     }
  }
}
1 CoolService#isCool() 返回与当前连接关联的 Boolean
2 TypedKey.forBoolean("isCool") 是用于获取连接创建时存储的数据的键。

6.3.3. CDI 事件

当打开新连接时,Quarkus 会异步触发一个带有限定符 @io.quarkus.websockets.next.Openio.quarkus.websockets.next.WebSocketConnection 类型的 CDI 事件。此外,当关闭连接时,会异步触发一个带有限定符 @io.quarkus.websockets.next.ClosedWebSocketConnection 类型的 CDI 事件。

import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketConnection;

class MyBean {

  void connectionOpened(@ObservesAsync @Open WebSocketConnection connection) { (1)
     // This observer method is called when a connection is opened...
  }
}
1 异步观察者方法使用默认的阻塞执行器服务执行。

6.4. 安全性

安全性功能由 Quarkus Security 扩展提供。任何 身份提供程序都可用于将初始 HTTP 请求中的身份验证凭据转换为 SecurityIdentity 实例。然后,SecurityIdentity 与 web socket 连接关联。授权选项将在以下部分中演示。

当使用 OpenID Connect 扩展 quarkus-oidc 且令牌过期时,Quarkus 会自动关闭连接。

6.4.1. 安全 HTTP 升级

当标准安全注解放置在端点类上或定义了 HTTP 安全策略时,HTTP 升级是安全的。保护 HTTP 升级的优点是处理较少,授权提前执行且仅执行一次。除非您需要在错误时执行操作(请参阅 安全 WebSocket 端点回调方法)或基于负载的安全检查(请参阅 具有权限检查器的安全服务器端点),否则应始终首选 HTTP 升级安全性。

使用标准安全注解来保护 HTTP 升级
package io.quarkus.websockets.next.test.security;

import io.quarkus.security.Authenticated;
import jakarta.inject.Inject;

import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@Authenticated (1)
@WebSocket(path = "/end")
public class Endpoint {

    @Inject
    SecurityIdentity currentIdentity;

    @OnOpen
    String open() {
        return "ready";
    }

    @OnTextMessage
    String echo(String message) {
        return message;
    }
}
1 初始 HTTP 握手以匿名用户的 401 状态结束。您还可以使用 quarkus.websockets-next.server.security.auth-failure-redirect-url 配置属性在授权失败时重定向握手请求。
HTTP 升级只有在端点类上,紧邻 @WebSocket 注解声明了安全注解时才是安全的。将安全注解放置在端点 Bean 上不会保护 Bean 方法,只会保护 HTTP 升级。您必须始终验证您的端点是否按预期受到保护。
使用 HTTP 安全策略来保护 HTTP 升级
quarkus.http.auth.permission.http-upgrade.paths=/end
quarkus.http.auth.permission.http-upgrade.policy=authenticated

在身份验证期间使用的安全注解也必须放置在端点类上,因为 SecurityIdentity 是在 websocket 连接打开之前创建的。

选择 Bearer 令牌身份验证机制
package io.quarkus.websockets.next.test.security;

import io.quarkus.oidc.BearerTokenAuthentication;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@BearerTokenAuthentication (1)
@WebSocket(path = "/end")
public class Endpoint {

    @OnTextMessage
    String echo(String message) {
        return message;
    }

}
1 要求打开 WebSocket 握手请求必须使用 bearer 令牌身份验证进行身份验证。有关使用注解选择身份验证机制的更多信息,请参阅 Quarkus 中的身份验证机制 指南。
quarkus.http.auth.proactive=false (1)
1 仅当检测到 io.quarkus.oidc.BearerTokenAuthentication 注解时才开始验证打开的 WebSocket 握手请求。

6.4.2. 保护 WebSocket 端点回调方法

WebSocket 端点回调方法可以使用安全注解进行保护,例如 io.quarkus.security.Authenticatedjakarta.annotation.security.RolesAllowed 以及 支持的安全注解 文档中列出的其他注解。

例如

package io.quarkus.websockets.next.test.security;

import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;

import io.quarkus.security.ForbiddenException;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/end")
public class Endpoint {

    @Inject
    SecurityIdentity currentIdentity;

    @OnOpen
    String open() {
        return "ready";
    }

    @RolesAllowed("admin")
    @OnTextMessage
    String echo(String message) { (1)
        return message;
    }

    @OnError
    String error(ForbiddenException t) { (2)
        return "forbidden:" + currentIdentity.getPrincipal().getName();
    }
}
1 只有当前安全身份具有 admin 角色时,才能调用 echo 回调方法。
2 如果授权失败,则会调用错误处理程序。

6.4.3. 使用权限检查器保护服务器端点

可以使用 权限检查器来保护 WebSocket 端点。我们建议 保护 HTTP 升级,而不是单独的端点方法。例如

具有安全 HTTP 升级的 WebSocket 端点示例
package io.quarkus.websockets.next.test.security;

import io.quarkus.security.PermissionsAllowed;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@PermissionsAllowed("product:premium")
@WebSocket(path = "/product/premium")
public class PremiumProductEndpoint {

    @OnTextMessage
    PremiumProduct getPremiumProduct(int productId) {
        return new PremiumProduct(productId);
    }

}
授权 HTTP 升级的权限检查器示例
package io.quarkus.websockets.next.test.security;

import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.security.PermissionChecker;
import io.quarkus.vertx.http.runtime.security.HttpSecurityUtils;
import io.vertx.ext.web.RoutingContext;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PermissionChecker {

    @PermissionChecker("product:premium")
    public boolean canGetPremiumProduct(SecurityIdentity securityIdentity) { (1)
        String username = securityIdentity.getPrincipal().getName();

        RoutingContext routingContext = HttpSecurityUtils.getRoutingContextAttribute(securityIdentity);
        String initialHttpUpgradePath = routingContext == null ? null : routingContext.normalizedPath();
        if (!isUserAllowedToAccessPath(initialHttpUpgradePath, username)) {
            return false;
        }

        return isPremiumCustomer(username);
    }

}
1 授权 HTTP 升级的权限检查器必须只声明一个方法参数,即 SecurityIdentity

也可以对每个消息运行安全检查。例如,可以像这样访问消息有效负载

package io.quarkus.websockets.next.test.security;

import io.quarkus.security.PermissionChecker;
import io.quarkus.security.PermissionsAllowed;
import jakarta.inject.Inject;

import io.quarkus.security.ForbiddenException;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;

@WebSocket(path = "/product")
public class ProductEndpoint {

    private record Product(int id, String name) {}

    @Inject
    SecurityIdentity currentIdentity;

    @PermissionsAllowed("product:get")
    @OnTextMessage
    Product getProduct(int productId) { (1)
        return new Product(productId, "Product " + productId);
    }

    @OnError
    String error(ForbiddenException t) { (2)
        return "forbidden:" + currentIdentity.getPrincipal().getName();
    }

    @PermissionChecker("product:get")
    boolean canGetProduct(int productId) {
        String username = currentIdentity.getPrincipal().getName();
        return currentIdentity.hasRole("admin") || canUserGetProduct(productId, username);
    }
}
1 只有当前安全身份具有 admin 角色或用户被允许获取产品详细信息时,才能调用 getProduct 回调方法。
2 如果授权失败,则会调用错误处理程序。

有关权限检查器的更多信息,请参阅 @PermissionChecker 的 JavaDoc。

6.4.4. Bearer 令牌身份验证

OIDC Bearer 令牌身份验证 期望 bearer 令牌在初始 HTTP 握手期间在 Authorization 标头中传递。Java WebSocket 客户端(例如 WebSockets Next ClientVert.x WebSocketClient)支持将自定义标头添加到 WebSocket 打开握手中。但是,遵循 WebSockets API 的 JavaScript 客户端不支持添加自定义标头。因此,使用自定义 Authorization 标头传递 bearer 访问令牌对于基于 JavaScript 的 WebSocket 客户端是不可能的。JavaScript WebSocket 客户端只允许配置 HTTP Sec-WebSocket-Protocol 请求标头以协商子协议。如果绝对必要,可以使用 Sec-WebSocket-Protocol 标头作为自定义标头的载体,以解决 WebSockets API 的限制。以下是一个 JavaScript 客户端将 Authorization 标头作为子协议值传播的示例

const token = getBearerToken()
const quarkusHeaderProtocol = encodeURIComponent("quarkus-http-upgrade#Authorization#Bearer " + token) (1)
const socket = new WebSocket("wss://" + location.host + "/chat/" + username, ["bearer-token-carrier", quarkusHeaderProtocol]) (2)
1 Quarkus 标头子协议的预期格式是 quarkus-http-upgrade#header-name#header-value。不要忘记将子协议值编码为 URI 组件以避免编码问题。
2 指示客户端支持的 2 个子协议,您选择的子协议和 Quarkus HTTP 升级子协议。

为了使 WebSocket 服务器接受作为子协议传递的 Authorization,我们必须

  • 使用支持的子协议配置我们的 WebSocket 服务器。当 WebSocket 客户端在 HTTP Sec-WebSocket-Protocol 请求标头中提供支持的子协议列表时,WebSocket 服务器必须同意使用其中一个子协议来提供内容。

  • 启用 Quarkus HTTP 升级子协议映射到打开的 WebSocket 握手请求标头。

quarkus.websockets-next.server.supported-subprotocols=bearer-token-carrier
quarkus.websockets-next.server.propagate-subprotocol-headers=true

WebSocket 安全模型是基于源的,并非设计用于使用标头或 Cookie 进行客户端身份验证。例如,Web 浏览器不会对打开的 WebSocket 握手请求强制执行同源策略。当您计划在打开 WebSocket 握手请求期间使用 bearer 访问令牌时,我们强烈建议遵循以下列出的额外安全措施,以最大限度地降低安全风险

  • 使用 CORS 过滤器将支持的源限制为仅受信任的源。

  • 使用 wss 协议来强制执行通过 TLS 加密的 HTTP 连接。

  • 使用自定义 WebSocket 票证系统,该系统提供一个随机令牌以及托管 JavaScript WebSockets 客户端的 HTML 页面,该客户端必须在初始握手请求期间将此令牌作为查询参数提供。

6.5. 检查和/或拒绝 HTTP 升级

要检查 HTTP 升级,您必须提供一个实现 io.quarkus.websockets.next.HttpUpgradeCheck 接口的 CDI Bean。Quarkus 在每个应该升级到 WebSocket 连接的 HTTP 请求上调用 HttpUpgradeCheck#perform 方法。在此方法中,您可以执行任何业务逻辑和/或拒绝 HTTP 升级。

HttpUpgradeCheck 示例
package io.quarkus.websockets.next.test;

import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped (1)
public class ExampleHttpUpgradeCheck implements HttpUpgradeCheck {

    @Override
    public Uni<CheckResult> perform(HttpUpgradeContext ctx) {
        if (rejectUpgrade(ctx)) {
            return CheckResult.rejectUpgrade(400); (2)
        }
        return CheckResult.permitUpgrade();
    }

    private boolean rejectUpgrade(HttpUpgradeContext ctx) {
        var headers = ctx.httpRequest().headers();
        // implement your business logic in here
    }
}
1 实现 HttpUpgradeCheck 接口的 CDI Bean 可以是 @ApplicationScoped@Singleton@Dependent Bean,但绝不能是 @RequestScoped Bean。
2 拒绝 HTTP 升级。初始 HTTP 握手以 400 Bad Request 响应状态代码结束。
您可以使用 HttpUpgradeCheck#appliesTo 方法选择将 HttpUpgradeCheck 应用于哪些 WebSocket 端点。

6.6. TLS

作为此扩展重用 HTTP 服务器的直接结果,所有相关的服务器配置都适用。有关更多详细信息,请参阅 HTTP 指南

6.7. Hibernate 多租户

HTTP 升级后 RoutingContext 不可用。但是,可以注入 WebSocketConnection 并访问初始 HTTP 请求的标头。

如果使用了自定义 TenantResolver 并且您想组合 REST/HTTP 和 WebSockets,则代码可能如下所示

@RequestScoped
@PersistenceUnitExtension
public class CustomTenantResolver implements TenantResolver {

    @Inject
    RoutingContext context;
    @Inject
    WebSocketConnection connection;

    @Override
    public String getDefaultTenantId() {
        return "public";
    }

    @Override
    public String resolveTenantId() {
        String schema;
        try {
            //Handle WebSocket
            schema = connection.handshakeRequest().header("schema");
        } catch ( ContextNotActiveException e) {
            // Handle REST/HTTP
            schema = context.request().getHeader( "schema" );
        }

        if ( schema == null || schema.equalsIgnoreCase( "public" ) ) {
            return "public";
        }

        return schema;
    }
}

有关 Hibernate 多租户的更多信息,请参阅 hibernate 文档

7. 客户端 API

7.1. 客户端连接器

可以使用连接器配置和打开由客户端端点支持的新客户端连接,该端点用于消费和发送消息。Quarkus 提供了一个 CDI Bean,其 Bean 类型为 io.quarkus.websockets.next.WebSocketConnector<CLIENT> 和默认限定符,可以在其他 Bean 中注入。注入点的实际类型参数用于确定客户端端点。该类型在构建期间进行验证 - 如果它不代表客户端端点,则构建失败。

让我们考虑以下客户端端点

客户端端点
@WebSocketClient(path = "/endpoint/{name}")
public class ClientEndpoint {

    @OnTextMessage
    void onMessage(@PathParam String name, String message, WebSocketClientConnection connection) {
        // ...
    }
}

此客户端端点的连接器使用方式如下

连接器
@Singleton
public class MyBean {

    @ConfigProperty(name = "endpoint.uri")
    URI myUri;

    @Inject
    WebSocketConnector<ClientEndpoint> connector; (1)

    void openAndSendMessage() {
        WebSocketClientConnection connection = connector
            .baseUri(uri) (2)
            .pathParam("name", "Roxanne") (3)
            .connectAndAwait();
        connection.sendTextAndAwait("Hi!"); (4)
    }
}
1 注入 ClientEndpoint 的连接器。
2 如果未提供基本 URI,我们将尝试从配置中获取该值。键由客户端 ID 和 .base-uri 后缀组成。
3 设置路径参数值。如果客户端端点路径不包含具有给定名称的参数,则抛出 IllegalArgumentException
4 如果需要,使用连接发送消息。
如果应用程序尝试注入缺失端点的连接器,则会抛出错误。

连接器不是线程安全的,不应并发使用。连接器也不应重复使用。如果需要连续创建多个连接,则需要使用 Instance#get() 以编程方式获取新的连接器实例

import jakarta.enterprise.inject.Instance;

@Singleton
public class MyBean {

    @Inject
    Instance<WebSocketConnector<MyEndpoint>> connector;

    void connect() {
        var connection1 = connector.get().baseUri(uri)
                .addHeader("Foo", "alpha")
                .connectAndAwait();
        var connection2 = connector.get().baseUri(uri)
                .addHeader("Foo", "bravo")
                .connectAndAwait();
    }
}

7.1.1. 基本连接器

如果应用程序开发人员不需要客户端端点和连接器的组合,则可以使用基本连接器。基本连接器是一种创建连接和消费/发送消息的简单方法,无需定义客户端端点。

基本连接器
@Singleton
public class MyBean {

    @Inject
    BasicWebSocketConnector connector; (1)

    void openAndConsume() {
        WebSocketClientConnection connection = connector
            .baseUri(uri) (2)
            .path("/ws") (3)
            .executionModel(ExecutionModel.NON_BLOCKING) (4)
            .onTextMessage((c, m) -> { (5)
               // ...
            })
            .connectAndAwait();
    }
}
1 注入连接器。
2 必须始终设置基本 URI。
3 应该附加到基本 URI 的附加路径。
4 设置回调处理程序的执行模型。默认情况下,回调可能会阻塞当前线程。但是,在这种情况下,回调在事件循环上执行,并且可能不会阻塞当前线程。
5 对于从服务器发送的每个文本消息,都会调用 lambda。

基本连接器更接近于底层 API,并且保留供高级用户使用。但是,与其他底层 WebSocket 客户端不同,它仍然是一个 CDI Bean,可以在其他 Bean 中注入。它还提供了一种配置回调执行模型的方法,确保与 Quarkus 的其余部分实现最佳集成。

连接器不是线程安全的,不应并发使用。连接器也不应重复使用。如果需要连续创建多个连接,则需要使用 Instance#get() 以编程方式获取新的连接器实例

import jakarta.enterprise.inject.Instance;

@Singleton
public class MyBean {

    @Inject
    Instance<BasicWebSocketConnector> connector;

    void connect() {
        var connection1 = connector.get().baseUri(uri)
                .addHeader("Foo", "alpha")
                .connectAndAwait();
        var connection2 = connector.get().baseUri(uri)
                .addHeader("Foo", "bravo")
                .connectAndAwait();
    }
}

7.2. WebSocket 客户端连接

io.quarkus.websockets.next.WebSocketClientConnection 对象表示 WebSocket 连接。Quarkus 提供了一个 @SessionScoped CDI Bean,该 Bean 实现了此接口,可以在 WebSocketClient 端点中注入,并用于与连接的服务器进行交互。

使用 @OnOpen@OnTextMessage@OnBinaryMessage@OnClose 注解的方法可以访问注入的 WebSocketClientConnection 对象

@Inject WebSocketClientConnection connection;
请注意,在这些方法之外,WebSocketClientConnection 对象不可用。但是,可以列出所有打开的客户端连接

该连接可用于向客户端发送消息、访问路径参数等。

// Send a message:
connection.sendTextAndAwait("Hello!");

// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);

// Access path parameters:
String param = connection.pathParam("foo");

WebSocketClientConnection 提供了阻塞和非阻塞方法变体来发送消息

  • sendTextAndAwait(String message):向客户端发送文本消息,并等待消息发送。它是阻塞的,只能从执行器线程调用。

  • sendText(String message):向客户端发送文本消息。它返回一个 Uni。它是非阻塞的。请确保您或 Quarkus 订阅了返回的 Uni 以发送消息。如果您从 Quarkus 调用的方法(如 Quarkus REST、Quarkus WebSocket Next 或 Quarkus Messaging)返回 Uni,它将订阅它并发送消息。例如

@POST
public Uni<Void> send() {
    return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message.
}

7.2.1. 列出打开的客户端连接

也可以列出所有打开的连接。Quarkus 提供了一个类型为 io.quarkus.websockets.next.OpenClientConnections 的 CDI Bean,该 Bean 声明了访问连接的便捷方法。

import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenClientConnections;

class MyBean {

  @Inject
  OpenClientConnections connections;

  void logAllOpenClinetConnections() {
     Log.infof("Open client connections: %s", connections.listAll()); (1)
  }
}
1 OpenClientConnections#listAll() 返回给定时间所有打开连接的不可变快照。

还有其他方便的方法。例如,OpenClientConnections#findByClientId(String) 可以轻松找到特定端点的连接。

7.2.2. 用户数据

也可以将任意用户数据与特定连接关联。通过 WebSocketClientConnection#userData() 方法获取的 io.quarkus.websockets.next.UserData 对象表示与连接关联的可变用户数据。

import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.UserData.TypedKey;

@WebSocketClient(path = "/endpoint/{username}")
class MyEndpoint {

  @Inject
  CoolService service;

  @OnOpen
  void open(WebSocketClientConnection connection) {
     connection.userData().put(TypedKey.forBoolean("isCool"), service.isCool(connection.pathParam("username"))); (1)
  }

  @OnTextMessage
  String process(String message) {
     if (connection.userData().get(TypedKey.forBoolean("isCool"))) { (2)
        return "Cool message processed!";
     } else {
        return "Message processed!";
     }
  }
}
1 CoolService#isCool() 返回与当前连接关联的 Boolean
2 TypedKey.forBoolean("isCool") 是用于获取连接创建时存储的数据的键。
7.2.2.1. 在连接器中指定

在某些情况下,您可能希望将用户数据与 连接器创建的连接关联。在这种情况下,您可以在获取连接之前在连接器实例上设置值。如果您需要在连接打开时执行某些操作,并且无法以其他方式推断必要的上下文,则此功能特别有用。

连接器
@Singleton
public class MyBean {

    @Inject
    MyService service;

    @Inject
    Instance<WebSocketConnector<MyEndpoint>> connectorInstance;

    public void openAndSendMessage(String internalId, String message) {
        var externalId = service.getExternalId(internalId);
        var connection = connectorInstance.get()
            .pathParam("externalId", externalId)
            .userData(TypedKey.forString("internalId"), internalId)
            .connectAndAwait();
        connection.sendTextAndAwait(message);
    }
}
端点
@WebSocketClient(path = "/endpoint/{externalId}")
class MyEndpoint {

    @Inject
    MyService service;

    @OnOpen
    void open(WebSocketClientConnection connection) {
        var internalId = connection.userData().get(TypedKey.forString("internalId"));
        service.doSomething(internalId);
    }
}

7.2.3. CDI 事件

当新连接打开时,Quarkus 会异步触发类型为 io.quarkus.websockets.next.WebSocketClientConnection 且限定符为 @io.quarkus.websockets.next.Open 的 CDI 事件。此外,当连接关闭时,会异步触发类型为 WebSocketClientConnection 且限定符为 @io.quarkus.websockets.next.Closed 的 CDI 事件。

import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketClientConnection;

class MyBean {

  void connectionOpened(@ObservesAsync @Open WebSocketClientConnection connection) { (1)
     // This observer method is called when a connection is opened...
  }
}
1 异步观察者方法使用默认的阻塞执行器服务执行。

7.3. 配置 SSL/TLS

要建立 TLS 连接,您需要使用 TLS 注册表配置命名配置。这通常通过配置完成

quarkus.tls.my-ws-client.trust-store.p12.path=server-truststore.p12
quarkus.tls.my-ws-client.trust-store.p12.password=secret

建立命名 TLS 配置后,您可以配置客户端以使用它

quarkus.websockets-next.client.tls-configuration-name=my-ws-client

或者,您可以使用 连接器提供配置名称

@Singleton
public class MyBean {

    @Inject
    WebSocketConnector<MyEndpoint> connector;

    public void connect() {
        connector
            .tlsConfigurationName("my-ws-client")
            .connectAndAwait();
    }
}

提供给连接器的名称将覆盖任何静态配置的名称。这对于建立默认配置非常有用,该配置可以在运行时根据需要进行覆盖。

使用 WebSocket 客户端时,需要使用命名配置以避免与其他 TLS 配置冲突。客户端不会使用默认 TLS 配置。

配置命名 TLS 配置时,默认情况下会启用 TLS。

8. 流量日志记录

Quarkus 可以记录发送和接收的消息以进行调试。要为服务器启用流量日志记录,请将 quarkus.websockets-next.server.traffic-logging.enabled 配置属性设置为 true。要为客户端启用流量日志记录,请将 quarkus.websockets-next.client.traffic-logging.enabled 配置属性设置为 true。文本消息的有效负载也会被记录。但是,记录的字符数是有限制的。默认限制为 100,但您可以使用 quarkus.websockets-next.server.traffic-logging.text-payload-limitquarkus.websockets-next.client.traffic-logging.text-payload-limit 配置属性分别更改此限制。

只有为记录器 io.quarkus.websockets.next.traffic 启用了 DEBUG 级别时,才会记录消息。
服务器配置示例
quarkus.websockets-next.server.traffic-logging.enabled=true (1)
quarkus.websockets-next.server.traffic-logging.text-payload-limit=50 (2)

quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG (3)
1 启用流量日志记录。
2 设置将记录的文本消息有效负载的字符数。
3 为记录器 io.quarkus.websockets.next.traffic 启用 DEBUG 级别。

9. 何时订阅 UniMulti

UniMulti 是惰性类型,这意味着它们在被订阅之前不会开始处理。

当您从参数或从您调用的方法中获取 UniMulti 时,是否应该订阅它取决于上下文

  • 如果您在 Quarkus 调用的方法中返回 UniMulti(例如使用 Quarkus REST、Quarkus WebSocket Next 或 Quarkus Messaging),Quarkus 会订阅它并处理 Multi 发出的项目或 Uni 发出的项目

@Incoming("...")
@Outgoing("...")
public Multi<String> process(Multi<String> input) {
    // No need to subscribe to the input Multi, the `process` method is called by Quarkus (Messaging).
    return input.map(String::toUpperCase);
}

当从使用 @OnOpen@OnTextMessage@OnBinaryMessage@OnClose 注解的方法返回 UniMulti 时,Quarkus 会自动订阅它。

  • 如果您不在 Quarkus 调用的方法中返回 UniMulti,则应该订阅它

@Incoming("...")
@Outgoing("...")
public void process(Multi<String> input) {
    input.map(String::toUpperCase)
        .subscribe().with(s -> log(s));
}

10. 遥测

当 OpenTelemetry 扩展存在时,默认情况下会收集打开和关闭的 WebSocket 连接的跟踪。如果您不需要 WebSocket 跟踪,您可以像下面的示例中那样禁用跟踪收集

quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false

当 Micrometer 扩展存在时,Quarkus 可以收集消息、错误和传输的字节的指标。如果您需要 WebSocket 指标,您可以像下面的示例中那样启用指标

quarkus.websockets-next.server.metrics.enabled=true
quarkus.websockets-next.client.metrics.enabled=true
目前不支持 BasicWebSocketConnector 的遥测。

11. 配置参考

构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖

配置属性

类型

默认

指定在端点回调调用期间激活 CDI 请求上下文的策略。默认情况下,仅在需要时才激活请求上下文,即如果端点的依赖树中存在具有给定作用域的 Bean,或者使用安全注解(例如 @RolesAllowed)注解的 Bean。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_ACTIVATE_REQUEST_CONTEXT

显示更多

auto仅在需要时才激活上下文。, always始终激活上下文。

auto仅在需要时才激活上下文。

指定在端点回调调用期间激活 CDI 会话上下文的策略。默认情况下,仅在需要时才激活会话上下文,即如果端点的依赖树中存在具有给定作用域的 Bean。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_ACTIVATE_SESSION_CONTEXT

显示更多

auto仅在需要时才激活上下文。, always始终激活上下文。

auto仅在需要时才激活上下文。

如果启用,则 WebSocket 打开握手标头将使用与格式 'quarkus-http-upgrade#header-name#header-value' 匹配的 'Sec-WebSocket-Protocol' 子协议进行增强。如果 WebSocket 客户端接口不支持将标头设置为 WebSocket 打开握手,这是一种设置验证用户所需的授权标头的方式。'quarkus-http-upgrade' 子协议将被删除,服务器将从子协议中选择一个受支持的子协议(不要忘记配置 'quarkus.websockets-next.server.supported-subprotocols' 属性)。重要提示:我们强烈建议仅在通过 TLS 加密 HTTP 连接、启用 CORS 来源检查并且已设置自定义 WebSocket 票证系统时才启用此功能。请参阅 Quarkus WebSockets Next 参考以获取更多信息。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_PROPAGATE_SUBPROTOCOL_HEADERS

显示更多

布尔值

false

请参阅 WebSocket 协议

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_SUPPORTED_SUBPROTOCOLS

显示更多

字符串列表

默认情况下支持 WebSocket 的压缩扩展。

另请参阅 RFC 7692

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_PER_MESSAGE_COMPRESSION_SUPPORTED

显示更多

布尔值

true

压缩级别必须是 0 到 9 之间的值。默认值为 io.vertx.core.http.HttpServerOptions#DEFAULT_WEBSOCKET_COMPRESSION_LEVEL

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_COMPRESSION_LEVEL

显示更多

整数

消息的最大大小(以字节为单位)。默认值为 io.vertx.core.http.HttpServerOptions#DEFAULT_MAX_WEBSOCKET_MESSAGE_SIZE

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_MAX_MESSAGE_SIZE

显示更多

整数

帧的最大大小(以字节为单位)。默认值为 io.vertx.core.http.HttpServerOptions#DEFAULT_MAX_WEBSOCKET_FRAME_SIZE

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_MAX_FRAME_SIZE

显示更多

整数

此间隔之后,如果设置了,服务器会自动向连接的客户端发送 ping 消息。

默认情况下不会自动发送 Ping 消息。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_AUTO_PING_INTERVAL

显示更多

持续时间 

当发生错误但没有错误处理程序可以处理失败时使用的策略。

默认情况下,发生未处理的失败时,会记录错误消息并关闭连接。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_UNHANDLED_FAILURE_STRATEGY

显示更多

log-and-close记录错误消息并关闭连接。, close静默关闭连接。, log记录错误消息。, noop无操作。

log-and-close记录错误消息并关闭连接。

如果由于授权失败而拒绝 HTTP 升级,Quarkus 会将 HTTP 握手请求重定向到此 URL。当您使用标准安全注解保护端点时,此配置属性生效。例如,如果端点类使用 @RolesAllowed 注解进行注解,则 HTTP 升级将受到保护。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_SECURITY_AUTH_FAILURE_REDIRECT_URL

显示更多

字符串

为 Dev UI 连接保留的消息限制。如果小于零,则不会存储消息并将其发送到 Dev UI 视图。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_DEV_MODE_CONNECTION_MESSAGES_LIMIT

显示更多

long

1000

如果设置为 true,则在为记录器 io.quarkus.websockets.next.traffic 启用了 DEBUG 级别时,将记录接收/发送的二进制/文本消息。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_TRAFFIC_LOGGING_ENABLED

显示更多

布尔值

false

如果启用了流量日志记录,则将记录文本消息的字符数。永远不会记录二进制消息的有效负载。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_TRAFFIC_LOGGING_TEXT_PAYLOAD_LIMIT

显示更多

整数

100

如果启用了 WebSocket 跟踪的收集。仅当 OpenTelemetry 扩展存在时才适用。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_TRACES_ENABLED

显示更多

布尔值

true

如果启用了 WebSocket 指标的收集。仅当 Micrometer 扩展存在时才适用。

环境变量:QUARKUS_WEBSOCKETS_NEXT_SERVER_METRICS_ENABLED

显示更多

布尔值

false

默认情况下支持 WebSocket 的压缩扩展。

另请参阅 RFC 7692

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_OFFER_PER_MESSAGE_COMPRESSION

显示更多

布尔值

false

压缩级别必须是 0 到 9 之间的值。默认值为 io.vertx.core.http.HttpClientOptions#DEFAULT_WEBSOCKET_COMPRESSION_LEVEL

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_COMPRESSION_LEVEL

显示更多

整数

消息的最大大小(以字节为单位)。默认值为 io.vertx.core.http.HttpClientOptions#DEFAULT_MAX_WEBSOCKET_MESSAGE_SIZE

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_MAX_MESSAGE_SIZE

显示更多

整数

帧的最大大小(以字节为单位)。默认值为 io.vertx.core.http.HttpClientOptions#DEFAULT_MAX_WEBSOCKET_FRAME_SIZE

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_MAX_FRAME_SIZE

显示更多

整数

此间隔之后,如果设置了,客户端会自动向连接的服务器发送 ping 消息。

默认情况下不会自动发送 Ping 消息。

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_AUTO_PING_INTERVAL

显示更多

持续时间 

如果设置,则如果在给定的超时时间内未接收或发送任何数据,则将关闭连接。

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_CONNECTION_IDLE_TIMEOUT

显示更多

持续时间 

客户端在发送关闭帧后等待关闭 TCP 连接的时间。任何值都将 Duration#toSeconds() 转换为秒并限制为 Integer#MAX_VALUE。默认值为 io.vertx.core.http.HttpClientOptions#DEFAULT_WEBSOCKET_CLOSING_TIMEOUT

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_CONNECTION_CLOSING_TIMEOUT

显示更多

持续时间 

当发生错误但没有错误处理程序可以处理失败时使用的策略。

默认情况下,发生未处理的失败时,会记录错误消息。

请注意,客户端不应随意关闭 WebSocket 连接。另请参阅 RFC-6455 第 7.3 节

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_UNHANDLED_FAILURE_STRATEGY

显示更多

log-and-close记录错误消息并关闭连接。, close静默关闭连接。, log记录错误消息。, noop无操作。

log记录错误消息。

要使用的 TLS 配置的名称。

如果配置了名称,它将使用来自 quarkus.tls.<name>.* 的配置。如果配置了名称,但找不到具有该名称的 TLS 配置,则将引发错误。

默认情况下,使用默认 TLS 配置。

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_TLS_CONFIGURATION_NAME

显示更多

字符串

如果设置为 true,则在为记录器 io.quarkus.websockets.next.traffic 启用了 DEBUG 级别时,将记录接收/发送的二进制/文本消息。

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_TRAFFIC_LOGGING_ENABLED

显示更多

布尔值

false

如果启用了流量日志记录,则将记录文本消息的字符数。永远不会记录二进制消息的有效负载。

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_TRAFFIC_LOGGING_TEXT_PAYLOAD_LIMIT

显示更多

整数

100

如果启用了 WebSocket 跟踪的收集。仅当 OpenTelemetry 扩展存在时才适用。

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_TRACES_ENABLED

显示更多

布尔值

true

如果启用了 WebSocket 指标的收集。仅当 Micrometer 扩展存在时才适用。

环境变量:QUARKUS_WEBSOCKETS_NEXT_CLIENT_METRICS_ENABLED

显示更多

布尔值

false

关于 Duration 格式

要写入持续时间值,请使用标准的 java.time.Duration 格式。有关更多信息,请参阅 Duration#parse() Java API 文档

您还可以使用简化的格式,以数字开头

  • 如果该值仅为一个数字,则表示以秒为单位的时间。

  • 如果该值是一个数字后跟 ms,则表示以毫秒为单位的时间。

在其他情况下,简化格式将被转换为 java.time.Duration 格式以进行解析

  • 如果该值是一个数字后跟 hms,则在其前面加上 PT

  • 如果该值是一个数字后跟 d,则在其前面加上 P

相关内容