WebSockets Next 参考指南
quarkus-websockets-next
扩展提供了一个现代化的声明式 API,用于定义 WebSocket 服务器和客户端端点。
1. WebSocket 协议
WebSocket 协议,记录在 RFC6455 中,建立了一种标准化的方法,用于通过单个 TCP 连接在客户端和服务器之间创建双向通信通道。与 HTTP 不同,WebSocket 作为一种独立的 TCP 协议运行,但旨在与 HTTP 无缝协作。例如,它重用相同的端口并与相同的安全机制兼容。
使用 WebSocket 的交互以 HTTP 请求开始,该请求使用“Upgrade”标头来转换到 WebSocket 协议。服务器不是回复 200 OK
,而是回复 101 Switching Protocols
响应,以将 HTTP 连接升级为 WebSocket 连接。在成功握手之后,初始 HTTP 升级请求中使用的 TCP 套接字保持打开状态,允许客户端和服务器持续地在两个方向上交换消息。
2. HTTP 和 WebSocket 架构风格
尽管 WebSocket 与 HTTP 兼容并且通过 HTTP 请求启动,但重要的是要认识到这两种协议会导致截然不同的架构和编程模型。
对于 HTTP/REST,应用程序围绕资源/端点构建,这些资源/端点处理各种 HTTP 方法和路径。客户端交互通过发出带有适当方法和路径的 HTTP 请求来进行,遵循请求-响应模式。服务器根据路径、方法和标头将传入的请求路由到相应的处理程序,然后回复一个定义良好的响应。
相反,WebSocket 通常涉及用于初始 HTTP 连接的单个端点,之后所有消息都使用相同的 TCP 连接。它引入了一种完全不同的交互模型:异步和消息驱动。
与 HTTP 相比,WebSocket 是一种低级传输协议。消息格式、路由或处理需要客户端和服务器之间事先就消息语义达成一致。
对于 WebSocket 客户端和服务器,HTTP 握手请求中的 Sec-WebSocket-Protocol
标头允许协商更高级别的消息传递协议。如果没有它,服务器和客户端必须建立自己的约定。
3. Quarkus WebSockets 与 Quarkus WebSockets Next
本指南使用 quarkus-websockets-next
扩展,这是 WebSocket API 的一个实现,与传统的 quarkus-websockets
扩展相比,它具有更高的效率和可用性。原始的 quarkus-websockets
扩展仍然可以访问,并将获得持续的支持,但不太可能进行功能开发。
与 quarkus-websockets
不同,quarkus-websockets-next
扩展不实现 Jakarta WebSocket 规范。相反,它引入了一个现代 API,优先考虑易用性。此外,它还专门设计用于与 Quarkus 的反应式架构和网络层无缝集成。
Quarkus WebSockets next 扩展使用的注解与 JSR 356 中的注解不同,尽管有时共享相同的名称。JSR 注解带有 Quarkus WebSockets Next 扩展不遵循的语义。
4. 项目设置
要使用 websockets-next
扩展,您需要将 io.quarkus:quarkus-websockets-next
依赖项添加到您的项目中。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-websockets-next</artifactId>
</dependency>
implementation("io.quarkus:quarkus-websockets-next")
5. 端点
服务器 API 和 客户端 API 都定义了用于消费和发送消息的端点。端点实现为 CDI bean 并支持注入。端点声明了使用 @OnTextMessage
、@OnBinaryMessage
、@OnPingMessage
、@OnPongMessage
、@OnOpen
、@OnClose
和 @OnError
注解的 回调方法。这些方法用于处理各种 WebSocket 事件。通常,当连接的客户端向服务器发送消息时,会调用使用 @OnTextMessage
注解的方法,反之亦然。
客户端 API 还包括用于配置和创建新 WebSocket 连接的 连接器。 |
5.1. 服务器端点
服务器端点是使用 @io.quarkus.websockets.next.WebSocket
注解的类。WebSocket#path()
的值用于定义端点的路径。
package org.acme.websockets;
import io.quarkus.websockets.next.WebSocket;
import jakarta.inject.Inject;
@WebSocket(path = "/chat/{username}") (1)
public class ChatWebSocket {
}
因此,客户端可以使用 ws://:8080/chat/your-name
连接到此 web socket 端点。如果使用 TLS,则 URL 为 wss://:8443/chat/your-name
。
端点路径相对于由 quarkus.http.root-path 配置的 根上下文(默认为 / )。例如,如果您将 quarkus.http.root-path=/api 添加到您的 application.properties 中,则客户端可以使用 https://:8080/api/chat/the-name 连接到此端点。 |
5.2. 客户端端点
客户端端点是使用 @io.quarkus.websockets.next.WebSocketClient
注解的类。WebSocketClient#path()
的值用于定义此客户端将连接到的端点的路径。
package org.acme.websockets;
import io.quarkus.websockets.next.WebSocketClient;
import jakarta.inject.Inject;
@WebSocketClient(path = "/chat/{username}") (1)
public class ChatWebSocket {
}
客户端端点用于消费和发送消息。您需要 连接器 API 来配置和打开新的 WebSocket 连接。 |
5.3. 路径参数
WebSocket 端点的路径可以包含路径参数。语法与 JAX-RS 资源相同:{parameterName}
。
您可以使用 io.quarkus.websockets.next.WebSocketConnection#pathParam(String)
方法或 io.quarkus.websockets.next.WebSocketClientConnection#pathParam(String)
方法分别访问路径参数值。或者,会自动注入使用 @io.quarkus.websockets.next.PathParam
注解的端点回调方法参数。
WebSocketConnection#pathParam(String)
示例@Inject io.quarkus.websockets.next.WebSocketConnection connection;
// ...
String value = connection.pathParam("parameterName");
路径参数值始终是字符串。如果路径中不存在路径参数,则 WebSocketConnection#pathParam(String)
/WebSocketClientConnection#pathParam(String)
方法返回 null
。如果存在使用 @PathParam
注解的端点回调方法参数,并且端点路径中未定义参数名称,则构建失败。
不支持查询参数。但是,您可以使用 WebSocketConnection#handshakeRequest().query() 访问查询。 |
5.4. CDI 范围
端点作为 CDI bean 进行管理。默认情况下,使用 @Singleton
范围。但是,开发人员可以指定替代范围以满足他们的特定要求。
@Singleton
和 @ApplicationScoped
端点在所有 WebSocket 连接之间共享。因此,实现应该是无状态的或线程安全的。
5.4.1. 会话上下文
如果端点使用 @SessionScoped
注解,或者直接或间接依赖于 @SessionScoped
bean,则每个 WebSocket 连接都与其自己的会话上下文相关联。会话上下文在端点回调调用期间处于活动状态。同一连接中后续的 回调方法 调用使用相同的会话上下文。会话上下文保持活动状态,直到连接关闭(通常在 @OnClose
方法完成执行时),此时它将终止。
也可以将 quarkus.websockets-next.server.activate-session-context 配置属性设置为 always 。在这种情况下,会话上下文始终处于活动状态,无论 @SessionScoped bean 是否参与依赖关系树。 |
@SessionScoped
端点import jakarta.enterprise.context.SessionScoped;
@WebSocket(path = "/ws")
@SessionScoped (1)
public class MyWebSocket {
}
1 | 此服务器端点未共享,并且作用域限定为会话/连接。 |
5.4.2. 请求上下文
如果端点是
-
使用
@RequestScoped
注解进行注解,则每个 WebSocket 端点回调方法执行都与一个新的 CDI 请求上下文相关联。 -
具有使用安全注解(例如
@RolesAllowed
)注解的方法。 -
直接或间接依赖于
@RequestScoped
bean。 -
直接或间接依赖于使用标准安全注解保护的 CDI bean。
也可以将 quarkus.websockets-next.server.activate-request-context 配置属性设置为 always 。在这种情况下,调用端点回调时始终激活请求上下文。 |
@RequestScoped
端点import jakarta.enterprise.context.RequestScoped;
@WebSocket(path = "/ws")
@RequestScoped (1)
public class MyWebSocket {
}
1 | 此服务器端点是为每次回调方法执行实例化的。 |
5.5. 回调方法
WebSocket 端点可以声明
-
最多一个
@OnTextMessage
方法:处理来自连接的客户端/服务器的文本消息。 -
最多一个
@OnBinaryMessage
方法:处理来自连接的客户端/服务器的二进制消息。 -
最多一个
@OnPingMessage
方法:处理来自连接的客户端/服务器的 ping 消息。 -
最多一个
@OnPongMessage
方法:处理来自连接的客户端/服务器的 pong 消息。 -
最多一个
@OnOpen
方法:在打开连接时调用。 -
最多一个
@OnClose
方法:在关闭连接时执行。 -
任意数量的
@OnError
方法:当发生错误时调用;即当端点回调引发运行时错误,或者发生转换错误,或者返回的io.smallrye.mutiny.Uni
/io.smallrye.mutiny.Multi
收到故障时。
只有某些端点需要包括所有方法。但是,它必须至少包含 @On[Text|Binary]Message
或 @OnOpen
。
如果任何端点违反这些规则,则会在构建时抛出错误。表示子 web socket 的静态嵌套类也遵循相同的准则。
在 WebSocket 端点之外使用 @OnTextMessage 、@OnBinaryMessage 、@OnOpen 和 @OnClose 注解的任何方法都被认为是错误的,并且会导致构建失败并显示相应的错误消息。 |
5.6. 处理消息
接收来自客户端的消息的方法使用 @OnTextMessage
或 @OnBinaryMessage
注解。
对于从客户端收到的每个文本消息,都会调用 OnTextMessage
。对于客户端收到的每个二进制消息,都会调用 OnBinaryMessage
。
5.6.1. 调用规则
在调用回调方法时,链接到 WebSocket 连接的会话范围保持活动状态。此外,请求范围在方法完成之前(或直到它为异步和反应式方法生成结果)一直处于活动状态。
WebSocket Next 支持阻塞和非阻塞逻辑,类似于 Quarkus REST,由方法的返回类型和额外的注解(如 @Blocking
和 @NonBlocking
)确定。
以下是控制执行的规则
-
使用
@RunOnVirtualThread
、@Blocking
或@Transactional
注解的方法被认为是阻塞的。 -
在使用
@RunOnVirtualThread
注解的类中声明的方法被认为是阻塞的。 -
使用
@NonBlocking
注解的方法被认为是非阻塞的。 -
在使用
@Transactional
注解的类中声明的方法被认为是阻塞的,除非使用@NonBlocking
注解。 -
如果方法未声明上述任何注解,则执行模型从返回类型派生
-
返回
Uni
和Multi
的方法被认为是非阻塞的。 -
返回
void
或任何其他类型的方法被认为是阻塞的。
-
-
Kotlin
suspend
函数始终被认为是非阻塞的,不能使用@Blocking
、@NonBlocking
或@RunOnVirtualThread
注解,也不能在使用@RunOnVirtualThread
注解的类中。 -
非阻塞方法必须在连接的事件循环线程上执行。
-
阻塞方法必须在工作线程上执行,除非使用
@RunOnVirtualThread
注解或在使用@RunOnVirtualThread
注解的类中。 -
使用
@RunOnVirtualThread
注解的方法或在使用@RunOnVirtualThread
注解的类中声明的方法必须在虚拟线程上执行,每次调用都会生成一个新的虚拟线程。
5.6.2. 方法参数
该方法必须只接受一个消息参数
-
消息对象(任何类型)。
-
一个
Multi<X>
,其中 X 是消息类型。
但是,它也可以接受以下参数
-
WebSocketConnection
/WebSocketClientConnection
-
HandshakeRequest
-
使用
@PathParam
注解的String
参数
消息对象表示发送的数据,可以作为原始内容(String
、JsonObject
、JsonArray
、Buffer
或 byte[]
)访问,也可以作为反序列化的更高级别的对象访问,这是推荐的方法。
当接收到 Multi
时,该方法每个连接调用一次,并且提供的 Multi
接收由此连接传输的项目。如果该方法返回一个 Multi
(从接收到的 Multi
构建),Quarkus 将自动订阅它并写入发出的项目,直到完成、失败或取消。但是,如果你的方法没有返回 Multi
,你必须订阅传入的 Multi
才能消费数据。
以下是两个示例
// No need to subscribe to the incoming Multi as the method returns a Multi derived from the incoming one
@OnTextMessage
public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {
return incoming.log();
}
// ...
// Must subscribe to the incoming Multi as the method does not return a Multi, otherwise no data will be consumed
@OnTextMessage
public void stream(Multi<ChatMessage> incoming) {
incoming.subscribe().with(item -> log(item));
}
请参阅 何时订阅 Uni
或 Multi
以了解有关订阅传入的 Multi
的更多信息。
5.6.3. 支持的返回类型
使用 @OnTextMessage
或 @OnBinaryMessage
注解的方法可以返回各种类型,以有效地处理 WebSocket 通信
-
void
:表示一个阻塞方法,其中没有将显式响应发送回客户端。 -
Uni<Void>
:表示一个非阻塞方法,其中返回的Uni
的完成表示处理的结束。没有将显式响应发送回客户端。 -
类型为
X
的对象表示一个阻塞方法,其中返回的对象被序列化并作为响应发送回客户端。 -
Uni<X>
:指定一个非阻塞方法,其中非空Uni
发出的项目作为响应发送给客户端。 -
Multi<X>
:表示一个非阻塞方法,其中非空Multi
发出的项目按顺序发送给客户端,直到完成或取消。 -
Kotlin
suspend
函数返回Unit
:表示一个非阻塞方法,其中没有将显式响应发送回客户端。 -
Kotlin
suspend
函数返回X
:指定一个非阻塞方法,其中返回的项目作为响应发送给客户端。
以下是这些方法的一些示例
@OnTextMessage
void consume(Message m) {
// Process the incoming message. The method is called on an executor thread for each incoming message.
}
@OnTextMessage
Uni<Void> consumeAsync(Message m) {
// Process the incoming message. The method is called on an event loop thread for each incoming message.
// The method completes when the returned Uni emits its item.
}
@OnTextMessage
ResponseMessage process(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client.
}
@OnTextMessage
Uni<ResponseMessage> processAsync(Message m) {
// Process the incoming message and send a response to the client.
// The method is called for each incoming message.
// Note that if the method returns `null`, no response will be sent to the client. The method completes when the returned Uni emits its item.
}
@OnTextMessage
Multi<ResponseMessage> stream(Message m) {
// Process the incoming message and send multiple responses to the client.
// The method is called for each incoming message.
// The method completes when the returned Multi emits its completion signal.
// The method cannot return `null` (but an empty multi if no response must be sent)
}
返回 Uni
和 Multi
的方法被认为是非阻塞的。此外,Quarkus 会自动订阅返回的 Multi
/ Uni
并写入发出的项目,直到完成、失败或取消。失败或取消会终止连接。
5.6.4. 流
除了单个消息之外,WebSocket 端点还可以处理消息流。在这种情况下,该方法接收一个 Multi<X>
作为参数。每个 X
的实例都使用上面列出的相同规则进行反序列化。
接收 Multi
的方法可以返回另一个 Multi
或 void
。如果该方法返回一个 Multi
,则不必订阅传入的 multi
。
@OnTextMessage
public Multi<ChatMessage> stream(Multi<ChatMessage> incoming) {
return incoming.log();
}
此方法允许双向流。
当方法返回 void
,因此不返回 Multi
时,代码必须订阅传入的 Multi
。否则,将不会消费任何数据,并且连接将不会关闭
@OnTextMessage
public void stream(Multi<ChatMessage> incoming) {
incoming.subscribe().with(item -> log(item));
}
另请注意,stream
方法将在 Multi
完成之前完成。
请参阅 何时订阅 Uni
或 Multi
以了解有关订阅传入的 Multi
的更多信息。
5.6.7. OnOpen 和 OnClose 方法
当客户端连接或断开连接时,也可以通知 WebSocket 端点。
这可以通过使用 @OnOpen
或 @OnClose
注解方法来完成
@OnOpen(broadcast = true)
public ChatMessage onOpen() {
return new ChatMessage(MessageType.USER_JOINED, connection.pathParam("username"), null);
}
@Inject WebSocketConnection connection;
@OnClose
public void onClose() {
ChatMessage departure = new ChatMessage(MessageType.USER_LEFT, connection.pathParam("username"), null);
connection.broadcast().sendTextAndAwait(departure);
}
@OnOpen
在客户端连接时触发,而 @OnClose
在断开连接时调用。
这些方法可以访问会话作用域的 WebSocketConnection
bean。
5.6.8. 参数
使用 @OnOpen
和 @OnClose
注解的方法可以接受以下参数
-
WebSocketConnection
/WebSocketClientConnection
-
HandshakeRequest
-
使用
@PathParam
注解的String
参数
使用 @OnClose
注解的端点方法也可以接受 io.quarkus.websockets.next.CloseReason
参数,该参数可以指示关闭连接的原因。
5.6.9. 支持的返回类型
@OnOpen
和 @OnClose
方法支持不同的返回类型。
对于 @OnOpen
方法,适用与 @On[Text|Binary]Message
相同的规则。因此,使用 @OnOpen
注解的方法可以在连接后立即向客户端发送消息。@OnOpen
方法支持的返回类型为
-
void
:表示一个阻塞方法,其中没有将显式消息发送回连接的客户端。 -
Uni<Void>
:表示一个非阻塞方法,其中返回的Uni
的完成表示处理的结束。没有将消息发送回客户端。 -
类型为
X
的对象:表示一个阻塞方法,其中返回的对象被序列化并发送回客户端。 -
Uni<X>
:指定一个非阻塞方法,其中非空Uni
发出的项目发送给客户端。 -
Multi<X>
:表示一个非阻塞方法,其中非空Multi
发出的项目按顺序发送给客户端,直到完成或取消。 -
Kotlin
suspend
函数返回Unit
:表示一个非阻塞方法,其中没有将显式消息发送回客户端。 -
Kotlin
suspend
函数返回X
:指定一个非阻塞方法,其中返回的项目发送给客户端。
发送给客户端的项目是 序列化的,但 String
、io.vertx.core.json.JsonObject
、io.vertx.core.json.JsonArray
、io.vertx.core.buffer.Buffer
和 byte[]
类型除外。对于 Multi
,Quarkus 订阅返回的 Multi
,并在发出项目时将它们写入 WebSocket
。String
、JsonObject
和 JsonArray
作为文本消息发送。Buffers
和字节数组作为二进制消息发送。
对于 @OnClose
方法,支持的返回类型包括
-
void
:该方法被认为是阻塞的。 -
Uni<Void>
:该方法被认为是非阻塞的。 -
Kotlin
suspend
函数返回Unit
:该方法被认为是非阻塞的。
在服务器端点上声明的 @OnClose 方法不能通过返回对象将项目发送给连接的客户端。它们只能使用 WebSocketConnection 对象将消息发送给其他客户端。 |
5.7. 错误处理
当发生错误时,也可以通知 WebSocket 端点。当端点回调抛出运行时错误,或者发生转换错误,或者返回的 io.smallrye.mutiny.Uni
/io.smallrye.mutiny.Multi
收到故障时,将调用使用 @io.quarkus.websockets.next.OnError
注解的 WebSocket 端点方法。
该方法必须只接受一个错误参数,即可以从 java.lang.Throwable
分配的参数。该方法也可以接受以下参数
-
WebSocketConnection
/WebSocketClientConnection
-
HandshakeRequest
-
使用
@PathParam
注解的String
参数
一个端点可以声明多个使用 @io.quarkus.websockets.next.OnError
注解的方法。但是,每个方法都必须声明不同的错误参数。选择声明实际异常的最具体的超类型的方法。
@io.quarkus.websockets.next.OnError 注解也可以用于声明全局错误处理程序,即未在 WebSocket 端点上声明的方法。此类方法可能不接受 @PathParam 参数。在端点上声明的错误处理程序优先于全局错误处理程序。 |
当发生错误但没有错误处理程序可以处理故障时,Quarkus 将使用 quarkus.websockets-next.server.unhandled-failure-strategy
指定的策略。对于服务器端点,默认情况下会记录错误消息并关闭连接。对于客户端端点,默认情况下会记录错误消息。
5.8. 序列化和反序列化
WebSocket Next 扩展支持自动序列化和反序列化消息。
类型为 String
、JsonObject
、JsonArray
、Buffer
和 byte[]
的对象按原样发送,并绕过序列化和反序列化。当未提供编解码器时,序列化和反序列化会自动将消息从/转换为 JSON。
当您需要自定义序列化和反序列化时,您可以提供自定义编解码器。
5.8.1. 自定义编解码器
要实现自定义编解码器,您必须提供一个实现以下接口的 CDI bean
-
二进制消息的
io.quarkus.websockets.next.BinaryMessageCodec
-
文本消息的
io.quarkus.websockets.next.TextMessageCodec
以下示例显示如何为 Item
类实现自定义编解码器
@Singleton
public class ItemBinaryMessageCodec implements BinaryMessageCodec<Item> {
@Override
public boolean supports(Type type) {
// Allows selecting the right codec for the right type
return type.equals(Item.class);
}
@Override
public Buffer encode(Item value) {
// Serialization
return Buffer.buffer(value.toString());
}
@Override
public Item decode(Type type, Buffer value) {
// Deserialization
return new Item(value.toString());
}
}
OnTextMessage
和 OnBinaryMessage
方法也可以显式指定应使用哪个编解码器
@OnTextMessage(codec = MyInputCodec.class) (1)
Item find(Item item) {
//....
}
-
指定用于消息的反序列化和序列化的编解码器
当序列化和反序列化必须使用不同的编解码器时,您可以单独指定用于序列化和反序列化的编解码器
@OnTextMessage(
codec = MyInputCodec.class, (1)
outputCodec = MyOutputCodec.class (2)
Item find(Item item) {
//....
}
-
指定用于传入消息的反序列化的编解码器
-
指定用于传出消息的序列化的编解码器
5.9. Ping/Pong 消息
5.9.1. 发送 ping 消息
Ping 消息是可选的,默认情况下不发送。但是,可以将服务器和客户端端点配置为定期自动发送 ping 消息。
quarkus.websockets-next.server.auto-ping-interval=2 (1)
quarkus.websockets-next.client.auto-ping-interval=10 (2)
1 | 每 2 秒从服务器向每个连接的客户端发送一次 ping 消息。 |
2 | 每 10 秒从所有连接的客户端实例向其远程服务器发送一次 ping 消息。 |
服务器和客户端可以使用 WebSocketConnection
或 WebSocketClientConnection
随时以编程方式发送 ping 消息。有一个非阻塞变体:Sender#sendPing(Buffer)
和一个阻塞变体:Sender#sendPingAndAwait(Buffer)
。
5.9.2. 发送 pong 消息
服务器和客户端端点始终会使用来自 ping 消息的应用程序数据,使用相应的 pong 消息响应来自远程方的 ping 消息。此行为是内置的,不需要额外的代码或配置。
服务器和客户端可以使用 WebSocketConnection
或 WebSocketClientConnection
发送未经请求的 pong 消息,这些消息可以用作单向心跳。有一个非阻塞变体:Sender#sendPong(Buffer)
和一个阻塞变体:Sender#sendPongAndAwait(Buffer)
。
5.9.3. 处理 ping/pong 消息
由于 ping 消息会自动处理,并且 pong 消息不需要响应,因此无需为这些消息编写处理程序即可符合 WebSocket 协议。但是,有时了解端点何时收到 ping 或 pong 消息很有用。
@OnPingMessage
和 @OnPongMessage
注解可用于定义使用从远程方发送的 ping 或 pong 消息的回调。一个端点最多可以声明一个 @OnPingMessage
回调和一个 @OnPongMessage
回调。回调方法必须返回 void
或 Uni<Void>
(或是一个返回 Unit
的 Kotlin suspend
函数),并且必须接受一个 Buffer
类型的参数。
@OnPingMessage
void ping(Buffer data) {
// an incoming ping that will automatically receive a pong
}
@OnPongMessage
void pong(Buffer data) {
// an incoming pong in response to the last ping sent
}
5.10. 入站处理模式
WebSocket 端点可以使用 @WebSocket#inboundProcessingMode()
和 @WebSocketClient.inboundProcessingMode()
分别定义用于处理特定连接的传入事件的模式。传入事件可以表示消息(文本、二进制、pong)、打开连接和关闭连接。默认情况下,事件是串行处理的,并且保证了排序。这意味着如果一个端点接收到事件 A
和 B
(按此特定顺序),则在事件 A
的回调完成后,将调用事件 B
的回调。但是,在某些情况下,最好并发处理事件,即不保证排序,但也没有并发限制。对于这种情况,应使用 InboundProcessingMode#CONCURRENT
。
6. 服务器 API
6.1. HTTP 服务器配置
此扩展重用主 HTTP 服务器。
因此,WebSocket 服务器的配置在 quarkus.http.
配置部分完成。
在应用程序中配置的 WebSocket 路径与 quarkus.http.root
定义的根路径连接(默认为 /
)。此连接确保 WebSocket 端点适当地位于应用程序的 URL 结构中。
有关更多详细信息,请参阅 HTTP 指南。
6.2. 子 web socket 端点
@WebSocket
端点可以封装静态嵌套类,这些类也使用 @WebSocket
注解,并表示子 web socket。这些子 web socket 的结果路径连接来自封闭类和嵌套类的路径。结果路径是标准化的,遵循 HTTP URL 规则。
子 web socket 继承对封闭类和嵌套类的 @WebSocket
注解中声明的路径参数的访问权限。封闭类中的 consumePrimary
方法可以在以下示例中访问 version
参数。同时,嵌套类中的 consumeNested
方法可以访问 version
和 id
参数
@WebSocket(path = "/ws/v{version}")
public class MyPrimaryWebSocket {
@OnTextMessage
void consumePrimary(String s) { ... }
@WebSocket(path = "/products/{id}")
public static class MyNestedWebSocket {
@OnTextMessage
void consumeNested(String s) { ... }
}
}
6.3. WebSocket 连接
io.quarkus.websockets.next.WebSocketConnection
对象表示 WebSocket 连接。Quarkus 提供一个 @SessionScoped
CDI bean,它实现此接口,可以注入到 WebSocket
端点中,并用于与连接的客户端交互。
使用 @OnOpen
、@OnTextMessage
、@OnBinaryMessage
和 @OnClose
注解的方法可以访问注入的 WebSocketConnection
对象
@Inject WebSocketConnection connection;
请注意,在这些方法之外,WebSocketConnection 对象不可用。但是,可以 列出所有打开的连接。 |
该连接可用于向客户端发送消息、访问路径参数、将消息广播给所有连接的客户端等。
// Send a message:
connection.sendTextAndAwait("Hello!");
// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);
// Access path parameters:
String param = connection.pathParam("foo");
WebSocketConnection
提供阻塞和非阻塞方法变体来发送消息
-
sendTextAndAwait(String message)
:向客户端发送文本消息,并等待消息发送。它是阻塞的,只能从执行器线程调用。 -
sendText(String message)
:向客户端发送文本消息。它返回一个Uni
。它是非阻塞的。请确保您或 Quarkus 订阅了返回的Uni
以发送消息。如果您从 Quarkus 调用的方法(如 Quarkus REST、Quarkus WebSocket Next 或 Quarkus Messaging)返回Uni
,它将订阅它并发送消息。例如
@POST
public Uni<Void> send() {
return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message.
}
请参阅 何时订阅 Uni
或 Multi
以了解有关订阅 Uni
的更多信息。
6.3.1. 列出打开的连接
也可以列出所有打开的连接。Quarkus 提供一个 io.quarkus.websockets.next.OpenConnections
类型的 CDI bean,它声明了访问连接的便捷方法。
import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenConnections;
class MyBean {
@Inject
OpenConnections connections;
void logAllOpenConnections() {
Log.infof("Open connections: %s", connections.listAll()); (1)
}
}
1 | OpenConnections#listAll() 返回给定时间所有打开的连接的不可变快照。 |
还有其他便捷方法。例如,OpenConnections#findByEndpointId(String)
可以轻松找到特定端点的连接。
6.3.2. 用户数据
也可以将任意用户数据与特定连接关联。通过 WebSocketConnection#userData()
方法获得的 io.quarkus.websockets.next.UserData
对象表示与连接关联的可变用户数据。
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.UserData.TypedKey;
@WebSocket(path = "/endpoint/{username}")
class MyEndpoint {
@Inject
CoolService service;
@OnOpen
void open(WebSocketConnection connection) {
connection.userData().put(TypedKey.forBoolean("isCool"), service.isCool(connection.pathParam("username"))); (1)
}
@OnTextMessage
String process(String message) {
if (connection.userData().get(TypedKey.forBoolean("isCool"))) { (2)
return "Cool message processed!";
} else {
return "Message processed!";
}
}
}
1 | CoolService#isCool() 返回与当前连接关联的 Boolean 。 |
2 | TypedKey.forBoolean("isCool") 是用于获取连接创建时存储的数据的键。 |
6.3.3. CDI 事件
当打开新连接时,Quarkus 会异步触发一个带有限定符 @io.quarkus.websockets.next.Open
的 io.quarkus.websockets.next.WebSocketConnection
类型的 CDI 事件。此外,当关闭连接时,会异步触发一个带有限定符 @io.quarkus.websockets.next.Closed
的 WebSocketConnection
类型的 CDI 事件。
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketConnection;
class MyBean {
void connectionOpened(@ObservesAsync @Open WebSocketConnection connection) { (1)
// This observer method is called when a connection is opened...
}
}
1 | 异步观察者方法使用默认的阻塞执行器服务执行。 |
6.4. 安全性
安全性功能由 Quarkus Security 扩展提供。任何 身份提供程序都可用于将初始 HTTP 请求中的身份验证凭据转换为 SecurityIdentity
实例。然后,SecurityIdentity
与 web socket 连接关联。授权选项将在以下部分中演示。
当使用 OpenID Connect 扩展 quarkus-oidc 且令牌过期时,Quarkus 会自动关闭连接。 |
6.4.1. 安全 HTTP 升级
当标准安全注解放置在端点类上或定义了 HTTP 安全策略时,HTTP 升级是安全的。保护 HTTP 升级的优点是处理较少,授权提前执行且仅执行一次。除非您需要在错误时执行操作(请参阅 安全 WebSocket 端点回调方法)或基于负载的安全检查(请参阅 具有权限检查器的安全服务器端点),否则应始终首选 HTTP 升级安全性。
package io.quarkus.websockets.next.test.security;
import io.quarkus.security.Authenticated;
import jakarta.inject.Inject;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
@Authenticated (1)
@WebSocket(path = "/end")
public class Endpoint {
@Inject
SecurityIdentity currentIdentity;
@OnOpen
String open() {
return "ready";
}
@OnTextMessage
String echo(String message) {
return message;
}
}
1 | 初始 HTTP 握手以匿名用户的 401 状态结束。您还可以使用 quarkus.websockets-next.server.security.auth-failure-redirect-url 配置属性在授权失败时重定向握手请求。 |
HTTP 升级只有在端点类上,紧邻 @WebSocket 注解声明了安全注解时才是安全的。将安全注解放置在端点 Bean 上不会保护 Bean 方法,只会保护 HTTP 升级。您必须始终验证您的端点是否按预期受到保护。 |
quarkus.http.auth.permission.http-upgrade.paths=/end
quarkus.http.auth.permission.http-upgrade.policy=authenticated
在身份验证期间使用的安全注解也必须放置在端点类上,因为 SecurityIdentity
是在 websocket 连接打开之前创建的。
package io.quarkus.websockets.next.test.security;
import io.quarkus.oidc.BearerTokenAuthentication;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
@BearerTokenAuthentication (1)
@WebSocket(path = "/end")
public class Endpoint {
@OnTextMessage
String echo(String message) {
return message;
}
}
1 | 要求打开 WebSocket 握手请求必须使用 bearer 令牌身份验证进行身份验证。有关使用注解选择身份验证机制的更多信息,请参阅 Quarkus 中的身份验证机制 指南。 |
quarkus.http.auth.proactive=false (1)
1 | 仅当检测到 io.quarkus.oidc.BearerTokenAuthentication 注解时才开始验证打开的 WebSocket 握手请求。 |
6.4.2. 保护 WebSocket 端点回调方法
WebSocket 端点回调方法可以使用安全注解进行保护,例如 io.quarkus.security.Authenticated
、jakarta.annotation.security.RolesAllowed
以及 支持的安全注解 文档中列出的其他注解。
例如
package io.quarkus.websockets.next.test.security;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import io.quarkus.security.ForbiddenException;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
@WebSocket(path = "/end")
public class Endpoint {
@Inject
SecurityIdentity currentIdentity;
@OnOpen
String open() {
return "ready";
}
@RolesAllowed("admin")
@OnTextMessage
String echo(String message) { (1)
return message;
}
@OnError
String error(ForbiddenException t) { (2)
return "forbidden:" + currentIdentity.getPrincipal().getName();
}
}
1 | 只有当前安全身份具有 admin 角色时,才能调用 echo 回调方法。 |
2 | 如果授权失败,则会调用错误处理程序。 |
6.4.3. 使用权限检查器保护服务器端点
可以使用 权限检查器来保护 WebSocket 端点。我们建议 保护 HTTP 升级,而不是单独的端点方法。例如
package io.quarkus.websockets.next.test.security;
import io.quarkus.security.PermissionsAllowed;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
@PermissionsAllowed("product:premium")
@WebSocket(path = "/product/premium")
public class PremiumProductEndpoint {
@OnTextMessage
PremiumProduct getPremiumProduct(int productId) {
return new PremiumProduct(productId);
}
}
package io.quarkus.websockets.next.test.security;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.security.PermissionChecker;
import io.quarkus.vertx.http.runtime.security.HttpSecurityUtils;
import io.vertx.ext.web.RoutingContext;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PermissionChecker {
@PermissionChecker("product:premium")
public boolean canGetPremiumProduct(SecurityIdentity securityIdentity) { (1)
String username = securityIdentity.getPrincipal().getName();
RoutingContext routingContext = HttpSecurityUtils.getRoutingContextAttribute(securityIdentity);
String initialHttpUpgradePath = routingContext == null ? null : routingContext.normalizedPath();
if (!isUserAllowedToAccessPath(initialHttpUpgradePath, username)) {
return false;
}
return isPremiumCustomer(username);
}
}
1 | 授权 HTTP 升级的权限检查器必须只声明一个方法参数,即 SecurityIdentity 。 |
也可以对每个消息运行安全检查。例如,可以像这样访问消息有效负载
package io.quarkus.websockets.next.test.security;
import io.quarkus.security.PermissionChecker;
import io.quarkus.security.PermissionsAllowed;
import jakarta.inject.Inject;
import io.quarkus.security.ForbiddenException;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
@WebSocket(path = "/product")
public class ProductEndpoint {
private record Product(int id, String name) {}
@Inject
SecurityIdentity currentIdentity;
@PermissionsAllowed("product:get")
@OnTextMessage
Product getProduct(int productId) { (1)
return new Product(productId, "Product " + productId);
}
@OnError
String error(ForbiddenException t) { (2)
return "forbidden:" + currentIdentity.getPrincipal().getName();
}
@PermissionChecker("product:get")
boolean canGetProduct(int productId) {
String username = currentIdentity.getPrincipal().getName();
return currentIdentity.hasRole("admin") || canUserGetProduct(productId, username);
}
}
1 | 只有当前安全身份具有 admin 角色或用户被允许获取产品详细信息时,才能调用 getProduct 回调方法。 |
2 | 如果授权失败,则会调用错误处理程序。 |
有关权限检查器的更多信息,请参阅 @PermissionChecker
的 JavaDoc。
6.4.4. Bearer 令牌身份验证
OIDC Bearer 令牌身份验证 期望 bearer 令牌在初始 HTTP 握手期间在 Authorization
标头中传递。Java WebSocket 客户端(例如 WebSockets Next Client 和 Vert.x WebSocketClient)支持将自定义标头添加到 WebSocket 打开握手中。但是,遵循 WebSockets API 的 JavaScript 客户端不支持添加自定义标头。因此,使用自定义 Authorization
标头传递 bearer 访问令牌对于基于 JavaScript 的 WebSocket 客户端是不可能的。JavaScript WebSocket 客户端只允许配置 HTTP Sec-WebSocket-Protocol 请求标头以协商子协议。如果绝对必要,可以使用 Sec-WebSocket-Protocol
标头作为自定义标头的载体,以解决 WebSockets API 的限制。以下是一个 JavaScript 客户端将 Authorization
标头作为子协议值传播的示例
const token = getBearerToken()
const quarkusHeaderProtocol = encodeURIComponent("quarkus-http-upgrade#Authorization#Bearer " + token) (1)
const socket = new WebSocket("wss://" + location.host + "/chat/" + username, ["bearer-token-carrier", quarkusHeaderProtocol]) (2)
1 | Quarkus 标头子协议的预期格式是 quarkus-http-upgrade#header-name#header-value 。不要忘记将子协议值编码为 URI 组件以避免编码问题。 |
2 | 指示客户端支持的 2 个子协议,您选择的子协议和 Quarkus HTTP 升级子协议。 |
为了使 WebSocket 服务器接受作为子协议传递的 Authorization
,我们必须
-
使用支持的子协议配置我们的 WebSocket 服务器。当 WebSocket 客户端在 HTTP
Sec-WebSocket-Protocol
请求标头中提供支持的子协议列表时,WebSocket 服务器必须同意使用其中一个子协议来提供内容。 -
启用 Quarkus HTTP 升级子协议映射到打开的 WebSocket 握手请求标头。
quarkus.websockets-next.server.supported-subprotocols=bearer-token-carrier
quarkus.websockets-next.server.propagate-subprotocol-headers=true
WebSocket 安全模型是基于源的,并非设计用于使用标头或 Cookie 进行客户端身份验证。例如,Web 浏览器不会对打开的 WebSocket 握手请求强制执行同源策略。当您计划在打开 WebSocket 握手请求期间使用 bearer 访问令牌时,我们强烈建议遵循以下列出的额外安全措施,以最大限度地降低安全风险
|
6.5. 检查和/或拒绝 HTTP 升级
要检查 HTTP 升级,您必须提供一个实现 io.quarkus.websockets.next.HttpUpgradeCheck
接口的 CDI Bean。Quarkus 在每个应该升级到 WebSocket 连接的 HTTP 请求上调用 HttpUpgradeCheck#perform
方法。在此方法中,您可以执行任何业务逻辑和/或拒绝 HTTP 升级。
package io.quarkus.websockets.next.test;
import io.quarkus.websockets.next.HttpUpgradeCheck;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped (1)
public class ExampleHttpUpgradeCheck implements HttpUpgradeCheck {
@Override
public Uni<CheckResult> perform(HttpUpgradeContext ctx) {
if (rejectUpgrade(ctx)) {
return CheckResult.rejectUpgrade(400); (2)
}
return CheckResult.permitUpgrade();
}
private boolean rejectUpgrade(HttpUpgradeContext ctx) {
var headers = ctx.httpRequest().headers();
// implement your business logic in here
}
}
1 | 实现 HttpUpgradeCheck 接口的 CDI Bean 可以是 @ApplicationScoped 、@Singleton 或 @Dependent Bean,但绝不能是 @RequestScoped Bean。 |
2 | 拒绝 HTTP 升级。初始 HTTP 握手以 400 Bad Request 响应状态代码结束。 |
您可以使用 HttpUpgradeCheck#appliesTo 方法选择将 HttpUpgradeCheck 应用于哪些 WebSocket 端点。 |
6.6. TLS
作为此扩展重用主 HTTP 服务器的直接结果,所有相关的服务器配置都适用。有关更多详细信息,请参阅 HTTP 指南。
6.7. Hibernate 多租户
HTTP 升级后 RoutingContext
不可用。但是,可以注入 WebSocketConnection
并访问初始 HTTP 请求的标头。
如果使用了自定义 TenantResolver
并且您想组合 REST/HTTP 和 WebSockets,则代码可能如下所示
@RequestScoped
@PersistenceUnitExtension
public class CustomTenantResolver implements TenantResolver {
@Inject
RoutingContext context;
@Inject
WebSocketConnection connection;
@Override
public String getDefaultTenantId() {
return "public";
}
@Override
public String resolveTenantId() {
String schema;
try {
//Handle WebSocket
schema = connection.handshakeRequest().header("schema");
} catch ( ContextNotActiveException e) {
// Handle REST/HTTP
schema = context.request().getHeader( "schema" );
}
if ( schema == null || schema.equalsIgnoreCase( "public" ) ) {
return "public";
}
return schema;
}
}
有关 Hibernate 多租户的更多信息,请参阅 hibernate 文档。
7. 客户端 API
7.1. 客户端连接器
可以使用连接器配置和打开由客户端端点支持的新客户端连接,该端点用于消费和发送消息。Quarkus 提供了一个 CDI Bean,其 Bean 类型为 io.quarkus.websockets.next.WebSocketConnector<CLIENT>
和默认限定符,可以在其他 Bean 中注入。注入点的实际类型参数用于确定客户端端点。该类型在构建期间进行验证 - 如果它不代表客户端端点,则构建失败。
让我们考虑以下客户端端点
@WebSocketClient(path = "/endpoint/{name}")
public class ClientEndpoint {
@OnTextMessage
void onMessage(@PathParam String name, String message, WebSocketClientConnection connection) {
// ...
}
}
此客户端端点的连接器使用方式如下
@Singleton
public class MyBean {
@ConfigProperty(name = "endpoint.uri")
URI myUri;
@Inject
WebSocketConnector<ClientEndpoint> connector; (1)
void openAndSendMessage() {
WebSocketClientConnection connection = connector
.baseUri(uri) (2)
.pathParam("name", "Roxanne") (3)
.connectAndAwait();
connection.sendTextAndAwait("Hi!"); (4)
}
}
1 | 注入 ClientEndpoint 的连接器。 |
2 | 如果未提供基本 URI ,我们将尝试从配置中获取该值。键由客户端 ID 和 .base-uri 后缀组成。 |
3 | 设置路径参数值。如果客户端端点路径不包含具有给定名称的参数,则抛出 IllegalArgumentException 。 |
4 | 如果需要,使用连接发送消息。 |
如果应用程序尝试注入缺失端点的连接器,则会抛出错误。 |
连接器不是线程安全的,不应并发使用。连接器也不应重复使用。如果需要连续创建多个连接,则需要使用 Instance#get()
以编程方式获取新的连接器实例
import jakarta.enterprise.inject.Instance;
@Singleton
public class MyBean {
@Inject
Instance<WebSocketConnector<MyEndpoint>> connector;
void connect() {
var connection1 = connector.get().baseUri(uri)
.addHeader("Foo", "alpha")
.connectAndAwait();
var connection2 = connector.get().baseUri(uri)
.addHeader("Foo", "bravo")
.connectAndAwait();
}
}
7.1.1. 基本连接器
如果应用程序开发人员不需要客户端端点和连接器的组合,则可以使用基本连接器。基本连接器是一种创建连接和消费/发送消息的简单方法,无需定义客户端端点。
@Singleton
public class MyBean {
@Inject
BasicWebSocketConnector connector; (1)
void openAndConsume() {
WebSocketClientConnection connection = connector
.baseUri(uri) (2)
.path("/ws") (3)
.executionModel(ExecutionModel.NON_BLOCKING) (4)
.onTextMessage((c, m) -> { (5)
// ...
})
.connectAndAwait();
}
}
1 | 注入连接器。 |
2 | 必须始终设置基本 URI。 |
3 | 应该附加到基本 URI 的附加路径。 |
4 | 设置回调处理程序的执行模型。默认情况下,回调可能会阻塞当前线程。但是,在这种情况下,回调在事件循环上执行,并且可能不会阻塞当前线程。 |
5 | 对于从服务器发送的每个文本消息,都会调用 lambda。 |
基本连接器更接近于底层 API,并且保留供高级用户使用。但是,与其他底层 WebSocket 客户端不同,它仍然是一个 CDI Bean,可以在其他 Bean 中注入。它还提供了一种配置回调执行模型的方法,确保与 Quarkus 的其余部分实现最佳集成。
连接器不是线程安全的,不应并发使用。连接器也不应重复使用。如果需要连续创建多个连接,则需要使用 Instance#get()
以编程方式获取新的连接器实例
import jakarta.enterprise.inject.Instance;
@Singleton
public class MyBean {
@Inject
Instance<BasicWebSocketConnector> connector;
void connect() {
var connection1 = connector.get().baseUri(uri)
.addHeader("Foo", "alpha")
.connectAndAwait();
var connection2 = connector.get().baseUri(uri)
.addHeader("Foo", "bravo")
.connectAndAwait();
}
}
7.2. WebSocket 客户端连接
io.quarkus.websockets.next.WebSocketClientConnection
对象表示 WebSocket 连接。Quarkus 提供了一个 @SessionScoped
CDI Bean,该 Bean 实现了此接口,可以在 WebSocketClient
端点中注入,并用于与连接的服务器进行交互。
使用 @OnOpen
、@OnTextMessage
、@OnBinaryMessage
和 @OnClose
注解的方法可以访问注入的 WebSocketClientConnection
对象
@Inject WebSocketClientConnection connection;
请注意,在这些方法之外,WebSocketClientConnection 对象不可用。但是,可以列出所有打开的客户端连接。 |
该连接可用于向客户端发送消息、访问路径参数等。
// Send a message:
connection.sendTextAndAwait("Hello!");
// Broadcast messages:
connection.broadcast().sendTextAndAwait(departure);
// Access path parameters:
String param = connection.pathParam("foo");
WebSocketClientConnection
提供了阻塞和非阻塞方法变体来发送消息
-
sendTextAndAwait(String message)
:向客户端发送文本消息,并等待消息发送。它是阻塞的,只能从执行器线程调用。 -
sendText(String message)
:向客户端发送文本消息。它返回一个Uni
。它是非阻塞的。请确保您或 Quarkus 订阅了返回的Uni
以发送消息。如果您从 Quarkus 调用的方法(如 Quarkus REST、Quarkus WebSocket Next 或 Quarkus Messaging)返回Uni
,它将订阅它并发送消息。例如
@POST
public Uni<Void> send() {
return connection.sendText("Hello!"); // Quarkus automatically subscribes to the returned Uni and sends the message.
}
7.2.1. 列出打开的客户端连接
也可以列出所有打开的连接。Quarkus 提供了一个类型为 io.quarkus.websockets.next.OpenClientConnections
的 CDI Bean,该 Bean 声明了访问连接的便捷方法。
import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OpenClientConnections;
class MyBean {
@Inject
OpenClientConnections connections;
void logAllOpenClinetConnections() {
Log.infof("Open client connections: %s", connections.listAll()); (1)
}
}
1 | OpenClientConnections#listAll() 返回给定时间所有打开连接的不可变快照。 |
还有其他方便的方法。例如,OpenClientConnections#findByClientId(String)
可以轻松找到特定端点的连接。
7.2.2. 用户数据
也可以将任意用户数据与特定连接关联。通过 WebSocketClientConnection#userData()
方法获取的 io.quarkus.websockets.next.UserData
对象表示与连接关联的可变用户数据。
import io.quarkus.websockets.next.WebSocketClientConnection;
import io.quarkus.websockets.next.UserData.TypedKey;
@WebSocketClient(path = "/endpoint/{username}")
class MyEndpoint {
@Inject
CoolService service;
@OnOpen
void open(WebSocketClientConnection connection) {
connection.userData().put(TypedKey.forBoolean("isCool"), service.isCool(connection.pathParam("username"))); (1)
}
@OnTextMessage
String process(String message) {
if (connection.userData().get(TypedKey.forBoolean("isCool"))) { (2)
return "Cool message processed!";
} else {
return "Message processed!";
}
}
}
1 | CoolService#isCool() 返回与当前连接关联的 Boolean 。 |
2 | TypedKey.forBoolean("isCool") 是用于获取连接创建时存储的数据的键。 |
7.2.2.1. 在连接器中指定
在某些情况下,您可能希望将用户数据与 连接器创建的连接关联。在这种情况下,您可以在获取连接之前在连接器实例上设置值。如果您需要在连接打开时执行某些操作,并且无法以其他方式推断必要的上下文,则此功能特别有用。
@Singleton
public class MyBean {
@Inject
MyService service;
@Inject
Instance<WebSocketConnector<MyEndpoint>> connectorInstance;
public void openAndSendMessage(String internalId, String message) {
var externalId = service.getExternalId(internalId);
var connection = connectorInstance.get()
.pathParam("externalId", externalId)
.userData(TypedKey.forString("internalId"), internalId)
.connectAndAwait();
connection.sendTextAndAwait(message);
}
}
@WebSocketClient(path = "/endpoint/{externalId}")
class MyEndpoint {
@Inject
MyService service;
@OnOpen
void open(WebSocketClientConnection connection) {
var internalId = connection.userData().get(TypedKey.forString("internalId"));
service.doSomething(internalId);
}
}
7.2.3. CDI 事件
当新连接打开时,Quarkus 会异步触发类型为 io.quarkus.websockets.next.WebSocketClientConnection
且限定符为 @io.quarkus.websockets.next.Open
的 CDI 事件。此外,当连接关闭时,会异步触发类型为 WebSocketClientConnection
且限定符为 @io.quarkus.websockets.next.Closed
的 CDI 事件。
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocketClientConnection;
class MyBean {
void connectionOpened(@ObservesAsync @Open WebSocketClientConnection connection) { (1)
// This observer method is called when a connection is opened...
}
}
1 | 异步观察者方法使用默认的阻塞执行器服务执行。 |
7.3. 配置 SSL/TLS
要建立 TLS 连接,您需要使用 TLS 注册表配置命名配置。这通常通过配置完成
quarkus.tls.my-ws-client.trust-store.p12.path=server-truststore.p12
quarkus.tls.my-ws-client.trust-store.p12.password=secret
建立命名 TLS 配置后,您可以配置客户端以使用它
quarkus.websockets-next.client.tls-configuration-name=my-ws-client
或者,您可以使用 连接器提供配置名称
@Singleton
public class MyBean {
@Inject
WebSocketConnector<MyEndpoint> connector;
public void connect() {
connector
.tlsConfigurationName("my-ws-client")
.connectAndAwait();
}
}
提供给连接器的名称将覆盖任何静态配置的名称。这对于建立默认配置非常有用,该配置可以在运行时根据需要进行覆盖。
使用 WebSocket 客户端时,需要使用命名配置以避免与其他 TLS 配置冲突。客户端不会使用默认 TLS 配置。 |
配置命名 TLS 配置时,默认情况下会启用 TLS。
8. 流量日志记录
Quarkus 可以记录发送和接收的消息以进行调试。要为服务器启用流量日志记录,请将 quarkus.websockets-next.server.traffic-logging.enabled
配置属性设置为 true
。要为客户端启用流量日志记录,请将 quarkus.websockets-next.client.traffic-logging.enabled
配置属性设置为 true
。文本消息的有效负载也会被记录。但是,记录的字符数是有限制的。默认限制为 100,但您可以使用 quarkus.websockets-next.server.traffic-logging.text-payload-limit
和 quarkus.websockets-next.client.traffic-logging.text-payload-limit
配置属性分别更改此限制。
只有为记录器 io.quarkus.websockets.next.traffic 启用了 DEBUG 级别时,才会记录消息。 |
quarkus.websockets-next.server.traffic-logging.enabled=true (1)
quarkus.websockets-next.server.traffic-logging.text-payload-limit=50 (2)
quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG (3)
1 | 启用流量日志记录。 |
2 | 设置将记录的文本消息有效负载的字符数。 |
3 | 为记录器 io.quarkus.websockets.next.traffic 启用 DEBUG 级别。 |
9. 何时订阅 Uni
或 Multi
Uni
和 Multi
是惰性类型,这意味着它们在被订阅之前不会开始处理。
当您从参数或从您调用的方法中获取 Uni
或 Multi
时,是否应该订阅它取决于上下文
-
如果您在 Quarkus 调用的方法中返回
Uni
或Multi
(例如使用 Quarkus REST、Quarkus WebSocket Next 或 Quarkus Messaging),Quarkus 会订阅它并处理Multi
发出的项目或Uni
发出的项目
@Incoming("...")
@Outgoing("...")
public Multi<String> process(Multi<String> input) {
// No need to subscribe to the input Multi, the `process` method is called by Quarkus (Messaging).
return input.map(String::toUpperCase);
}
当从使用 @OnOpen
、@OnTextMessage
、@OnBinaryMessage
或 @OnClose
注解的方法返回 Uni
或 Multi
时,Quarkus 会自动订阅它。
-
如果您不在 Quarkus 调用的方法中返回
Uni
或Multi
,则应该订阅它
@Incoming("...")
@Outgoing("...")
public void process(Multi<String> input) {
input.map(String::toUpperCase)
.subscribe().with(s -> log(s));
}
10. 遥测
当 OpenTelemetry 扩展存在时,默认情况下会收集打开和关闭的 WebSocket 连接的跟踪。如果您不需要 WebSocket 跟踪,您可以像下面的示例中那样禁用跟踪收集
quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false
当 Micrometer 扩展存在时,Quarkus 可以收集消息、错误和传输的字节的指标。如果您需要 WebSocket 指标,您可以像下面的示例中那样启用指标
quarkus.websockets-next.server.metrics.enabled=true
quarkus.websockets-next.client.metrics.enabled=true
目前不支持 BasicWebSocketConnector 的遥测。 |
11. 配置参考
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
---|---|---|
指定在端点回调调用期间激活 CDI 请求上下文的策略。默认情况下,仅在需要时才激活请求上下文,即如果端点的依赖树中存在具有给定作用域的 Bean,或者使用安全注解(例如 环境变量: 显示更多 |
|
|
指定在端点回调调用期间激活 CDI 会话上下文的策略。默认情况下,仅在需要时才激活会话上下文,即如果端点的依赖树中存在具有给定作用域的 Bean。 环境变量: 显示更多 |
|
|
如果启用,则 WebSocket 打开握手标头将使用与格式 'quarkus-http-upgrade#header-name#header-value' 匹配的 'Sec-WebSocket-Protocol' 子协议进行增强。如果 WebSocket 客户端接口不支持将标头设置为 WebSocket 打开握手,这是一种设置验证用户所需的授权标头的方式。'quarkus-http-upgrade' 子协议将被删除,服务器将从子协议中选择一个受支持的子协议(不要忘记配置 'quarkus.websockets-next.server.supported-subprotocols' 属性)。重要提示:我们强烈建议仅在通过 TLS 加密 HTTP 连接、启用 CORS 来源检查并且已设置自定义 WebSocket 票证系统时才启用此功能。请参阅 Quarkus WebSockets Next 参考以获取更多信息。 环境变量: 显示更多 |
布尔值 |
|
请参阅 WebSocket 协议 环境变量: 显示更多 |
字符串列表 |
|
默认情况下支持 WebSocket 的压缩扩展。 另请参阅 RFC 7692 环境变量: 显示更多 |
布尔值 |
|
压缩级别必须是 0 到 9 之间的值。默认值为 环境变量: 显示更多 |
整数 |
|
消息的最大大小(以字节为单位)。默认值为 环境变量: 显示更多 |
整数 |
|
帧的最大大小(以字节为单位)。默认值为 环境变量: 显示更多 |
整数 |
|
此间隔之后,如果设置了,服务器会自动向连接的客户端发送 ping 消息。 默认情况下不会自动发送 Ping 消息。 环境变量: 显示更多 |
||
当发生错误但没有错误处理程序可以处理失败时使用的策略。 默认情况下,发生未处理的失败时,会记录错误消息并关闭连接。 环境变量: 显示更多 |
|
|
如果由于授权失败而拒绝 HTTP 升级,Quarkus 会将 HTTP 握手请求重定向到此 URL。当您使用标准安全注解保护端点时,此配置属性生效。例如,如果端点类使用 环境变量: 显示更多 |
字符串 |
|
为 Dev UI 连接保留的消息限制。如果小于零,则不会存储消息并将其发送到 Dev UI 视图。 环境变量: 显示更多 |
long |
|
如果设置为 true,则在为记录器 环境变量: 显示更多 |
布尔值 |
|
如果启用了流量日志记录,则将记录文本消息的字符数。永远不会记录二进制消息的有效负载。 环境变量: 显示更多 |
整数 |
|
如果启用了 WebSocket 跟踪的收集。仅当 OpenTelemetry 扩展存在时才适用。 环境变量: 显示更多 |
布尔值 |
|
如果启用了 WebSocket 指标的收集。仅当 Micrometer 扩展存在时才适用。 环境变量: 显示更多 |
布尔值 |
|
默认情况下支持 WebSocket 的压缩扩展。 另请参阅 RFC 7692 环境变量: 显示更多 |
布尔值 |
|
压缩级别必须是 0 到 9 之间的值。默认值为 环境变量: 显示更多 |
整数 |
|
消息的最大大小(以字节为单位)。默认值为 环境变量: 显示更多 |
整数 |
|
帧的最大大小(以字节为单位)。默认值为 环境变量: 显示更多 |
整数 |
|
此间隔之后,如果设置了,客户端会自动向连接的服务器发送 ping 消息。 默认情况下不会自动发送 Ping 消息。 环境变量: 显示更多 |
||
如果设置,则如果在给定的超时时间内未接收或发送任何数据,则将关闭连接。 环境变量: 显示更多 |
||
客户端在发送关闭帧后等待关闭 TCP 连接的时间。任何值都将 环境变量: 显示更多 |
||
当发生错误但没有错误处理程序可以处理失败时使用的策略。 默认情况下,发生未处理的失败时,会记录错误消息。 请注意,客户端不应随意关闭 WebSocket 连接。另请参阅 RFC-6455 第 7.3 节。 环境变量: 显示更多 |
|
|
要使用的 TLS 配置的名称。 如果配置了名称,它将使用来自 默认情况下,不使用默认 TLS 配置。 环境变量: 显示更多 |
字符串 |
|
如果设置为 true,则在为记录器 环境变量: 显示更多 |
布尔值 |
|
如果启用了流量日志记录,则将记录文本消息的字符数。永远不会记录二进制消息的有效负载。 环境变量: 显示更多 |
整数 |
|
如果启用了 WebSocket 跟踪的收集。仅当 OpenTelemetry 扩展存在时才适用。 环境变量: 显示更多 |
布尔值 |
|
如果启用了 WebSocket 指标的收集。仅当 Micrometer 扩展存在时才适用。 环境变量: 显示更多 |
布尔值 |
|
关于 Duration 格式
要写入持续时间值,请使用标准的 您还可以使用简化的格式,以数字开头
在其他情况下,简化格式将被转换为
|