Elasticsearch新增一个字段并赋值
2018-03-31 18:32
1246 查看
先吐槽几句,最近本博主一直在做数据平台的事,越发觉得做数据平台难,尤其数据量很大的情况下,然而一旦问题解决,又马上觉得峰回路转,蛮有成就感。
下面就介绍一下,在已经存有大量数据的ES索引中(博主处理的大概在1亿7千万条),向一个type中添加一个新字段并赋给一个值。
说明,cimissgcdb是index,agmedays是type。
首先来查看一下原始的mapping:GET /cimissgcdb/agmedays/_mapping 查看一下该表数据量多大,心里有个底:POST cimissgcdb/agmedays/_search/
{
"query": {
"match_all": {}
}
}然后添加一个新字段,并设置字段的数据类型(ES的数据类型比如Integer、String、Double、Date等),博主这里新增了一个date类型的时间字段TimeFormat,并进行了format:PUT cimissgcdb/_mapping/agmedays
{
"properties": {
"TimeFormat": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}接下来给新字段赋值,有2种方式:
1. 使用painless
ES 5.0版本后推出painless。何为painless,推荐参考这2篇博文:
elasticsearch painless最强教程
elasticsearch painless最强教程 二
由于博主的ES用的还是低版本2.3.2的,所以第一种方法用不了,那只能使用第二种方式了。
2. 自己通过代码重写,由于博主的新字段值需要通过已经存在字段的值来计算,所以,自己动手写java api来实现。public static void updateHourByScroll(String Type) throws IOException, ExecutionException, InterruptedException {
System.out.println("scroll 模式启动!");
Date begin = new Date();
SearchResponse scrollResponse = client.prepareSearch(Index).setTypes(TYPE)
.setSearchType(SearchType.SCAN).setSize(5000).setScroll(TimeValue.timeValueMinutes(1))
.execute().actionGet();
long count = scrollResponse.getHits().getTotalHits();//第一次不返回数据
for(int i=0,sum=0; sum<count; i++) {
scrollResponse = client.prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(8))
.execute().actionGet();
sum += scrollResponse.getHits().hits().length;
SearchHits searchHits = scrollResponse.getHits();
List<UpdateRequest> list = new ArrayList<UpdateRequest>();
for (SearchHit hit : searchHits) {
String id = hit.getId();
Map<String, Object> source = hit.getSource();
if (source.containsKey("TimeFormat")) { //这个很重要,如果中间过程失败了,在执行时,起到过滤作用,提高效率。
System.out.println("TimeFormat已经存在!");
}else{
Integer year = Integer.valueOf(source.get("Year").toString());
Integer month = Integer.valueOf(source.get("Mon").toString());
Integer day = Integer.valueOf(source.get("Day").toString());
Integer hour = 0;
if(source.containsKey(""Hour"")){ //处理Hour不存在的情况
hour = Integer.valueOf(source.get("Hour").toString());
}else{
hour = 0;
}
String time = getyear_month_day_hour(year, month, day, hour); //这个方法自定义,用来生成新字段TimeFormat的值,按需修改即可。
System.out.println(time);
UpdateRequest uRequest = new UpdateRequest()
.index(Index)
.type(Type)
.id(id)
.doc(jsonBuilder().startObject().field("TimeFormat", time).endObject());
list.add(uRequest);
//client.update(uRequest).get(); //注释上一行,就是单个提交,大数据量效率很低,用一个list来使用bulk,批量提高效率
}
}
// 批量执行
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (UpdateRequest uprequest : list) {
bulkRequest.add(uprequest);
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("批量错误!");
}
System.out.println("总量" + count + " 已经查到" + sum);
}
Date end = new Date();
System.out.println("耗时: "+(end.getTime()-begin.getTime()));
}上面方法就对1亿7千万条数据重新查询然后写了一遍,用到Scroll游标查询。
对于游标查询和分页查看文章:Elasticsearch分页查询From&Size VS scroll
这样就完成了对一个已经存在的ES表新增字段并赋值,但是切记,这样的操作只是对表已经存在的历史记录的改变,对于新来的数据,要修改ES写入新字段的过程。
下面就介绍一下,在已经存有大量数据的ES索引中(博主处理的大概在1亿7千万条),向一个type中添加一个新字段并赋给一个值。
说明,cimissgcdb是index,agmedays是type。
首先来查看一下原始的mapping:GET /cimissgcdb/agmedays/_mapping 查看一下该表数据量多大,心里有个底:POST cimissgcdb/agmedays/_search/
{
"query": {
"match_all": {}
}
}然后添加一个新字段,并设置字段的数据类型(ES的数据类型比如Integer、String、Double、Date等),博主这里新增了一个date类型的时间字段TimeFormat,并进行了format:PUT cimissgcdb/_mapping/agmedays
{
"properties": {
"TimeFormat": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}接下来给新字段赋值,有2种方式:
1. 使用painless
ES 5.0版本后推出painless。何为painless,推荐参考这2篇博文:
elasticsearch painless最强教程
elasticsearch painless最强教程 二
由于博主的ES用的还是低版本2.3.2的,所以第一种方法用不了,那只能使用第二种方式了。
2. 自己通过代码重写,由于博主的新字段值需要通过已经存在字段的值来计算,所以,自己动手写java api来实现。public static void updateHourByScroll(String Type) throws IOException, ExecutionException, InterruptedException {
System.out.println("scroll 模式启动!");
Date begin = new Date();
SearchResponse scrollResponse = client.prepareSearch(Index).setTypes(TYPE)
.setSearchType(SearchType.SCAN).setSize(5000).setScroll(TimeValue.timeValueMinutes(1))
.execute().actionGet();
long count = scrollResponse.getHits().getTotalHits();//第一次不返回数据
for(int i=0,sum=0; sum<count; i++) {
scrollResponse = client.prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(8))
.execute().actionGet();
sum += scrollResponse.getHits().hits().length;
SearchHits searchHits = scrollResponse.getHits();
List<UpdateRequest> list = new ArrayList<UpdateRequest>();
for (SearchHit hit : searchHits) {
String id = hit.getId();
Map<String, Object> source = hit.getSource();
if (source.containsKey("TimeFormat")) { //这个很重要,如果中间过程失败了,在执行时,起到过滤作用,提高效率。
System.out.println("TimeFormat已经存在!");
}else{
Integer year = Integer.valueOf(source.get("Year").toString());
Integer month = Integer.valueOf(source.get("Mon").toString());
Integer day = Integer.valueOf(source.get("Day").toString());
Integer hour = 0;
if(source.containsKey(""Hour"")){ //处理Hour不存在的情况
hour = Integer.valueOf(source.get("Hour").toString());
}else{
hour = 0;
}
String time = getyear_month_day_hour(year, month, day, hour); //这个方法自定义,用来生成新字段TimeFormat的值,按需修改即可。
System.out.println(time);
UpdateRequest uRequest = new UpdateRequest()
.index(Index)
.type(Type)
.id(id)
.doc(jsonBuilder().startObject().field("TimeFormat", time).endObject());
list.add(uRequest);
//client.update(uRequest).get(); //注释上一行,就是单个提交,大数据量效率很低,用一个list来使用bulk,批量提高效率
}
}
// 批量执行
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (UpdateRequest uprequest : list) {
bulkRequest.add(uprequest);
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
System.out.println("批量错误!");
}
System.out.println("总量" + count + " 已经查到" + sum);
}
Date end = new Date();
System.out.println("耗时: "+(end.getTime()-begin.getTime()));
}上面方法就对1亿7千万条数据重新查询然后写了一遍,用到Scroll游标查询。
对于游标查询和分页查看文章:Elasticsearch分页查询From&Size VS scroll
这样就完成了对一个已经存在的ES表新增字段并赋值,但是切记,这样的操作只是对表已经存在的历史记录的改变,对于新来的数据,要修改ES写入新字段的过程。
相关文章推荐
- oracle对没有主键表的新增主键修改表数据操作(没有主键字段,则新增一个主键字段,然后赋值,然后再添加主键)
- oracle批量新增字段 数据赋值应用
- 将一个实体中相同字段的值赋值给另一个实体
- Oracle新增自增一的主键字段和赋值代码
- 关于将表中自增长字段赋值给另外一个字段的方法
- oracle将一个表中字段的值赋值到另一个表中字段(批量)
- Elasticsearch 插件head 端增加一条记录和增加一个字段
- oracle赋值问题(将同一表中某一字段赋值给另外一个字段的语句)
- 给某个 collection 的所有记录的某个字段随机一个数字赋值
- oracle 如何将一个表的某个字段赋值给另一张表的某个字段
- MySQL给一个字段递增赋值
- Oracle新增自增一的主键字段和赋值代码
- MySQL给一个字段递增赋值
- 创建一个触发器新增字段的时候设置某个字段的值
- Oracle新增自增一的主键字段和赋值代码
- Elasticsearch 一个字段精确和模糊搜索,一个字段多种分词器的设置
- Elasticsearch 索引新增字段并且设置mappping
- T-SQL从DB中取出多个字段赋值给多个变量/一个字段给单个变量赋值
- mybatis插入一个对象后获取表中自增的主键Id并且传入到插入的的对象中,方便将对象中其他属性赋值给其他以前表主键Id作为非空字段的表
- Oracle新增自增一的主键字段和赋值代码