您的位置:首页 > 其它

storm trident groupby

2015-08-23 14:39 375 查看
import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.generated.StormTopology;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import storm.trident.TridentTopology;

import storm.trident.operation.BaseAggregator;

import storm.trident.operation.BaseFilter;

import storm.trident.operation.BaseFunction;

import storm.trident.operation.Filter;

import storm.trident.operation.TridentCollector;

import storm.trident.testing.FixedBatchSpout;

import storm.trident.tuple.TridentTuple;

public class TridentLocalPologyGroupBy {

public static class SumBolt extends BaseFunction{

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

String value = tuple.getString(0);

System.err.println("value="+value);

}

}

public static class MyAgg extends BaseAggregator<Map<String, Integer>>{

@Override

public void aggregate(Map<String, Integer> val, TridentTuple tuple,

TridentCollector collector) {

String word = tuple.getString(0);

Integer value = val.get(word);

if(value==null){

value=0;

}

value++;

//把数据保存到一个map对象中

val.put(word, value);

//把结果写出去

System.err.println("===============================");

for (Entry<String, Integer> entry : val.entrySet()) {

System.err.println(entry);

}

}

@Override

public void complete(Map<String, Integer> val, TridentCollector collector) {

collector.emit(new Values(val));

}

@Override

public Map<String, Integer> init(Object arg0, TridentCollector arg1) {

return new HashMap<String, Integer>();

}

}

public static class MyBolt extends BaseFunction{

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

//因为返回值是map 强制转换为map类型

Map<String, Integer> value = (Map)tuple.getValue(0);

for (Entry<String, Integer> entry : value.entrySet()) {

System.out.println("entry==="+entry);

}

}

}

public static void main(String[] args) {

//输出为new Fields("sentence")

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 4, new Values("a"),new Values("b"),new Values("a"),new Values("c"));

//spout.setCycle(true);

spout.setCycle(false);

TridentTopology tridentTopology = new TridentTopology();

tridentTopology.newStream("spout1", spout)

.groupBy(new Fields("sentence"))

.aggregate(new Fields("sentence"), new MyAgg(), new Fields("Map"))

.each( new Fields("Map"), new MyBolt(), new Fields(""));

LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology("trident", new Config(), tridentTopology.build());

}

}

执行结果为:

===============================

a=1

===============================

b=1

===============================

a=2

===============================

c=1

entry===b=1

entry===a=2

entry===c=1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: