【实时日志分析系列之】--------Storm本地Hello入门篇
2018-01-27 18:08
453 查看
原由
由于现在的运维工作越来越繁重,无论是哪种类型的公司,但凡是开发出来的系统随着时间的积累,后台日志会不断的积累,庞大,臃肿,而如何有效的利用好日志做好业务层运维,或者技术层运维逐渐被人们关注。在这样的业务背景下,本项目组在处长的提议下,也开始着手于这样的想法,本着做出一套实时日志业务分析系统来更高效的协助业务运维人员。最近已经搞了半个月相关系统的知识,所以在此记录下,每个框架的demo入门篇。
若有错误,还望指出。。。。
环境介绍
本篇作为storm的初级入门篇,主体讲解storm本地环境跑起来hello world。。。。环境如下:
- jdk1.8
- apache-storm 1.1.1
- IDEA
开始之前的准备工作
由于storm属于大数据方向的开源软件,所以个人认为一切从官网上学习是最权威的,当然像概念的理解国内已经有很多优秀的博客,大家可以自行查阅相关资料。下面给出后续用到的名字解释以及对应的官网:中文官网文档:
http://storm.apachecn.org/releases/cn/1.1.0/
英文官网文档:
http://storm.apache.org/
Storm的概念一共有如下几点(最好是记住英文,中文记住了也就是知道啥意思,并不是那么好理解):
1.Topologies(拓扑) 2.Streams(流) 3.Spouts (管口、 喷水 、也有人说有龙卷风的意思,这个是storm中的组件,代码角度是指数据的来源) 4.Bolts (也有人说雷云的意思,这个是storm中的组件,代码角度是指数据的处理) 5.Stream groupings(流分组) 6.Reliability(可靠性) 7.Tasks (每个 Task 对应一个 execution 的线程) 8.Workers (每个 Worker 进程是一个物理的 JVM, 执行 topology(拓扑) Tasks 中的一个子集.)
这里在网上看到一个比较形象的例子,可以这么理解:
storm里有着拓扑,而拓扑相当于小区中的各种水龙头及水连接成的一个网状图,来个自己理解画的图吧:
这样看其实很简单,Spout就是你们家水龙头,它是水的来源,而你要用这个水去处理什么事情,这个过程就是bolt所做的,而水源传递的过程中形成的便是Stream,水流。
上面的概念都理解了,接下来开始编码吧!
Storm HelloWorld 初体验
先说maven的依赖,我是用idea搭建的项目,而本地想要跑storm需要引入的jar包就需要一个即可。pom.xml:
<!-- 切忌,provided这个属性在本地跑的时候要注释掉,否则会报错!--> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> <!--<scope>provided</scope>--> </dependency> </dependencies>
这里先说下,provided这个属性在本地跑的时候要注释掉,否则会报错!贴上我的图:
然后看下工程的目录结构:
每部分代码都有注释的详细解释,可以看下:
/** * @author sy * @version 1.0.0 * @ClassName RandomSentenceSpout * @Description ① 第一个写的是Spout * spout,数据源的源头,会将【"谁":"说了什么"】这样的格式发射到blot中 * open方法主要由storm框架传入SpoutOutputCollector * nextTuple方法主要是发射数据源 * @Date 2018/1/12 17:22 */ public class RandomSentenceSpout extends BaseRichSpout { /** * 用来收集Spout输出的tuple */ private SpoutOutputCollector collector; private Random random; private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous" ,"suyu:I'm Hello World"}; /** * 该方法调用一次,主要由storm框架传入SpoutOutputCollector * @param conf 具体不详 * @param context 拓扑上下文 * @param collector 数据源的收集器 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.random = new Random(); } /** * 此方法是从ISpout接口实现的。 * 当此方法被调用时,storm会主动请求soupt发射元组给输出的collector,这个方法应该是非阻塞的, * 所以如果spout没有元组发射,这个方法应该进行返回。 * When this method is called, Storm is requesting that the Spout emit tuples to the * output collector. This method should be non-blocking, so if the Spout has no tuples * to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous * to have nextTuple sleep for a short amount of time (like a single millisecond) * so as not to waste too much CPU. */ @Override public void nextTuple() { //从自定义数组中随机取出每次说话的句子 String toSay = sentences[random.nextInt(sentences.length)]; //将其发射出去,这里的values是继承了ArrayList this.collector.emit(new Values(toSay)); } /** * 该方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。 * 声明输出视图给此拓扑的所有流 * Declare the output schema for all the streams of this topology. * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } }
/** * @ClassName AddSomeThingBolt * @Description ②第二个写的类是处理元数据的blot类 * @author suyu * @Date 2018/1/12 18:05 * @version 1.0.0 */ public class AddSomeThingBolt extends BaseBasicBolt { private int indexId; /** * blot的执行方法,实现了IBasicBolt接口, * 进程输入元组,选择性的提交新的元组基于输入的元组(个人翻译) * Process the input tuple and optionally emit new tuples based on the input tuple. * All acking is managed for you. Throw a FailedException if you want to fail the tuple. * @param tuple 从spout中发射过来的句子,个人理解 * @param collector 类似spout 中的收集器,也是起到发射的作用 */ @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //拿到tuple中的第一个元素 String sentence = (String) tuple.getValue(0); //进行逻辑拼接 String out = sentence + "!!!! yoyoyo!!~~"; //继续将其进行发射给下一个blot collector.emit(new Values(out)); System.err.println(String.format("--------------AddSomeThingBolt[%d] ------------------",this.getIndexId())); } /** * 类似spout中的方法,该方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。 * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("add_sentence")); } public int getIndexId() { this.indexId = (int) Thread.currentThread().getId(); return indexId; } }
/** * @author sy * @version 1.0.0 * @ClassName PrintBolt * @Description ③第二个处理blot节点,打印blot,第三个写的类 * @Date 2018/1/12 18:12 */ public class PrintBolt extends BaseBasicBolt { private int indexId; /** * 打印blot * @param tuple * @param collector */ @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String printBlot = tuple.getString(0); System.err.println(String.format("Bolt[%d] String recieved: %s",this.getIndexId(), printBlot)); // System.out.println("接收到的【打印blot】:" + printBlot); } /** * 不做处理了 * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } public int getIndexId() { this.indexId = (int) Thread.currentThread().getId(); return indexId; } }
/** * @ClassName TopologyMain * @Description ④ 第四个写的类,创建之前的所有节点连线,此类为拓扑类 * @author sy * @Date 2018/1/12 17:20 * @version 1.0.0 */ public class TopologyMain { public static void main(String[] args) throws Exception { //1.创建拓扑builder TopologyBuilder builder = new TopologyBuilder(); //2.创建连线节点,把spout放入topology中 builder.setSpout("spout",new RandomSentenceSpout()); //3.创建第一个bolt节点放入topology中,shuffleGrouping,Storm按照何种策略将tuple分配到后续的bolt去。 builder.setBolt("addBlot", new AddSomeThingBolt(),2).shuffleGrouping("spout"); //4.创建第二个bolt节点放入topology中 builder.setBolt("printBlot", new PrintBolt(),3).shuffleGrouping("addBlot"); Config conf = new Config(); conf.setDebug(false); //若有参数,则执行storm集群提交,若没有本地模拟集群提交topology if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("test"); cluster.shutdown(); } } }
运行结果
一旦运行成功,控制台就开始疯狂的输出啦!O(∩_∩)O哈哈~这里给出github地址:
点我啦
https://github.com/unlimitbladeworks/Storm-HelloWorld
相关文章推荐
- ELK学习10_ELK系列--实时日志分析系统ELK 部署与运行中的问题汇总
- ELK系列--实时日志分析系统ELK 部署与运行中的问题汇总
- ELK学习10_ELK系列--实时日志分析系统ELK 部署与运行中的问题汇总
- Storm实时日志分析实战
- ELK 开源实时日志分析平台部署安装
- 用Kibana和logstash快速搭建实时日志查询、收集与分析系统
- 用Kibana和logstash快速搭建实时日志查询、收集与分析系统
- 通过实时日志分析_进行访问日志的快速统计
- 通过实时日志分析_进行访问日志的快速统计
- 用Kibana和logstash快速搭建实时日志查询、收集与分析系统
- ELK(ElasticSearch, Logstash, Kibana)+ SuperVisor + Springboot + Logback 搭建实时日志分析平台
- Cloudera Hadoop 4系列实战课程(电商业日志流量分析项目)
- ELK(ElasticSearch, Logstash, Kibana)+ SuperVisor + Springboot + Logback 搭建实时日志分析平台
- ELK 日志收集实时分析大数据平台(简介)
- 开源实时日志分析ELK平台部署
- Kafka项目实战-用户日志上报实时统计之分析与设计
- 实时日志收集-查询-分析系统(Flume+ElasticSearch+Kibana)
- ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平台
- 实时日志分析平台搭建笔记(二)
- log4net 框架系列:Log4net源码分析 日志输出机制