您的位置:首页 > 其它

elasticsearch将某个集群的索引导入到另外一个集群中

2016-12-15 16:36 381 查看
在使用elasticsearch的过程中,肯定有很多朋友有过这种需求,就是将某个集群中的索引导入另外一个集群中去,这种情况呢,有两种办法,一种就是直接拷贝文件,简单粗暴,但是有很多弊端,比如说文件非常大的话拷贝起来也很麻烦,另外拷贝的过程中也容易乱,比较低端,最后拷贝完之后还得重启集群不是很灵活;今天我主要说的就是第二种方式,通过scroll的方式进行处理。

处理的时候需要的参数,分别为:源索引,源集群名称,源ip,目的索引,目的集群名称,目的ip,索引类型;通过构建一个源settings,一个目的settings去处理。

具体参见下面的代码(此代码适用于2.x到2.x版本,如果有其他的版本,只需要修改下es相关的类名称即可适用,如果有改变的话)

另外通过测试,效率也还可以,一个小时大概能导出5千万左右的数据,如果数据量不大,还会更快

行了不废话了,直接上代码:

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;

/**
* java -cp "/root/es/lib/*" org.elasticsearch.ScrollCreateIndex
* -argInfo "a;c1;ip;b;c2;ip;ddd"
*
*/
public class ScrollCreateIndex {

public static void main(String[] args) throws Exception {
String sourceIdx = null;
String sourceClustName = null;
String sourceIp = null;
String destiIdx = null;
String destiClustName = null;
String destiIp = null;
String idxType = null;

String argInfos = null;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-argInfo")) {
argInfos = args[(i + 1)];
}
}
String[] arg_info_arr = argInfos.split(";");
sourceIdx = arg_info_arr[0];
sourceClustName = arg_info_arr[1];
sourceIp = arg_info_arr[2];

destiIdx = arg_info_arr[3];
destiClustName = arg_info_arr[4];
destiIp = arg_info_arr[5];
idxType = arg_info_arr[6];
executeScrollCreate(sourceIdx,sourceClustName,sourceIp,destiIdx,destiClustName,destiIp,idxType);
}

public static void executeScrollCreate(String indexName,String clustName,String sourceIp,
String destiIndexName,String destiClustName,String destiIp,String idxType) throws Exception{
//build source settings
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient client = TransportClient.builder().settings(settings).build();
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, 9300)));

//build destination settings
Settings destiSettings = Settings.settingsBuilder()
.put("cluster.name", destiClustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient destiClient = TransportClient.builder().settings(destiSettings).build();
destiClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(destiIp, 9300)));

SearchResponse scrollResp = client.prepareSearch(indexName)
.setScroll(new TimeValue(90000)).setSize(1000).execute().actionGet();

//build destination bulk
BulkRequestBuilder bulk = destiClient.prepareBulk();

ExecutorService executor = Executors.newFixedThreadPool(5);
while(true){
bulk = destiClient.prepareBulk();
final BulkRequestBuilder bulk_new = bulk;
for(SearchHit hit : scrollResp.getHits().getHits()){
IndexRequest req = destiClient.prepareIndex().setIndex(destiIndexName).setType(idxType).setSource(hit.getSourceAsString()).request();
bulk_new.add(req);
}
executor.execute(new Runnable() {
@Override
public void run() {
bulk_new.execute();
}
});

Thread.sleep(10);

scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(90000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}

}

}
上面的代码可以直接通过注释中的java -cp命令直接执行,不过还是通过脚本调用在后台执行比较好,结束后可以自动kill掉java进程,本人在使用的过程中就是通过脚本调用的,在这里也把脚本给贴出来,希望能有用:

#!/bin/bash

source_idx=$1
source_clustername=$2
source_ip=$3
desti_idx=$4
desti_clustername=$5
desti_ip=$6
idx_type=$7

curl -sXPUT "http://$desti_ip:9200/create_index_action/$desti_idx/?master_timeout=2m"
while true ; do
stat=$(curl -sXGET http://$desti_ip:9200/_cat/health | awk '{print $4}')
if [ "$stat" = "green" ] ; then
break
fi
done

java -cp "/root/es/root/lib/*" org.elasticsearch.ScrollCreateIndex -argInfo "$1;$2;$3;$4;$5;$6;$7" &

pid="$!"
source_docnum=$(curl -sXGET http://$source_ip:9200/_cat/indices/$source_idx | awk '{print $6}')
temp_docnum=0
same_num=0
while true ; do
desti_docnum=$(curl -sXGET http://$desti_ip:9200/_cat/indices/$desti_idx | awk '{print $6}')
[[ ! "$desti_docnum" ]] && desti_docnum=0
if [ "$desti_docnum" = "$temp_docnum" ] ; then
same_num=$(expr $same_num + 1)
else
same_num=0
fi
if [ "$same_num" -gt 3 ] ; then
break
fi
temp_docnum=$desti_docnum
dif_value=$(expr $source_docnum - $desti_docnum)
if [[ -n "$dif_value" && "$dif_value" -lt 10000 ]] ; then
sleep 10
kill -9 $pid
break
fi
sleep 100
done
echo "finish"
以上就是完整的过程了,希望能帮助到需要的朋友们,当然,有问题的话,大家可以一起讨论啊。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  elasticsearch
相关文章推荐