您的位置:首页 > 其它

016-Storm借助tick消息定时器统计统计周期性业务

2016-02-23 00:41 453 查看
在实际业务中,经常需要定时做一些业务逻辑,如每1分钟做一些统计数值。普通业务做法是启动一个Timer线程或者使用Quartz来做定时触发。在Storm中,可以通过让Topology的系统组件定时发送tick消息,Bolt接收到消息后,触发相应的逻辑来完成

使用Storm组件的定时器需要为bolt重写下面的方法:
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);//每60s持久化一次数据
return conf;
}

其中:Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS 定时消息发送的频率,单位为秒。

我们判断是否为tick消息。可以使用TupleHelpers类中的isTickTuple方法,具体代码:
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
Constants.SYSTEM_TICK_STREAM_ID);
}

下面通过统计uv来看一下tick消息的使用,通过tick消息,同时也解决每个bolt中map数据过大的问题。
1、业务需求:实时统计每分钟访客数,页面效果如图所示:



2. 平台架构:主要包括日志的传输和采集部分、数据传输和计算平台、持久化数据服务以及在线数据服务部分

2.1日志数据源一般包括以下几个日志数据:
用户浏览网页点击行为和鼠标悬停等会触发相应的日志数据,并实时传递给后端的服务器。
APP框架中内涵所有页面安妮、页面滑动以及页面切换等的埋点,只要用户有相应的操作,就会记录日志,批量发送日志服务器

2.2 storm是实时平台的核心
2.3 Topology中的bolt计算的结果数据和中间交换数据根据业务需求存放到redis、hbase、或者mysql中
2.4 数据持久化到相应的数据库中后,由RPC服务器提供对外统一的访问服务,用户不用关心数据存储的细节

3. 系统架构一般如图所示:



4. 统计UV逻辑计算如图所示:



其中: DeepVisitUVBolt、AggreatorUVBolt通过map来保持中间结果,通过Storm的组件tick定时处理消息。
5.storm程序代码
5.1 Topoloy代码
public class PortalUVTopology {

    private static final String KAFKA_SPOUT_ID = "CustomQueueSpout";
    private static final String LOG_PARSER_ID = "LogParserBolt";
    private static final String DEEP_VISIT_UV_ID = "DeepVisitUVBolt";
    private static final String AGGREGATOR_UV_ID = "AggregatorUVBolt";
    private static final String PERSISTENCE_UV_ID = "PersistenceBoltUVBolt";

    public static void main(String[] args) throws Exception, Exception {
        TopologyBuilder builder = new TopologyBuilder();
        // 读取kafka消息,单线程
        KafkaSpout kafkaSpout = KafkaUtils.getKafkaSpout();
        builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout, 1);
        //builder.setSpout(KAFKA_SPOUT_ID,new  CustomQueueSpout(), 1);
        // 多线程解析消息,输出: yyyyMMddHHmm uid ,即: 201602221411 1000
        builder.setBolt(LOG_PARSER_ID, new LogParserBolt(), 3).shuffleGrouping(KAFKA_SPOUT_ID);
        // 多线程统计每分钟每个uid的uv数,即: yyyyMMddHHmm_uid 4
        builder.setBolt(DEEP_VISIT_UV_ID, new DeepVisitUVBolt(), 3).fieldsGrouping(LOG_PARSER_ID,
                new Fields("date", "uid"));
        // 汇总统计
        builder.setBolt(AGGREGATOR_UV_ID, new AggregatorUVBolt(), 1).noneGrouping(DEEP_VISIT_UV_ID);
        // 持久化存储
        builder.setBolt(PERSISTENCE_UV_ID, new PersistenceUVBolt(),2).shuffleGrouping(
                AGGREGATOR_UV_ID);

        Config conf = new Config();
        conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
        conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
        conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
        conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
        //集群模式
        //StormSubmitter.submitTopology("PortalUV", conf , builder.createTopology());
        //本地模式
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("PortalUV" + System.currentTimeMillis(), conf ,builder.createTopology());
    }
}


5.2 数据源采用kafka和storm集成的类:KafkaSpout

5.3 多线程日志解析

public class LogParserBolt extends BaseRichBolt {

    /**
     *
     */
    private static final long serialVersionUID = 1L;
    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String data_spout = tuple.getStringByField("data_spout");
        String[] split = data_spout.split("\t");
        String date = split[0];
        String uid = split[1];
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
        try {
            date = sdf.format(new Date(sdf.parse(date).getTime()));

            System.out.println("发送的数据为:" + date + "\t" + uid);
            this.collector.emit(new Values(date, uid));
            this.collector.ack(tuple);
        } catch (Exception e) {
            this.collector.fail(tuple);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date", "uid"));
    }
}


5.4 多线程建立每分钟的uv数
public class DeepVisitUVBolt extends BaseRichBolt {

    private static final transient Log logger = LogFactory.getLog(DeepVisitUVBolt.class);
    Map<String, Integer> counters = new HashMap<String, Integer>();
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    OutputCollector collector;
    private List<Tuple> anchors = new ArrayList<Tuple>();

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {

        if (TupleHelpers.isTickTuple(tuple)) {
            logger.debug("Received tick tuple, triggering emit of current window counts");
            for (Entry<String, Integer> entry : counters.entrySet()) {
                this.collector.emit(anchors, new Values(entry.getKey(), entry.getValue()));
            }
            anchors.clear();
            counters.clear();
            return;
        }
        String date = tuple.getStringByField("date");
        String uid = tuple.getStringByField("uid");
        String key = date + "_" + uid;
        Integer value = counters.get(key);
        if (value == null) {
            value = 0;
        }
        value++;
        counters.put(key, value);
        anchors.add(tuple);
        this.collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date_uid", "count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);// 每10s发送一次特殊的tuple
        return conf;
    }
}


5.5 汇总统计

public class AggregatorUVBolt extends BaseRichBolt {
    private static final transient Log logger = LogFactory.getLog(AggregatorUVBolt.class);
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    OutputCollector collector;
    Map<String, Integer> counters = new HashMap<String,Integer>();

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {

        if(TupleHelpers.isTickTuple(tuple)){
            logger.debug("AggregatorUVBolt Received tick tuple, triggering emit of current window counts");
            for (Entry<String, Integer> entry : counters.entrySet()) {

                System.out.println("AggregatorUVBolt:" + entry.getKey() + "\t" + entry.getValue());
                this.collector.emit(new Values(entry.getKey(),entry.getValue()));
            }
            counters.clear();
            return;
        }
        String date_uid = tuple.getString(0);
        Integer count = tuple.getInteger(1);
        String[] dates = date_uid.split("_");
        Integer value = counters.get(dates[0]);
        if(value == null){
            value = 0;
        }
        int totalCount = count + value;
        counters.put(dates[0], totalCount);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date","count"));
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);//每60s持久化一次数据
        return conf;
    }
}


5.6 持久化存储

public class PersistenceUVBolt extends BaseRichBolt {

    /**
     *
     */
    private static final long serialVersionUID = 1L;
    OutputCollector collector;
    HBaseUtils hbase;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.hbase = new HBaseUtils();
    }

    @Override
    public void execute(Tuple tuple) {

        String date = tuple.getStringByField("date");
        Integer count = tuple.getIntegerByField("count");
        System.out.println("-------------PersistenceBoltUVBolt---------------:" +date + "\t" + count);
        hbase.put("uv_info", date, "info", "date", date);
        hbase.put("uv_info", date, "info", "count", String.valueOf(count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: