您的位置:首页 > 其它

dubbo容器-客户端调用

2016-03-29 21:53 381 查看
1,在Dubbo使用中,客户端通过Dubbo容器来调用远程服务。在客户端调用中,通过如下代码引入远程Dubbo服务器:

<dubbo:reference id="itemManagement" interface="com.IItemManagemen" version="${items.dubbo.version}" check="false" async="true"></dubbo:reference>

引入后,在Spring中我们就可以像正常的Bean一样申明Dubbo提供的RPC对象,并调用方法获取返回值。

@Resourceprivate

 IItemManagement itemManagement

2,客户端调用,简单来说涉及到Bean的注入,注入时使用ReferenceBean来完成注入。具体调用getObject完成,我们跟踪该方法继续查看。具体的注入流程如下:

private void init() {

        if (initialized) {

            return;

        }

        initialized = true;

        if (interfaceName == null || interfaceName.length() == 0) {

            throw new IllegalStateException("<dubbo:reference
interface=\"\" /> interface not allow null!");

        }

        // 获取消费者全局配置

        checkDefault();

        appendProperties(this);

        if (getGeneric() == null && getConsumer() != null) {

            setGeneric(getConsumer().getGeneric());

        }

        if (ProtocolUtils.isGeneric(getGeneric())) {

            interfaceClass = GenericService.class;

        } else {

            try {

                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()

                        .getContextClassLoader());

            } catch (ClassNotFoundException e) {

                throw new IllegalStateException(e.getMessage(), e);

            }

            checkInterfaceAndMethods(interfaceClass, methods);

        }

        String resolve = System.getProperty(interfaceName);

        String resolveFile = null;

        if (resolve == null || resolve.length() == 0) {

            resolveFile = System.getProperty("dubbo.resolve.file");

            if (resolveFile == null || resolveFile.length() == 0) {

                File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");

                if (userResolveFile.exists()) {

                    resolveFile = userResolveFile.getAbsolutePath();

                }

            }

            if (resolveFile != null && resolveFile.length() > 0) {

                Properties properties = new Properties();

                FileInputStream fis = null;

                try {

                    fis = new FileInputStream(new File(resolveFile));

                    properties.load(fis);

                } catch (IOException e) {

                    throw new IllegalStateException("Unload
" + resolveFile + ", cause: " + e.getMessage(), e);

                } finally {

                    try {

                        if(null != fis) fis.close();

                    } catch (IOException e) {

                        logger.warn(e.getMessage(), e);

                    }

                }

                resolve = properties.getProperty(interfaceName);

            }

        }

        if (resolve != null && resolve.length() > 0) {

            url = resolve;

            if (logger.isWarnEnabled()) {

                if (resolveFile != null && resolveFile.length() > 0) {

                    logger.warn("Using default dubbo resolve file " + resolveFile + "
replace " + interfaceName + "" + resolve + "
to p2p invoke remote service.");

                } else {

                    logger.warn("Using -D" + interfaceName + "=" + resolve + "
to p2p invoke remote service.");

                }

            }

        }

        if (consumer != null) {

            if (application == null) {

                application = consumer.getApplication();

            }

            if (module == null) {

                module = consumer.getModule();

            }

            if (registries == null) {

                registries = consumer.getRegistries();

            }

            if (monitor == null) {

                monitor = consumer.getMonitor();

            }

        }

        if (module != null) {

            if (registries == null) {

                registries = module.getRegistries();

            }

            if (monitor == null) {

                monitor = module.getMonitor();

            }

        }

        if (application != null) {

            if (registries == null) {

                registries = application.getRegistries();

            }

            if (monitor == null) {

                monitor = application.getMonitor();

            }

        }

        checkApplication();

        checkStubAndMock(interfaceClass);

        Map<String, String> map = new HashMap<String, String>();

        Map<Object, Object> attributes = new HashMap<Object, Object>();

        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);

        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());

        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));

        if (ConfigUtils.getPid() > 0) {

            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));

        }

        if (! isGeneric()) {

            String revision = Version.getVersion(interfaceClass, version);

            if (revision != null && revision.length() > 0) {

                map.put("revision", revision);

            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();

            if(methods.length == 0) {

                logger.warn("NO method found in service interface " + interfaceClass.getName());

                map.put("methods", Constants.ANY_VALUE);

            }

            else {

                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));

            }

        }

        map.put(Constants.INTERFACE_KEY, interfaceName);

        appendParameters(map, application);

        appendParameters(map, module);

        appendParameters(map, consumer, Constants.DEFAULT_KEY);

        appendParameters(map, this);

        String prifix = StringUtils.getServiceKey(map);

        if (methods != null && methods.size() > 0) {

            for (MethodConfig method : methods) {

                appendParameters(map, method, method.getName());

                String retryKey = method.getName() + ".retry";

                if (map.containsKey(retryKey)) {

                    String retryValue = map.remove(retryKey);

                    if ("false".equals(retryValue)) {

                        map.put(method.getName() + ".retries", "0");

                    }

                }

                appendAttributes(attributes, method, prifix + "." + method.getName());

                checkAndConvertImplicitConfig(method, map, attributes);

            }

        }

        //attributes通过系统context进行存储.

        StaticContext.getSystemContext().putAll(attributes);

        ref = createProxy(map);

    }

具体的生成在createProxy方法中。

如下,我们查看对应通过注册中心配置URL。

} else { // 通过注册中心配置拼装URL

                List<URL> us = loadRegistries(false);

                if (us != null && us.size() > 0) {

                    for (URL u : us) {

                        URL monitorUrl = loadMonitor(u);

                        if (monitorUrl != null) {

                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));

                        }

                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));

                    }

                }

                if (urls == null || urls.size() == 0) {

                    throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to
your spring config.");

                }

            }

            if (urls.size() == 1) {

                invoker = refprotocol.refer(interfaceClass, urls.get(0));

            } else {

                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();

                URL registryURL = null;

                for (URL url : urls) {

                    invokers.add(refprotocol.refer(interfaceClass, url));

                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {

                        registryURL = url; // 用了最后一个registry url

                    }

                }

                if (registryURL != null) { // 有 注册中心协议的URL

                    // 对有注册中心的Cluster 只用 AvailableCluster

                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 

                    invoker = cluster.join(new StaticDirectory(u, invokers));

                }  else { // 不是 注册中心的URL

                    invoker = cluster.join(new StaticDirectory(invokers));

                }

            }

        }

        Boolean c = check;

        if (c == null && consumer != null) {

            c = consumer.isCheck();

        }

        if (c == null) {

            c = true; // default true

        }

        if (c && ! invoker.isAvailable()) {

            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the
url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());

        }

        if (logger.isInfoEnabled()) {

            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());

        }

        // 创建服务代理

        return (T) proxyFactory.getProxy(invoker);

以上可以大概看下Dubbo的一些基本生成思路,部分还不是很清楚,暂时先了解下基本实现思路吧,后续有时间可以深入了解。

3,客户端调用过程。

客户端通过Invoke来封装调用的参数,在ReferenceBean的初始化过程中我们可以看到对Invoke的初始化。

public interface Invoker<T> extends Node {

    /**

     * get service interface.

     * 

     * @return service interface.

     */

    Class<T> getInterface();

    /**

     * invoke.

     * 

     * @param invocation

     * @return result

     * @throws RpcException

     */

    Result invoke(Invocation invocation) throws RpcException;

具体的实现,可以查看MockClusterINvoke类,

public Result invoke(Invocation invocation) throws
RpcException {

        Result result = null;

        

        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 

        if (value.length() == 0 || value.equalsIgnoreCase("false")){

            //no mock

            result = this.invoker.invoke(invocation);

        } else if (value.startsWith("force")) {

            if (logger.isWarnEnabled()) {

                logger.info("force-mock: " + invocation.getMethodName() + "
force-mock enabled , url : " +  directory.getUrl());

            }

            //force:direct mock

            result = doMockInvoke(invocation, null);

        } else {

            //fail-mock

            try {

                result = this.invoker.invoke(invocation);

            }catch (RpcException e) {

                if (e.isBiz()) {

                    throw e;

                } else {

                    if (logger.isWarnEnabled()) {

                        logger.info("fail-mock: " + invocation.getMethodName() + "
fail-mock enabled , url : " +  directory.getUrl(), e);

                    }

                    result = doMockInvoke(invocation, e);

                }

            }

        }

        return result;

    }

此处对方法请求和返回结果进行了处理。具体的调用交由Invoke对象来实现。

 public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {

        this.directory = directory;

        this.invoker = invoker;

    }

Invoke的抽象实现在AbstractInvoke类中,

 public Result invoke(Invocation inv) throws RpcException {

        if(destroyed) {

            throw new RpcException("Rpc
invoker for service " + this + " on consumer " + NetUtils.getLocalHost() 

                                            + " use dubbo version " + Version.getVersion()

                                            + " is DESTROYED, can not be invoked any more!");

        }

        RpcInvocation invocation = (RpcInvocation) inv;

        invocation.setInvoker(this);

        if (attachment != null && attachment.size() > 0) {

            invocation.addAttachmentsIfAbsent(attachment);

        }

        Map<String, String> context = RpcContext.getContext().getAttachments();

        if (context != null) {

            invocation.addAttachmentsIfAbsent(context);

        }

        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){

            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());

        }

        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        

        

        try {

            return doInvoke(invocation);

        } catch (InvocationTargetException e) { //
biz exception

            Throwable te = e.getTargetException();

            if (te == null) {

                return new RpcResult(e);

            } else {

                if (te instanceof RpcException) {

                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);

                }

                return new RpcResult(te);

            }

        } catch (RpcException e) {

            if (e.isBiz()) {

                return new RpcResult(e);

            } else {

                throw e;

            }

        } catch (Throwable e) {

            return new RpcResult(e);

        }

    }

具体的doInvoke交由具体实现来完成。

我们继续查看MockClusterInvoke对构造方法,其中比较重要的另一个参数是Directory。

public interface Directory<T> extends Node {

    

    /**

     * get service type.

     * 

     * @return service type.

     */

    Class<T> getInterface();

    /**

     * list invokers.

     * 

     * @return invokers

     */

    List<Invoker<T>> list(Invocation
invocation) throws RpcException;

    
}

该接口可以获取待Invoker的列表,其抽象实现在AbstractDirectory。

/**

 * 增加router的Directory

 * 

 * @author chao.liuc

 */
public abstract class AbstractDirectory<T> implements
Directory<T> {

    // 日志输出

    private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);

    private final URL url ;

    

    private volatile boolean destroyed = false;

    private volatile URL consumerUrl ;

    

    private volatile List<Router> routers;

    

    public AbstractDirectory(URL url) {

        this(url, null);

    }

    

    public AbstractDirectory(URL url, List<Router> routers) {

        this(url, url, routers);

    }

    

    public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {

        if (url == null)

            throw new IllegalArgumentException("url
== null");

        this.url = url;

        this.consumerUrl = consumerUrl;

        setRouters(routers);

    }

    

    public List<Invoker<T>> list(Invocation
invocation) throws RpcException {

        if (destroyed){

            throw new RpcException("Directory
already destroyed .url: "+ getUrl());

        }

        List<Invoker<T>> invokers = doList(invocation);

        List<Router> localRouters = this.routers; //
local reference

        if (localRouters != null && localRouters.size() > 0) {

            for (Router router: localRouters){

                try {

                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {

                        invokers = router.route(invokers, getConsumerUrl(), invocation);

                    }

                } catch (Throwable t) {

                    logger.error("Failed to execute router: " + getUrl() + ",
cause: " + t.getMessage(), t);

                }

            }

        }

        return invokers;

    }

其doList方法交由具体的子类RegistryDirectory实现。

public RegistryDirectory(Class<T> serviceType, URL
url) {

        super(url);

        if(serviceType == null )

            throw new IllegalArgumentException("service
type is null.");

        if(url.getServiceKey() == null || url.getServiceKey().length() == 0)

            throw new IllegalArgumentException("registry
serviceKey is null.");

        this.serviceType = serviceType;

        this.serviceKey = url.getServiceKey();

        this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));

        this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);

        String group = directoryUrl.getParameter( Constants.GROUP_KEY, "" );

        this.multiGroup = group != null && ("*".equals(group) || group.contains( "," ));

        String methods = queryMap.get(Constants.METHODS_KEY);

        this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);

    }

该类实现了一些客户端调用时的的Invoker对象。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: