[置顶] STORM入门之(TridentAPI,Aggregation)
2017-09-01 12:44
363 查看
基本介绍
Aggregation是Trident的基本基本api 主要作用是聚合,如下聚合方法作用是记录单词出现的次数package com.storm.trident;
import org.apache.storm.shade.org.apache.commons.exec.util.MapUtils;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordAggregat extends BaseAggregator<Map<String, Integer>> {
public static Map<String, Integer> map = new HashMap<String, Integer>();
@Override
public Map<String, Integer> init(Object batchId, TridentCollector collector) {
return new HashMap<String, Integer>();
}
@Override
public void aggregate(Map<String, Integer> val, TridentTuple tuple,TridentCollector collector) {
String location = tuple.getString(0);
Integer i = map.get(location);
if(null == i){
i = 0;
}else{
i = i+1;
}
map.put(location, i);
}
@Override
public void complete(Map<String, Integer> val, TridentCollector collector) {
for (String key : map.keySet()) {
System.out.println("key= "+ key + " and value= " + map.get(key));
}
collector.emit(new Values(map));
}
}
(1)init 接收第一batch 执行
(2)aggregate循环执行
(3)complet结束循环执行
Trident构建
package com.storm.topology; import com.storm.spout.FixedBatchSpout; import com.storm.trident.Split; import com.storm.trident.WordAggregat; import com.storm.trident.WordFilter; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.trident.state.RedisClusterState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.builtin.Count; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.net.InetSocketAddress; import java.util.HashSet; import java.util.Set; public class TTopology { public static void main(String[] args){ TridentTopology topology = new TridentTopology(); FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); topology.newStream("batch-spout",spout).parallelismHint(2) .each(new Fields("sentence"), new Split(), new Fields("word")).project(new Fields("word")) //project方法只发送word字段 .aggregate(new Fields("word"),new WordAggregat(),new Fields("agg")); StormTopology stormTopology = topology.build(); LocalCluster cluster = new LocalCluster(); Config conf = new Config(); conf.setDebug(true); cluster.submitTopology("soc", conf,stormTopology); } }
project方法作用是摒弃tuple其他消息体,之发送word一个field
结果
出现单词的次数相关文章推荐
- [置顶] STORM入门之(TridentAPI,Each)
- [置顶] STORM入门之(TridentTopology集成Kafka)
- Storm入门(十四)Trident API Overview
- Storm入门(十四)Trident API Overview
- Storm高级原语(四) — Trident API 综述
- Storm专题二:Storm Trident API 使用具体解释
- Storm高级原语(四)Trident API 综述
- Storm编程入门API系列之Storm的Topology多个tasks数目控制实现
- storm trident api
- Storm入门到精通(七)--Trident详解
- [置顶] STORM入门之(集成HDFS)
- Storm专题二:Storm Trident API 使用详解
- Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景)
- Storm Trident API
- [置顶] STORM入门之(Flume Kafka集成架构)
- Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景)
- Storm高级原语(四) — Trident API 综述
- Storm高级原语(四) — Trident API 综述
- Storm高级原语(四) — Trident API 综述
- Storm Trident API总结-2