elasticsearch将某个集群的索引导入到另外一个集群中
2016-12-15 16:36
381 查看
在使用elasticsearch的过程中,肯定有很多朋友有过这种需求,就是将某个集群中的索引导入另外一个集群中去,这种情况呢,有两种办法,一种就是直接拷贝文件,简单粗暴,但是有很多弊端,比如说文件非常大的话拷贝起来也很麻烦,另外拷贝的过程中也容易乱,比较低端,最后拷贝完之后还得重启集群不是很灵活;今天我主要说的就是第二种方式,通过scroll的方式进行处理。
处理的时候需要的参数,分别为:源索引,源集群名称,源ip,目的索引,目的集群名称,目的ip,索引类型;通过构建一个源settings,一个目的settings去处理。
具体参见下面的代码(此代码适用于2.x到2.x版本,如果有其他的版本,只需要修改下es相关的类名称即可适用,如果有改变的话)
另外通过测试,效率也还可以,一个小时大概能导出5千万左右的数据,如果数据量不大,还会更快
行了不废话了,直接上代码:
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
/**
* java -cp "/root/es/lib/*" org.elasticsearch.ScrollCreateIndex
* -argInfo "a;c1;ip;b;c2;ip;ddd"
*
*/
public class ScrollCreateIndex {
public static void main(String[] args) throws Exception {
String sourceIdx = null;
String sourceClustName = null;
String sourceIp = null;
String destiIdx = null;
String destiClustName = null;
String destiIp = null;
String idxType = null;
String argInfos = null;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-argInfo")) {
argInfos = args[(i + 1)];
}
}
String[] arg_info_arr = argInfos.split(";");
sourceIdx = arg_info_arr[0];
sourceClustName = arg_info_arr[1];
sourceIp = arg_info_arr[2];
destiIdx = arg_info_arr[3];
destiClustName = arg_info_arr[4];
destiIp = arg_info_arr[5];
idxType = arg_info_arr[6];
executeScrollCreate(sourceIdx,sourceClustName,sourceIp,destiIdx,destiClustName,destiIp,idxType);
}
public static void executeScrollCreate(String indexName,String clustName,String sourceIp,
String destiIndexName,String destiClustName,String destiIp,String idxType) throws Exception{
//build source settings
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient client = TransportClient.builder().settings(settings).build();
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, 9300)));
//build destination settings
Settings destiSettings = Settings.settingsBuilder()
.put("cluster.name", destiClustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient destiClient = TransportClient.builder().settings(destiSettings).build();
destiClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(destiIp, 9300)));
SearchResponse scrollResp = client.prepareSearch(indexName)
.setScroll(new TimeValue(90000)).setSize(1000).execute().actionGet();
//build destination bulk
BulkRequestBuilder bulk = destiClient.prepareBulk();
ExecutorService executor = Executors.newFixedThreadPool(5);
while(true){
bulk = destiClient.prepareBulk();
final BulkRequestBuilder bulk_new = bulk;
for(SearchHit hit : scrollResp.getHits().getHits()){
IndexRequest req = destiClient.prepareIndex().setIndex(destiIndexName).setType(idxType).setSource(hit.getSourceAsString()).request();
bulk_new.add(req);
}
executor.execute(new Runnable() {
@Override
public void run() {
bulk_new.execute();
}
});
Thread.sleep(10);
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(90000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}
}
}
上面的代码可以直接通过注释中的java -cp命令直接执行,不过还是通过脚本调用在后台执行比较好,结束后可以自动kill掉java进程,本人在使用的过程中就是通过脚本调用的,在这里也把脚本给贴出来,希望能有用:
#!/bin/bash
source_idx=$1
source_clustername=$2
source_ip=$3
desti_idx=$4
desti_clustername=$5
desti_ip=$6
idx_type=$7
curl -sXPUT "http://$desti_ip:9200/create_index_action/$desti_idx/?master_timeout=2m"
while true ; do
stat=$(curl -sXGET http://$desti_ip:9200/_cat/health | awk '{print $4}')
if [ "$stat" = "green" ] ; then
break
fi
done
java -cp "/root/es/root/lib/*" org.elasticsearch.ScrollCreateIndex -argInfo "$1;$2;$3;$4;$5;$6;$7" &
pid="$!"
source_docnum=$(curl -sXGET http://$source_ip:9200/_cat/indices/$source_idx | awk '{print $6}')
temp_docnum=0
same_num=0
while true ; do
desti_docnum=$(curl -sXGET http://$desti_ip:9200/_cat/indices/$desti_idx | awk '{print $6}')
[[ ! "$desti_docnum" ]] && desti_docnum=0
if [ "$desti_docnum" = "$temp_docnum" ] ; then
same_num=$(expr $same_num + 1)
else
same_num=0
fi
if [ "$same_num" -gt 3 ] ; then
break
fi
temp_docnum=$desti_docnum
dif_value=$(expr $source_docnum - $desti_docnum)
if [[ -n "$dif_value" && "$dif_value" -lt 10000 ]] ; then
sleep 10
kill -9 $pid
break
fi
sleep 100
done
echo "finish"
以上就是完整的过程了,希望能帮助到需要的朋友们,当然,有问题的话,大家可以一起讨论啊。。
处理的时候需要的参数,分别为:源索引,源集群名称,源ip,目的索引,目的集群名称,目的ip,索引类型;通过构建一个源settings,一个目的settings去处理。
具体参见下面的代码(此代码适用于2.x到2.x版本,如果有其他的版本,只需要修改下es相关的类名称即可适用,如果有改变的话)
另外通过测试,效率也还可以,一个小时大概能导出5千万左右的数据,如果数据量不大,还会更快
行了不废话了,直接上代码:
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
/**
* java -cp "/root/es/lib/*" org.elasticsearch.ScrollCreateIndex
* -argInfo "a;c1;ip;b;c2;ip;ddd"
*
*/
public class ScrollCreateIndex {
public static void main(String[] args) throws Exception {
String sourceIdx = null;
String sourceClustName = null;
String sourceIp = null;
String destiIdx = null;
String destiClustName = null;
String destiIp = null;
String idxType = null;
String argInfos = null;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-argInfo")) {
argInfos = args[(i + 1)];
}
}
String[] arg_info_arr = argInfos.split(";");
sourceIdx = arg_info_arr[0];
sourceClustName = arg_info_arr[1];
sourceIp = arg_info_arr[2];
destiIdx = arg_info_arr[3];
destiClustName = arg_info_arr[4];
destiIp = arg_info_arr[5];
idxType = arg_info_arr[6];
executeScrollCreate(sourceIdx,sourceClustName,sourceIp,destiIdx,destiClustName,destiIp,idxType);
}
public static void executeScrollCreate(String indexName,String clustName,String sourceIp,
String destiIndexName,String destiClustName,String destiIp,String idxType) throws Exception{
//build source settings
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient client = TransportClient.builder().settings(settings).build();
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, 9300)));
//build destination settings
Settings destiSettings = Settings.settingsBuilder()
.put("cluster.name", destiClustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient destiClient = TransportClient.builder().settings(destiSettings).build();
destiClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(destiIp, 9300)));
SearchResponse scrollResp = client.prepareSearch(indexName)
.setScroll(new TimeValue(90000)).setSize(1000).execute().actionGet();
//build destination bulk
BulkRequestBuilder bulk = destiClient.prepareBulk();
ExecutorService executor = Executors.newFixedThreadPool(5);
while(true){
bulk = destiClient.prepareBulk();
final BulkRequestBuilder bulk_new = bulk;
for(SearchHit hit : scrollResp.getHits().getHits()){
IndexRequest req = destiClient.prepareIndex().setIndex(destiIndexName).setType(idxType).setSource(hit.getSourceAsString()).request();
bulk_new.add(req);
}
executor.execute(new Runnable() {
@Override
public void run() {
bulk_new.execute();
}
});
Thread.sleep(10);
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(90000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}
}
}
上面的代码可以直接通过注释中的java -cp命令直接执行,不过还是通过脚本调用在后台执行比较好,结束后可以自动kill掉java进程,本人在使用的过程中就是通过脚本调用的,在这里也把脚本给贴出来,希望能有用:
#!/bin/bash
source_idx=$1
source_clustername=$2
source_ip=$3
desti_idx=$4
desti_clustername=$5
desti_ip=$6
idx_type=$7
curl -sXPUT "http://$desti_ip:9200/create_index_action/$desti_idx/?master_timeout=2m"
while true ; do
stat=$(curl -sXGET http://$desti_ip:9200/_cat/health | awk '{print $4}')
if [ "$stat" = "green" ] ; then
break
fi
done
java -cp "/root/es/root/lib/*" org.elasticsearch.ScrollCreateIndex -argInfo "$1;$2;$3;$4;$5;$6;$7" &
pid="$!"
source_docnum=$(curl -sXGET http://$source_ip:9200/_cat/indices/$source_idx | awk '{print $6}')
temp_docnum=0
same_num=0
while true ; do
desti_docnum=$(curl -sXGET http://$desti_ip:9200/_cat/indices/$desti_idx | awk '{print $6}')
[[ ! "$desti_docnum" ]] && desti_docnum=0
if [ "$desti_docnum" = "$temp_docnum" ] ; then
same_num=$(expr $same_num + 1)
else
same_num=0
fi
if [ "$same_num" -gt 3 ] ; then
break
fi
temp_docnum=$desti_docnum
dif_value=$(expr $source_docnum - $desti_docnum)
if [[ -n "$dif_value" && "$dif_value" -lt 10000 ]] ; then
sleep 10
kill -9 $pid
break
fi
sleep 100
done
echo "finish"
以上就是完整的过程了,希望能帮助到需要的朋友们,当然,有问题的话,大家可以一起讨论啊。。
相关文章推荐
- 从EXCEL 导入记录后,如何释放?否则另外打开一个EXCEL文件得重启电脑.
- oracle imp命令,将数据从一个用户导出的dmp数据,导入到另外一个用户下面。
- 声明:本博客之前从另外我一个博客系统导入了大量博文,
- Android studio 导入另外一个项目作为依赖包问题集锦!!!
- 两种方法将oracle数据库中的一张表的数据导入到另外一个oracle数据库中
- 一个DataTable的数据如何导入另外一个DataTable中!!
- 把一个SVN项目的目录结构 导入到另外一个空白的SVN项目里
- LuceneDemo类的演示案例:建立索引、建立搜索,另外建立一个测试类TestLucene(用到JUnit)
- 一个Jsp页面导入另外一个Jsp页面的方法
- 从别的库中的表导入到另外一个库中表内容的语句
- Oracle 中将一个表中数据导入到另外一个表的方法
- Mysql把一个表的记录导入到另外一张表
- [导入]新近上线了一个小说索引站点,喜欢看小说的同志们可以试用下
- WinCE下将SQLite数据库导入到另外一个SQLite数据库
- 如何创建一个与已知的一个表完全相同结构的新表(包括主键、外键、索引),同时将数据导入新表
- sql DTS 导入导出 使用一个表 替换另外一个表
- Liferay 集群中在一个节点上上传照片另外一个节点无法看到的问题的解决
- android 项目引用另外一个库项目,如何导入该库项目
- apach hadoop2.6 集群利用Phoenix 4.6-hbase 批量导入并自动创建索引
- Android开发-将一个项目作为lib导入到另外一个项目中