您的位置:首页 > 编程语言 > Java开发

Java 实现ES批量索引

2016-05-11 17:10 417 查看
    项目中有部分数据通过Hadoop离线分析之后,需要将这部分数据的检索功能暴露出去,于是考虑搭建一套简单的索引服务。Lucene比较灵活, 可以自定义比较多的规则,比如分词,排序等,除了索引之外,服务化方面需要写一些代码,本着少开发,易维护的原则,采用了ES来搭建索引服务。ES的REST类型的接口还是比较容易上手的,把数据刷到ES之后,第三方可以直接调用ES的接口来进行检索,目前百万级别的时候性能还是比较OK的。以下是简单的demo, 没有多线程,只有批量建索引。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.io.LineProcessor;
import com.zuoye.crm.model.Example;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Bulk;
import io.searchbox.core.Index;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.List;

/**
* Created with IntelliJ IDEA.
* User: hotallen
* Date: 2016/5/11
* Time: 16:50
* To change this template use File | Settings | File Templates.
*/
public class DemoIndexer {

private static final Logger LOGGER = LoggerFactory.getLogger(CrmTeacherIndexer.class);

private static final String indexName = "demo_v1";
private static final String type = "main";
private static JestClientFactory factory;
private static final int maxRetry = 3; //retry times when failed
private List<Example> buffers;
private static final int batches = 100; //batch size

public DemoIndexer() {
String uri = System.getProperty("es.uri");
Preconditions.checkNotNull(uri, "es.uri shold be defined!");
factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig
.Builder(uri)
.multiThreaded(true)
.build());
buffers = Lists.newArrayList();
}

public void buidIndex() {
JestClient client = factory.getObject();
try {
client.execute(new CreateIndex.Builder(indexName).build());
} catch (Exception e) {
LOGGER.warn("buildIndex error! e = {}", e.getMessage());
}
}

public void dropIndex() {
JestClient client = factory.getObject();
try {
client.execute(new DeleteIndex.Builder(indexName).build());
} catch (Exception e) {
LOGGER.warn("dropIndex error! e = {}", e.getMessage());
}
}

public void dropIndex(String indexName, String indexType) {
JestClient client = factory.getObject();
try {
client.execute(new DeleteIndex.Builder(indexName).type(indexType).build());
} catch (Exception e) {
LOGGER.warn("dropIndex error! name = {}, type = {}, e = {}", indexName, e.getMessage());
}
}

/**
* index by single
* @param example
*/
public void indexExample(Example example) {
String json = JsonUtil.obj2JsonData(example);
JestClient client = factory.getObject();
int retry = 0;
while (retry < maxRetry) {
try {
String tId = example.getTeacherId().toString();
Index index = new Index.Builder(json).index(indexName).type(type).id(tId).build();
client.execute(index);
break;
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("indexExample exception! e = {}, retry = {}", e.getMessage(), retry);
retry++;
}
}
}

/**
* index by batch
* @param examples
*/
public void indexExamples(List<Example> examples) {
Bulk.Builder bulkBuilder = new Bulk.Builder();
for (Example example : examples) {
String teacherId = example.getTeacherId().toString();
String json = JsonUtil.obj2JsonData(example);
Index index = new Index.Builder(json).index(indexName).type(type).id(teacherId).build();
bulkBuilder.addAction(index);
}
int retry = 0;
while (retry < maxRetry) {
try {
JestClient client = factory.getObject();
client.execute(bulkBuilder.build());
break;
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("indexExample exception! e = {}, retry = {}", e.getMessage(), retry);
retry++;
}
}
}

/**
* index from file
* @param path
*/
public void indexFromFile(String path) {
Preconditions.checkNotNull(path, "路径不为空!");
File source = new File(path);
try {
IndexerLine indexerLine = new IndexerLine();
Files.readLines(source, Charsets.UTF_8, indexerLine);
if (!buffers.isEmpty()) {
indexExamples(buffers);
}
LOGGER.info("indexed done! count = {}", indexerLine.getResult());
} catch (Exception e) {
e.printStackTrace();
LOGGER.warn("indexFrom File error! e = {}, line", e.getMessage());
}
}

/**
* for test
* @param queryBuilder
* @return
*/
public List<string> doSearch(QueryBuilder queryBuilder) {
List<string> list = Lists.newArrayList();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder);
Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(type).build();
JestClient client = factory.getObject();
try {
SearchResult result = client.execute(search);
List<SearchResult.Hit<jsonobject void="">> hits = result.getHits(JSONObject.class);
for (SearchResult.Hit hit : hits) {
list.add(hit.source.toString());
}
} catch (Exception e) {}
return list;
}

class IndexerLine implements LineProcessor<integer> {
private int rows = 0;
@Override
public boolean processLine(String line) throws IOException {
rows++;
try {
Example example = JSON.parseObject(line.trim(), Example.class);
if (example != null) {
buffers.add(example);
}
} catch (Exception e) {
LOGGER.warn("parse Example error! e = {} , line = {}", e, line);
}
if (rows % batches == 0) {
indexTeachers(buffers);
buffers.clear();
}
return true;
}

@Override
public Integer getResult() {
return rows;
}
}

public static void main(String[] args) {
Options opts = new Options();
opts.addOption("h", "help", false, "usage: -f index path");
opts.addOption("f", "file", true, "index file path");
BasicParser parser = new BasicParser();
try {
CommandLine cl = parser.parse(opts, args);
HelpFormatter formatter = new HelpFormatter();
if (cl.getOptions().length > 0) {
if (cl.hasOption("h")) {
formatter.printHelp("Options", opts);
} else {
String file = cl.getOptionValue("f");
DemoIndexer demoIndexer = new DemoIndexer();
demoIndexer.indexFromFile(file);
}
} else {
formatter.printHelp("Options", opts);
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

<


    需要注意的是,需要预先在ES服务器建好索引的schema, 另外,文档有ID的时候最好指定索引ID,这样更新的时候可以指定ID进行更新,有个版本在修改的时候会造成index数量的增长预计就是这个原因。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 索引