您的位置:首页 > 其它

【实时日志分析系列之】--------Storm本地Hello入门篇

2018-01-27 18:08 453 查看

原由

  由于现在的运维工作越来越繁重,无论是哪种类型的公司,但凡是开发出来的系统随着时间的积累,后台日志会不断的积累,庞大,臃肿,而如何有效的利用好日志做好业务层运维,或者技术层运维逐渐被人们关注。在这样的业务背景下,本项目组在处长的提议下,也开始着手于这样的想法,本着做出一套实时日志业务分析系统来更高效的协助业务运维人员。

  最近已经搞了半个月相关系统的知识,所以在此记录下,每个框架的demo入门篇。

  若有错误,还望指出。。。。

环境介绍

 本篇作为storm的初级入门篇,主体讲解storm本地环境跑起来hello world。。。。

 

环境如下:

- jdk1.8

- apache-storm 1.1.1

- IDEA

开始之前的准备工作

由于storm属于大数据方向的开源软件,所以个人认为一切从官网上学习是最权威的,当然像概念的理解国内已经有很多优秀的博客,大家可以自行查阅相关资料。下面给出后续用到的名字解释以及对应的官网:

中文官网文档:

http://storm.apachecn.org/releases/cn/1.1.0/

英文官网文档:

http://storm.apache.org/

Storm的概念一共有如下几点(最好是记住英文,中文记住了也就是知道啥意思,并不是那么好理解):

1.Topologies(拓扑)
2.Streams(流)
3.Spouts (管口、 喷水 、也有人说有龙卷风的意思,这个是storm中的组件,代码角度是指数据的来源)
4.Bolts  (也有人说雷云的意思,这个是storm中的组件,代码角度是指数据的处理)
5.Stream groupings(流分组)
6.Reliability(可靠性)
7.Tasks (每个 Task 对应一个 execution 的线程)
8.Workers (每个 Worker 进程是一个物理的 JVM, 执行 topology(拓扑) Tasks 中的一个子集.)


这里在网上看到一个比较形象的例子,可以这么理解:

storm里有着拓扑,而拓扑相当于小区中的各种水龙头及水连接成的一个网状图,来个自己理解画的图吧:



这样看其实很简单,Spout就是你们家水龙头,它是水的来源,而你要用这个水去处理什么事情,这个过程就是bolt所做的,而水源传递的过程中形成的便是Stream,水流。

上面的概念都理解了,接下来开始编码吧!

Storm HelloWorld 初体验

先说maven的依赖,我是用idea搭建的项目,而本地想要跑storm需要引入的jar包就需要一个即可。

pom.xml:

<!-- 切忌,provided这个属性在本地跑的时候要注释掉,否则会报错!-->
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>


这里先说下,provided这个属性在本地跑的时候要注释掉,否则会报错!贴上我的图:



然后看下工程的目录结构:



每部分代码都有注释的详细解释,可以看下:

/**
* @author sy
* @version 1.0.0
* @ClassName RandomSentenceSpout
* @Description ① 第一个写的是Spout
*              spout,数据源的源头,会将【"谁":"说了什么"】这样的格式发射到blot中
*              open方法主要由storm框架传入SpoutOutputCollector
*              nextTuple方法主要是发射数据源
* @Date 2018/1/12 17:22
*/
public class RandomSentenceSpout extends BaseRichSpout {
/**
* 用来收集Spout输出的tuple
*/
private SpoutOutputCollector collector;
private Random random;
private static String[] sentences = new String[] {"edi:I'm happy",
"marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous" ,"suyu:I'm Hello World"};

/**
* 该方法调用一次,主要由storm框架传入SpoutOutputCollector
* @param conf 具体不详
* @param context 拓扑上下文
* @param collector 数据源的收集器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}

/**
* 此方法是从ISpout接口实现的。
* 当此方法被调用时,storm会主动请求soupt发射元组给输出的collector,这个方法应该是非阻塞的,
* 所以如果spout没有元组发射,这个方法应该进行返回。
* When this method is called, Storm is requesting that the Spout emit tuples to the
* output collector. This method should be non-blocking, so if the Spout has no tuples
* to emit, this method should return. nextTuple, ack, and fail are all called in a tight
* loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
* to have nextTuple sleep for a short amount of time (like a single millisecond)
* so as not to waste too much CPU.
*/
@Override
public void nextTuple() {
//从自定义数组中随机取出每次说话的句子
String toSay = sentences[random.nextInt(sentences.length)];
//将其发射出去,这里的values是继承了ArrayList
this.collector.emit(new Values(toSay));
}

/**
* 该方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。
* 声明输出视图给此拓扑的所有流
* Declare the output schema for all the streams of this topology.
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}


/**
* @ClassName AddSomeThingBolt
* @Description ②第二个写的类是处理元数据的blot类
* @author suyu
* @Date 2018/1/12 18:05
* @version 1.0.0
*/
public class AddSomeThingBolt extends BaseBasicBolt {

private int indexId;

/**
* blot的执行方法,实现了IBasicBolt接口,
* 进程输入元组,选择性的提交新的元组基于输入的元组(个人翻译)
* Process the input tuple and optionally emit new tuples based on the input tuple.
* All acking is managed for you. Throw a FailedException if you want to fail the tuple.
* @param tuple  从spout中发射过来的句子,个人理解
* @param collector 类似spout 中的收集器,也是起到发射的作用
*/
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//拿到tuple中的第一个元素
String sentence = (String) tuple.getValue(0);
//进行逻辑拼接
String out = sentence + "!!!! yoyoyo!!~~";
//继续将其进行发射给下一个blot
collector.emit(new Values(out));
System.err.println(String.format("--------------AddSomeThingBolt[%d] ------------------",this.getIndexId()));
}

/**
* 类似spout中的方法,该方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("add_sentence"));
}

public int getIndexId() {
this.indexId = (int) Thread.currentThread().getId();
return indexId;
}
}


/**
* @author sy
* @version 1.0.0
* @ClassName PrintBolt
* @Description ③第二个处理blot节点,打印blot,第三个写的类
* @Date 2018/1/12 18:12
*/
public class PrintBolt extends BaseBasicBolt {

private int indexId;

/**
* 打印blot
* @param tuple
* @param collector
*/
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String printBlot = tuple.getString(0);
System.err.println(String.format("Bolt[%d] String recieved: %s",this.getIndexId(), printBlot));
//        System.out.println("接收到的【打印blot】:" + printBlot);
}

/**
* 不做处理了
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

public int getIndexId() {
this.indexId = (int) Thread.currentThread().getId();
return indexId;
}
}


/**
* @ClassName TopologyMain
* @Description ④ 第四个写的类,创建之前的所有节点连线,此类为拓扑类
* @author sy
* @Date 2018/1/12 17:20
* @version 1.0.0
*/
public class TopologyMain {

public static void main(String[] args) throws Exception {
//1.创建拓扑builder
TopologyBuilder builder = new TopologyBuilder();
//2.创建连线节点,把spout放入topology中
builder.setSpout("spout",new RandomSentenceSpout());
//3.创建第一个bolt节点放入topology中,shuffleGrouping,Storm按照何种策略将tuple分配到后续的bolt去。
builder.setBolt("addBlot", new AddSomeThingBolt(),2).shuffleGrouping("spout");
//4.创建第二个bolt节点放入topology中
builder.setBolt("printBlot", new PrintBolt(),3).shuffleGrouping("addBlot");

Config conf = new Config();
conf.setDebug(false);
//若有参数,则执行storm集群提交,若没有本地模拟集群提交topology
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}


运行结果

一旦运行成功,控制台就开始疯狂的输出啦!O(∩_∩)O哈哈~



这里给出github地址:

点我啦

https://github.com/unlimitbladeworks/Storm-HelloWorld
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: