016-Storm借助tick消息定时器统计统计周期性业务
2016-02-23 00:41
453 查看
在实际业务中,经常需要定时做一些业务逻辑,如每1分钟做一些统计数值。普通业务做法是启动一个Timer线程或者使用Quartz来做定时触发。在Storm中,可以通过让Topology的系统组件定时发送tick消息,Bolt接收到消息后,触发相应的逻辑来完成
使用Storm组件的定时器需要为bolt重写下面的方法:
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);//每60s持久化一次数据
return conf;
}
其中:Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 定时消息发送的频率,单位为秒。
我们判断是否为tick消息。可以使用TupleHelpers类中的isTickTuple方法,具体代码:
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
Constants.SYSTEM_TICK_STREAM_ID);
}
下面通过统计uv来看一下tick消息的使用,通过tick消息,同时也解决每个bolt中map数据过大的问题。
1、业务需求:实时统计每分钟访客数,页面效果如图所示:
2. 平台架构:主要包括日志的传输和采集部分、数据传输和计算平台、持久化数据服务以及在线数据服务部分
2.1日志数据源一般包括以下几个日志数据:
用户浏览网页点击行为和鼠标悬停等会触发相应的日志数据,并实时传递给后端的服务器。
APP框架中内涵所有页面安妮、页面滑动以及页面切换等的埋点,只要用户有相应的操作,就会记录日志,批量发送日志服务器
2.2 storm是实时平台的核心
2.3 Topology中的bolt计算的结果数据和中间交换数据根据业务需求存放到redis、hbase、或者mysql中
2.4 数据持久化到相应的数据库中后,由RPC服务器提供对外统一的访问服务,用户不用关心数据存储的细节
3. 系统架构一般如图所示:
4. 统计UV逻辑计算如图所示:
其中: DeepVisitUVBolt、AggreatorUVBolt通过map来保持中间结果,通过Storm的组件tick定时处理消息。
5.storm程序代码
5.1 Topoloy代码
5.2 数据源采用kafka和storm集成的类:KafkaSpout
5.3 多线程日志解析
5.4 多线程建立每分钟的uv数
5.5 汇总统计
5.6 持久化存储
使用Storm组件的定时器需要为bolt重写下面的方法:
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);//每60s持久化一次数据
return conf;
}
其中:Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 定时消息发送的频率,单位为秒。
我们判断是否为tick消息。可以使用TupleHelpers类中的isTickTuple方法,具体代码:
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
Constants.SYSTEM_TICK_STREAM_ID);
}
下面通过统计uv来看一下tick消息的使用,通过tick消息,同时也解决每个bolt中map数据过大的问题。
1、业务需求:实时统计每分钟访客数,页面效果如图所示:
2. 平台架构:主要包括日志的传输和采集部分、数据传输和计算平台、持久化数据服务以及在线数据服务部分
2.1日志数据源一般包括以下几个日志数据:
用户浏览网页点击行为和鼠标悬停等会触发相应的日志数据,并实时传递给后端的服务器。
APP框架中内涵所有页面安妮、页面滑动以及页面切换等的埋点,只要用户有相应的操作,就会记录日志,批量发送日志服务器
2.2 storm是实时平台的核心
2.3 Topology中的bolt计算的结果数据和中间交换数据根据业务需求存放到redis、hbase、或者mysql中
2.4 数据持久化到相应的数据库中后,由RPC服务器提供对外统一的访问服务,用户不用关心数据存储的细节
3. 系统架构一般如图所示:
4. 统计UV逻辑计算如图所示:
其中: DeepVisitUVBolt、AggreatorUVBolt通过map来保持中间结果,通过Storm的组件tick定时处理消息。
5.storm程序代码
5.1 Topoloy代码
public class PortalUVTopology { private static final String KAFKA_SPOUT_ID = "CustomQueueSpout"; private static final String LOG_PARSER_ID = "LogParserBolt"; private static final String DEEP_VISIT_UV_ID = "DeepVisitUVBolt"; private static final String AGGREGATOR_UV_ID = "AggregatorUVBolt"; private static final String PERSISTENCE_UV_ID = "PersistenceBoltUVBolt"; public static void main(String[] args) throws Exception, Exception { TopologyBuilder builder = new TopologyBuilder(); // 读取kafka消息,单线程 KafkaSpout kafkaSpout = KafkaUtils.getKafkaSpout(); builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout, 1); //builder.setSpout(KAFKA_SPOUT_ID,new CustomQueueSpout(), 1); // 多线程解析消息,输出: yyyyMMddHHmm uid ,即: 201602221411 1000 builder.setBolt(LOG_PARSER_ID, new LogParserBolt(), 3).shuffleGrouping(KAFKA_SPOUT_ID); // 多线程统计每分钟每个uid的uv数,即: yyyyMMddHHmm_uid 4 builder.setBolt(DEEP_VISIT_UV_ID, new DeepVisitUVBolt(), 3).fieldsGrouping(LOG_PARSER_ID, new Fields("date", "uid")); // 汇总统计 builder.setBolt(AGGREGATOR_UV_ID, new AggregatorUVBolt(), 1).noneGrouping(DEEP_VISIT_UV_ID); // 持久化存储 builder.setBolt(PERSISTENCE_UV_ID, new PersistenceUVBolt(),2).shuffleGrouping( AGGREGATOR_UV_ID); Config conf = new Config(); conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); //集群模式 //StormSubmitter.submitTopology("PortalUV", conf , builder.createTopology()); //本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("PortalUV" + System.currentTimeMillis(), conf ,builder.createTopology()); } }
5.2 数据源采用kafka和storm集成的类:KafkaSpout
5.3 多线程日志解析
public class LogParserBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = 1L; OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String data_spout = tuple.getStringByField("data_spout"); String[] split = data_spout.split("\t"); String date = split[0]; String uid = split[1]; SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm"); try { date = sdf.format(new Date(sdf.parse(date).getTime())); System.out.println("发送的数据为:" + date + "\t" + uid); this.collector.emit(new Values(date, uid)); this.collector.ack(tuple); } catch (Exception e) { this.collector.fail(tuple); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date", "uid")); } }
5.4 多线程建立每分钟的uv数
public class DeepVisitUVBolt extends BaseRichBolt { private static final transient Log logger = LogFactory.getLog(DeepVisitUVBolt.class); Map<String, Integer> counters = new HashMap<String, Integer>(); /** * */ private static final long serialVersionUID = 1L; OutputCollector collector; private List<Tuple> anchors = new ArrayList<Tuple>(); @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { if (TupleHelpers.isTickTuple(tuple)) { logger.debug("Received tick tuple, triggering emit of current window counts"); for (Entry<String, Integer> entry : counters.entrySet()) { this.collector.emit(anchors, new Values(entry.getKey(), entry.getValue())); } anchors.clear(); counters.clear(); return; } String date = tuple.getStringByField("date"); String uid = tuple.getStringByField("uid"); String key = date + "_" + uid; Integer value = counters.get(key); if (value == null) { value = 0; } value++; counters.put(key, value); anchors.add(tuple); this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_uid", "count")); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);// 每10s发送一次特殊的tuple return conf; } }
5.5 汇总统计
public class AggregatorUVBolt extends BaseRichBolt { private static final transient Log logger = LogFactory.getLog(AggregatorUVBolt.class); /** * */ private static final long serialVersionUID = 1L; OutputCollector collector; Map<String, Integer> counters = new HashMap<String,Integer>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { if(TupleHelpers.isTickTuple(tuple)){ logger.debug("AggregatorUVBolt Received tick tuple, triggering emit of current window counts"); for (Entry<String, Integer> entry : counters.entrySet()) { System.out.println("AggregatorUVBolt:" + entry.getKey() + "\t" + entry.getValue()); this.collector.emit(new Values(entry.getKey(),entry.getValue())); } counters.clear(); return; } String date_uid = tuple.getString(0); Integer count = tuple.getInteger(1); String[] dates = date_uid.split("_"); Integer value = counters.get(dates[0]); if(value == null){ value = 0; } int totalCount = count + value; counters.put(dates[0], totalCount); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date","count")); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);//每60s持久化一次数据 return conf; } }
5.6 持久化存储
public class PersistenceUVBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = 1L; OutputCollector collector; HBaseUtils hbase; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.hbase = new HBaseUtils(); } @Override public void execute(Tuple tuple) { String date = tuple.getStringByField("date"); Integer count = tuple.getIntegerByField("count"); System.out.println("-------------PersistenceBoltUVBolt---------------:" +date + "\t" + count); hbase.put("uv_info", date, "info", "date", date); hbase.put("uv_info", date, "info", "count", String.valueOf(count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
相关文章推荐
- 【慕课笔记】第四章 JAVA中的集合框架(上) 第6节 学生选课—课程查询
- windows7_64+vs2013_32位配置PCL_1.6.0
- Codeforces 620D Lipshitz Sequence RMQ+二分
- 3.7 代码注释和编码规范
- form提交前验证的几种方法
- nodejs爬取页面内容(utf8编码)
- 图像分割简单处理
- 多点移动电子地图定位
- php分享二十一:mysql语句
- #HDU 1016 Prime Ring Problem 【DFS+溯回求组数】
- UVALive 5873 - Tree Inspections 【模拟】
- jquery动态改变form属性提交表单
- android studio导入SlidingMenu的方法
- yum安装mysql和mysql源
- 写给自己
- cocos2dx学习之路----第五篇(切换场景的另一种方式:使用popScene与pushScene)
- UVALive 5878 - Shortest Leash 【计算几何】
- #HDU 1372 Knight Moves 【BFS】
- SpringMvc如何传递获取Date类型数据
- 嵌入式开发的基本环境配置大全