您的位置:首页 > 其它

Elasticsearch新增一个字段并赋值

2018-03-31 18:32 1246 查看
先吐槽几句,最近本博主一直在做数据平台的事,越发觉得做数据平台难,尤其数据量很大的情况下,然而一旦问题解决,又马上觉得峰回路转,蛮有成就感。
下面就介绍一下,在已经存有大量数据的ES索引中(博主处理的大概在1亿7千万条),向一个type中添加一个新字段并赋给一个值。
说明,cimissgcdb是index,agmedays是type。
首先来查看一下原始的mapping:GET /cimissgcdb/agmedays/_mapping 查看一下该表数据量多大,心里有个底:POST cimissgcdb/agmedays/_search/
{
"query": {
"match_all": {}
}
}然后添加一个新字段,并设置字段的数据类型(ES的数据类型比如Integer、String、Double、Date等),博主这里新增了一个date类型的时间字段TimeFormat,并进行了format:PUT cimissgcdb/_mapping/agmedays
{
"properties": {
"TimeFormat": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}接下来给新字段赋值,有2种方式:
1. 使用painless

ES 5.0版本后推出painless。何为painless,推荐参考这2篇博文:
elasticsearch painless最强教程
elasticsearch painless最强教程 二
由于博主的ES用的还是低版本2.3.2的,所以第一种方法用不了,那只能使用第二种方式了。
2. 自己通过代码重写,由于博主的新字段值需要通过已经存在字段的值来计算,所以,自己动手写java api来实现。public static void updateHourByScroll(String Type) throws IOException, ExecutionException, InterruptedException {
System.out.println("scroll 模式启动!");
Date begin = new Date();
SearchResponse scrollResponse = client.prepareSearch(Index).setTypes(TYPE)
.setSearchType(SearchType.SCAN).setSize(5000).setScroll(TimeValue.timeValueMinutes(1))
.execute().actionGet();
long count = scrollResponse.getHits().getTotalHits();//第一次不返回数据
for(int i=0,sum=0; sum<count; i++) {
scrollResponse = client.prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(8))
.execute().actionGet();
sum += scrollResponse.getHits().hits().length;

SearchHits searchHits = scrollResponse.getHits();
List<UpdateRequest> list = new ArrayList<UpdateRequest>();
for (SearchHit hit : searchHits) {
String id = hit.getId();
Map<String, Object> source = hit.getSource();
if (source.containsKey("TimeFormat")) { //这个很重要,如果中间过程失败了,在执行时,起到过滤作用,提高效率。
System.out.println("TimeFormat已经存在!");
}else{
Integer year = Integer.valueOf(source.get("Year").toString());
Integer month = Integer.valueOf(source.get("Mon").toString());
Integer day = Integer.valueOf(source.get("Day").toString());
Integer hour = 0;
if(source.containsKey(""Hour"")){ //处理Hour不存在的情况
hour = Integer.valueOf(source.get("Hour").toString());
}else{
hour = 0;
}

String time = getyear_month_day_hour(year, month, day, hour); //这个方法自定义,用来生成新字段TimeFormat的值,按需修改即可。
System.out.println(time);
UpdateRequest uRequest = new UpdateRequest()
.index(Index)
.type(Type)
.id(id)
.doc(jsonBuilder().startObject().field("TimeFormat", time).endObject());
list.add(uRequest);
//client.update(uRequest).get(); //注释上一行,就是单个提交,大数据量效率很低,用一个list来使用bulk,批量提高效率
}
}
// 批量执行
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (UpdateRequest uprequest : list) {
bulkRequest.add(uprequest);
}

BulkResponse bulkResponse = bulkRequest.execute().actionGet();

if (bulkResponse.hasFailures()) {
System.out.println("批量错误!");
}

System.out.println("总量" + count + " 已经查到" + sum);
}
Date end = new Date();
System.out.println("耗时: "+(end.getTime()-begin.getTime()));
}上面方法就对1亿7千万条数据重新查询然后写了一遍,用到Scroll游标查询。
对于游标查询和分页查看文章:Elasticsearch分页查询From&Size VS scroll

这样就完成了对一个已经存在的ES表新增字段并赋值,但是切记,这样的操作只是对表已经存在的历史记录的改变,对于新来的数据,要修改ES写入新字段的过程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: