您的位置:首页 > 其它

Thrift连接池实现

2016-01-03 21:15 357 查看
摘要: Thrift连接池实现

简介

Thrift是Facebook的核心框架之一,使不同的开发语言开发的系统可以通过该框架实现彼此的通信,类似于webservice,但是Thrift提供了近乎变态的效率和开发的方便性,是webservice所不能比拟的。给分布式开发带来了极大的方便。但是这柄利器也有一些不完美。

问题

首先文档相当的少,只有一个wiki网站提供相应的帮助。这对于Thrift的推广极为不利。

其次框架本身实现有一些缺陷,就Thrift的java部分来说,没有提供连接池的支持,对RPC的调用效率有所影响。

对于文档稀少的问题,只能是通过一些Thrift的开发者和使用者多供献一些自己的心得来解决。这得需要一个过程。而连接池的问题的解决则可以快速一些。

提到池一般做过Java开发的肯定会想到ObjectPool,Apache Commons项目确实给我们的开发得来了很大的便利性,其中的pool项目正是我们实现thrift连接池的基础,当然也少不了神器spring framework。

实现

一,定义thrift连接池接口

复制代码

ConnectionProvider

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->/*

* @(#)ConnectionProvider.java 0.1 05/11/17

*

* Copyright 2010 QISI, Inc. All rights reserved.

* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

*/

package com.qidea.thrift.pool;

import org.apache.thrift.transport.TSocket;

/**

*

*
@author sunwei

*
@version 2010-8-6

*
@since JDK1.5

*/

public interface ConnectionProvider

{

/**

* 取链接池中的一个链接

*

*
@return

*/

public TSocket getConnection();

/**

* 返回链接

*

* @param socket

*/

public void returnCon(TSocket socket);

}

复制代码

二,实现连接池

复制代码

GenericConnectionProvider

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->/*

* @(#)DefaultConnectionProviderImpl.java 0.1 05/11/17

*

* Copyright 2010 QISI, Inc. All rights reserved.

* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

*/

package com.qidea.thrift.pool;

import org.apache.commons.pool.ObjectPool;

import org.apache.commons.pool.impl.GenericObjectPool;

import org.apache.thrift.transport.TSocket;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.InitializingBean;

/**

*

* @author sunwei

* @version 2010-8-10

* @since JDK1.5

*/

public class GenericConnectionProvider implements ConnectionProvider,

InitializingBean, DisposableBean

{

public static final Logger logger = LoggerFactory

.getLogger(GenericConnectionProvider.class);

/** 服务的IP地址 */

private String serviceIP;

/** 服务的端口 */

private int servicePort;

/** 连接超时配置 */

private int conTimeOut;

/** 可以从缓存池中分配对象的最大数量 */

private int maxActive = GenericObjectPool.DEFAULT_MAX_ACTIVE;

/** 缓存池中最大空闲对象数量 */

private int maxIdle = GenericObjectPool.DEFAULT_MAX_IDLE;

/** 缓存池中最小空闲对象数量 */

private int minIdle = GenericObjectPool.DEFAULT_MIN_IDLE;

/** 阻塞的最大数量 */

private long maxWait = GenericObjectPool.DEFAULT_MAX_WAIT;

/** 从缓存池中分配对象,是否执行PoolableObjectFactory.validateObject方法 */

private boolean testOnBorrow = GenericObjectPool.DEFAULT_TEST_ON_BORROW;

private boolean testOnReturn = GenericObjectPool.DEFAULT_TEST_ON_RETURN;

private boolean testWhileIdle = GenericObjectPool.DEFAULT_TEST_WHILE_IDLE;

/** 对象缓存池 */

private ObjectPool objectPool = null;

/**

*

*/

@Override

public void afterPropertiesSet() throws Exception

{

// 对象池

objectPool = new GenericObjectPool();

//

((GenericObjectPool) objectPool).setMaxActive(maxActive);

((GenericObjectPool) objectPool).setMaxIdle(maxIdle);

((GenericObjectPool) objectPool).setMinIdle(minIdle);

((GenericObjectPool) objectPool).setMaxWait(maxWait);

((GenericObjectPool) objectPool).setTestOnBorrow(testOnBorrow);

((GenericObjectPool) objectPool).setTestOnReturn(testOnReturn);

((GenericObjectPool) objectPool).setTestWhileIdle(testWhileIdle);

((GenericObjectPool) objectPool)

.setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);

// 设置factory

ThriftPoolableObjectFactory thriftPoolableObjectFactory = new ThriftPoolableObjectFactory(

serviceIP, servicePort, conTimeOut);

objectPool.setFactory(thriftPoolableObjectFactory);

}

@Override

public void destroy()

{

try

{

objectPool.close();

}

catch (Exception e)

{

throw new RuntimeException("erorr destroy()", e);

}

}

@Override

public TSocket getConnection()

{

try

{

TSocket socket = (TSocket) objectPool.borrowObject();

return socket;

}

catch (Exception e)

{

throw new RuntimeException("error getConnection()", e);

}

}

@Override

public void returnCon(TSocket socket)

{

try

{

objectPool.returnObject(socket);

}

catch (Exception e)

{

throw new RuntimeException("error returnCon()", e);

}

}

public String getServiceIP()

{

return serviceIP;

}

public void setServiceIP(String serviceIP)

{

this.serviceIP = serviceIP;

}

public int getServicePort()

{

return servicePort;

}

public void setServicePort(int servicePort)

{

this.servicePort = servicePort;

}

public int getConTimeOut()

{

return conTimeOut;

}

public void setConTimeOut(int conTimeOut)

{

this.conTimeOut = conTimeOut;

}

public int getMaxActive()

{

return maxActive;

}

public void setMaxActive(int maxActive)

{

this.maxActive = maxActive;

}

public int getMaxIdle()

{

return maxIdle;

}

public void setMaxIdle(int maxIdle)

{

this.maxIdle = maxIdle;

}

public int getMinIdle()

{

return minIdle;

}

public void setMinIdle(int minIdle)

{

this.minIdle = minIdle;

}

public long getMaxWait()

{

return maxWait;

}

public void setMaxWait(long maxWait)

{

this.maxWait = maxWait;

}

public boolean isTestOnBorrow()

{

return testOnBorrow;

}

public void setTestOnBorrow(boolean testOnBorrow)

{

this.testOnBorrow = testOnBorrow;

}

public boolean isTestOnReturn()

{

return testOnReturn;

}

public void setTestOnReturn(boolean testOnReturn)

{

this.testOnReturn = testOnReturn;

}

public boolean isTestWhileIdle()

{

return testWhileIdle;

}

public void setTestWhileIdle(boolean testWhileIdle)

{

this.testWhileIdle = testWhileIdle;

}

public ObjectPool getObjectPool()

{

return objectPool;

}

public void setObjectPool(ObjectPool objectPool)

{

this.objectPool = objectPool;

}

}

复制代码

复制代码

ThriftPoolableObjectFactory

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->/*

* @(#)ThriftPoolableObjectFactory.java 0.1 05/11/17

*

* Copyright 2010 QISI, Inc. All rights reserved.

* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

*/

package com.qidea.thrift.pool;

import org.apache.commons.pool.PoolableObjectFactory;

import org.apache.thrift.transport.TSocket;

import org.apache.thrift.transport.TTransport;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

*

* @author sunwei

* @version 2010-8-10

* @since JDK1.5

*/

public class ThriftPoolableObjectFactory implements PoolableObjectFactory

{

/** 日志记录器 */

public static final Logger logger = LoggerFactory

.getLogger(ThriftPoolableObjectFactory.class);

/** 服务的IP */

private String serviceIP;

/** 服务的端口 */

private int servicePort;

/** 超时设置 */

private int timeOut;

/**

*

* @param serviceIP

* @param servicePort

* @param timeOut

*/

public ThriftPoolableObjectFactory(String serviceIP, int servicePort,

int timeOut)

{

this.serviceIP = serviceIP;

this.servicePort = servicePort;

this.timeOut = timeOut;

}

@Override

public void destroyObject(Object arg0) throws Exception

{

if (arg0 instanceof TSocket)

{

TSocket socket = (TSocket) arg0;

if (socket.isOpen())

{

socket.close();

}

}

}

/**

*

*/

@Override

public Object makeObject() throws Exception

{

try

{

TTransport transport = new TSocket(this.serviceIP,

this.servicePort, this.timeOut);

transport.open();

return transport;

}

catch (Exception e)

{

logger.error("error ThriftPoolableObjectFactory()", e);

throw new RuntimeException(e);

}

}

@Override

public boolean validateObject(Object arg0)

{

try

{

if (arg0 instanceof TSocket)

{

TSocket thriftSocket = (TSocket) arg0;

if (thriftSocket.isOpen())

{

return true;

}

else

{

return false;

}

}

else

{

return false;

}

}

catch (Exception e)

{

return false;

}

}

@Override

public void passivateObject(Object arg0) throws Exception

{

// DO NOTHING

}

@Override

public void activateObject(Object arg0) throws Exception

{

// DO NOTHING

}

public String getServiceIP()

{

return serviceIP;

}

public void setServiceIP(String serviceIP)

{

this.serviceIP = serviceIP;

}

public int getServicePort()

{

return servicePort;

}

public void setServicePort(int servicePort)

{

this.servicePort = servicePort;

}

public int getTimeOut()

{

return timeOut;

}

public void setTimeOut(int timeOut)

{

this.timeOut = timeOut;

}

}

复制代码

  三,定义连接的管理类 

复制代码

ConnectionManager

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->/*

* @(#)ConnectionManager.java 0.1 05/11/17

*

* Copyright 2010 QISI, Inc. All rights reserved.

* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

*/

package com.qidea.thrift.pool;

import org.aopalliance.intercept.MethodInterceptor;

import org.aopalliance.intercept.MethodInvocation;

import org.apache.thrift.transport.TSocket;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

*

* @author sunwei

* @version 2010-8-10

* @since JDK1.5

*/

public class ConnectionManager implements MethodInterceptor

{

/** 日志记录器 */

public Logger logger = LoggerFactory.getLogger(ConnectionManager.class);

/** 保存local对象 */

ThreadLocal<TSocket> socketThreadSafe = new ThreadLocal<TSocket>();

/** 连接提供池 */

public ConnectionProvider connectionProvider;

@Override

public Object invoke(MethodInvocation arg0) throws Throwable

{

TSocket socket = null;

try

{

socket = connectionProvider.getConnection();

socketThreadSafe.set(socket);

Object ret = arg0.proceed();

return ret;

}

catch (Exception e)

{

logger.error("error ConnectionManager.invoke()", e);

throw new Exception(e);

}

finally

{

connectionProvider.returnCon(socket);

socketThreadSafe.remove();

}

}

/**

* 取socket

*

* @return

*/

public TSocket getSocket()

{

return socketThreadSafe.get();

}

public ConnectionProvider getConnectionProvider()

{

return connectionProvider;

}

public void setConnectionProvider(ConnectionProvider connectionProvider)

{

this.connectionProvider = connectionProvider;

}

}

复制代码

四,定义spring配置,对受管的bean提供thrift连接

复制代码

Thrift连接池spring配置

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/--><?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:flex="http://www.springframework.org/schema/flex" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/flex http://www.springframework.org/schema/flex/spring-flex-1.0.xsd">

<!-- thrift连接池配置 -->

<bean id="connectionProvider" class="com.qidea.thrift.pool.GenericConnectionProvider">

<property name="serviceIP" value="localhost" />

<property name="servicePort" value="9090" />

<property name="maxActive" value="10" />

<property name="maxIdle" value="10" />

<property name="testOnBorrow" value="true" />

<property name="testOnReturn" value="true" />

<property name="testWhileIdle" value="true" />

<property name="conTimeOut" value="2000" />

</bean>

<!-- thrift连接管理配置 -->

<bean id="connectionManager" class="com.qidea.thrift.pool.ConnectionManager">

<property name="connectionProvider" ref="connectionProvider" />

</bean>

<!-- 客户端接口配置 -->

<bean class="com.qidea.pushserver.rpc.client.PushServiceClient">

<property name="connectionManager" ref="connectionManager" />

</bean>

<!-- thrift连接AOP配置 -->

<aop:config proxy-target-class="true">

<aop:pointcut id="clientMethods"

expression="execution(* com.qidea.pushserver.rpc.client.*.*(..))" />

<aop:advisor advice-ref="connectionManager" pointcut-ref="clientMethods" />

</aop:config>

</beans>

复制代码

五,使用连接池

复制代码

PushRPCClient

Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/-->/*

* @(#)PushRPCClient.java 0.1 05/11/17

*

* Copyright 2010 QISI, Inc. All rights reserved.

* QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.

*/

package com.qidea.pushserver.rpc;

import java.util.ArrayList;

import java.util.List;

import org.apache.thrift.TException;

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.thrift.protocol.TProtocol;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.qidea.pushserver.ServiceException;

import com.qidea.thrift.pool.ConnectionManager;

/**

*

* @author sunwei

* @version 2010-8-11

* @since JDK1.5

*/

public class PushRPCClient

{

public static Logger logger = LoggerFactory.getLogger(PushRPCClient.class);

private ConnectionManager connectionManager;

/**

* 取在线玩家列表

*

* @param roleIdList

* @return

* @throws ServiceException

*/

public List<Long> getOnLineRoleIdList(List<Long> roleIdList)

{

TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());

PushRPCService.Client client = new PushRPCService.Client(protocol);

try

{

List<Long> onLineIdList = client.getOnLineRoleIdList(roleIdList);

return onLineIdList;

}

catch (TException e)

{

logger.error("error getOnLineRoleIdList()", e);

}

return new ArrayList<Long>();

}

/**

* 解散联盟

*

* @param allianceId

*/

public void dismissAlliance(long allianceId)

{

TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());

PushRPCService.Client client = new PushRPCService.Client(protocol);

try

{

client.dismissAlliance(allianceId);

}

catch (TException e)

{

logger.error("error dismissAlliance()", e);

}

}

/**

* 加入联盟

*

* @param roleId

* @param allianceId

*/

public void joinAlliance(long roleId, long allianceId)

{

TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());

PushRPCService.Client client = new PushRPCService.Client(protocol);

try

{

client.joinAlliance(roleId, allianceId);

}

catch (TException e)

{

logger.error("error joinAlliance()", e);

}

}

/**

* 解散联盟

*

* @param roleId

* @param allianceId

*/

public void getOutAlliance(long roleId, long allianceId)

{

TProtocol protocol = new TBinaryProtocol(connectionManager.getSocket());

PushRPCService.Client client = new PushRPCService.Client(protocol);

try

{

client.getOutAlliance(roleId, allianceId);

}

catch (Exception e)

{

logger.error("error getOutAlliance()", e);

}

}

public ConnectionManager getConnectionManager()

{

return connectionManager;

}

public void setConnectionManager(ConnectionManager connectionManager)

{

this.connectionManager = connectionManager;

}

}

复制代码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: