[Storm]用代码提交拓扑,自动从jar包中扫描拓扑实现接口
2016-11-22 15:48
567 查看
简化提交流程,用代码提交拓扑
1.为了规范下storm的jar包,先写了一个接口,必须实现这个接口的jar包才能自动提交。/** * Created by Mario on 2016/10/21 0021. * 可提交拓扑接口 * 如果要使用client提交拓扑,请实现此接口 */ @SuppressWarnings("unused") public interface TopologySubmitable { /** * 拓扑名字 * @return 拓扑名字 */ String name(); /** * 创建拓扑 * @return 拓扑 */ StormTopology createTopology(); }
代码提交拓扑的工具类storm已经提供了(org.apache.storm.utils.NimbusClient),只需要名字和具体的拓扑实例即可。
2.假定jar包已经实现了这个接口,我们也拿到了这个jar包,现在我们需要从jar中扫描出接口的实现类,来取得我们提交需要用的参数。
扫描类的代码取自:http://blog.csdn.net/sodino/article/details/19048493,感谢分享。
用这里的方法可以取到当前程序集加载的所有类,包括自己的包,引用的第三方包,以及JDK下的包。但并不能取到其他未引用的jar包。
这里我们需要重新设置下当前的Classloader,代码如下(完成扫描一共用了两个类,完整代码末尾会贴出)
URL[] urls = new URL[1]; urls[0] = new URL("file:/"+extraJarPath); ClassLoader classloader = URLClassLoader.newInstance(urls);
extraJarPath即stormjar的路径,给个例子:
"E:/repo/Storm/storm/push/target/ppp-1.0.0-SNAPSHOT.jar"
就是将文件路径转为url,并以此创建新的URLClassLoader。最后:
Thread.currentThread().setContextClassLoader(classloader);
这样,classloader.getResources(…)就可以遍历到指定jar包中的类了。扫描出需要的类之后,提交过程就很简单了,下面附上完整代码。
TopologySubmitClient
/** * Created by Mario on 2016/10/21 0021. * 拓扑提交客户端 */ @SuppressWarnings("unused unchecked") public class TopologySubmitClient { /** * 反射找到TopologySubmitable实现类 * 从jar包中加载,不使用Spring */ public static void submitTopology(String jarPath) { try { /** * 从Factory获取实现类,有多个拓扑时,多次提交 */ TopologySubmitFactory topologySubmitFactory = new TopologySubmitFactory(jarPath); List<Class<TopologySubmitable>> list = topologySubmitFactory.scanImplOfInterface(TopologySubmitable.class); list.stream().forEach(obj -> { try { TopologySubmitable ts = obj.newInstance(); /** * 集群中的storm.yaml配置文件必须在classpath下(src/main/resources) * 根据配置初始化一个Nimbus.Client */ Map defaultConf = Utils.readStormConfig(); NimbusClient nimbus = NimbusClient.getConfiguredClient(defaultConf); /** * 先关闭拓扑 */ KillOptions killOptions = new KillOptions(); killOptions.set_wait_secs(5); nimbus.getClient().killTopologyWithOpts(ts.name(), killOptions); Thread.sleep(5000); /** * 配置Worker数目为1 * 获取submittedJar路径 * 配置文件转化为json */ Config topologyConf = new Config(); topologyConf.setNumWorkers(1); defaultConf.putAll(topologyConf); String submittedJar = StormSubmitter.submitJar(defaultConf, jarPath); String jsonConfig = JSONValue.toJSONString(defaultConf); /** * 提交 */ nimbus.getClient().submitTopology(ts.name(), submittedJar, jsonConfig, ts.createTopology()); } catch (Exception e) { e.printStackTrace(); } topologySubmitFactory.close(); }); } catch (Exception e) { e.printStackTrace(); } } }
TopologySubmitFactory
/** * Created by Mario on 2016/10/21 0021. * 拓扑提交工厂类 */ public class TopologySubmitFactory { private String packageRoot; private ClassLoader classloader; private ClassLoader oldLoader; /** * 构造拓扑提交工厂类 * @param extraJarPath 需要额外加载的Jar包 * @throws MalformedURLException * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException */ public TopologySubmitFactory(String... extraJarPath) throws MalformedURLException, ClassNotFoundException, IllegalAccessException, InstantiationException { //保留原loader this.oldLoader = Thread.currentThread().getContextClassLoader(); this.packageRoot = "com.mario"; URL[] urls = new URL[extraJarPath.length]; for(int i=0;i<extraJarPath.length;i++){ urls[i] = new URL("file:/"+extraJarPath[i]); } classloader = URLClassLoader.newInstance(urls); Thread.currentThread().setContextClassLoader(classloader); } /** * 取得指定接口下所有实现类 * * @param api 指定interface变量 * @param <T> 指定interface类型 * @return 所有实现类 * @throws Exception */ public <T> List<Class<T>> scanImplOfInterface(Class<T> api) throws Exception { List<Class<T>> result = new ArrayList<>(); getClasses().stream().filter(c -> api.isAssignableFrom(c) && !Modifier.isAbstract(c.getModifiers()) && !Modifier.isInterface(c.getModifiers())) .forEach(c -> result.add((Class<T>) c)); return result; } /** * 取得所有类 * * @return class list * @throws IOException * @throws ClassNotFoundException */ public List<Class<?>> getClasses() throws IOException, ClassNotFoundException { Enumeration<URL> urls = classloader.getResources(this.packageRoot.replaceAll("\\.", "/")); List<Class<?>> result = new ArrayList<>(); while (urls.hasMoreElements()) { URL url = urls.nextElement(); String protocol = url.getProtocol(); if ("file".equals(protocol)) { //本地自己可见的代码 result.addAll(getClasses(new File(url.getFile()), packageRoot)); } else if ("jar".equals(protocol)) { //引用第三方jar的代码 result.addAll(getClassesFromJar(url, packageRoot)); } } return result; } /** * 查找jar包中的类 * * @param url url * @param packageRoot pk * @return class list */ private List<? extends Class<?>> getClassesFromJar(URL url, String packageRoot) throws IOException, ClassNotFoundException { List<Class<?>> result = new ArrayList<>(); JarURLConnection jarURLConnection = (JarURLConnection) url.openConnection(); JarFile jarFile = jarURLConnection.getJarFile(); Enumeration<JarEntry> jarEntries = jarFile.entries(); while (jarEntries.hasMoreElements()) { JarEntry jarEntry = jarEntries.nextElement(); String jarEntryName = jarEntry.getName(); if(!jarEntry.isDirectory() && jarEntryName.endsWith(".class")){ String clazzName = jarEntryName.replace("/", ".").replaceAll("\\.class",""); if(clazzName.startsWith(packageRoot)){ Class<?> clazz = classloader.loadClass(clazzName); result.add(clazz); } } } return result; } /** * 获取所有本地类 * * @param dir 目录 * @param pk 包名 * @return class list * @throws ClassNotFoundException */ private List<Class<?>> getClasses(File dir, String pk) throws ClassNotFoundException { List<Class<?>> classes = new ArrayList<>(); if (!dir.exists()) { return classes; } for (File f : dir.listFiles()) { if (f.isDirectory()) { if (!pk.equals("")) { classes.addAll(getClasses(f, pk + "." + f.getName())); } else { classes.addAll(getClasses(f, f.getName())); } } String name = f.getName(); if (name.endsWith(".class")) { classes.add(Class.forName(pk + "." + name.substring(0, name.length() - 6))); } } return classes; } /** * 关闭工程,恢复classloader */ public void close(){ Thread.currentThread().setContextClassLoader(this.oldLoader); } }
测试的小例子
/** * Created by Mario on 2016/10/21 0021. * test */ public class Main { public static void main(String[] args) throws Exception { TopologySubmitClient.submitTopology("E:/repo/Storm/storm/push/target/push-1.0.0-SNAPSHOT.jar"); System.out.println("OVER !!"); } }
相关文章推荐
- Android studio gradle依赖,依赖爆红,fileTree一句代码实现项目lib下的所有jar包全部自动依赖
- Ubuntu Server 中实现自动部署提交到SVN的代码
- Java传入用户名和密码并自动提交表单实现登录到其他系统的实例代码
- django ajax提交评论并自动刷新功能的实现代码
- storm学习之接口实现及代码发布集群
- [ci] 基于1 上文实现拉取代码后能自动触发sonar-runner实现代码扫描评测,job1完成
- Storm之——如何提交拓扑或Jar包到集群
- 禁用Enter键表单自动提交实现代码
- jsp实现后台提交编程代码自动生成器
- 禁用Enter键表单自动提交实现代码
- storm IDEA maven 打相关依赖jar包---用于提交拓扑
- Ubuntu Server 中实现自动部署提交到SVN的代码
- 实现提交代码github自动通知jenkins编译部署
- gitlab实现提交后线上自动更新代码
- git实现代码提交自动部署到相应的web服务器
- Java传入用户名和密码并自动提交表单实现登录到其他系统的实例代码
- webhook实现提交代码自动触发Jenkins构建任务(三)
- 在winfrom下利用c#代码,实现kindEditor的JavaScript方法:editor.html(),实现上报窗体的自动提交。
- 获取指定包名下继承或者实现某接口的所有类(扫描文件目录和所有jar)
- expect 实现 自动打包和提交代码到github