您的位置:首页 > 编程语言 > Java开发

Thrift-client与Spring集成,并使用连接池提高新能

2016-06-01 15:02 537 查看
参考:  http://shift-alt-ctrl.iteye.com/blog/1990030?utm_source=tuicool&utm_medium=referral
    Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:

    1) client端需要知道server端的IP + port,如果是分布式部署,还需要知道所有server的IP + port列表.  

    2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案.

    3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利.

    4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip + port,可以使用zookeeper来推送每个service的服务地址.

    5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。

 

1. pom.xml

Java代码  


<dependencies>  

    <dependency>  

        <groupId>org.springframework</groupId>  

        <artifactId>spring-context</artifactId>  

        <version>3.0.7.RELEASE</version>  

    </dependency>  

    <dependency>  

        <groupId>org.apache.zookeeper</groupId>  

        <artifactId>zookeeper</artifactId>  

        <version>3.4.5</version>  

        <!--<exclusions>-->  

            <!--<exclusion>-->  

                <!--<groupId>log4j</groupId>-->  

                <!--<artifactId>log4j</artifactId>-->  

            <!--</exclusion>-->  

        <!--</exclusions>-->  

    </dependency>  

    <!--  

    <dependency>  

        <groupId>com.101tec</groupId>  

        <artifactId>zkclient</artifactId>  

        <version>0.4</version>  

    </dependency>  

    -->  

    <dependency>  

        <groupId>org.apache.thrift</groupId>  

        <artifactId>libthrift</artifactId>  

        <version>0.9.1</version>  

    </dependency>  

    <dependency>  

        <groupId>org.apache.curator</groupId>  

        <artifactId>curator-recipes</artifactId>  

        <version>2.3.0</version>  

    </dependency>  

    <dependency>  

        <groupId>commons-pool</groupId>  

        <artifactId>commons-pool</artifactId>  

        <version>1.6</version>  

    </dependency>  

  

</dependencies>  

2. spring-thrift-client.xml

    其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.

Java代码  


<!-- fixedAddress -->  

<!--    

<bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">  

    <property name="service" value="com.demo.service.UserService"></property>  

    <property name="serverAddress" value="127.0.0.1:9090:2"></property>  

    <property name="maxActive" value="5"></property>  

    <property name="idleTime" value="10000"></property>  

</bean>  

-->  

<!-- zookeeper -->  

<bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">  

    <property name="connectString" value="127.0.0.1:2181"></property>  

    <property name="namespace" value="demo/thrift-service"></property>  

</bean>  

<bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">  

    <property name="service" value="com.demo.service.UserService"></property>  

    <property name="maxActive" value="5"></property>  

    <property name="idleTime" value="1800000"></property>  

    <property name="addressProvider">  

 
291fd
       <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">  

            <property name="configPath" value="UserServiceImpl"></property>  

            <property name="zookeeper" ref="thriftZookeeper"></property>  

        </bean>  

    </property>  

</bean>  

3. ThriftServiceClientProxyFactory.java

    因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".  

Java代码  


@SuppressWarnings("rawtypes")  

public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {  

  

    private String service;  

  

    private String serverAddress;  

      

    private Integer maxActive = 32;//最大活跃连接数  

      

    ////ms,default 3 min,链接空闲时间  

    //-1,关闭空闲检测  

    private Integer idleTime = 180000;  

    private ThriftServerAddressProvider addressProvider;  

  

    private Object proxyClient;  

      

  

    public void setMaxActive(Integer maxActive) {  

        this.maxActive = maxActive;  

    }  

  

  

    public void setIdleTime(Integer idleTime) {  

        this.idleTime = idleTime;  

    }  

  

  

    public void setService(String service) {  

        this.service = service;  

    }  

  

  

    public void setServerAddress(String serverAddress) {  

        this.serverAddress = serverAddress;  

    }  

  

  

    public void setAddressProvider(ThriftServerAddressProvider addressProvider) {  

        this.addressProvider = addressProvider;  

    }  

  

    private Class objectClass;  

      

    private GenericObjectPool<TServiceClient> pool;  

      

    private PoolOperationCallBack callback = new PoolOperationCallBack() {  

          

        @Override  

        public void make(TServiceClient client) {  

            System.out.println("create");  

              

        }  

          

        @Override  

        public void destroy(TServiceClient client) {  

            System.out.println("destroy");  

              

        }  

    };  

  

    @Override  

    public void afterPropertiesSet() throws Exception {  

        if(serverAddress != null){  

            addressProvider = new FixedAddressProvider(serverAddress);  

        }  

        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  

        //加载Iface接口  

        objectClass = classLoader.loadClass(service + "$Iface");  

        //加载Client.Factory类  

        Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");  

        TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  

        ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);  

        GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  

        poolConfig.maxActive = maxActive;  

        poolConfig.minIdle = 0;  

        poolConfig.minEvictableIdleTimeMillis = idleTime;  

        poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;  

        pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);  

        proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {  

            @Override  

            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  

                //  

                TServiceClient client = pool.borrowObject();  

                try{  

                    return method.invoke(client, args);  

                }catch(Exception e){  

                    throw e;  

                }finally{  

                    pool.returnObject(client);  

                }  

            }  

        });  

    }  

  

    @Override  

    public Object getObject() throws Exception {  

        return proxyClient;  

    }  

  

    @Override  

    public Class<?> getObjectType() {  

        return objectClass;  

    }  

  

    @Override  

    public boolean isSingleton() {  

        return true;  //To change body of implemented methods use File | Settings | File Templates.  

    }  

      

    public void close(){  

        if(addressProvider != null){  

            addressProvider.close();  

        }  

    }  

}  

4. ThriftClientPoolFactory.java

    "Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.

Java代码  


/** 

 * 连接池,thrift-client for spring 

 */  

public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{  

  

    private final ThriftServerAddressProvider addressProvider;  

      

    private final TServiceClientFactory<TServiceClient> clientFactory;  

      

    private PoolOperationCallBack callback;  

  

    protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  

        this.addressProvider = addressProvider;  

        this.clientFactory = clientFactory;  

    }  

      

    protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {  

        this.addressProvider = addressProvider;  

        this.clientFactory = clientFactory;  

        this.callback = callback;  

    }  

  

  

  

    @Override  

    public TServiceClient makeObject() throws Exception {  

        InetSocketAddress address = addressProvider.selector();  

        TSocket tsocket = new TSocket(address.getHostName(),address.getPort());  

        TProtocol protocol = new TBinaryProtocol(tsocket);  

        TServiceClient client = this.clientFactory.getClient(protocol);  

        tsocket.open();  

        if(callback != null){  

            try{  

                callback.make(client);  

            }catch(Exception e){  

                //  

            }  

        }  

        return client;  

    }  

  

    public void destroyObject(TServiceClient client) throws Exception {  

        if(callback != null){  

            try{  

                callback.destroy(client);  

            }catch(Exception e){  

                //  

            }  

        }  

        TTransport pin = client.getInputProtocol().getTransport();  

        pin.close();  

    }  

  

    public boolean validateObject(TServiceClient client) {  

        TTransport pin = client.getInputProtocol().getTransport();  

        return pin.isOpen();  

    }  

      

    static interface PoolOperationCallBack {  

        //销毁client之前执行  

        void destroy(TServiceClient client);  

        //创建成功是执行  

        void make(TServiceClient client);  

    }  

  

}  

5. DynamicAddressProvider.java

    将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.

Java代码  


/** 

 * 可以动态获取address地址,方案设计参考 

 * 1) 可以间歇性的调用一个web-service来获取地址 

 * 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等) 

 * 3) 可以基于zookeeper-watcher机制,获取最新地址 

 * <p/> 

 * 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 

 * 如下实现,仅供参考 

 */  

public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {  

  

    private String configPath;  

  

    private PathChildrenCache cachedPath;  

  

    private CuratorFramework zookeeper;  

      

    //用来保存当前provider所接触过的地址记录  

    //当zookeeper集群故障时,可以使用trace中地址,作为"备份"  

    private Set<String> trace = new HashSet<String>();  

  

    private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  

  

    private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  

      

    private Object lock = new Object();  

      

    private static final Integer DEFAULT_PRIORITY = 1;  

  

    public void setConfigPath(String configPath) {  

        this.configPath = configPath;  

    }  

  

    public void setZookeeper(CuratorFramework zookeeper) {  

        this.zookeeper = zookeeper;  

    }  

  

    @Override  

    public void afterPropertiesSet() throws Exception {  

        //如果zk尚未启动,则启动  

        if(zookeeper.getState() == CuratorFrameworkState.LATENT){  

            zookeeper.start();  

        }  

        buildPathChildrenCache(zookeeper, configPath, true);  

        cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  

    }  

  

    private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {  

        cachedPath = new PathChildrenCache(client, path, cacheData);  

        cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  

            @Override  

            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  

                PathChildrenCacheEvent.Type eventType = event.getType();  

                switch (eventType) {  

//                    case CONNECTION_RECONNECTED:  

//                          

//                        break;  

                    case CONNECTION_SUSPENDED:  

                    case CONNECTION_LOST:  

                        System.out.println("Connection error,waiting...");  

                        return;  

                    default:  

                        //  

                }  

                //任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.  

                cachedPath.rebuild();  

                rebuild();  

            }  

              

            protected void rebuild() throws Exception {  

                List<ChildData> children = cachedPath.getCurrentData();  

                if (children == null || children.isEmpty()) {  

                    //有可能所有的thrift server都与zookeeper断开了链接  

                    //但是,有可能,thrift client与thrift server之间的网络是良好的  

                    //因此此处是否需要清空container,是需要多方面考虑的.  

                    container.clear();  

                    System.out.println("thrift server-cluster error....");  

                    return;  

                }  

                List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  

                for (ChildData data : children) {  

                    String address = new String(data.getData(), "utf-8");  

                    current.addAll(transfer(address));  

                    trace.add(address);  

                }  

                Collections.shuffle(current);  

                synchronized (lock) {  

                    container.clear();  

                    container.addAll(current);  

                    inner.clear();  

                    inner.addAll(current);  

                      

                }  

            }  

        });  

    }  

      

      

      

    private List<InetSocketAddress> transfer(String address){  

        String[] hostname = address.split(":");  

        Integer priority = DEFAULT_PRIORITY;  

        if (hostname.length == 3) {  

            priority = Integer.valueOf(hostname[2]);  

        }  

        String ip = hostname[0];  

        Integer port = Integer.valueOf(hostname[1]);  

        List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  

        for (int i = 0; i < priority; i++) {  

            result.add(new InetSocketAddress(ip, port));  

        }  

        return result;  

    }  

  

  

    @Override  

    public List<InetSocketAddress> getAll() {  

        return Collections.unmodifiableList(container);  

    }  

  

    @Override  

    public synchronized InetSocketAddress selector() {  

        if (inner.isEmpty()) {  

            if(!container.isEmpty()){  

                inner.addAll(container);  

            }else if(!trace.isEmpty()){  

                synchronized (lock) {  

                    for(String hostname : trace){  

                        container.addAll(transfer(hostname));  

                    }  

                    Collections.shuffle(container);  

                    inner.addAll(container);  

                }  

            }  

        }  

        return inner.poll();//null  

    }  

  

  

    @Override  

    public void close() {  

        try {  

            cachedPath.close();  

            zookeeper.close();  

        } catch (Exception e) {  

            //  

        }  

    }  

}  

    到此为止,我们的Thrift基本上就可以顺利运行起来了.更多代码,参见附件.

    Thrift-server端开发与配置,参见[Thrift-server] 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: