您的位置:首页 > 编程语言 > Java开发

ElasticSearch Java API

2016-10-09 17:03 267 查看
通过JAVA api实现对ES的增删改查.

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

public class Test {

public static void main(String[] args)
throws ClassNotFoundException, SQLException, IOException, InterruptedException {
// 准备数据
long start = System.currentTimeMillis();
List> data = prepareData();
long end = System.currentTimeMillis();
System.out.printf("准备数据用时:%s秒\n", (end - start) / 1000);

start = System.currentTimeMillis();
index(data);
end = System.currentTimeMillis();
System.out.printf("单条索引:%s秒\n", (end - start) / 1000);

start = System.currentTimeMillis();
update(data);
end = System.currentTimeMillis();
System.out.printf("单条更新:%s秒\n", (end - start) / 1000);

start = System.currentTimeMillis();
delete(data);
end = System.currentTimeMillis();
System.out.printf("单条删除:%s秒\n", (end - start) / 1000);

start = System.currentTimeMillis();
bulk(data);
end = System.currentTimeMillis();
System.out.printf("批量索引:%s秒\n", (end - start) / 1000);

start = System.currentTimeMillis();
get(data);
end = System.currentTimeMillis();
System.out.printf("ID查询:%s秒\n", (end - start) / 1000);

start = System.currentTimeMillis();
bulkProcessor(data);
end = System.currentTimeMillis();
System.out.printf("bulkProcessor处理:%s秒\n", (end - start) / 1000);
}

private static List> prepareData()
throws ClassNotFoundException, SQLException, UnknownHostException, InterruptedException {
List> data = new ArrayList>();
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://172.16.1.25/songod", "root", "123456");
ResultSet rs = con.createStatement().executeQuery("select * from sod_song_ksc limit 20000");
ResultSetMetaData metadata = rs.getMetaData();
int columnCount = metadata.getColumnCount();

while (rs.next()) {
Map object = new HashMap();
for (int i = 1; i <= columnCount; i++) {
object.put(metadata.getColumnName(i), rs.getObject(i));
}
data.add(object);
}
rs.close();
con.close();

return data;
}

private static void index(List> data) throws UnknownHostException {

Client client = getClient();
for (Map map : data) {
client.prepareIndex("songod", "sod_song_ksc").setSource(map).setId(map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).execute().actionGet();
}
client.close();
}

private static void bulk(List> data) throws UnknownHostException {
Client client = getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();

for (Map map : data) {
bulkRequest.add(client.prepareIndex("songod", "sod_song_ksc").setSource(map)
.setRouting(map.get("SongID").toString()).setId(map.get("SongID").toString()));
}
BulkResponse bulkResponse = bulkRequest.get();
client.close();
}

private static void get(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
GetResponse response = client.prepareGet("songod", "sod_song_ksc", map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).get();
String str = response.getSourceAsString();
// System.out.println(str);
}
client.close();
}

private static void delete(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
client.prepareDelete("songod", "sod_song_ksc", map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).get();
}
client.close();

}

private static void update(List> data) throws IOException {
Client client = getClient();
for (Map map : data) {
client.prepareUpdate().setIndex("songod").setType("sod_song_ksc")
.setId(map.get("SongID").toString()).setDoc(jsonBuilder().startObject()
.field("Name", "Hello World!!" + map.get("Name").toString()).endObject())
.setRouting(map.get("SongID").toString()).get();
}
client.close();
}

private static void bulkProcessor(List> data) throws IOException {
BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), new BulkProcessor.Listener() {

public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
// TODO Auto-generated method stub

}

public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
// TODO Auto-generated method stub

}

public void beforeBulk(long arg0, BulkRequest arg1) {
// TODO Auto-generated method stub

}

}).setBulkActions(1000).setBulkSize(new ByteSizeValue(3, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5)).setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
// .setRouting(map.get("SongID").toString())
for (Map map : data) {
UpdateRequest updateRequest = new UpdateRequest("songod", "sod_song_ksc", map.get("SongID").toString());
updateRequest.routing(map.get("SongID").toString());
updateRequest.id(map.get("SongID").toString());
updateRequest.doc(jsonBuilder().startObject().field("Name", "Hello World!!" + map.get("Name").toString())
.endObject());
bulkProcessor.add(updateRequest);
}
bulkProcessor.close();
}

private static Client getClient() throws UnknownHostException {
Settings settings = Settings.settingsBuilder().put("cluster.name", "es_cluster").build();
Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.16.114"), 9300));

return client;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: