连接到 Elasticsearch 集群
Elasticsearch 是一个著名的全文搜索引擎和 NoSQL 数据存储。
在本指南中,我们将了解如何让您的 REST 服务与 Elasticsearch 集群交互。
Quarkus 提供了两种访问 Elasticsearch 的方式
-
较低级别的 REST 客户端
-
Elasticsearch Java 客户端
曾经存在用于“高级 REST 客户端”的第三个 Quarkus 扩展,但由于该客户端已被 Elastic 弃用并且存在一些许可问题而被删除。 |
先决条件
要完成本指南,您需要
-
大约 15 分钟
-
一个 IDE
-
已安装 JDK 17+ 并正确配置了
JAVA_HOME
-
Apache Maven 3.9.9
-
如果您想使用它,可以选择 Quarkus CLI
-
如果您想构建本机可执行文件(或者如果您使用本机容器构建,则为 Docker),可以选择安装 Mandrel 或 GraalVM 并进行适当的配置
-
已安装 Elasticsearch 或已安装 Docker
创建 Maven 项目
首先,我们需要一个新项目。使用以下命令创建一个新项目
对于 Windows 用户
-
如果使用 cmd,(不要使用反斜杠
\
并将所有内容放在同一行上) -
如果使用 Powershell,请将
-D
参数包含在双引号中,例如"-DprojectArtifactId=elasticsearch-quickstart"
此命令生成一个 Maven 结构,导入 Quarkus REST(以前的 RESTEasy Reactive)、Jackson 和 Elasticsearch 低级别 REST 客户端扩展。
Elasticsearch 低级别 REST 客户端随 quarkus-elasticsearch-rest-client
扩展一起提供,该扩展已添加到您的构建文件中。
如果要改用 Elasticsearch Java 客户端,请将 quarkus-elasticsearch-rest-client
扩展替换为 quarkus-elasticsearch-java-client
扩展。
我们在此处使用 |
要将扩展添加到现有项目,请按照以下说明操作。
对于 Elasticsearch 低级别 REST 客户端,请将以下依赖项添加到您的构建文件中
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-elasticsearch-rest-client</artifactId>
</dependency>
implementation("io.quarkus:quarkus-elasticsearch-rest-client")
对于 Elasticsearch Java 客户端,请将以下依赖项添加到您的构建文件中
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-elasticsearch-java-client</artifactId>
</dependency>
implementation("io.quarkus:quarkus-elasticsearch-java-client")
创建您的第一个 JSON REST 服务
在此示例中,我们将创建一个应用程序来管理水果列表。
首先,让我们创建 Fruit
bean 如下
package org.acme.elasticsearch;
public class Fruit {
public String id;
public String name;
public String color;
}
没什么特别的。需要注意的重要一点是,JSON 序列化层需要一个默认构造函数。
现在创建一个 org.acme.elasticsearch.FruitService
,它将成为我们应用程序的业务层,并将水果存储/加载到 Elasticsearch 实例中。 在这里我们使用低级别 REST 客户端,如果要改用 Java API 客户端,请按照 使用 Elasticsearch Java 客户端 段落中的说明进行操作。
package org.acme.elasticsearch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
@ApplicationScoped
public class FruitService {
@Inject
RestClient restClient; (1)
public void index(Fruit fruit) throws IOException {
Request request = new Request(
"PUT",
"/fruits/_doc/" + fruit.id); (2)
request.setJsonEntity(JsonObject.mapFrom(fruit).toString()); (3)
restClient.performRequest(request); (4)
}
public void index(List<Fruit> list) throws IOException {
var entityList = new ArrayList<JsonObject>();
for (var fruit : list) {
entityList.add(new JsonObject().put("index", new JsonObject()(5)
.put("_index", "fruits").put("_id", fruit.id)));
entityList.add(JsonObject.mapFrom(fruit));
}
Request request = new Request(
"POST", "fruits/_bulk?pretty");
request.setEntity(new StringEntity(
toNdJsonString(entityList),(6)
ContentType.create("application/x-ndjson")));(7)
restClient.performRequest(request);
}
public void delete(List<String> identityList) throws IOException {
var entityList = new ArrayList<JsonObject>();
for (var id : identityList) {
entityList.add(new JsonObject().put("delete",
new JsonObject().put("_index", "fruits").put("_id", id)));(8)
}
Request request = new Request(
"POST", "fruits/_bulk?pretty");
request.setEntity(new StringEntity(
toNdJsonString(entityList),
ContentType.create("application/x-ndjson")));
restClient.performRequest(request);
}
public Fruit get(String id) throws IOException {
Request request = new Request(
"GET",
"/fruits/_doc/" + id);
Response response = restClient.performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
JsonObject json = new JsonObject(responseBody); (9)
return json.getJsonObject("_source").mapTo(Fruit.class);
}
public List<Fruit> searchByColor(String color) throws IOException {
return search("color", color);
}
public List<Fruit> searchByName(String name) throws IOException {
return search("name", name);
}
private List<Fruit> search(String term, String match) throws IOException {
Request request = new Request(
"GET",
"/fruits/_search");
//construct a JSON query like {"query": {"match": {"<term>": "<match"}}
JsonObject termJson = new JsonObject().put(term, match);
JsonObject matchJson = new JsonObject().put("match", termJson);
JsonObject queryJson = new JsonObject().put("query", matchJson);
request.setJsonEntity(queryJson.encode());
Response response = restClient.performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
JsonObject json = new JsonObject(responseBody);
JsonArray hits = json.getJsonObject("hits").getJsonArray("hits");
List<Fruit> results = new ArrayList<>(hits.size());
for (int i = 0; i < hits.size(); i++) {
JsonObject hit = hits.getJsonObject(i);
Fruit fruit = hit.getJsonObject("_source").mapTo(Fruit.class);
results.add(fruit);
}
return results;
}
private static String toNdJsonString(List<JsonObject> objects) {
return objects.stream()
.map(JsonObject::encode)
.collect(Collectors.joining("\n", "", "\n"));
}
}
1 | 我们将 Elasticsearch 低级别 RestClient 注入到我们的服务中。 |
2 | 我们创建一个 Elasticsearch 请求。 |
3 | 我们使用 Vert.x JsonObject 在将对象发送到 Elasticsearch 之前对其进行序列化,您可以使用任何您想要的方式将对象序列化为 JSON。 |
4 | 我们将请求(此处的索引请求)发送到 Elasticsearch。 |
5 | 当我们 index 对象集合时,我们应该使用 index 、create 或 update 操作。 |
6 | 我们使用 toNdJsonString(entityList) 调用来生成如下输出
|
7 | 传递搜索后端批量请求所需的 content type。 |
8 | 批量 API 的删除操作 JSON 已经包含所有必需的信息; 因此,在此操作之后,批量 API 请求正文中没有请求主体。
|
9 | 为了从 Elasticsearch 反序列化对象,我们再次使用 Vert.x JsonObject。 |
现在,创建 org.acme.elasticsearch.FruitResource
类,如下所示
package org.acme.elasticsearch;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.UUID;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.BadRequestException;
@Path("/fruits")
public class FruitResource {
@Inject
FruitService fruitService;
@POST
public Response index(Fruit fruit) throws IOException {
if (fruit.id == null) {
fruit.id = UUID.randomUUID().toString();
}
fruitService.index(fruit);
return Response.created(URI.create("/fruits/" + fruit.id)).build();
}
@Path("bulk")
@DELETE
public Response delete(List<String> identityList) throws IOException {
fruitService.delete(identityList);
return Response.ok().build();
}
@Path("bulk")
@POST
public Response index(List<Fruit> list) throws IOException {
fruitService.index(list);
return Response.ok().build();
}
@GET
@Path("/{id}")
public Fruit get(String id) throws IOException {
return fruitService.get(id);
}
@GET
@Path("/search")
public List<Fruit> search(@RestQuery String name, @RestQuery String color) throws IOException {
if (name != null) {
return fruitService.searchByName(name);
} else if (color != null) {
return fruitService.searchByColor(color);
} else {
throw new BadRequestException("Should provide name or color query parameter");
}
}
}
实现非常简单,您只需使用 Jakarta REST 注释定义您的端点,并使用 FruitService
列出/添加新水果。
配置 Elasticsearch
要配置的主要属性是连接到 Elasticsearch 集群的 URL。
对于典型的集群化 Elasticsearch 服务,一个示例配置如下所示
# configure the Elasticsearch client for a cluster of two nodes
quarkus.elasticsearch.hosts = elasticsearch1:9200,elasticsearch2:9200
在我们的例子中,我们使用的是在 localhost 上运行的单个实例
# configure the Elasticsearch client for a single instance on localhost
quarkus.elasticsearch.hosts = localhost:9200
如果您需要更高级的配置,您可以在本指南末尾找到支持的配置属性的完整列表。
开发服务
Quarkus 支持一项名为 Dev Services 的功能,允许您启动各种容器而无需任何配置。 对于 Elasticsearch,此支持扩展到默认的 Elasticsearch 连接。 实际上,这意味着如果您没有配置 quarkus.elasticsearch.hosts
,Quarkus 将在运行测试或开发模式时自动启动一个 Elasticsearch 容器,并自动配置连接。
运行应用程序的生产版本时,需要像往常一样配置 Elasticsearch 连接,因此如果您想在 application.properties
中包含生产数据库配置并继续使用 Dev Services,我们建议您使用 %prod.
配置文件来定义您的 Elasticsearch 设置。
有关更多信息,您可以阅读 Elasticsearch 开发服务指南。
以编程方式配置 Elasticsearch
除了参数化配置之外,您还可以通过实现 RestClientBuilder.HttpClientConfigCallback
并使用 ElasticsearchClientConfig
注释它,以编程方式将其他配置应用于客户端。 您可以提供多个实现,并且每个实现提供的配置将以随机排序的级联方式应用。
例如,当访问在 HTTP 层上设置为 TLS 的 Elasticsearch 集群时,客户端需要信任 Elasticsearch 正在使用的证书。 以下示例演示了如何设置客户端以信任签署 Elasticsearch 正在使用的证书的 CA,前提是该 CA 证书在 PKCS#12 密钥库中可用。
import io.quarkus.elasticsearch.restclient.lowlevel.ElasticsearchClientConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClientBuilder;
import jakarta.enterprise.context.Dependent;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
@ElasticsearchClientConfig
public class SSLContextConfigurator implements RestClientBuilder.HttpClientConfigCallback {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
try {
String keyStorePass = "password-for-keystore";
Path trustStorePath = Paths.get("/path/to/truststore.p12");
KeyStore truststore = KeyStore.getInstance("pkcs12");
try (InputStream is = Files.newInputStream(trustStorePath)) {
truststore.load(is, keyStorePass.toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom()
.loadTrustMaterial(truststore, null);
SSLContext sslContext = sslBuilder.build();
httpClientBuilder.setSSLContext(sslContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
return httpClientBuilder;
}
}
有关此特定示例的更多详细信息,请参见 Elasticsearch 文档。
默认情况下,带有 |
运行 Elasticsearch 集群
由于默认情况下,Elasticsearch 客户端配置为访问端口 9200 上的本地 Elasticsearch 集群(默认的 Elasticsearch 端口),因此如果您在此端口上有一个本地运行的实例,则在能够对其进行测试之前无需执行任何其他操作!
如果要使用 Docker 运行 Elasticsearch 实例,可以使用以下命令启动一个
docker run --name elasticsearch -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms512m -Xmx512m"\
-e "cluster.routing.allocation.disk.threshold_enabled=false" -e "xpack.security.enabled=false"\
--rm -p 9200:9200 docker.io/elastic/elasticsearch:9.0.2
运行应用程序
让我们在开发模式下启动我们的应用程序
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
您可以通过以下 curl 命令将新水果添加到列表中
curl localhost:8080/fruits -d '{"name": "bananas", "color": "yellow"}' -H "Content-Type: application/json"
并通过以下 curl 命令按名称或颜色搜索水果
curl localhost:8080/fruits/search?color=yellow
使用 Elasticsearch Java 客户端
这是一个 FruitService
的版本,它使用 Elasticsearch Java 客户端而不是低级别客户端
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
@ApplicationScoped
public class FruitService {
@Inject
ElasticsearchClient client; (1)
public void index(Fruit fruit) throws IOException {
IndexRequest<Fruit> request = IndexRequest.of( (2)
b -> b.index("fruits")
.id(fruit.id)
.document(fruit)); (3)
client.index(request); (4)
}
public void index(List<Fruit> list) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (var fruit : list) {
br.operations(op -> op
.index(idx -> idx.index("fruits").id(fruit.id).document(fruit)));
}
BulkResponse result = client.bulk(br.build());
if (result.errors()) {
throw new RuntimeException("The indexing operation encountered errors.");
}
}
public void delete(List<String> list) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (var id : list) {
br.operations(op -> op.delete(idx -> idx.index("fruits").id(id)));
}
BulkResponse result = client.bulk(br.build());
if (result.errors()) {
throw new RuntimeException("The indexing operation encountered errors.");
}
}
public Fruit get(String id) throws IOException {
GetRequest getRequest = GetRequest.of(
b -> b.index("fruits")
.id(id));
GetResponse<Fruit> getResponse = client.get(getRequest, Fruit.class);
if (getResponse.found()) {
return getResponse.source();
}
return null;
}
public List<Fruit> searchByColor(String color) throws IOException {
return search("color", color);
}
public List<Fruit> searchByName(String name) throws IOException {
return search("name", name);
}
private List<Fruit> search(String term, String match) throws IOException {
SearchRequest searchRequest = SearchRequest.of(
b -> b.index("fruits")
.query(QueryBuilders.match().field(term).query(FieldValue.of(match)).build()._toQuery()));
SearchResponse<Fruit> searchResponse = client.search(searchRequest, Fruit.class);
HitsMetadata<Fruit> hits = searchResponse.hits();
return hits.hits().stream().map(hit -> hit.source()).collect(Collectors.toList());
}
}
1 | 我们在服务内部注入一个 ElasticsearchClient 。 |
2 | 我们使用构建器创建一个 Elasticsearch 索引请求。 |
3 | 我们直接将对象传递给请求,因为 Java API 客户端具有序列化层。 |
4 | 我们将请求发送到 Elasticsearch。 |
Hibernate Search Elasticsearch
Quarkus 通过 quarkus-hibernate-search-orm-elasticsearch
扩展支持带有 Elasticsearch 的 Hibernate Search。
Hibernate Search Elasticsearch 允许您将 Jakarta Persistence 实体同步到 Elasticsearch 集群,并提供了一种使用 Hibernate Search API 查询 Elasticsearch 集群的方法。
如果您对此感兴趣,请查阅 Hibernate Search with Elasticsearch 指南。
集群健康检查
如果您使用的是 quarkus-smallrye-health
扩展,则两个扩展都会自动添加就绪状态检查,以验证集群的运行状况。
因此,当您访问应用程序的 /q/health/ready
端点时,您将获得有关集群状态的信息。 它使用集群运行状况端点,如果集群的状态为红色或集群不可用,则检查将关闭。
可以通过在 application.properties
中将 quarkus.elasticsearch.health.enabled
属性设置为 false
来禁用此行为。
构建本机可执行文件
您可以在本机可执行文件中使用这两个客户端。
您可以使用常用命令构建本机可执行文件
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
运行它就像执行 ./target/elasticsearch-low-level-client-quickstart-1.0.0-SNAPSHOT-runner
一样简单。
然后,您可以将您的浏览器指向 https://:8080/fruits.html
并使用您的应用程序。
结论
使用 Quarkus 从低级别 REST 客户端或 Elasticsearch Java 客户端访问 Elasticsearch 集群非常容易,因为它提供了简单的配置、CDI 集成以及对它的本机支持。
配置参考
构建时固定的配置属性 - 所有其他配置属性都可以在运行时覆盖
配置属性 |
类型 |
默认 |
||
---|---|---|---|---|
如果 smallrye-health 扩展存在,是否发布健康检查。 环境变量: 显示更多 |
布尔值 |
|
||
主机:端口 列表 |
|
|||
联系 Elasticsearch 服务器时使用的协议。 设置为 "https" 以启用 SSL/TLS。 环境变量: 显示更多 |
字符串 |
|
||
字符串 |
||||
字符串 |
||||
|
||||
|
||||
到所有 Elasticsearch 服务器的最大连接数。 环境变量: 显示更多 |
整数 |
|
||
每个 Elasticsearch 服务器的最大连接数。 环境变量: 显示更多 |
整数 |
|
||
IO 线程数。 默认情况下,这是本地检测到的处理器数量。 线程数高于处理器数量可能没有必要,因为 I/O 线程依赖于非阻塞操作,但您可能希望使用低于处理器数量的线程数。 环境变量: 显示更多 |
整数 |
|||
定义是否启用自动发现。 环境变量: 显示更多 |
布尔值 |
|
||
节点列表的刷新间隔。 环境变量: 显示更多 |
|
|||
类型 |
默认 |
|||
此开发服务是否应在开发模式或测试中随应用程序一起启动。 除非显式设置连接配置(例如, 环境变量: 显示更多 |
布尔值 |
|||
开发服务将侦听的可选固定端口。 如果未定义,将随机选择端口。 环境变量: 显示更多 |
整数 |
|||
要使用的 Elasticsearch 发行版。 默认为从显式配置的 环境变量: 显示更多 |
|
|||
要使用的 Elasticsearch 容器映像。 默认值取决于配置的
环境变量: 显示更多 |
字符串 |
|||
ES_JAVA_OPTS 环境变量的值。 环境变量: 显示更多 |
字符串 |
|
||
Quarkus 开发服务管理的 Elasticsearch 服务器是否共享。 共享时,Quarkus 会使用基于标签的服务发现来查找正在运行的容器。 如果找到匹配的容器,则会使用该容器,因此不会启动第二个容器。 否则,Elasticsearch 的开发服务将启动一个新容器。 发现使用 容器共享仅在开发模式下使用。 环境变量: 显示更多 |
布尔值 |
|
||
附加到启动的容器的 当 当您需要多个共享 Elasticsearch 服务器时,将使用此属性。 环境变量: 显示更多 |
字符串 |
|
||
传递给容器的环境变量。 环境变量: 显示更多 |
Map<String,String> |
|||
是否在开发模式会话或测试套件执行之后保持开发服务容器运行,以便在下一个开发模式会话或测试套件执行中重用它们。 在开发模式会话或测试套件执行中,只要配置(用户名、密码、环境、端口绑定等)未更改,Quarkus 将始终重用开发服务。 此功能专门用于在 Quarkus 未运行时保持容器运行,以便在多次运行中重用它们。
此配置属性默认设置为 环境变量: 显示更多 |
布尔值 |
|
关于 Duration 格式
要写入 duration 值,请使用标准的 您还可以使用简化的格式,以数字开头
在其他情况下,简化格式将被转换为
|