dubbo容器-客户端调用
2016-03-29 21:53
381 查看
1,在Dubbo使用中,客户端通过Dubbo容器来调用远程服务。在客户端调用中,通过如下代码引入远程Dubbo服务器:
<dubbo:reference id="itemManagement" interface="com.IItemManagemen" version="${items.dubbo.version}" check="false" async="true"></dubbo:reference>
引入后,在Spring中我们就可以像正常的Bean一样申明Dubbo提供的RPC对象,并调用方法获取返回值。
@Resourceprivate
IItemManagement itemManagement
2,客户端调用,简单来说涉及到Bean的注入,注入时使用ReferenceBean来完成注入。具体调用getObject完成,我们跟踪该方法继续查看。具体的注入流程如下:
private void init() {
if (initialized) {
return;
}
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference
interface=\"\" /> interface not allow null!");
}
// 获取消费者全局配置
checkDefault();
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
resolveFile = userResolveFile.getAbsolutePath();
}
}
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload
" + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if(null != fis) fis.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
resolve = properties.getProperty(interfaceName);
}
}
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
if (resolveFile != null && resolveFile.length() > 0) {
logger.warn("Using default dubbo resolve file " + resolveFile + "
replace " + interfaceName + "" + resolve + "
to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + "
to p2p invoke remote service.");
}
}
}
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
checkApplication();
checkStubAndMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (! isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prifix = StringUtils.getServiceKey(map);
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
appendAttributes(attributes, method, prifix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}
//attributes通过系统context进行存储.
StaticContext.getSystemContext().putAll(attributes);
ref = createProxy(map);
}
具体的生成在createProxy方法中。
如下,我们查看对应通过注册中心配置URL。
} else { // 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to
your spring config.");
}
}
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the
url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
以上可以大概看下Dubbo的一些基本生成思路,部分还不是很清楚,暂时先了解下基本实现思路吧,后续有时间可以深入了解。
3,客户端调用过程。
客户端通过Invoke来封装调用的参数,在ReferenceBean的初始化过程中我们可以看到对Invoke的初始化。
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
具体的实现,可以查看MockClusterINvoke类,
public Result invoke(Invocation invocation) throws
RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")){
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + "
force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + "
fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
此处对方法请求和返回结果进行了处理。具体的调用交由Invoke对象来实现。
public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
this.directory = directory;
this.invoker = invoker;
}
Invoke的抽象实现在AbstractInvoke类中,
public Result invoke(Invocation inv) throws RpcException {
if(destroyed) {
throw new RpcException("Rpc
invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { //
biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
具体的doInvoke交由具体实现来完成。
我们继续查看MockClusterInvoke对构造方法,其中比较重要的另一个参数是Directory。
public interface Directory<T> extends Node {
/**
* get service type.
*
* @return service type.
*/
Class<T> getInterface();
/**
* list invokers.
*
* @return invokers
*/
List<Invoker<T>> list(Invocation
invocation) throws RpcException;
}
该接口可以获取待Invoker的列表,其抽象实现在AbstractDirectory。
/**
* 增加router的Directory
*
* @author chao.liuc
*/
public abstract class AbstractDirectory<T> implements
Directory<T> {
// 日志输出
private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);
private final URL url ;
private volatile boolean destroyed = false;
private volatile URL consumerUrl ;
private volatile List<Router> routers;
public AbstractDirectory(URL url) {
this(url, null);
}
public AbstractDirectory(URL url, List<Router> routers) {
this(url, url, routers);
}
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null)
throw new IllegalArgumentException("url
== null");
this.url = url;
this.consumerUrl = consumerUrl;
setRouters(routers);
}
public List<Invoker<T>> list(Invocation
invocation) throws RpcException {
if (destroyed){
throw new RpcException("Directory
already destroyed .url: "+ getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; //
local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router: localRouters){
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ",
cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
其doList方法交由具体的子类RegistryDirectory实现。
public RegistryDirectory(Class<T> serviceType, URL
url) {
super(url);
if(serviceType == null )
throw new IllegalArgumentException("service
type is null.");
if(url.getServiceKey() == null || url.getServiceKey().length() == 0)
throw new IllegalArgumentException("registry
serviceKey is null.");
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
String group = directoryUrl.getParameter( Constants.GROUP_KEY, "" );
this.multiGroup = group != null && ("*".equals(group) || group.contains( "," ));
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
}
该类实现了一些客户端调用时的的Invoker对象。
<dubbo:reference id="itemManagement" interface="com.IItemManagemen" version="${items.dubbo.version}" check="false" async="true"></dubbo:reference>
引入后,在Spring中我们就可以像正常的Bean一样申明Dubbo提供的RPC对象,并调用方法获取返回值。
@Resourceprivate
IItemManagement itemManagement
2,客户端调用,简单来说涉及到Bean的注入,注入时使用ReferenceBean来完成注入。具体调用getObject完成,我们跟踪该方法继续查看。具体的注入流程如下:
private void init() {
if (initialized) {
return;
}
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference
interface=\"\" /> interface not allow null!");
}
// 获取消费者全局配置
checkDefault();
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
resolveFile = userResolveFile.getAbsolutePath();
}
}
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload
" + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if(null != fis) fis.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
resolve = properties.getProperty(interfaceName);
}
}
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
if (resolveFile != null && resolveFile.length() > 0) {
logger.warn("Using default dubbo resolve file " + resolveFile + "
replace " + interfaceName + "" + resolve + "
to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + "
to p2p invoke remote service.");
}
}
}
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
checkApplication();
checkStubAndMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (! isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prifix = StringUtils.getServiceKey(map);
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
appendAttributes(attributes, method, prifix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}
//attributes通过系统context进行存储.
StaticContext.getSystemContext().putAll(attributes);
ref = createProxy(map);
}
具体的生成在createProxy方法中。
如下,我们查看对应通过注册中心配置URL。
} else { // 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to
your spring config.");
}
}
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the
url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
以上可以大概看下Dubbo的一些基本生成思路,部分还不是很清楚,暂时先了解下基本实现思路吧,后续有时间可以深入了解。
3,客户端调用过程。
客户端通过Invoke来封装调用的参数,在ReferenceBean的初始化过程中我们可以看到对Invoke的初始化。
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
具体的实现,可以查看MockClusterINvoke类,
public Result invoke(Invocation invocation) throws
RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")){
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + "
force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + "
fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
此处对方法请求和返回结果进行了处理。具体的调用交由Invoke对象来实现。
public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
this.directory = directory;
this.invoker = invoker;
}
Invoke的抽象实现在AbstractInvoke类中,
public Result invoke(Invocation inv) throws RpcException {
if(destroyed) {
throw new RpcException("Rpc
invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { //
biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
具体的doInvoke交由具体实现来完成。
我们继续查看MockClusterInvoke对构造方法,其中比较重要的另一个参数是Directory。
public interface Directory<T> extends Node {
/**
* get service type.
*
* @return service type.
*/
Class<T> getInterface();
/**
* list invokers.
*
* @return invokers
*/
List<Invoker<T>> list(Invocation
invocation) throws RpcException;
}
该接口可以获取待Invoker的列表,其抽象实现在AbstractDirectory。
/**
* 增加router的Directory
*
* @author chao.liuc
*/
public abstract class AbstractDirectory<T> implements
Directory<T> {
// 日志输出
private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);
private final URL url ;
private volatile boolean destroyed = false;
private volatile URL consumerUrl ;
private volatile List<Router> routers;
public AbstractDirectory(URL url) {
this(url, null);
}
public AbstractDirectory(URL url, List<Router> routers) {
this(url, url, routers);
}
public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null)
throw new IllegalArgumentException("url
== null");
this.url = url;
this.consumerUrl = consumerUrl;
setRouters(routers);
}
public List<Invoker<T>> list(Invocation
invocation) throws RpcException {
if (destroyed){
throw new RpcException("Directory
already destroyed .url: "+ getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; //
local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router: localRouters){
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ",
cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
其doList方法交由具体的子类RegistryDirectory实现。
public RegistryDirectory(Class<T> serviceType, URL
url) {
super(url);
if(serviceType == null )
throw new IllegalArgumentException("service
type is null.");
if(url.getServiceKey() == null || url.getServiceKey().length() == 0)
throw new IllegalArgumentException("registry
serviceKey is null.");
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
String group = directoryUrl.getParameter( Constants.GROUP_KEY, "" );
this.multiGroup = group != null && ("*".equals(group) || group.contains( "," ));
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
}
该类实现了一些客户端调用时的的Invoker对象。
相关文章推荐
- python调用caffe接口进行classify时提示Mean shape incompatible with input shape错误的解决方法
- c++ 笔记explicit
- 向FTP服务器上传下载工具类
- CMakeListx.txt 编辑语法学习
- 数据库的访问实现过程
- C++ 第二次上机实验报告
- jQuery源码浅析2–奇技淫巧
- 【必做2】结对编程—词频统计
- 内存中对象实例化细节
- 软考中高项学员:2016年3月26日作业
- VS 调用 Matlab (混合编程)
- python常见笔试题
- Dir命令
- 前端文本截断
- 复利计算单元测试
- Dubbo容器--Provider
- Sinacloud Cron
- 101.LeetCode Symmetric Tree(easy)[递归 二叉树]
- JSP九大对象
- dubbo容器-注册中心