您的位置:首页 > 编程语言

[Storm]用代码提交拓扑,自动从jar包中扫描拓扑实现接口

2016-11-22 15:48 567 查看

简化提交流程,用代码提交拓扑

1.为了规范下storm的jar包,先写了一个接口,必须实现这个接口的jar包才能自动提交。

/**
* Created by Mario on 2016/10/21 0021.
* 可提交拓扑接口
* 如果要使用client提交拓扑,请实现此接口
*/
@SuppressWarnings("unused")
public interface TopologySubmitable {
/**
* 拓扑名字
* @return 拓扑名字
*/
String name();
/**
* 创建拓扑
* @return 拓扑
*/
StormTopology createTopology();
}


代码提交拓扑的工具类storm已经提供了(org.apache.storm.utils.NimbusClient),只需要名字和具体的拓扑实例即可。

2.假定jar包已经实现了这个接口,我们也拿到了这个jar包,现在我们需要从jar中扫描出接口的实现类,来取得我们提交需要用的参数。

扫描类的代码取自:http://blog.csdn.net/sodino/article/details/19048493,感谢分享。

用这里的方法可以取到当前程序集加载的所有类,包括自己的包,引用的第三方包,以及JDK下的包。但并不能取到其他未引用的jar包。

这里我们需要重新设置下当前的Classloader,代码如下(完成扫描一共用了两个类,完整代码末尾会贴出)

URL[] urls = new URL[1];
urls[0] = new URL("file:/"+extraJarPath);
ClassLoader classloader = URLClassLoader.newInstance(urls);


extraJarPath即stormjar的路径,给个例子:

"E:/repo/Storm/storm/push/target/ppp-1.0.0-SNAPSHOT.jar"


就是将文件路径转为url,并以此创建新的URLClassLoader。最后:

Thread.currentThread().setContextClassLoader(classloader);


这样,classloader.getResources(…)就可以遍历到指定jar包中的类了。扫描出需要的类之后,提交过程就很简单了,下面附上完整代码

TopologySubmitClient

/**
* Created by Mario on 2016/10/21 0021.
* 拓扑提交客户端
*/
@SuppressWarnings("unused unchecked")
public class TopologySubmitClient {

/**
* 反射找到TopologySubmitable实现类
* 从jar包中加载,不使用Spring
*/

public static void submitTopology(String jarPath) {
try {
/**
* 从Factory获取实现类,有多个拓扑时,多次提交
*/
TopologySubmitFactory topologySubmitFactory = new TopologySubmitFactory(jarPath);
List<Class<TopologySubmitable>> list = topologySubmitFactory.scanImplOfInterface(TopologySubmitable.class);
list.stream().forEach(obj -> {
try {
TopologySubmitable ts = obj.newInstance();

/**
* 集群中的storm.yaml配置文件必须在classpath下(src/main/resources)
* 根据配置初始化一个Nimbus.Client
*/
Map defaultConf = Utils.readStormConfig();
NimbusClient nimbus = NimbusClient.getConfiguredClient(defaultConf);
/**
* 先关闭拓扑
*/
KillOptions killOptions = new KillOptions();
killOptions.set_wait_secs(5);
nimbus.getClient().killTopologyWithOpts(ts.name(), killOptions);

Thread.sleep(5000);

/**
* 配置Worker数目为1
* 获取submittedJar路径
* 配置文件转化为json
*/
Config topologyConf = new Config();
topologyConf.setNumWorkers(1);
defaultConf.putAll(topologyConf);
String submittedJar = StormSubmitter.submitJar(defaultConf, jarPath);
String jsonConfig = JSONValue.toJSONString(defaultConf);
/**
* 提交
*/
nimbus.getClient().submitTopology(ts.name(), submittedJar, jsonConfig, ts.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
topologySubmitFactory.close();
});

} catch (Exception e) {
e.printStackTrace();
}
}
}


TopologySubmitFactory

/**
* Created by Mario on 2016/10/21 0021.
* 拓扑提交工厂类
*/
public class TopologySubmitFactory {

private String packageRoot;
private ClassLoader classloader;
private ClassLoader oldLoader;
/**
* 构造拓扑提交工厂类
* @param extraJarPath 需要额外加载的Jar包
* @throws MalformedURLException
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
*/
public TopologySubmitFactory(String... extraJarPath) throws MalformedURLException, ClassNotFoundException, IllegalAccessException, InstantiationException {
//保留原loader
this.oldLoader = Thread.currentThread().getContextClassLoader();
this.packageRoot = "com.mario";
URL[] urls = new URL[extraJarPath.length];
for(int i=0;i<extraJarPath.length;i++){
urls[i] = new URL("file:/"+extraJarPath[i]);
}
classloader = URLClassLoader.newInstance(urls);
Thread.currentThread().setContextClassLoader(classloader);
}

/**
* 取得指定接口下所有实现类
*
* @param api 指定interface变量
* @param <T> 指定interface类型
* @return 所有实现类
* @throws Exception
*/
public <T> List<Class<T>> scanImplOfInterface(Class<T> api) throws Exception {
List<Class<T>> result = new ArrayList<>();
getClasses().stream().filter(c -> api.isAssignableFrom(c)
&& !Modifier.isAbstract(c.getModifiers())
&& !Modifier.isInterface(c.getModifiers()))
.forEach(c -> result.add((Class<T>) c));
return result;
}

/**
* 取得所有类
*
* @return class list
* @throws IOException
* @throws ClassNotFoundException
*/
public List<Class<?>> getClasses() throws IOException,
ClassNotFoundException {
Enumeration<URL> urls = classloader.getResources(this.packageRoot.replaceAll("\\.", "/"));

List<Class<?>> result = new ArrayList<>();

while (urls.hasMoreElements()) {
URL url = urls.nextElement();
String protocol = url.getProtocol();
if ("file".equals(protocol)) {
//本地自己可见的代码
result.addAll(getClasses(new File(url.getFile()), packageRoot));
} else if ("jar".equals(protocol)) {
//引用第三方jar的代码
result.addAll(getClassesFromJar(url, packageRoot));
}
}
return result;
}

/**
* 查找jar包中的类
*
* @param url         url
* @param packageRoot pk
* @return class list
*/
private List<? extends Class<?>> getClassesFromJar(URL url, String packageRoot) throws IOException, ClassNotFoundException {

List<Class<?>> result = new ArrayList<>();

JarURLConnection jarURLConnection = (JarURLConnection) url.openConnection();
JarFile jarFile = jarURLConnection.getJarFile();
Enumeration<JarEntry> jarEntries = jarFile.entries();
while (jarEntries.hasMoreElements()) {
JarEntry jarEntry = jarEntries.nextElement();
String jarEntryName = jarEntry.getName();
if(!jarEntry.isDirectory() && jarEntryName.endsWith(".class")){
String clazzName = jarEntryName.replace("/", ".").replaceAll("\\.class","");
if(clazzName.startsWith(packageRoot)){
Class<?> clazz = classloader.loadClass(clazzName);
result.add(clazz);
}
}
}

return result;
}

/**
* 获取所有本地类
*
* @param dir 目录
* @param pk  包名
* @return class list
* @throws ClassNotFoundException
*/
private List<Class<?>> getClasses(File dir, String pk) throws ClassNotFoundException {
List<Class<?>> classes = new ArrayList<>();
if (!dir.exists()) {
return classes;
}
for (File f : dir.listFiles()) {
if (f.isDirectory()) {
if (!pk.equals("")) {
classes.addAll(getClasses(f, pk + "." + f.getName()));
} else {
classes.addAll(getClasses(f, f.getName()));
}
}
String name = f.getName();
if (name.endsWith(".class")) {
classes.add(Class.forName(pk + "." + name.substring(0, name.length() - 6)));
}
}
return classes;
}

/**
* 关闭工程,恢复classloader
*/
public void close(){
Thread.currentThread().setContextClassLoader(this.oldLoader);
}
}


测试的小例子

/**
* Created by Mario on 2016/10/21 0021.
* test
*/
public class Main {
public static void main(String[] args) throws Exception {
TopologySubmitClient.submitTopology("E:/repo/Storm/storm/push/target/push-1.0.0-SNAPSHOT.jar");
System.out.println("OVER !!");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息