您的位置:首页 > 其它

Thrift

2015-10-31 10:01 1141 查看

Thrift

目前流行的服务调用方式有很多种,例如基于SOAP消息格式的Web Service,基于JSON消息格式的RESTful服务等。其中所用到的数据传输方式包括XML,JSON等,然而XML相对体积太大,传输效率低,JSON 体积较小,新颖,

但还不够完善。 Facebook 开发的远程服务调用框架 Apache Thrift,它采用接口描述语言定义并创建服务,支持可扩展的跨语言服务开发,所包含的代码生成引擎可以在多种语言中,如 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk 等创建高效的、无缝的服务,其传输数据采用二进制格式,相对 XML 和 JSON 体积更小,对于高并发、大数据量和多语言的环境更有优势。

首先Thrift是一种RPC,其次其可以做到跨语言调用服务,采用的传输格式可以是二进制,所以效率可以较高。

定义接口描述文件

Thrift采用IDL(Interface Definition Language)来描述接口,之后使用官方提供的编译工具(http://thrift.apache.org/download),可以将接口编译成指定的语言。

Thrift 脚本可定义的数据类型包括以下几种类型:

基本类型:
bool:布尔值,true 或 false
byte:8 位有符号整数
i16:16 位有符号整数
i32:32 位有符号整数
i64:64 位有符号整数
double:64 位浮点数
string:未知编码文本或二进制字符串

结构体类型:
struct:定义公共的对象
容器类型:
list
set
map
异常类型:
exception
服务类型:
service


这里举一个例子:

exception Xception {
1: i32 errorCode,
2: string message
}

service MyService {
string remoteCall(
1: string json_str,
) throws (
1: Xception err,
)

string ping()
}


上述代码定义了一个异常,之后定义一个服务。要注意的是thrift的接口定义的格式是:参数序号,参数类型,参数名称。

之后使用官方提供的thrift编译工具,编译出相应语言的代码文件。

thrift --gen <language> <Thrift filename>


这里编译成python的对应文件:

thrift --gen py my.thrift




主要看一下MyService.py文件

from thrift.Thrift import TType, TMessageType, TException, TApplicationException
import logging
from ttypes import *
from thrift.Thrift import TProcessor
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol, TProtocol
try:
from thrift.protocol import fastbinary
except:
fastbinary = None

#定义接口
class Iface:
def remoteCall(self, json_str):
"""
Parameters:
- json_str
"""
pass

def ping(self):
pass

#定义客户端操作
class Client(Iface):
def __init__(self, iprot, oprot=None):
self._iprot = self._oprot = iprot
if oprot is not None:
self._oprot = oprot
self._seqid = 0

def remoteCall(self, json_str):
"""
Parameters:
- json_str
"""
self.send_remoteCall(json_str)
return self.recv_remoteCall()

def send_remoteCall(self, json_str):
self._oprot.writeMessageBegin('remoteCall', TMessageType.CALL, self._seqid)
args = remoteCall_args()
args.json_str = json_str
args.write(self._oprot)
self._oprot.writeMessageEnd()
self._oprot.trans.flush()

def recv_remoteCall(self):
iprot = self._iprot
(fname, mtype, rseqid) = iprot.readMessageBegin()
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
x.read(iprot)
iprot.readMessageEnd()
raise x
result = remoteCall_result()
result.read(iprot)
iprot.readMessageEnd()
if result.success is not None:
return result.success
if result.err is not None:
raise result.err
raise TApplicationException(TApplicationException.MISSING_RESULT, "remoteCall failed: unknown result")

def ping(self):
self.send_ping()
return self.recv_ping()

def send_ping(self):
self._oprot.writeMessageBegin('ping', TMessageType.CALL, self._seqid)
args = ping_args()
args.write(self._oprot)
self._oprot.writeMessageEnd()
self._oprot.trans.flush()

def recv_ping(self):
iprot = self._iprot
(fname, mtype, rseqid) = iprot.readMessageBegin()
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
x.read(iprot)
iprot.readMessageEnd()
raise x
result = ping_result()
result.read(iprot)
iprot.readMessageEnd()
if result.success is not None:
return result.success
raise TApplicationException(TApplicationException.MISSING_RESULT, "ping failed: unknown result")

#定义服务端处理
class Processor(Iface, TProcessor):
def __init__(self, handler):
self._handler = handler  #这里需要传入一个handler,实际的处理在handler,handler留给使用者进行定义
self._processMap = {}
self._processMap["remoteCall"] = Processor.process_remoteCall
self._processMap["ping"] = Processor.process_ping

def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
if name not in self._processMap:
iprot.skip(TType.STRUCT)
iprot.readMessageEnd()
x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name))
oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid)
x.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
return
else:
self._processMap[name](self, seqid, iprot, oprot)
return True

def process_remoteCall(self, seqid, iprot, oprot):
args = remoteCall_args()
args.read(iprot)
iprot.readMessageEnd()
result = remoteCall_result()
try:
result.success = self._handler.remoteCall(args.json_str)  #服务器端处理
msg_type = TMessageType.REPLY
except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
raise
except Xception as err:
msg_type = TMessageType.REPLY
result.err = err
except Exception as ex:
msg_type = TMessageType.EXCEPTION
logging.exception(ex)
result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
oprot.writeMessageBegin("remoteCall", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()

def process_ping(self, seqid, iprot, oprot):
args = ping_args()
args.read(iprot)
iprot.readMessageEnd()
result = ping_result()
try:
result.success = self._handler.ping()
msg_type = TMessageType.REPLY
except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
raise
except Exception as ex:
msg_type = TMessageType.EXCEPTION
logging.exception(ex)
result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
oprot.writeMessageBegin("ping", msg_type, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()


自定义handler

class MY_Handler:
def ping(self):
return "pong"

def remoteCall(self, msg):
logger.info("Recv: [%s]" % msg)
data = json.loads(msg)
ret = CMD_EXCUTED_FAILED
cmdId = data.get("cmdId",None)
if cmdId:
handle = RPC_CMD_HANDLER_MAP.get(cmdId, None)
if handle:
ret = handle(data)
else:
logger.error("No handle found for cmd: %d" % cmdId)
else:
logger.warning("no cmdId found for this msg:%s" % msg)

logger.info("Ret: [%s]" % ret)
json_ret = {"result":ret}
return json.dumps(json_ret)


服务端代码

handler = MMY_Handler()
processor = MyService.Processor(handler) #自定义processor
transport = TSocket.TServerSocket("localhost", 9090) #链接类型,这里使用了socket
tfactory = TTransport.TBufferedTransportFactory() #传输层——传输层工厂,使用了带缓存传输层
pfactory = TBinaryProtocol.TBinaryProtocolFactory() #协议——传输协议工厂,这里使用了二进制传输协议
server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory) #sever
server.server()


这里步骤很清晰:需要设置的是链接类型,传输层,协议。

客户端代码

thrift_conf = settings.GOLDFISH['thrift']
transport = TSocket.TSocket(thrift_conf['host'], thrift_conf['port']) #链接
transport = TTransport.TBufferedTransport(transport)    #链接传输层包装
protocol = TBinaryProtocol.TBinaryProtocol(transport)   #协议
client = MyService.Client(protocol)
transport.open()

client.ping()


现在client就可以调用server端的接口了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: