Elasticsearch[2.0] ☞ Java Client API
2015-11-07 10:06
771 查看
Elasticsearch 2.0 ☞ Java Client API
Elasticsearch 20 Java Client APIPreface 前言
Maven Repository Maven资料库
Deploying in JBoss EAP6 module 部署在JBoss EAP6 模块
Client 客户端
Node Client 节点客户端
Transport Client 传输客户端
Document APIs 文档API
Index API
生成JSON 文档
Index document
GET API
Delete API
UPDATE API
Update by script 通过脚本更新
更新合并到现有document中
插入
Multi Get API 多维查询
Bulk API 批操作
Using Bulk Processor
Search APIs 搜索API
Using scrolls in Java
Using Aggreations 聚合查询
Terminate After
Count API Count API
Aggregations 聚合API
Percolate API 过滤API
Query DSL 查询DSL
Indexde Scripts API 索引脚本API
迷惑的词语:
noise:噪音。这个应该在elasticsearch里面咋理解啊
hop :跳跃。这个又应该咋理解?是连接的意思吗
Preface (前言)
这章节介绍了使用Java API作为elasticsearch客户端。所有的elasticsearch操作使用的都是客户端对象执行。
所有的操作本质上都是异步的(无论是接受一个监听器或者在返回以后)。
此外,客户端还提供了批量执行的操作。
注意:所有的API本质上都是通过Java API执行的(因为elasticsearch内部执行的时候都会转换成java 客户端的)
Maven Repository (Maven资料库)
Deploying in JBoss EAP6 module 部署在JBoss EAP6 模块
Client 客户端
你可以使用Java 客户端可以在如下多个方面:在现有集群上执行标准的index、get、delete和search操作
在一个正在运行的集群上面进行
全部节点都启动时候当你想要在你的APP中调用elasticsearch时候或者你想进行单元和集成测试时候
获取elasticsearch 客户端是非常简单的,获取一个客户端最常见的方法使:
创建一个节点来充当你集群里面的节点
然后从这个节点发起求情来获取一个客户端
另一种方法是创建一个TransportClient来连接集群
注意
官方建议使用和集群想匹配的客户端,不然你有可能会碰到一些不兼容的问题。
Node Client 节点客户端
实例化一个节点的客户端是获取一个能够对elasticsearch执行操作的客户端的最简单办法。[code]import static org.elasticsearch.node.NodeBuilder.*; //on startup Node node = nodeBuilder().node(); Client client = node.client(); //on shutdown node.close();
当你启动一个Node时候,它已经加入了一个elasticsearch集群。你可以通过简单的设置cluster.name参数来达到连接不同的集群,或者你也可以通过clusterName方法直接创建
两种配置方式:
配置文件的方式
在你项目的/src/main/resources / elasticsearch.yml文件中定义cluster.name属性
cluster.name: yourclustername
在Java中配置
Node node = nodeBuilder().clusterName(“yourclustername”).node();
Client client = node.client();
使用客户端的好处是操作是自动路由到需要进行操作的节点上面进行的,不执行“double hop”。例如,在执行index操作的时候它是在shard上自动执行的。
当你启动一个Node时候,最重要的是判断是否应该保存数据。换句话说就是,shard和indices是否应该被分配给它。很多户时候我们仅仅是希望客户端只是一个客户端,并没有shard分配给它们。这可以通过将node.data简单的设置为false或者node.client配置为true即可(the NodeBuilder respective helper methods on it)。
[code]import static org.elasticsearch.node.NodeBuilder.*; // on startup // Embedded node clients behave just like standalone nodes, // which means that they will leave the HTTP port open! Node node = nodeBuilder() .settings(Settings.settingsBuilder().put("http.enabled", false)) .client(true) .node(); Client client = node.client(); // on shutdown node.close();
另外一种场景是当我们在进行单元或者集成测试时候启动一个节点。在这种情况下我们只想启动一个“local”节点(与“local”的discovery and transpor)。再次,这也是一个简单的办法,在节点启动的时候设置为local。注意,“local”在这里是指本地的JVM(实际的类装载器),这意味着如果本地的JVM上同时开启了两个服务将组成一个集群。
[code]import static org.elasticsearch.node.NodeBuilder.*; // on startup Node node = nodeBuilder().local(true).node(); Client client = node.client(); // on shutdown node.close();
节点客户端的缺点
嵌入一个节点客户端到你的应用程序中这种最简单的方法连接elasticsearch集群会带来一些负面的影响:
频繁的启动和停止一个或多个节点夸集群创建时候会产生不必要的噪音(unnecessary noise)
嵌入式节点的客户端将响应外部的请求,就像任何其他的客户端一样。
你几乎总是要禁用HTTP为嵌入式节点的客户端
Transport Client 传输客户端
TransportClient使用transport模块连接远程的Elasticsearch集群。它不会加入集群,但只会有一个或多个初始传输地址与集群以流的方式对每个操作进行交互(虽然大多数动作可能是“two hop”操作)[code]// on startup Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); // on shutdown client.close();
现在你就可以设置你的集群名称了,如果你使用集群不是默认的“elasticsearch”
[code]Settings settings = Settings.settingsBuilder() .put("cluster.name", "myClusterName").build(); Client client = TransportClient.builder().settings(settings).build(); //Add transport addresses and do something with the client...
或者使用 elasticsearch.yml文件,具体查看上一节Node Client
客户端也可以嗅探集群,客户端可以将集群的其他数据节点添加到连接列表中供自己使用。在这种情况下,请注意,将会使用上面addTransportAddress中填写的地址去嗅探其他地址(就是“publish”的地址)。为了启用这个功能我们需要将client.transport.sniff设置为true。
[code]Settings settings = Settings.settingsBuilder() .put("client.transport.sniff", true).build(); TransportClient client = TransportClient.builder().settings(settings).build();
其他的一些transport client级别设置参数还包括:
Parameter | Description |
---|---|
client.transport.ignore_cluster_name | Set to true to ignore cluster name validation of connected nodes. (since 0.19.4)设置为true则忽略集群名称验证 |
client.transport.ping_timeout | The time to wait for a ping response from a node. Defaults to 5s.连接超时 |
client.transport.nodes_sampler_interval | How often to sample / ping the nodes listed and connected. Defaults to 5s.设置多久去操作节点取样和ping一次,默认是5秒 |
Document APIs 文档API
这部分文档主要描述CRUD APIs部分Index API
Get API
Delete API
Update API
多种文件API(Multi-document APIs)
Multi Get APIs
Bulk API
注意:
所有的CRUD APIs都是单个索引API(single-index)。这个索引参数接受单个的索引名称或者analias指向单个的索引。
Index API
index API允许一个JSON类型文档的索引转换成为一个具体的指标,并且使其可以搜索。生成JSON 文档
生成一个JSON文件有以下几种不同的方法:手动使用原生的byte[]或者一个字符串
使用map将会自动的转换成一个等价的JSON
使用一个第三方的序列化库,例如Jackson
使用 elasticsearch内置XContentFactory.jsonBuilder()
在内部,以上每种方式最终都会被转换成
byte[](所以一个String会被转换为一个byte[]) 。因此,如果对象已经是这种形式了那么就可以直接使用它。该jsonBuilder是高度优化的JSON生成器,直接构造一个byte[]
Do it Yourself(不需要手动,其实就是使用字符串自己拼接JSON)
这里没有什么难的,注意的就是日期编码格式。
[code]String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
使用map
map 本身就是一个key:value的形式,它就代表了一个JSON格式:
[code]Map < String , Object > json = new HashMap < String , Object >(); json . put ( "user" , "kimchy" ); json . put ( "postDate" , new Date ()); json . put ( "message" , "trying出Elasticsearch“);
序列化的beans
Elasticsearch已经使用了Jackson。所以你能够用它序列化你的bean成为一个JSON。
[code]import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
Use Elasticsearch helpers 使用 elasticsearch助手
elasticsearch提供了内部的辅助函数来生成JSON内容。
[code]import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()
请注意。你也可以添加数组(startArray(String)和endArray())到该方法。顺便说一下这个field方法可以接受很多种对象类型。你可以直接通过使用数字、日期甚至其他XContentBuilder对象。如果你需要查看生成的JSON字符串你可以使用string()方法。
String json = builder.string();
Index document
在下面的例子中索引的JSON文档将装换成所谓的twitter,在一个名为tweet的索引,ID为1:[code]import static org.elasticsearch.common.xcontent.XContentFactory.*; IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .get();
请注意,你也可以索引你的文档为一个JSON字符串,并且你不需要给定一个ID:
[code]String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
IndexResponse response = client.prepareIndex("twitter", "tweet")
.setSource(json)
.get();
IndexResponse会给你一个报告:
[code]// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // isCreated() is true if the document is a new one, false if it has been updated boolean created = response.isCreated();
更多的信息查看REST index文档
Operation Threading操作线程
索引API允许你设置线程执行的时候将实际操作的API在同一个节点上执行(即,API执行shard时候分配在同一台机器上)。
选择一个不同的线程上执行操作,或在调用线程上执行它(注意,API任然是异步的 )。默认情况下operationThreaded设置为True(将会在不同的线程上执行)
GET API
GET API允许我们从索引根据ID获取一个JSON格式的document。在下面的例子中从索引twitter获取一个type为tweet,ID等于1的索引:GetResponse response = client.prepareGet(“twitter”, “tweet”, “1”).get();
关于GET API更多的信息请查看REST GET
Operation Threading
在执行GET API的时候线程模型允许我们在同一个节点上面执行API的实际操作(也就是GET API是被分配在同一个服务器上面执行shard的)
默认operationThreaded是true也就是默认是多线程执行(在不同节点上面执行shard的),下面是一个将operationThreaded设置为false的例子:
[code]GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) .get();
Delete API
delete API允许我们在特定索引中通过id删除JSON document。DeleteResponse response = client.prepareDelete(“twitter”, “tweet”, “1”).get();
For more information on the delete operation, check out the delete API docs.
Operation Threading
和GET API一样,默认是true。
[code]DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .setOperationThreaded(false) .get();
UPDATE API
你可以创建一个UpdateRequest发送给客户端[code]UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
或者使用prepareUpdate()方法:
[code]client.prepareUpdate("ttl", "doc", "1") //1 .setScript(new Script("ctx._source.gender = \"male\"" , ScriptService.ScriptType.INLINE, null, null)) .get(); client.prepareUpdate("ttl", "doc", "1") //2 .setDoc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .get();
脚本也可以是一个本地文件名称,这时候将使用如下这个参数
ScriptService.ScriptType.FILE
第二个种写法表示将被合并到现有的文档之中。
Note that you can’t provide both script and doc.
Update by script 通过脚本更新
[code]UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1") .script(new Script("ctx._source.gender = \"male\"")); client.update(updateRequest).get();
更新合并到现有document中
The update API also support passing a partial document, which will be merged into the existing document (simple recursive merge, inner merging of objects, replacing core “keys/values” and arrays). For example:通过简单的内部递归合并到现有文档,核心概念是“keys/values”和数组
[code]UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
插入
也支持插入操作,如果文件已经存在,插入元素的内容将作为document的新元素加入到doc中:[code]IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
如果文档不存在则indexRequest执行添加操作
If the document index/type/1 already exists, we will have after this operation a document like:
如果文档index/type/1已经存在,我们将会将原有文档更新成如下:
[code]{ "name" : "Joe Dalton", "gender": "male" }
This field is added by the update request
如果上面文件不存在那我们将会新建一个document:
[code]{ "name" : "Joe Smith", "gender": "male" }
Multi Get API 多维查询
The multi get API allows to get a list of documents based on their index, type and id:多维查询允许获取一个基于index、type和id的的列表
[code]MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }
get by a single id
or by a list of ids for the same index / type
you can also get from another index
iterate over the result set
you can check if the document exists
access to the _source field
Bulk API 批操作
[code]import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
Using Bulk Processor
BulkProcessor类提供了一个简单的接口,自动刷新基于数量(number)或大小(size)请求的批操作,或者在给定时期。[code]import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, //1 new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } //2 @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } //3 @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } //4 }) .setBulkActions(10000) //5 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) //6 .setFlushInterval(TimeValue.timeValueSeconds(5)) //7 .setConcurrentRequests(1) //8 .build();
解释:
1. Add your elasticsearch client
2. This method is called just before bulk is executed. You can for example see the numberOfActions with request.numberOfActions()
3. This method is called after bulk execution. You can for example check if there was some failing requests with response.hasFailures()
4. This method is called when the bulk failed and raised a Throwable
5. We want to execute the bulk every 10 000 requests
6. We want to flush the bulk every 1gb
7. We want to flush the bulk every 5 seconds whatever the number of requests
8. Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
Then you can simply add your requests to the BulkProcessor:
bulkProcessor.add(new IndexRequest(“twitter”, “tweet”, “1”).source(/* your doc here */));
bulkProcessor.add(new DeleteRequest(“twitter”, “tweet”, “2”));
By default, BulkProcessor:
- sets bulkActions to 1000
- sets bulkSize to 5mb
- does not set flushInterval
- sets concurrentRequests to 1
When all documents are loaded to the BulkProcessor it can be closed by using awaitClose or close methods:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
or
bulkProcessor.close();
Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting flushInterval. If concurrent requests were enabled the awaitClose method waits for up to the specified timeout for all bulk requests to complete then returns true, if the specified waiting time elapses before all bulk requests complete, false is returned. The close method doesn’t wait for any remaining bulk requests to complete and exists immediately.
Search APIs 搜索API
The search API allows one to execute a search query and get back search hits that match the query. It can be executed across one or more indices and across one or more types. The query can provided using the query Java API. The body of the search request is built using the SearchSourceBuilder. Here is an example:搜索API允许执行一个查询并且返回相关匹配到的结果。它可以跨越一个或者多个indices或者多个types。查询可以使用Java的查询API。搜索请求的主体(body)使用SearchSourceBuilder。这是一个例子:
[code]import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilders.*;
[code]SearchResponse response = client.prepareSearch("index1", "index2") .setTypes("type1", "type2") .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(QueryBuilders.termQuery("multi", "test")) // Query .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter .setFrom(0).setSize(60).setExplain(true) .execute() .actionGet();
请注意,所有参数都是可选的,如果是最简单的搜索你可以这么写:
[code]// MatchAll on the whole cluster with all default options SearchResponse response = client.prepareSearch().execute().actionGet();
* Note *
Although the Java API defines the additional search types QUERY_AND_FETCH and DFS_QUERY_AND_FETCH, these modes are internal optimizations and should not be specified explicitly by users of the API.
For more information on the search operation, check out the REST search docs.
Using scrolls in Java
Read the scroll documentation first![code]import static org.elasticsearch.index.query.QueryBuilders.*; QueryBuilder qb = termQuery("multi", "test"); SearchResponse scrollResp = client.prepareSearch(test) .setSearchType(SearchType.SCAN) .setScroll(new TimeValue(60000)) .setQuery(qb) .setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll //Scroll until no hits are returned while (true) { for (SearchHit hit : scrollResp.getHits().getHits()) { //Handle the hit... } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); //Break condition: No hits are returned if (scrollResp.getHits().getHits().length == 0) { break; } }
### MultiSearch API ###
多条件查询
See MultiSearch API Query documentation
[code]SearchRequestBuilder srb1 = node.client() .prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1); SearchRequestBuilder srb2 = node.client() .prepareSearch().setQuery(QueryBuilders.matchQuery("name", "kimchy")).setSize(1); MultiSearchResponse sr = node.client().prepareMultiSearch() .add(srb1) .add(srb2) .execute().actionGet(); // You will get all individual responses from MultiSearchResponse#getResponses() long nbHits = 0; for (MultiSearchResponse.Item item : sr.getResponses()) { SearchResponse response = item.getResponse(); nbHits += response.getHits().getTotalHits(); }
Using Aggreations 聚合查询
The following code shows how to add two aggregations within your search:[code]SearchResponse sr = node.client().prepareSearch() .setQuery(QueryBuilders.matchAllQuery()) .addAggregation( AggregationBuilders.terms("agg1").field("field") ) .addAggregation( AggregationBuilders.dateHistogram("agg2") .field("birth") .interval(DateHistogramInterval.YEAR) ) .execute().actionGet(); // Get your facet results Terms agg1 = sr.getAggregations().get("agg1"); DateHistogram agg2 = sr.getAggregations().get("agg2");
See Aggregations Java API documentation for details.
Terminate After
The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early. If set, you will be able to check if the operation terminated early by asking for isTerminatedEarly() in the SearchResponse onject:设置当查询达到最多多少记录时候可以终止查询。如果你设置了,那么你将能够检查这个操作在达到isTerminatedEarly()时候终止SearchResponse操作。
[code]SearchResponse sr = client.prepareSearch(INDEX) .setTerminateAfter(1000) .get(); if (sr.isTerminatedEarly()) { // We finished early }
以上代码表示在1000行以后将提前结束
Count API Count API
Aggregations 聚合API
Percolate API 过滤API
Query DSL 查询DSL
Indexde Scripts API 索引脚本API
相关文章推荐
- Java中的多重继承
- myeclipse一直停留在Loading workbench界面上的处理办法
- JAVA环境变量配置
- eclipse快捷键
- (转)Struts2+JFreeChart 环境搭建个基本用法!
- 初步学习Java中线程的实现与生命周期
- java synchronized/wait/notify/互斥/同步
- 【Java EE 学习 56】【酒店会员管理系统技术点总结】
- 【Java EE 学习 55】【酒店会员管理系统项目总结】
- [深入理解Java虚拟机]第十二章 Java内存模型与线程-硬件的效率与一致性
- SPRING4.X HIBERNATE4.X 整合 EHCACHE 注解 ANNOTATE
- 走进Java(四)JSP的Model1和Model2
- Struts2学习感悟2015-11-17
- java中接口interface用法小结
- Controlling Access in Java
- java单链表实现
- Java任意同类型对象的复制
- springmvc的Ajax提交问题
- 面向对象
- Java之随机数