ElasticSearch Java API
2016-10-09 17:03
369 查看
通过JAVA api实现对ES的增删改查.
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
public class Test {
public static void main(String[] args)
throws ClassNotFoundException, SQLException, IOException, InterruptedException {
// 准备数据
long start = System.currentTimeMillis();
List> data = prepareData();
long end = System.currentTimeMillis();
System.out.printf("准备数据用时:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
index(data);
end = System.currentTimeMillis();
System.out.printf("单条索引:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
update(data);
end = System.currentTimeMillis();
System.out.printf("单条更新:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
delete(data);
end = System.currentTimeMillis();
System.out.printf("单条删除:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
bulk(data);
end = System.currentTimeMillis();
System.out.printf("批量索引:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
get(data);
end = System.currentTimeMillis();
System.out.printf("ID查询:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
bulkProcessor(data);
end = System.currentTimeMillis();
System.out.printf("bulkProcessor处理:%s秒\n", (end - start) / 1000);
}
private static List> prepareData()
throws ClassNotFoundException, SQLException, UnknownHostException, InterruptedException {
List> data = new ArrayList>();
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://172.16.1.25/songod", "root", "123456");
ResultSet rs = con.createStatement().executeQuery("select * from sod_song_ksc limit 20000");
ResultSetMetaData metadata = rs.getMetaData();
int columnCount = metadata.getColumnCount();
while (rs.next()) {
Map object = new HashMap();
for (int i = 1; i <= columnCount; i++) {
object.put(metadata.getColumnName(i), rs.getObject(i));
}
data.add(object);
}
rs.close();
con.close();
return data;
}
private static void index(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
client.prepareIndex("songod", "sod_song_ksc").setSource(map).setId(map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).execute().actionGet();
}
client.close();
}
private static void bulk(List> data) throws UnknownHostException {
Client client = getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (Map map : data) {
bulkRequest.add(client.prepareIndex("songod", "sod_song_ksc").setSource(map)
.setRouting(map.get("SongID").toString()).setId(map.get("SongID").toString()));
}
BulkResponse bulkResponse = bulkRequest.get();
client.close();
}
private static void get(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
GetResponse response = client.prepareGet("songod", "sod_song_ksc", map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).get();
String str = response.getSourceAsString();
// System.out.println(str);
}
client.close();
}
private static void delete(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
client.prepareDelete("songod", "sod_song_ksc", map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).get();
}
client.close();
}
private static void update(List> data) throws IOException {
Client client = getClient();
for (Map map : data) {
client.prepareUpdate().setIndex("songod").setType("sod_song_ksc")
.setId(map.get("SongID").toString()).setDoc(jsonBuilder().startObject()
.field("Name", "Hello World!!" + map.get("Name").toString()).endObject())
.setRouting(map.get("SongID").toString()).get();
}
client.close();
}
private static void bulkProcessor(List> data) throws IOException {
BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), new BulkProcessor.Listener() {
public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
// TODO Auto-generated method stub
}
public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
// TODO Auto-generated method stub
}
public void beforeBulk(long arg0, BulkRequest arg1) {
// TODO Auto-generated method stub
}
}).setBulkActions(1000).setBulkSize(new ByteSizeValue(3, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5)).setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
// .setRouting(map.get("SongID").toString())
for (Map map : data) {
UpdateRequest updateRequest = new UpdateRequest("songod", "sod_song_ksc", map.get("SongID").toString());
updateRequest.routing(map.get("SongID").toString());
updateRequest.id(map.get("SongID").toString());
updateRequest.doc(jsonBuilder().startObject().field("Name", "Hello World!!" + map.get("Name").toString())
.endObject());
bulkProcessor.add(updateRequest);
}
bulkProcessor.close();
}
private static Client getClient() throws UnknownHostException {
Settings settings = Settings.settingsBuilder().put("cluster.name", "es_cluster").build();
Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.16.114"), 9300));
return client;
}
}
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
public class Test {
public static void main(String[] args)
throws ClassNotFoundException, SQLException, IOException, InterruptedException {
// 准备数据
long start = System.currentTimeMillis();
List> data = prepareData();
long end = System.currentTimeMillis();
System.out.printf("准备数据用时:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
index(data);
end = System.currentTimeMillis();
System.out.printf("单条索引:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
update(data);
end = System.currentTimeMillis();
System.out.printf("单条更新:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
delete(data);
end = System.currentTimeMillis();
System.out.printf("单条删除:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
bulk(data);
end = System.currentTimeMillis();
System.out.printf("批量索引:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
get(data);
end = System.currentTimeMillis();
System.out.printf("ID查询:%s秒\n", (end - start) / 1000);
start = System.currentTimeMillis();
bulkProcessor(data);
end = System.currentTimeMillis();
System.out.printf("bulkProcessor处理:%s秒\n", (end - start) / 1000);
}
private static List> prepareData()
throws ClassNotFoundException, SQLException, UnknownHostException, InterruptedException {
List> data = new ArrayList>();
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://172.16.1.25/songod", "root", "123456");
ResultSet rs = con.createStatement().executeQuery("select * from sod_song_ksc limit 20000");
ResultSetMetaData metadata = rs.getMetaData();
int columnCount = metadata.getColumnCount();
while (rs.next()) {
Map object = new HashMap();
for (int i = 1; i <= columnCount; i++) {
object.put(metadata.getColumnName(i), rs.getObject(i));
}
data.add(object);
}
rs.close();
con.close();
return data;
}
private static void index(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
client.prepareIndex("songod", "sod_song_ksc").setSource(map).setId(map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).execute().actionGet();
}
client.close();
}
private static void bulk(List> data) throws UnknownHostException {
Client client = getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (Map map : data) {
bulkRequest.add(client.prepareIndex("songod", "sod_song_ksc").setSource(map)
.setRouting(map.get("SongID").toString()).setId(map.get("SongID").toString()));
}
BulkResponse bulkResponse = bulkRequest.get();
client.close();
}
private static void get(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
GetResponse response = client.prepareGet("songod", "sod_song_ksc", map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).get();
String str = response.getSourceAsString();
// System.out.println(str);
}
client.close();
}
private static void delete(List> data) throws UnknownHostException {
Client client = getClient();
for (Map map : data) {
client.prepareDelete("songod", "sod_song_ksc", map.get("SongID").toString())
.setRouting(map.get("SongID").toString()).get();
}
client.close();
}
private static void update(List> data) throws IOException {
Client client = getClient();
for (Map map : data) {
client.prepareUpdate().setIndex("songod").setType("sod_song_ksc")
.setId(map.get("SongID").toString()).setDoc(jsonBuilder().startObject()
.field("Name", "Hello World!!" + map.get("Name").toString()).endObject())
.setRouting(map.get("SongID").toString()).get();
}
client.close();
}
private static void bulkProcessor(List> data) throws IOException {
BulkProcessor bulkProcessor = BulkProcessor.builder(getClient(), new BulkProcessor.Listener() {
public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
// TODO Auto-generated method stub
}
public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
// TODO Auto-generated method stub
}
public void beforeBulk(long arg0, BulkRequest arg1) {
// TODO Auto-generated method stub
}
}).setBulkActions(1000).setBulkSize(new ByteSizeValue(3, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5)).setConcurrentRequests(1)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();
// .setRouting(map.get("SongID").toString())
for (Map map : data) {
UpdateRequest updateRequest = new UpdateRequest("songod", "sod_song_ksc", map.get("SongID").toString());
updateRequest.routing(map.get("SongID").toString());
updateRequest.id(map.get("SongID").toString());
updateRequest.doc(jsonBuilder().startObject().field("Name", "Hello World!!" + map.get("Name").toString())
.endObject());
bulkProcessor.add(updateRequest);
}
bulkProcessor.close();
}
private static Client getClient() throws UnknownHostException {
Settings settings = Settings.settingsBuilder().put("cluster.name", "es_cluster").build();
Client client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.16.114"), 9300));
return client;
}
}
相关文章推荐
- Elasticsearch java API (13)Search API 使用聚合
- ElasticSearch Java api 详解_V1.0
- Elasticsearch java API (3)
- Elasticsearch java API (8)删除API
- Elasticsearch java API (7)GET API
- Elasticsearch java API (14)Search API Terminate After 终止后
- Elasticsearch java API (10)批量处理 API
- ElasticSearch java API (1)
- ElasticSearch Java Api -检索索引库
- Elasticsearch java API (10)Multi Get API
- Elasticsearch java API (16)Aggregations 构建聚合
- Elasticsearch java API (21)查询 DSL 复合查询
- Elasticsearch java API (19)Percolate API
- Elasticsearch java API (2)
- Elasticsearch java API (12)Search API MultiSearch API
- Elasticsearch java API (17)Aggregations 聚合 函数
- Elasticsearch java API (20)查询 DSL
- Elasticsearch java API (21)查询 DSL 项级别查询
- Elasticsearch java API (5)Transport Client
- Elasticsearch java API (11)Bulk API