您的位置:首页 > 其它

storm学习笔记

2016-04-19 11:14 399 查看
Storm集群组件:
集群角色:
Nimbus:集群主节点,主要负责任务分配、响应客户端提交topology请求以及任务失败的调度。
Supervisor : 集群从节点,主要负责启动、停止业务逻辑组件的进程。
Storm编程模型:
Topology : 业务处理模型
Spout : 数据源组件,用于获取数据,可以通过文件或消息队列【kafka、activeMQ】中获取数据
Bolt :逻辑处理组件

示例:
public class RandomSpout extends BaseRichSpout{
SpoutOutputCollector collector=null;
String []goods={"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};

/*
* 获取消息并发送到下一个组件的方法,会被storm不断地调用从goods数组中随机获取一个商品名称封装
*到tuple中去
*/
@override
public void nextTuple(){
Random random=new Random();
String good=goods[random.nextInt(goods.length)];
//封装到tuple中发送给下一个组件
collector.emit(new Values(good));
}

//进行初始化,只在开始时调用一次
@override
public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){
this.collector=collector;
}

//定义tuple的schema
@override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declarer(new Fields("src_word"));
}
}

public class UpperBolt extends BaseBasicBolt{
//每来一个消息组tuple,都会执行一次改方法
@Override
public void execute(Tuple tuple,BasicOutputCollector collector){
//从tuple中拿到数据--原始商品名
String src_word=tuple.getString(0);
String upper=src_word.toUpperCase();
//发送出去
collector.emit(new Values(upper));
}
//给消息申明一个字段名
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("upper"));
}
}

pulic class SuffixBolt extends BaseBasicBolt{
FileWriter file=null;
@Override
public void prepare(Map stormConf,TopologyContext context){
try{
file=new FileWriter("D://eclipse_plugin"+UUID.randomUUID());
}catch(IOException e){
e.printStackTrace();
}
}
//每一次执行都去new一个writer,应该在调用execute之前先把writer初始化好
@Override
public void execute(Tuple tuple,BasicOutputCollector collector){
//从消息元组中拿到上一个组件发送过来的数据
String upper=tuple.getString(0);
String result=upper+"_suffix";
try{
file.append(result);
file.append("/n");
}catch(IOException e){
e.printStackTrace();
}
}
//声明该组件要发送出去的tuple的字段定义
@Override
public void declareOutputFields(OutputFieldsDeclarer declare){
}
}

public class TopMain{
public static void main(String []args)throws AlreadyAliveException,InvalidTopologyException{
ToplogyBuilder builder=new TopologyBuilder();
//设置消息源组件,4表示spout进程个数
builder.setSpout("randomSpout",new RandomSpout(),4);

//设置逻辑处理组件
//shuffleGrouping 指定接收哪个组件传过来的消息
builder.setBolt("upper",new UpperBolt(),4).shuffleGrouping("randomSpout");
builder.setBolt("result",new SuffixBolt(),4).shuffleGrouping("upper");

//创建一个topology
StormT
}
}

====================================================================
storm 实时计算:流操作入门编程实践:-----

storm是一个分布式的实时计算系统,他设计了一种对流和计算的抽象,概念比较简单,实践编程开发起来相对容易,下面介绍下storm中的几个概念:
topology stom中topology的概念类似于hadoop中的mapreduce job,是一个用来编排,容纳一组计算逻辑组件,spout bolt的对象,hadoop mapreduce中一个job包含一组map task,reduce task ,这一组计算组件可以按照dag图的方式编排起来,从而组合成一个计算逻辑更加负责的对象,那就是topology。一个toplogy运行之后就不能停止,他会无限的运行下去,除非手动干预或意外故障让他终止。
spout中spout是一个topology的消息生产的源头,spout应该是一个持续不断生产消息的组件,例如,他可以是一个socket server在坚挺外部client链接并发送消息,可以是一个消息队列mq的消费者,可以是用来接收flume agent的sink所发送消息的服务,等等,spout生产的消息在sotrm中被抽象为tuple,在整个topology的多个计算组件之间都是根据需要抽象构建的tuple消息来金系你给链接,从而形成流。
storm中消息的处理逻辑被封装到bolt组件中,任务处理逻辑都可以在bolt里面执行,处理过程和普通计算应用程序没什么区别,只是需要根据storm的计算予以来合理设置一下组件之间的消息流的声明,分发,链接即可,bolt可以接受来一个一个或多个spout的tuple消息,也可以来自多个其他bolt的tuple消息,也可能是spout和其他bolt组合发送的tuple消息。
stream grouping:
storm中用来定义哥哥计算组件spout bolt之间流的链接,分组,分发关系,storm定义了如下7种分发策咯,shuffle grouping 随机分组,fields grouping 按字段分组,all grouping 广播分组,global grouping 全局分组,non grouping 部分组,direct grouping直接分组,local
or shuffile grouping 本地随机分组,各种策咯的具体含义不同。

下面简单介绍几种开发中常用的流操作处理方式的实现:

storm组件简单串行: 这种方式是简单最直观的,只要我们将storm的组件 spout bolt创兴起来就可以实现。
produceRecordSpout 类是一个spout组件,用来产生消息,我们这里模拟发送一些英文句子,实际应用中可以制定任务数据源,如数据库,消息中间件,socket链接,rpc调用等等。

构造一个produceRecordsputd对象是,传入一个字符串数组,然后随机选择其中一个句子,emit到下游的wordsplitterbolt组件,只声明了一个field,wordsplitterbolt组件可以根据声明的field,接受到emit的消息。
在execute方法中,传入的参数是一个tuple,该tuple就包含了上有组件producerecordsput所emit的树家居,直接俄取出数据进行处理,我们将取出的数据,按照空格进行的split,得到一个一个单词,然后在emit到下一个组件,声明的输出sch进行统计次品的组件为word
counterbolt。
storm组合多种流操作,storm支持流聚合操作,将多个组件emit数据,汇聚到同一个处理组件来统一处理,可以实现对多个spout组件通过流聚合到一个bolt组件,也可以实现对多个bolt通过聚合到另一个bolt组件,实际,这里面有俩种主要的操作,一种是类似工作流中的fork,另一种是类似工作流中国的join。
下面看看toplogy是如何进行创建的。代码如下:
public static void main()
topologyBuider builder =new TopologyBuilder();
builder.setSput('spout-number',new produceRecordSpout(Type,NuMBEr,new String));
builder.setSput('spout-string');
builder.setSput('spout-sign');

一个splitrecordbolt组件从3个不同类型的produceRecordSpout接受数据,这是一个多spout流聚合,splitRecordBlot将处理后的数据发送给distributeWrdByTypeBolt组件,然后根绝收到的数据的类型惊醒一个分发处理,这里用了fieldGrouping操作,也就是splitRecordBolt发送的数据会按照类型发soon个到不同的distributeWordByTypeBolt任务,每个tast收到的一定是同一个类型的数据,,如果直接使用shufflgrouping操作也没有问题,只不过每个task可能受到任务类型的数据,在distributewordbuytypebolt内容进行流向控制,distributewordtypebolt组件中定义了多个stream,根据类型来分组发送给不同类型的savedatabbolt组件,

无论接受的tuple是什么类型的数据,都进行split,然后在emit的时候,仍然将类型星系传递给下一个bolt组件,
distributewordbytypebolt组件只是用来分发tuple,用过定义stream,将接受到的tuple发送到指定下游bolt组件进行吹,用过splitrocordbolt组件emit的tuple包含的类型信息,所有在distributewordbytypebolt中根据类型进行分发。

总结,storm中最核心的计算组件的抽象就是spout,bolt以及stream grouping 其他高级的功能,trident,drpc,他们或者基于这些基础组件以及streaming grouping分发策咯来实现的,屏蔽了底层的分发计算处理逻辑以及更高层的编程抽象面向开发者,减轻了开发人员对底层复杂机制的处理,获得是为了方便使用storm极端服务而增加的计算衍生物,如批量食物处理,rpc等。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: