您的位置:首页 > 编程语言 > Java开发

dubbo源码解析(二): dubbo服务的启动

2017-05-24 11:27 519 查看
在上一篇文章中http://blog.csdn.net/zhangw1236/article/details/65630952 给大家分析了dubbo的扩展点加载机制,在dubbo整体的框架中,扩展点加载是他的核心部分。本篇文章将带着大家看一下dubbo服务的启动过程,是怎么一步步将每个人写的不同的业务逻辑暴露出去的。这里我们使用dubbo源码自带的dubbo-demo-provider和dubbo-demo-consumer作为例子。

dubbo-demo-provider的业务代码只有一个接口和一个实现

public interface DemoService {
String sayHello(String name);
}

public class DemoServiceImpl implements DemoService {
public String sayHello(String name) {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();
}
}


作为服务提供方,也就是server端,逻辑非常简单:收到服务消费方也就是client端的消息后,打印消息,并回复response。这部分就是业务代码,剩下的只需要提供一些配置,dubbo就可以帮你把这个服务暴露出去。

下面我们来看看main函数:

private static final ExtensionLoader<Container> loader = ExtensionLoader.getExtensionLoader(Container.class);

public static void main(String[] args) {
try {
if (args == null || args.length == 0) {
//读取配置
String config = ConfigUtils.getProperty(CONTAINER_KEY, loader.getDefaultExtensionName());
args = Constants.COMMA_SPLIT_PATTERN.split(config);
}

final List<Container> containers = new ArrayList<Container>();
//根据container名称,获取指定的扩展点
for (int i = 0; i < args.length; i ++) {
containers.add(loader.getExtension(args[i]));
}
logger.info("Use container type(" + Arrays.toString(args) + ") to run dubbo serivce.");
//程序退出时,关闭container
if ("true".equals(System.getProperty(SHUTDOWN_HOOK_KEY))) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
for (Container container : containers) {
try {
container.stop();
logger.info("Dubbo " + container.getClass().getSimpleName() + " stopped!");
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
synchronized (Main.class) {
running = false;
Main.class.notify();
}
}
}
});
}
//启动具体的container
for (Container container : containers) {
System.out.println("Dubbo " + container.getClass().getSimpleName() + " starting...");
container.start();
logger.info("Dubbo " + container.getClass().getSimpleName() + " started!");
}
System.out.println(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss]").format(new Date()) + " Dubbo service server started!");
} catch (RuntimeException e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
System.exit(1);
}
synchronized (Main.class) {
while (running) {
try {
Main.class.wait();
} catch (Throwable e) {
}
}
}
}


配置文件:

dubbo.container=log4j,spring
dubbo.application.name=demo-provider
dubbo.application.owner=
dubbo.registry.address=zookeeper://127.0.0.1:2181
dubbo.monitor.protocol=registry
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880
dubbo.service.loadbalance=roundrobin
dubbo.log4j.file=logs/dubbo-demo-provider.log
dubbo.log4j.level=INFO


dubbo的启动是包含在容器(container)中的,这里配置的容器是spring,从配置文件中获取具体的container后,从ExtensionLoader中加载指定的扩展点,之后调用start方法启动container。我们来看一下spring container的start函数。

public void start() {
String configPath = ConfigUtils.getProperty(SPRING_CONFIG);
if (configPath == null || configPath.length() == 0) {
configPath = DEFAULT_SPRING_CONFIG;
}
context = new ClassPathXmlApplicationContext(configPath.split("[,\\s]+"));
context.start();
}


其实就是启动一个spring的上下文,之后就进入spring的内容了,因为涉及到了spring自定义schemas相关的知识点,在这里也顺带提一下。先来看看spring的配置文件dubbo-demo-provider.xml

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />

<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />

</beans>


dubbo自定义的标签dubbo:service,该标签的定义在dubbo.xsd中,对于自定义的spring xsd文件,需要实现一些接口去让spring解析具体的schemas,具体来说需要实现NamespaceHandlerSupport这个接口:

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}

public void init() {
//实现标签和对应的parser的映射
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}


以及一个Bean的解析接口,这里实现的是BeanDefinitionParser,具体的实现类为DubboBeanDefinitionParser

同时提供两个配置文件:

spring.handlers:

http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler


spring.schemas:

http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd


这样spring就会知道dubbo.xsd对应的handler为DubboNamespaceHandler

我们现在回到spring container的start函数,spring启动后加载配置文件调用DubboNamespaceHandler的init函数,由于配置文件中只定义了dubbo:service,所以这里只会解析service

registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));


之后调用DubboBeanDefinitionParser的parse方法:

BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required);


这里只截取解析ServiceBean的部分

if (ServiceBean.class.equals(beanClass)) {
String className = element.getAttribute("class");
if(className != null && className.length() > 0) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
}


当beanDefinition被返回后,spring就会加载ServiceBean并注入从spring的配置文件读取的service属性,即本文最开始的com.alibaba.dubbo.demo.DemoService。ServiceBean这个class中有三个比较重要的属性:

// 接口类型
private String              interfaceName;
// 接口class对象
private Class<?>            interfaceClass;
// 接口实现类引用
private T                   ref;


对应本文的例子,interfaceName为com.alibaba.dubbo.demo.DemoService,interfaceClass为DemoService的class对象,ref为DemoServiceImpl对象。这些值都是通过parse()函数注入到ServiceBean中的

我们也顺便来看一下spring初始化一个bean的步骤,下面这张图来自于《spring实战》



具体步骤如下:

1.Spring对bean进行实例化;

2.Spring将值和bean的引用注入到bean对应的属性中;

3.如果bean实现了BeanNameAware接口,Spring将bean的ID传递给setBeanName()方法;

4.如果bean实现了BeanFactoryAware接口,Spring将调用setBeanFactory()方法,将

BeanFactory容器实例传入;

5.如果bean实现了ApplicationContextAware接口,Spring将调

用setApplicationContext()方法,将bean所在的应用上下文的引用传入进来;

6.如果bean实现了BeanPostProcessor接口,Spring将调用它们的post-

ProcessBeforeInitialization()方法;

7.如果bean实现了InitializingBean接口,Spring将调用它们的after

PropertiesSet()方法。类似地,如果bean使用init-method声明了初始化方法,该方法

也会被调用;

8.如果bean实现了BeanPostProcessor接口,Spring将调用它们的post

ProcessAfterInitialization()方法;

9.此时,bean已经准备就绪,可以被应用程序使用了,它们将一直驻留在应用上下文中,直到

该应用上下文被销毁;

10.如果bean实现了DisposableBean接口,Spring将调用它的destroy()接口方法。同样,

如果bean使用destroy-method声明了销毁方法,该方法也会被调用。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware


ServiceBean实现了BeanNameAware、ApplicationContextAware和InitializingBean接口,将依次调用setBeanName()、setApplicationContext()和afterPropertiesSet()方法,我们重点看一下afterPropertiesSet()方法,该方法的主要功能是查找provider(服务提供方)、application(应用配置)、module(模块配置)、registries(注册中心)、monitor(监控中心)和protocols(RPC协议)等配置信息。由于在dubbo-demo-provider这个例子的spring配置文件中只配置了service信息,没有包含上述的配置,因此所有的判null条件均为false,这样就直接进入了export()函数。

该demo采用的是dubbo.properties配置,此外在dubbo的官网中也提供了spring的配置方法,大家也可以了解一下。感觉在实际应用的时候,用得比较多的还是spring的配置,这也是dubbo官网推荐的配置方式。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd        http://code.alibabatech.com/schema/dubbo        http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="hello-world-app"  />

<!-- 使用multicast广播注册中心暴露服务地址 -->
<dubbo:registry address="multicast://224.5.6.7:1234" />

<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880" />

<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />

<!-- 和本地bean一样实现服务 -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />

</beans>


这时,afterPropertiesSet()就会从spring的bean中获取配置信息,由于afterPropertiesSet()函数代码过长,这里仅贴出加载application的代码重点分析一下,其他配置的加载与其类似。

if (getApplication() == null
&& (getProvider() == null || getProvider().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}


这里调用了BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);从spring的bean容器中找到相应的配置。

我们接着上文,如果采用的是配置文件dubbo.properties,那么直接进入export()函数,在export()函数中,由于provider的配置还未加载,于是直接进入doExport()函数。

在doExport()函数中,会调用一系列的checkDefault()、checkApplication()、checkRegistry()和checkProtocol()从配置文件中加载provider、application、registry和protocol的配置信息,并注入到ServiceBean中。之后生成业务逻辑的class对象

try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);


interfaceName即为之前注入的com.alibaba.dubbo.demo.DemoService,最后调用doExportUrls()

private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}


doExportUrlsFor1Protocol()函数开始真正地创建server,该函数的代码也很长,具体分为以下几个步骤:

1、获取本机地址,当获取到的地址是本地地址时,发起与注册中心的socket连接,之后获得真实地址

String host = protocolConfig.getHost();
if (provider != null && (host == null || host.length() == 0)) {
host = provider.getHost();
}
boolean anyhost = false;
//此时host为null
if (NetUtils.isInvalidLocalHost(host)) {
anyhost = true;
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
//此时获取到的地址为127.0.0.1
if (NetUtils.isInvalidLocalHost(host)) {
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
try {
Socket socket = new Socket();
try {
SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
socket.connect(addr, 1000);
//这里获得真实的地址
host = socket.getLocalAddress().getHostAddress();
break;
} finally {
try {
socket.close();
} catch (Throwable e) {}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
if (NetUtils.isInvalidLocalHost(host)) {
host = NetUtils.getLocalHost();
}
}
}


2、获取server的port,这里直接取配置里的20880,如果没有配置也会有默认的端口

3、创建一个map,存储配置信息,将之前获取到的application,module,provider和protocol等信息存入map,供后面生成URL使用

Map<String, String> map = new HashMap<String, String>();
if (anyhost) {
map.put(Constants.ANYHOST_KEY, "true");
}
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);


4、处理method标签,这里给出dubbo官网对method的解释:

dubbo:method

方法级配置:

配置类:com.alibaba.dubbo.config.MethodConfig

说明:该标签为或的子标签,用于控制到方法级

就是为某个具体的方式设置一些特有属性,包括超时时间、重试次数等等,由于这里没有配置method属性,此步骤跳过。

5、将rpc的具体方法放入配置map中

//获取方法名
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
//将方法名放入map
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}


6、将map里的参数生成url

URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);


完整的url的值为:dubbo://172.28.114.45:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.5.3&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&pid=2164&revision=2.5.3&side=provider×tamp=1495791242011

该url会被注册到注册中心(这里是zookeeper)的节点上。

7、暴露服务

//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
//加载monitor的url
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//生成invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//生成exporter
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}


我们来重点说一下invoker和exporter,此处调用的是proxyFactory的getInvoker,proxyFactory的值为:

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();


看过前一篇文章的朋友们应该知道,这里获取的是ProxyFactory的自适应扩展点,由于ProxyFactory没有带有Adaptive注解的扩展类,所以自动生成了一个AdaptiveExtension,来看一下AdaptiveExtension的getInvoker函数:

public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1,
com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
//default的值为javassist
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
+ url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}


可以看到自适应的扩展点其实是调用了javassist扩展点getInvoker,JavassistProxyFactory的getInvoker函数生成了一个invoker:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}


引用dubbo官网的关于invoker的解释:

Invoker是实体域,它是Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

通俗了说,就是包装了你的业务代码的一个调用接口。

最后就是调用protocol的export()函数生成一个exporter,这里protocol的实现类为DubboProtocol,至此一个服务就被暴露出去可以被远程调用了。关于DubboProtocol类中export的细节,也就是socket server的启动,包括handler的处理过程,我们将在后续的文章中继续介绍。

这里介绍的是provider的启动过程,consumer的启动配置的dubbo标签为reference,从ReferenceBean开始启动流程。

最后,贴一下dubbo官网上给出的启动流程图,对照着流程图看代码,思路会更加清晰一些。



服务提供方服务暴露时序图,本文只介绍到了protocol的export,后续将会继续介绍后面的部分。



服务消费方引用服务时序图。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  源码 dubbo java