Funqy Knative 事件绑定
Quarkus Funqy Knative 事件构建于 Funqy HTTP 扩展之上,允许您在 Funqy 函数中路由和处理 Knative 事件。
本指南将引导您完成快速入门代码,向您展示如何使用 Knative 事件部署和调用 Funqy 函数。
先决条件
要完成本指南,您需要
-
大约 1 小时
-
一个 IDE
-
已安装 JDK 17+ 并正确配置了
JAVA_HOME
-
Apache Maven 3.9.9
-
如果您想使用它,可以选择 Quarkus CLI
-
阅读关于 Funqy 基础。 这是一篇简短的阅读!
-
已经完成了 Knative 教程,特别是 Broker 和 Trigger
设置 Knative
在 Minikube 环境中本地设置 Knative 超出了本指南的范围。建议遵循 Red Hat 整理的 此 Knative 教程。它将引导您如何在本地环境中在 Minikube 或 OpenShift 上设置 Knative。
特别是,您应该运行 Broker 和 Trigger 教程,因为本指南要求您可以调用 Broker 来触发快速入门代码。 |
阅读关于 Cloud Events
Cloud Event 规范是一个很好的阅读材料,可以让您更深入地了解 Knative 事件。
快速入门
克隆 Git 仓库:git clone https://github.com/quarkusio/quarkus-quickstarts.git
,或者下载一个 存档。
解决方案位于 funqy-knative-events-quickstart
目录中。
快速入门流程
快速入门的工作原理是使用 curl
手动发送包含 Cloud Event 的 HTTP 请求到 Knative Broker。 Knative Broker 接收请求并触发快速入门构建的 Funqy 容器的启动。该事件触发一系列 Funqy 函数的调用。一个函数的输出触发另一个 Funqy 函数的调用。
Funqy 和 Cloud Events
当在 Knative 事件环境中运行时,Funqy 函数由特定的 Cloud Event 类型触发。 您可以在单个应用程序/部署中拥有多个 Funqy 函数,但它们必须由特定的 Cloud Event 类型触发。 此规则的例外情况是,如果应用程序中只有一个 Funqy 函数。 在这种情况下,无论 Cloud Event 类型如何,该事件都会被推送到该函数。
目前,Funqy 只能消耗基于 JSON 的数据。 它支持二进制和结构化执行模式,但 Cloud Event 消息的数据组件必须是 JSON。 此 JSON 还必须可序列化为函数的 Java 参数和返回类型,以及从函数的 Java 参数和返回类型反序列化。
代码
让我们开始查看我们的快速入门代码,以便您可以了解 Knative 事件如何映射到 Funqy。 打开 SimpleFunctionChain.java
我们将看到的第一个函数是 defaultChain
。
import io.quarkus.funqy.Funq;
public class SimpleFunctionChain {
@Funq
public String defaultChain(String input) {
log.info("*** defaultChain ***");
return input + "::" + "defaultChain";
}
与往常一样,Funqy 函数具有默认的 Cloud Event 映射。 默认情况下,Cloud Event 类型必须与函数名称匹配,函数才能触发。 如果函数返回输出,则响应将转换为 Cloud Event 并返回给 Broker 以路由到其他触发器。 此响应的默认 Cloud Event 类型是函数名称 + .output
。 默认 Cloud Event 源是函数名称。
因此,对于 defaultChain
函数,触发该函数的 Cloud Event 类型是 defaultChain
。 它生成一个响应,该响应触发一个新的 Cloud Event,其类型为 defaultChain.output
,事件源为 defaultChain
。
虽然默认映射很简单,但它可能并不总是可行的。 您可以通过配置更改此默认映射。 让我们看看下一个函数
import io.quarkus.funqy.Funq;
public class SimpleFunctionChain {
@Funq
public String configChain(String input) {
log.info("*** configChain ***");
return input + "::" + "configChain";
}
configChain
函数的 Cloud Event 映射已通过 application.properties 中的配置进行更改。
quarkus.funqy.knative-events.mapping.configChain.trigger=defaultChain.output
quarkus.funqy.knative-events.mapping.configChain.response-type=annotated
quarkus.funqy.knative-events.mapping.configChain.response-source=configChain
在这种情况下,配置将传入的 Cloud Event 类型 defaultChain.output
映射到 configChain
函数。 configChain
函数将其响应映射到 annotated
Cloud Event 类型和 Cloud Event 源 configChain
。
-
quarkus.funqy.knative-events.mapping.{function name}.trigger
设置触发特定函数的 Cloud Event 类型。 可以使用特殊值*
作为捕获所有值的特殊值。 在这种情况下,该函数将用于所有事件类型。 -
quarkus.funqy.knative-events.mapping.{function name}.response-type
设置响应的 Cloud Event 类型 -
quarkus.funqy.knative-events.mapping.{function name}.resource-source
设置响应的 Cloud Event 源
Funqy Knative Events 扩展还具有注释来执行此 Cloud Event 到您函数的映射。 看一下 annotatedChain
方法
import io.quarkus.funqy.Funq;
import io.quarkus.funqy.knative.events.CloudEventMapping;
public class SimpleFunctionChain {
@Funq
@CloudEventMapping(trigger = "annotated", responseSource = "annotated", responseType = "lastChainLink")
public String annotatedChain(String input) {
log.info("*** annotatedChain ***");
return input + "::" + "annotatedChain";
}
如果在您的函数上使用 @CloudEventMapping
注释,您可以映射 Cloud Event 类型触发器和 Cloud Event 响应。 在此示例中,annotatedChain
函数将由 annotated
Cloud Event 类型触发,响应将映射到 lastChainLink
类型和 annotated
Cloud Event 源。
因此,如果您查看 SimpleFunctionChain
中定义的所有函数,您会注意到一个函数触发下一个函数。 触发的最后一个函数是 lastChainLink
。
import io.quarkus.funqy.Context;
import io.quarkus.funqy.Funq;
public class SimpleFunctionChain {
@Funq
public void lastChainLink(String input, @Context CloudEvent event) {
log.info("*** lastChainLink ***");
log.info(input + "::" + "lastChainLink");
}
}
关于此函数,有两件事需要注意。 首先,它没有输出。 您的函数不需要返回输出。 其次,该函数还有一个额外的 event
参数。
如果您想了解有关传入 Cloud Event 的其他信息,您可以使用 Funqy @Context
注释注入 io.quarkus.funqy.knative.events.CloudEvent
接口。 CloudEvent
接口公开有关触发事件的信息。
public interface CloudEvent {
String id();
String specVersion();
String source();
String subject();
OffsetDateTime time();
}
Maven
如果您查看 POM,您会看到它是一个典型的 Quarkus POM,它引入了一个 Funqy 依赖项
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-funqy-knative-events</artifactId>
</dependency>
开发模式和测试
Funqy Knative Events 支持使用 RestAssured 的开发模式和单元测试。 您可以使用与 Funqy HTTP 相同的调用模型来调用 Funqy Knative Events 函数,使用普通的 HTTP 请求、Cloud Event 二进制模式或结构化模式。 所有调用模式都同时支持。
因此,如果您打开 FunqyTest.java 中的单元测试代码,您会看到它只是使用 RestAssured 进行 HTTP 调用来测试函数。
Funqy 也适用于 Quarkus 开发模式!
构建项目
首先构建 Java 工件
quarkus build
./mvnw install
接下来,Knative 需要一个 Docker 镜像,因此您需要接下来构建它
docker build -f src/main/docker/Dockerfile.jvm -t yourAccountName/funqy-knative-events-quickstart .
运行 docker build
时,请确保将 yourAccountName
替换为您的 docker 或 quay 帐户名。 Dockerfile 是一个标准的 Quarkus dockerfile。 没有特殊的 Knative 魔术。
将您的镜像推送到 docker hub 或 quay
docker push yourAccountName/funqy-knative-events-quickstart
再次,运行 docker push
时,请确保将 yourAccountName
替换为您的 docker 或 quay 帐户名。
部署到 Kubernetes/OpenShift
第一步是在我们的命名空间中设置 broker。 以下是 Knative cli 中的示例命令。
kn broker create default \
--namespace knativetutorial
我们创建的 broker 称为 default
,此 broker 将接收云事件。 Broker 也在函数 YAML 文件中被引用。
第二步是定义一个 Kubernetes/OpenShift 服务来指向您在构建期间创建和推送的 Docker 镜像。 看一下 funqy-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: funqy-knative-events-quickstart
spec:
template:
metadata:
name: funqy-knative-events-quickstart-v1
annotations:
autoscaling.knative.dev/target: "1"
spec:
containers:
- image: docker.io/yourAccountName/funqy-knative-events-quickstart
这是一个标准的 Kubernetes 服务定义 YAML 文件。
确保更改镜像 URL 以指向您之前构建和推送的镜像! |
对于我们的快速入门,一个 Kubernetes 服务将包含所有函数。 没有理由不能将此快速入门分解为多个不同的项目,并为每个函数部署一个服务。 为了简单起见,并表明您不必为每个函数部署一个服务,快速入门将所有内容合并到一个项目、镜像和服务中。
部署服务
kubectl apply -n knativetutorial -f src/main/k8s/funqy-service.yaml
下一步是为每个事件类型部署 Knative 事件触发器。 如代码部分所述,每个 Funqy 函数都映射到特定的 Cloud Event 类型。 您必须创建 Knative 事件触发器,该触发器映射 Cloud Event 并将其路由到特定的 Kubernetes 服务。 我们有 4 个不同的触发器。
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: defaultchain
spec:
broker: default
filter:
attributes:
type: defaultChain
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: funqy-knative-events-quickstart
spec:filter:attributes:type
将 Cloud Event 类型映射到 spec:subscriber:ref
中定义的 Kubernetes 服务。 当 Cloud Event 被推送到 Broker 时,它将触发映射到该事件的服务的启动。
每个我们的 4 个 Funqy 函数都有一个触发器 YAML 文件。 全部部署它们
kubectl apply -n knativetutorial -f src/main/k8s/defaultChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/configChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/annotatedChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/lastChainLink-trigger.yaml
运行演示
您需要两个不同的终端窗口。 一个用于向 Broker 发出 curl 请求,另一个用于监视 pod 日志文件,以便您可以看到消息流经 Funqy 函数事件链。
确保您已安装 stern
工具。 有关信息,请参见 Knative 教程设置。 运行 stern 以查找我们的 Funqy 部署输出的日志
stern funq user-container
打开一个单独的终端。 您首先需要了解 broker 的 URL。 执行此命令以找到它。
kubectl get broker default -o jsonpath='{.status.address.url}'
这将为您提供类似于 e.g.: http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default
的 URL。 记住这个 URL。
接下来我们需要做的是 ssh 进入我们的 Kubernetes 集群,以便我们可以向我们的 broker 发送 curl 请求。 以下命令将创建一个简单的 OS pod,以便我们可以 curl 进入我们的函数。
kubectl -n knativetutorial apply -f src/main/k8s/curler.yaml
您可能需要等待几秒钟,直到 curler pod 启动。 运行以下命令以获取对 curler pod 的 bash 访问权限
kubectl -n knativetutorial exec -it curler -- /bin/bash
您现在将在 Kubernetes 集群中的 shell 中。 在 shell 中,执行此 curl 命令,broker 地址是一个示例,可能因您的项目或 broker 名称而异。
curl -v "http://default-broker.knativetutorial.svc.cluster.local" \
-X POST \
-H "Ce-Id: 1234" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: defaultChain" \
-H "Ce-Source: curl" \
-H "Content-Type: application/json" \
-d '"Start"'
这将向 broker 发布一个 Knative 事件,该事件将触发 defaultChain
函数。 如前所述,defaultChain
的输出触发一个事件,该事件被发布到 configChain
,后者触发一个事件,该事件被发布到 annotatedChain
,最后到 lastChainLink
函数。 您可以在 stern
窗口中看到此流程。 应该输出类似这样的内容。
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,256 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** defaultChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,365 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** configChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,394 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** annotatedChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,466 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** lastChainLink ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,467 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-2) Start::defaultChain::configChain::annotatedChain::lastChainLink