您的位置:首页 > 运维架构

Storm Topology 提交到集群

2015-07-01 17:28 316 查看
问题:当完成Topology各个组件的定义之后(写好了**Spout.java 和 **Bolt.java)如何将Topology提交到集群中去?

参考:/article/4893401.html

1,在**Topology.java中的main方法 setSpout、setBolt 之后通过TopologyBuilder.createTopology()创建Topology对象,在**Topology的main方法中通过下面代码进行提交:

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());


2,在StormSubmitter.java的submitTopology方法中,先生成一个NimbusClient对象,然后再执行submitJar(, ,)方法进行提交

NimbusClient client = NimbusClient.getConfiguredClient(conf);

submitJar(conf, progressListener);


3,在submitJar方法(只有两个参数)中,先获得待提交的jar文件的地址(string 对象表示),再调用submitJar(, , ,)提交。在这里先判断该Topology是否已经提交

private static void submitJar(Map conf, ProgressListener listener) {
if(submittedJar==null) {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
String localJar = System.getProperty("storm.jar");
submittedJar = submitJar(conf, localJar, listener);
} else {
LOG.info("Jar already uploaded to master. Not submitting jar.");
}
}

4,在submitJar(, , ,)中,首先生成一个NimbusClient用来获得提交jar文件的目标地址。StormSubmitter是个Thrift Client,Nimbus是Thrift Server,通过以下三步将jar文件通过RPC发送给Nimubs

方法执行完后返回String submittedJar,标记该Topology是否已经提交过。该参数作为第5步中的submitTopologyWithOpts()参数

String uploadLocation = client.getClient().beginFileUpload();
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
client.getClient().finishFileUpload(uploadLocation);

其中用到了 backtype.storm.utils工具包中的类BufferFileInputStream.java

5,此时返回到StormSubmitter.java的submitTopology方法,执行

if(opts!=null) {
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);

} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, submittedJar, serConf, topology);


再来看看第5步中调用的方法,public class NimbusClient extends ThriftClient说明其实提交过程都是由Thrift封装好了的。

参考:/article/4893399.html

public void submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, org.apache.thrift.TException
{
send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
recv_submitTopologyWithOpts();
}

public void send_submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws org.apache.thrift.TException
{
submitTopologyWithOpts_args args = new submitTopologyWithOpts_args();
args.set_name(name);
args.set_uploadedJarLocation(uploadedJarLocation);
args.set_jsonConf(jsonConf);
args.set_topology(topology);
args.set_options(options);
sendBase("submitTopologyWithOpts", args);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: