您的位置:首页 > 编程语言 > Java开发

Elasticsearch java api(五) Bulk批量索引

2017-05-19 17:30 351 查看

一、Bulk API

使用bulk命令时,REST API以
_bulk
结尾,批量操作写在json文件中,官网给出的语法格式:

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

也就是说每一个操作都有2行数据组成,末尾要回车换行。第一行用来说明操作命令和原数据、第二行是自定义的选项.举个例子,同时执行插入2条数据、删除一条数据, 新建bulkdata.json,写入如下内容:

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }}
{ "title":"title1","posttime":"2016-07-02","content":"内容一" }

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }}
{ "title":"title2","posttime":"2016-07-03","content":"内容2" }

{ "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}

执行:

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json
{
"took" : 11,
"errors" : false,
"items" : [ {
"create" : {
"_index" : "blog",
"_type" : "article",
"_id" : "13",
"_version" : 1,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"status" : 201
}
} ]
}

注意:行末要回车换行,不然会因为命令不能识别而出现错误.

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json
{
"error" : {
"root_cause" : [ {
"type" : "action_request_validation_exception",
"reason" : "Validation Failed: 1: no requests added;"
} ],
"type" : "action_request_validation_exception",
"reason" : "Validation Failed: 1: no requests added;"
},
"status" : 400
}

二、批量导出

下面的例子是把索引库中的文档以json格式批量导出到文件中,其中集群名称为”bropen”,索引库名为”blog”,type为”article”,项目根目录下新建files/bulk.txt,索引内容写入bulk.txt中:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;

public class ElasticSearchBulkOut {

public static void main(String[] args) {

try {
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml

Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("127.0.0.1"), 9300));

QueryBuilder qb = QueryBuilders.matchAllQuery();
SearchResponse response = client.prepareSearch("blog")
.setTypes("article").setQuery(QueryBuilders.matchAllQuery())
.execute().actionGet();
SearchHits resultHits = response.getHits();

File article = new File("files/bulk.txt");
FileWriter fw = new FileWriter(article);
BufferedWriter bfw = new BufferedWriter(fw);

if (resultHits.getHits().length == 0) {
System.out.println("查到0条数据!");

} else {
for (int i = 0; i < resultHits.getHits().length; i++) {
String jsonStr = resultHits.getHits()[i]
.getSourceAsString();
System.out.println(jsonStr);
bfw.write(jsonStr);
bfw.write("\n");
}
}
bfw.close();
fw.close();

} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

}

}




三、批量导入

从刚才导出的bulk.txt文件中按行读取,然后bulk导入。首先通过调用
client.prepareBulk()
实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchBulkIn {

public static void main(String[] args) {

try {

Settings settings = Settings.settingsBuilder()
.put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置

Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("127.0.0.1"), 9300));

File article = new File("files/bulk.txt");
FileReader fr=new FileReader(article);
BufferedReader bfr=new BufferedReader(fr);
String line=null;
BulkRequestBuilder bulkRequest=client.prepareBulk();
int count=0;
while((line=bfr.readLine())!=null){
bulkRequest.add(client.prepareIndex("test","article").setSource(line));
if (count%10==0) {
bulkRequest.execute().actionGet();
}
count++;
//System.out.println(line);
}
bulkRequest.execute().actionGet();

bfr.close();
fr.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

}

}

批量导入:

Settings settings = Settings.settingsBuilder().put("cluster.name", "es1").build();
TransportClient client = TransportClient.builder().settings(settings).build();
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("bigdata2"), 9300));

Long count = 100000L;
String index = "bigdata";
String type = "student1";
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < count; i++) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put("recordtime", 11);
ret.put("area", 22);
ret.put("usertype", 33);
ret.put("count", 44);
bulkRequest.add(client.prepareIndex(index, type).setSource(ret));
// 每10000条提交一次
if (i % 10000 == 0) {
bulkRequest.execute().actionGet();
}
}
参考文档:

Elasticsearch Reference [2.3] » Document APIs » Bulk API
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: