您的位置:首页 > 大数据

streamsets Data Collecotor启动流程分析

2016-12-16 11:24 429 查看
exec ${JAVA}

       -classpath  ${BOOTSTRAP_CLASSPATH} 

       ${SDC_JAVA_OPTS}

       com.streamsets.pipeline.BootstrapMain \

       -mainClass

      ${SDC_MAIN_CLASS}

      -apiClasspath "${API_CLASSPATH}"

      -containerClasspath "${CONTAINER_CLASSPATH}" \

      -streamsetsLibrariesDir "${STREAMSETS_LIBRARIES_DIR}"

      -userLibrariesDir "${USER_LIBRARIES_DIR}"

      -configDir ${SDC_CONF} \

      -libsCommonLibDir "${LIBS_COMMON_LIB_DIR}" ${EXTRA_OPTIONS}

注意这里SDC_JAVA_OPTS

SDC_JAVA_OPTS=${SDC_JAVA_OPTS:="-Xmx1024m"}

SDC_MAIN_CLASS=${SDC_MAIN_CLASS:="com.streamsets.datacollector.main.DataCollectorMain"}

SDC_SECURITY_MANAGER_ENABLED=${SDC_SECURITY_MANAGER_ENABLED:="true"}

SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Dsdc.dist.dir=${SDC_DIST}"

SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Dsdc.resources.dir=${SDC_RESOURCES}"

SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Dsdc.hostname=${SDC_HOSTNAME}"

SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Dsdc.conf.dir=${SDC_CONF}"

SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Dsdc.data.dir=${SDC_DATA}"

SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -Dsdc.log.dir=${SDC_LOG}"

BOOTSTRAP_JAR="${SDC_DIST}/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-2.2.0.0.jar"

SDC_JAVA_OPTS="${SDC_JAVA_OPTS} -javaagent:${BOOTSTRAP_JAR}"

使用了-javaagent选项,关于javaAgent的介绍,这里有篇文章http://blog.csdn.net/ykdsg/article/details/12080071

大意是指你应用程序的入口类的Main方法执行之前,先执行javaAgent参数制定的jar包里MANIFEST.MF中敲定的类的premain方法

AgentJAR即BOOTSTRAP_JAR="${SDC_DIST}/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-2.2.0.0.jar"

MANIFEST.MF的内容如下: 

Manifest-Version: 1.0
Premain-Class: com.streamsets.pipeline.BootstrapMain
Archiver-Version: Plexus Archiver
Built-By: jenkins
Created-By: Apache Maven 3.3.9
Build-Jdk: 1.8.0_112

可以看到,agent类和应用程序类都是同一个com.streamsets.pipeline.BootstrapMain

当在命令行启动该代理jar时,VM会根据manifest中指定的代理类,使用于main类相同的系统类加载器(即ClassLoader.getSystemClassLoader()获得的加载器)加载代理类。在执行main方法前执行premain()方法。

public static void premain(String args, Instrumentation instrumentation) {
if (BootstrapMain.instrumentation == null) {
BootstrapMain.instrumentation = instrumentation;
} else {
throw new IllegalStateException("Premain method cannot be called twice (" + BootstrapMain.instrumentation + ")");
}
}
其中方法参数args即命令中的options,类型为String(注意不是String[]),因此如果需要多个参数,需要在方法中自己处理(比如用";"分割多个参数之类);inst是运行时由VM自动传入的Instrumentation实例,可以用于获取VM信息

那么这里只是简单的将instrumentation对象hold住,并没有做其它操作,因为BootstrapMain只是负责bootstrap的任务,那么真正的应用入口类是通过-mainClass指定的,

mainClass即  com.streamsets.datacollector.main.DataCollectorMain ,instrumentation是BootstrapMain通过反射传递给主类的setContext方法,

public static void setContext(ClassLoader apiCL, ClassLoader containerCL,
List<? extends ClassLoader> moduleCLs, Instrumentation instrumentation) {
MemoryUsageCollector.initialize(instrumentation);
RuntimeModule.setStageLibraryClassLoaders(moduleCLs);
}

如果继续追踪代码,会发现主要使用
instrumentation.getObjectSize(obj)方法获取对象的内存占用情况,以便精确控制pipeline内存占用情况。 javaAgent参数的使用,目的简单说就是为了获取到instrumentation对象,然后用这个对象做一些事情。

bootstrapMain除了充当javaAgent,那么它还负责把各种classpath汇总好,作为参数传递给DataCollectorMain,因为dataCollector下面有很多的jar包,而streamsets支持hbase,hadoop,kafka等系统多种版本的客户端,而这种客户端都是运行在同一个jvm里面,开发同学都知道jar包依赖冲突的问题,跟nifi一样,streamsets给pipeline上面没一个stage配置了一个classloader,具体是怎么做的呢?

String apiClasspath = null;
String containerClasspath = null;
String streamsetsLibrariesDir = null;
String streamsetsLibrariesExtraDir = null;
String userLibrariesDir = null;
String configDir = null;
String libsCommonLibDir = null;

那么主要分上面几类路径,里面分别存放了各个阶段需要的jar包
List<URL> apiUrls = getClasspathUrls(apiClasspath);
List<URL> containerUrls = getClasspathUrls(containerClasspath);

上面首先获取到api和container的所有jar包的位置集合

├── api-lib

│   ├── annotations-java5-15.0.jar

│   ├── log4j-1.2.17.jar

│   ├── metrics-core-3.1.0.jar

│   ├── slf4j-api-1.7.7.jar

│   ├── slf4j-log4j12-1.7.7.jar

│   └── streamsets-datacollector-api-2.2.0.0.jar



├── container-lib

│   ├── activation-1.1.jar

│   ├── annotations-2.0.1.jar

│   ├── aopalliance-repackaged-2.4.0-b34.jar

│   ├── asm-5.0.3.jar

│   ├── commons-beanutils-1.9.3.jar

│   ├── commons-codec-1.10.jar

│   ├── commons-collections-3.2.2.jar

│   ├── commons-compress-1.10.jar

│   ├── commons-configuration2-2.0.jar

│   ├── commons-el-1.0.jar

│   ├── commons-io-2.4.jar

│   ├── commons-lang3-3.3.2.jar

│   ├── commons-logging-1.2.jar

│   ├── commons-pool2-2.4.1.jar

│   ├── dagger-1.2.2.jar

│   ├── google-http-client-1.22.0.jar

│   ├── google-http-client-jackson2-1.22.0.jar

│   ├── guava-18.0.jar

│   ├── hk2-api-2.4.0-b34.jar

│   ├── hk2-locator-2.4.0-b34.jar

│   ├── hk2-utils-2.4.0-b34.jar

│   ├── httpclient-4.5.2.jar

│   ├── httpcore-4.4.4.jar

│   ├── jackson-annotations-2.4.3.jar

│   ├── jackson-core-2.4.3.jar

│   ├── jackson-databind-2.4.3.jar

│   ├── jackson-dataformat-xml-2.4.3.jar

│   ├── jackson-dataformat-yaml-2.4.5.jar

│   ├── jackson-datatype-joda-2.4.3.jar

│   ├── jackson-jaxrs-base-2.5.4.jar

│   ├── jackson-jaxrs-json-provider-2.5.4.jar

│   ├── jackson-module-jaxb-annotations-2.4.3.jar

│   ├── javassist-3.19.0-GA.jar

│   ├── javax.annotation-api-1.2.jar

│   ├── javax.inject-1.jar

│   ├── javax.inject-2.4.0-b34.jar

│   ├── javax.servlet-api-3.1.0.jar

│   ├── javax.ws.rs-api-2.0.1.jar

│   ├── jersey-client-2.22.2.jar

│   ├── jersey-common-2.22.2.jar

│   ├── jersey-container-servlet-2.22.2.jar

│   ├── jersey-container-servlet-core-2.22.2.jar

│   ├── jersey-entity-filtering-2.22.2.jar

│   ├── jersey-guava-2.22.2.jar

│   ├── jersey-media-jaxb-2.22.2.jar

│   ├── jersey-media-json-jackson-2.22.2.jar

│   ├── jersey-media-multipart-2.22.2.jar

│   ├── jersey-server-2.22.2.jar

│   ├── jetty-client-9.2.17.v20160517.jar

│   ├── jetty-continuation-9.2.17.v20160517.jar

│   ├── jetty-http-9.2.17.v20160517.jar

│   ├── jetty-io-9.2.17.v20160517.jar

│   ├── jetty-jaas-9.2.17.v20160517.jar

│   ├── jetty-rewrite-9.2.17.v20160517.jar

│   ├── jetty-security-9.2.17.v20160517.jar

│   ├── jetty-server-9.2.17.v20160517.jar

│   ├── jetty-servlet-9.2.17.v20160517.jar

│   ├── jetty-servlets-9.2.17.v20160517.jar

│   ├── jetty-util-9.2.17.v20160517.jar

│   ├── jetty-webapp-9.2.17.v20160517.jar

│   ├── jetty-xml-9.2.17.v20160517.jar

│   ├── joda-time-2.3.jar

│   ├── jsp-api-2.0.jar

│   ├── jsr305-3.0.0.jar

│   ├── jtar-2.2.jar

│   ├── kryo-3.0.1.jar

│   ├── mail-1.4.7.jar

│   ├── metrics-json-3.1.0.jar

│   ├── metrics-jvm-3.1.0.jar

│   ├── mimepull-1.9.6.jar

│   ├── minlog-1.3.0.jar

│   ├── objenesis-2.1.jar

│   ├── osgi-resource-locator-1.0.1.jar

│   ├── reflectasm-1.10.1.jar

│   ├── reflections-0.9.9.jar

│   ├── selma-0.13.jar

│   ├── snakeyaml-1.12.jar

│   ├── snappy-0.4.jar

│   ├── stax2-api-3.1.4.jar

│   ├── streamsets-datacollector-common-2.2.0.0.jar

│   ├── streamsets-datacollector-container-2.2.0.0.jar

│   ├── streamsets-datacollector-container-common-2.2.0.0.jar

│   ├── streamsets-datacollector-json-dto-2.2.0.0.jar

│   ├── streamsets-datacollector-messaging-client-2.2.0.0.jar

│   ├── streamsets-sso-2.2.0.0.jar

│   ├── streamsets-utils-2.2.0.0.jar

│   ├── swagger-annotations-1.5.3.jar

│   ├── swagger-core-1.5.3.jar

│   ├── swagger-jaxrs-1.5.3.jar

│   ├── swagger-jersey2-jaxrs-1.5.3.jar

│   ├── swagger-models-1.5.3.jar

│   ├── validation-api-1.1.0.Final.jar

│   ├── websocket-api-9.2.17.v20160517.jar

│   ├── websocket-client-9.2.17.v20160517.jar

│   ├── websocket-common-9.2.17.v20160517.jar

│   ├── websocket-server-9.2.17.v20160517.jar

│   └── websocket-servlet-9.2.17.v20160517.jar

下面加载user和system stageLibs

这里是有白名单控制的:

在etc的sdc.properties中配置了黑白名单,注意黑白名单不能共存,一般配置黑名单,即从制定目录扫描到的全部libs中排除掉黑名单好了

#system.stagelibs.whitelist=
system.stagelibs.blacklist=\
streamsets-datacollector-mapr_5_0-lib,\
streamsets-datacollector-mapr_5_1-lib,\
streamsets-datacollector-mapr_5_2-lib,\
streamsets-datacollector-apache-solr_6_1_0-lib
#
#user.stagelibs.whitelist=
#user.stagelibs.blacklist=

steamsets里面可以运行很多的pipeline,整个jvm其实是当作一个容器,那么containerClassLoader就负责整个容器的启动,相应的其它classloader的作用顾名思义

// Bootstrap container
Thread.currentThread().setContextClassLoader(containerCL);
Class klass = containerCL.loadClass(mainClass);
Method method = klass.getMethod(SET_CONTEXT_METHOD, ClassLoader.class, ClassLoader.class, List.class,
Instrumentation.class);
method.invoke(null, apiCL, containerCL, stageLibrariesCLs, instrumentation);
method = klass.getMethod(MAIN_METHOD, String[].class);
method.invoke(null, new Object[]{new String[]{}});
apiCL, containerCL, stageLibrariesCLs
这三个通过setContext方法完成初始化,传递给DataCollectorMain

不过,从执行方法看, public static void setContext(ClassLoader apiCL, ClassLoader containerCL,
List<? extends ClassLoader> moduleCLs, Instrumentation instrumentation) {
MemoryUsageCollector.initialize(instrumentation);
RuntimeModule.setStageLibraryClassLoaders(moduleCLs);
}

似乎,apiClassLoader和containerClassLoader什么也没干,直接扔了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息