storm trident groupby
2015-08-23 14:39
375 查看
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.Filter;
import storm.trident.operation.TridentCollector;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.tuple.TridentTuple;
public class TridentLocalPologyGroupBy {
public static class SumBolt extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String value = tuple.getString(0);
System.err.println("value="+value);
}
}
public static class MyAgg extends BaseAggregator<Map<String, Integer>>{
@Override
public void aggregate(Map<String, Integer> val, TridentTuple tuple,
TridentCollector collector) {
String word = tuple.getString(0);
Integer value = val.get(word);
if(value==null){
value=0;
}
value++;
//把数据保存到一个map对象中
val.put(word, value);
//把结果写出去
System.err.println("===============================");
for (Entry<String, Integer> entry : val.entrySet()) {
System.err.println(entry);
}
}
@Override
public void complete(Map<String, Integer> val, TridentCollector collector) {
collector.emit(new Values(val));
}
@Override
public Map<String, Integer> init(Object arg0, TridentCollector arg1) {
return new HashMap<String, Integer>();
}
}
public static class MyBolt extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//因为返回值是map 强制转换为map类型
Map<String, Integer> value = (Map)tuple.getValue(0);
for (Entry<String, Integer> entry : value.entrySet()) {
System.out.println("entry==="+entry);
}
}
}
public static void main(String[] args) {
//输出为new Fields("sentence")
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 4, new Values("a"),new Values("b"),new Values("a"),new Values("c"));
//spout.setCycle(true);
spout.setCycle(false);
TridentTopology tridentTopology = new TridentTopology();
tridentTopology.newStream("spout1", spout)
.groupBy(new Fields("sentence"))
.aggregate(new Fields("sentence"), new MyAgg(), new Fields("Map"))
.each( new Fields("Map"), new MyBolt(), new Fields(""));
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("trident", new Config(), tridentTopology.build());
}
}
执行结果为:
===============================
a=1
===============================
b=1
===============================
a=2
===============================
c=1
entry===b=1
entry===a=2
entry===c=1
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentTopology;
import storm.trident.operation.BaseAggregator;
import storm.trident.operation.BaseFilter;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.Filter;
import storm.trident.operation.TridentCollector;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.tuple.TridentTuple;
public class TridentLocalPologyGroupBy {
public static class SumBolt extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String value = tuple.getString(0);
System.err.println("value="+value);
}
}
public static class MyAgg extends BaseAggregator<Map<String, Integer>>{
@Override
public void aggregate(Map<String, Integer> val, TridentTuple tuple,
TridentCollector collector) {
String word = tuple.getString(0);
Integer value = val.get(word);
if(value==null){
value=0;
}
value++;
//把数据保存到一个map对象中
val.put(word, value);
//把结果写出去
System.err.println("===============================");
for (Entry<String, Integer> entry : val.entrySet()) {
System.err.println(entry);
}
}
@Override
public void complete(Map<String, Integer> val, TridentCollector collector) {
collector.emit(new Values(val));
}
@Override
public Map<String, Integer> init(Object arg0, TridentCollector arg1) {
return new HashMap<String, Integer>();
}
}
public static class MyBolt extends BaseFunction{
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//因为返回值是map 强制转换为map类型
Map<String, Integer> value = (Map)tuple.getValue(0);
for (Entry<String, Integer> entry : value.entrySet()) {
System.out.println("entry==="+entry);
}
}
}
public static void main(String[] args) {
//输出为new Fields("sentence")
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 4, new Values("a"),new Values("b"),new Values("a"),new Values("c"));
//spout.setCycle(true);
spout.setCycle(false);
TridentTopology tridentTopology = new TridentTopology();
tridentTopology.newStream("spout1", spout)
.groupBy(new Fields("sentence"))
.aggregate(new Fields("sentence"), new MyAgg(), new Fields("Map"))
.each( new Fields("Map"), new MyBolt(), new Fields(""));
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("trident", new Config(), tridentTopology.build());
}
}
执行结果为:
===============================
a=1
===============================
b=1
===============================
a=2
===============================
c=1
entry===b=1
entry===a=2
entry===c=1
相关文章推荐
- RPC协议及其python实例
- java1.8环境配置(win8)
- 编写高质量代码改善C#程序的157个建议——建议132:考虑用类名作为属性名
- 怎么防止sql注入
- 匈牙利算法
- 线程和进程
- greenDAO系列7--关系
- greenDAO系列6--会话
- KM算法
- 数据结构之栈的链表实现
- web.xml整理
- Java实现简单的二叉树
- U盘装win7/win8 32位&64位系统
- 【算法结构】一些经典的算法和数据结构的问题
- 基金投资方法札记
- Spring + JdbcTemplate + JdbcDaoSupport examples
- greenDAO系列5--查询
- Enums in JavaScript
- hibernate关系映射--双向一对一@OneToOne
- Spring Boot——开发新一代Spring应用