您的位置:首页 > 编程语言 > Java开发

SQL to Elasticsearch java code

2017-05-08 17:43 357 查看
Elasticsearch虽然定位为Search Engine,但是因其可以持久化数据,很多时候,我们把Elasticsearch当成Database用,但是Elasticsearch不支持SQL,就需要把SQL逻辑转换成代码实现对应的功能。

以下列举了一些常用的SQL转换成对应的Java代码。

1.按某个field group by查询count

SELECT
fieldA, COUNT(fieldA)
from table
WHERE fieldC = "hoge"
AND fieldD = "huga"
AND fieldB > 10
AND fieldB < 100
group by fieldA;


对应的java code:

SearchRequestBuilder searchReq = client.prepareSearch("sample_index");
searchReq.setTypes("sample_types");
TermsBuilder termsb = AggregationBuilders.terms("my_fieldA").field("fieldA").size(100);

BoolFilterBuilder bf = FilterBuilders.boolFilter();
TermFilterBuilder tf_fieldC = FilterBuilders.termFilter("fieldC","hoge");
TermFilterBuilder tf_fieldD = FilterBuilders.termFilter("fieldD","huga");
bf.must(tf_fieldC);
bf.must(tf_fieldD);

RangeFilterBuilder rangefieldBFilter = FilterBuilders.rangeFilter("fieldB")
.gt(10)
.lt(100);

searchReq.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
FilterBuilders.andFilter(bf, rangefieldBFilter))).addAggregation(
termsb);
SearchResponse searchRes = searchReq.execute().actionGet();

Terms fieldATerms = searchRes.getAggregations().get("my_fieldA");
for (Terms.Bucket filedABucket : fieldATerms.getBuckets()) {
//fieldA
String fieldAValue = filedABucket.getKey();

//COUNT(fieldA)
long fieldACount = filedABucket.getDocCount();
}


2. 按某个field 和 date group by 并查询另一个filed的sum,时间统计图,时间间隔是1天。

SELECT
DATE(create_at), fieldA, SUM(fieldB)
from table
group by DATE(create_at), fieldA;


对应的java code:

SearchRequestBuilder searchReq = client.prepareSearch("sample_index");
searchReq.setTypes("sample_types");
DateHistogramBuilder dhb = AggregationBuilders.dateHistogram("my_datehistogram").field("create_at").interval(DateHistogram.Interval.days(1));
TermsBuilder termsb_fa = AggregationBuilders.terms("my_fieldA").field("fieldA").size(100);
termsb_fa.subAggregation(AggregationBuilders.sum("my_sum_fieldB").field("fieldB"));
dhb.subAggregation(termsb_fa)

searchReq.setQuery(QueryBuilders.matchAllQuery()).addAggregation(dhb);
SearchResponse searchRes = searchReq.execute().actionGet();

DateHistogram dateHist = searchRes.getAggregations().get("my_datehistogram");
for (DateHistogram.Bucket dateBucket : dateHist.getBuckets()) {
//DATE(create_at)
String create_at = dateentry.getKey();
Terms fieldATerms = dateBucket.getAggregations().get("my_fieldA");
for (Terms.Bucket filedABucket : fieldATerms.getBuckets()) {
//fieldA
String fieldAValue = filedABucket.getKey();

//SUM(fieldB)
Sum sumagg = filedABucket.getAggregations().get("my_sum_fieldB");
long sumFieldB = (long)sumagg.getValues();
}
}


3. 按两个field group by并查询第三个filed的sum

SELECT
fieldA, fieldC, SUM(fieldB)
from table
group by fieldA, fieldC;


对应的java code:

SearchRequestBuilder searchReq = client.prepareSearch("sample_index");
searchReq.setTypes("sample_types");

TermsBuilder termsb_fa = AggregationBuilders.terms("my_fieldA").field("fieldA").size(100);
TermsBuilder termsb_fc = AggregationBuilders.terms("my_fieldC").field("fieldC").size(50);

termsb_fc.subAggregation(AggregationBuilders.sum("my_sum_fieldB").field("fieldB"));
termsb_fa.subAggregation(termsb_fc)

searchReq.setQuery(QueryBuilders.matchAllQuery()).addAggregation(termsb_fa);
SearchResponse searchRes = searchReq.execute().actionGet();

Terms fieldATerms = searchRes.getAggregations().get("my_fieldA");
for (Terms.Bucket filedABucket : fieldATerms.getBuckets()) {
//fieldA
String fieldAValue = filedABucket.getKey();
Terms fieldCTerms = filedABucket.getAggregations().get("my_fieldC");
for (Terms.Bucket filedCBucket : fieldCTerms.getBuckets()) {
//fieldC
String fieldCValue = filedCBucket.getKey();

//SUM(fieldB)
Sum sumagg = filedCBucket.getAggregations().get("my_sum_fieldB");
long sumFieldB = (long)sumagg.getValues();
}
}


4. 按某个filed group by 并查询count、sum 和 average

SELECT
fieldA, COUNT(fieldA), SUM(fieldB), AVG(fieldB)
from table
group by fieldA;


对应的java code:

SearchRequestBuilder searchReq = client.prepareSearch("sample_index");
searchReq.setTypes("sample_types");

TermsBuilder termsb = AggregationBuilders.terms("my_fieldA").field("fieldA").size(100);
termsb.subAggregation(AggregationBuilders.sum("my_sum_fieldB").field("fieldB"));
termsb.subAggregation(AggregationBuilders.avg("my_avg_fieldB").field("fieldB"));

searchReq.setQuery(QueryBuilders.matchAllQuery()).addAggregation(termsb);
SearchResponse searchRes = searchReq.execute().actionGet();
Terms fieldATerms = searchRes.getAggregations().get("my_fieldA");
for (Terms.Bucket filedABucket : fieldATerms.getBuckets()) {
//fieldA
String fieldAValue = filedABucket.getKey();

//COUNT(fieldA)
long fieldACount = filedABucket.getDocCount();

//SUM(fieldB)
Sum sumagg = filedABucket.getAggregations().get("my_sum_fieldB");
long sumFieldB = (long)sumagg.getValues();

//AVG(fieldB)
Avg avgagg = filedABucket.getAggregations().get("my_avg_fieldB");
double avgFieldB = avgagg.getValues();
}


5. 按某个field group by 并按另一个filed的Sum排序,获取前10

SELECT
fieldA, SUM(fieldB)
from table
WHERE fieldC = "hoge"
group by fieldA
order by SUM(fieldB) DESC
limit 10;


对应的java code:

QueryBuilder termsc = QueryBuilders.termQuery("fieldC","hoge");
QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(termsc);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("my_fieldA").field("fieldA").size(10);
aggregationBuilder.subAggregation(AggregationBuilders.sum("my_sum_fieldB").field("fieldB"));
aggregationBuilder.order(Order.aggregation("my_sum_fieldB", false));
SearchResponse searchResponse = client.prepareSearch("sample_index").setQuery(queryBuilder).addAggregation(aggregationBuilder).execute().actionGet();
Terms terms = searchResponse.getAggregations().get("my_fieldA");
for (Terms.Bucket entry : terms.getBuckets()) {
String fieldAValue = entry.getKey().toString();

Sum sumagg = entry.getAggregations().get("my_sum_fieldB");
double fieldValue = sumagg.getValue();
}


代码在GitHub上:https://github.com/luxiaoxun/Code4Java
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐