Storm Topology 提交到集群
2015-07-01 17:28
316 查看
问题:当完成Topology各个组件的定义之后(写好了**Spout.java 和 **Bolt.java)如何将Topology提交到集群中去?
参考:/article/4893401.html
1,在**Topology.java中的main方法 setSpout、setBolt 之后通过TopologyBuilder.createTopology()创建Topology对象,在**Topology的main方法中通过下面代码进行提交:
2,在StormSubmitter.java的submitTopology方法中,先生成一个NimbusClient对象,然后再执行submitJar(, ,)方法进行提交
3,在submitJar方法(只有两个参数)中,先获得待提交的jar文件的地址(string 对象表示),再调用submitJar(, , ,)提交。在这里先判断该Topology是否已经提交
private static void submitJar(Map conf, ProgressListener listener) {
if(submittedJar==null) {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
String localJar = System.getProperty("storm.jar");
submittedJar = submitJar(conf, localJar, listener);
} else {
LOG.info("Jar already uploaded to master. Not submitting jar.");
}
}
4,在submitJar(, , ,)中,首先生成一个NimbusClient用来获得提交jar文件的目标地址。StormSubmitter是个Thrift Client,Nimbus是Thrift Server,通过以下三步将jar文件通过RPC发送给Nimubs
方法执行完后返回String submittedJar,标记该Topology是否已经提交过。该参数作为第5步中的submitTopologyWithOpts()参数
其中用到了 backtype.storm.utils工具包中的类BufferFileInputStream.java
5,此时返回到StormSubmitter.java的submitTopology方法,执行
再来看看第5步中调用的方法,public class NimbusClient extends ThriftClient说明其实提交过程都是由Thrift封装好了的。
参考:/article/4893399.html
参考:/article/4893401.html
1,在**Topology.java中的main方法 setSpout、setBolt 之后通过TopologyBuilder.createTopology()创建Topology对象,在**Topology的main方法中通过下面代码进行提交:
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
2,在StormSubmitter.java的submitTopology方法中,先生成一个NimbusClient对象,然后再执行submitJar(, ,)方法进行提交
NimbusClient client = NimbusClient.getConfiguredClient(conf); submitJar(conf, progressListener);
3,在submitJar方法(只有两个参数)中,先获得待提交的jar文件的地址(string 对象表示),再调用submitJar(, , ,)提交。在这里先判断该Topology是否已经提交
private static void submitJar(Map conf, ProgressListener listener) {
if(submittedJar==null) {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
String localJar = System.getProperty("storm.jar");
submittedJar = submitJar(conf, localJar, listener);
} else {
LOG.info("Jar already uploaded to master. Not submitting jar.");
}
}
4,在submitJar(, , ,)中,首先生成一个NimbusClient用来获得提交jar文件的目标地址。StormSubmitter是个Thrift Client,Nimbus是Thrift Server,通过以下三步将jar文件通过RPC发送给Nimubs
方法执行完后返回String submittedJar,标记该Topology是否已经提交过。该参数作为第5步中的submitTopologyWithOpts()参数
String uploadLocation = client.getClient().beginFileUpload();
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
client.getClient().finishFileUpload(uploadLocation);
其中用到了 backtype.storm.utils工具包中的类BufferFileInputStream.java
5,此时返回到StormSubmitter.java的submitTopology方法,执行
if(opts!=null) { client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
} else { // this is for backwards compatibility client.getClient().submitTopology(name, submittedJar, serConf, topology);
}
再来看看第5步中调用的方法,public class NimbusClient extends ThriftClient说明其实提交过程都是由Thrift封装好了的。
参考:/article/4893399.html
public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift.TException { send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options); recv_submitTopologyWithOpts(); }
public void send_submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws org.apache.thrift.TException { submitTopologyWithOpts_args args = new submitTopologyWithOpts_args(); args.set_name(name); args.set_uploadedJarLocation(uploadedJarLocation); args.set_jsonConf(jsonConf); args.set_topology(topology); args.set_options(options); sendBase("submitTopologyWithOpts", args); }
相关文章推荐
- Linux/UNXI系统设置环境变量
- 理解本真的REST架构风格
- Linux给指定用户或全部用户(已登录)发送消息
- Xcode7.0beta真机调试出现"Could not find Developer Disk Image"提示
- Linux下p2p的聊天功能实现
- linux知识点回顾
- Linux下Java安装与配置
- CentOS_7 LNMP环境源码安装
- Atomic api copy 记录
- Nginx负载均衡策略
- linux大杂烩
- NGINX引入线程池 性能提升9倍
- OpenCV实现视频播放和进度条
- hadoop基础知识
- 给openwrt移植一个其他项目的问题记录
- linux 下 格式化 sd卡为ext4
- open vSwitch学习资料整理
- linux swap详解
- linux服务管理---独立服务和基于xinetd服务的管理
- lvs、nginx、haproxy中转模式总结