您好,登錄后才能下訂單哦!
本篇內容介紹了“Elasticsearch Multi Get、 Bulk API的原理是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
本文將詳細介紹批量獲取API(Multi Get API)與Bulk API。
1、Multi Get API
public final MultiGetResponse mget(MultiGetRequest multiGetRequest, RequestOptions options) throws IOException
public final void mgetAsync(MultiGetRequest multiGetRequest, RequestOptions options, ActionListener<MultiGetResponse> listener)
其核心需要關注MultiGetRequest 。
從上面所知,mget及批量獲取文檔,通過add方法添加多個Item,每一個item代表一個文件獲取請求,其相關字段已在get API中詳細介紹,這里就不做過多詳解。
Mget API使用示例
public static void testMget() { RestHighLevelClient client = EsClient.getClient(); try { MultiGetRequest request = new MultiGetRequest(); request.add("twitter", "_doc", "10"); request.add("twitter", "_doc", "11"); request.add("twitter", "_doc", "12"); request.add("gisdemo", "_doc", "10"); MultiGetResponse result = client.mget(request, RequestOptions.DEFAULT); System.out.println(result); } catch (Throwable e) { e.printStackTrace(); } finally { EsClient.close(client); } }
返回的結果其本質是一個 GetResponse的數組,不會因為其中一個失敗,整個請求失敗,但其結果中會標明每一個是否成功。其返回結果類圖如下:
其字段過濾(Source filtering)、路由等機制與Get API相同,故不重復講解。
2、Bluk API詳解
Bulk API可以在一次API調用中包含多個索引操作,例如更新索引,刪除索引等。其API定義如下:
public final BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException
public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener<BulkResponse> listener)
其核心需要關注BulkRequest。
2.1BulkRequest詳解
List<DocWriteRequest> requests:單個命令容器,DocWriteRequest的子類包括:IndexRequest、UpdateRequest、DeleteRequest。
private final Set<String> indices:requests涉及到的索引。
List<Object> payloads :有效載荷,6.4.0版本,貌似該字段意義不大,通常命令的請求體(負載數據)存放在DocWriteRequest對象中,例如IndexRequest的source字段。
protected TimeValue timeout:timeout機制,針對一個Bulk請求生效。
ActiveShardCount waitForActiveShards:針對整個Bulk請求有效。
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:刷新策略。
private long sizeInBytes = 0:整個Bulk請求的大小。
通過add api為BulkRequest添加一個請求。
2.2 Bulk API請求格式詳解
Bulk Rest請求協議基于如下格式:
POST _bulk { "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } } { "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
其請求格式定義如下(restfull):
POST請求,其Content-Type為application/x-ndjson。
每一個命令占用兩行,每行的結束字符為\r\n。
第一行為元數據,"opType" : {元數據}。
第二行為有效載體(非必選),例如Index操作,其有效載荷為IndexRequest#source字段。
opType可選值 index、create、update、delete。
公用元數據(index、create、update、delete)如下
1)_index :索引名
2)_type:類型名
3)_id:文檔ID
4)routing:路由值
5)parent
6)version:數據版本號
7)version_type:版本類型
各操作特有元數據
1、index | create
1)pipeline
2、update
1)retry_on_conflict :更新沖突時重試次數。
2)_source:字段過濾。
有效載荷說明
1、index | create
其有效載荷為_source字段。
2、update
其有效載荷為:partial doc, upsert and script。
3、delete
沒有有效載荷。
對請求格式為什么要設計成metdata+有效載體的方式,主要是為了在接受端節點(所謂的接受端節點是指收到命令的第一節點),只需解析metadata,然后將請求直接轉發給對應的數據節點。
2.3 bulk API通用特性分析
2.3.1 版本管理
每一個Bulk條目擁有獨自的version,存在于請求條目的item的元數據中。
2.3.2 路由
每一個Bulk條目各自生效。
2.3.3 Wait For Active Shards
通常可以設置BulkRequest#waitForActiveShards來要求Bulk批量執行之前要求處于激活的最小副本數。
2.3.4 Bulk Demo
public static final void testBulk() { RestHighLevelClient client = EsClient.getClient(); try { IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12") .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk")); UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11") .doc(new IndexRequest("twitter", "_doc", "11") .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update"))); BulkRequest request = new BulkRequest(); request.add(indexRequest); request.add(updateRequest); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); System.out.println(failure); continue; } DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; System.out.println(indexRequest); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; System.out.println(updateRequest); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; System.out.println(deleteResponse); } } } catch (Exception e) { e.printStackTrace(); } finally { EsClient.close(client); } }
“Elasticsearch Multi Get、 Bulk API的原理是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。