Thrift
2015-10-31 10:01
1141 查看
Thrift
目前流行的服务调用方式有很多种,例如基于SOAP消息格式的Web Service,基于JSON消息格式的RESTful服务等。其中所用到的数据传输方式包括XML,JSON等,然而XML相对体积太大,传输效率低,JSON 体积较小,新颖,但还不够完善。 Facebook 开发的远程服务调用框架 Apache Thrift,它采用接口描述语言定义并创建服务,支持可扩展的跨语言服务开发,所包含的代码生成引擎可以在多种语言中,如 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk 等创建高效的、无缝的服务,其传输数据采用二进制格式,相对 XML 和 JSON 体积更小,对于高并发、大数据量和多语言的环境更有优势。
首先Thrift是一种RPC,其次其可以做到跨语言调用服务,采用的传输格式可以是二进制,所以效率可以较高。
定义接口描述文件
Thrift采用IDL(Interface Definition Language)来描述接口,之后使用官方提供的编译工具(http://thrift.apache.org/download),可以将接口编译成指定的语言。Thrift 脚本可定义的数据类型包括以下几种类型:
基本类型: bool:布尔值,true 或 false byte:8 位有符号整数 i16:16 位有符号整数 i32:32 位有符号整数 i64:64 位有符号整数 double:64 位浮点数 string:未知编码文本或二进制字符串 结构体类型: struct:定义公共的对象 容器类型: list set map 异常类型: exception 服务类型: service
这里举一个例子:
exception Xception { 1: i32 errorCode, 2: string message } service MyService { string remoteCall( 1: string json_str, ) throws ( 1: Xception err, ) string ping() }
上述代码定义了一个异常,之后定义一个服务。要注意的是thrift的接口定义的格式是:参数序号,参数类型,参数名称。
之后使用官方提供的thrift编译工具,编译出相应语言的代码文件。
thrift --gen <language> <Thrift filename>
这里编译成python的对应文件:
thrift --gen py my.thrift
主要看一下MyService.py文件
from thrift.Thrift import TType, TMessageType, TException, TApplicationException import logging from ttypes import * from thrift.Thrift import TProcessor from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol, TProtocol try: from thrift.protocol import fastbinary except: fastbinary = None #定义接口 class Iface: def remoteCall(self, json_str): """ Parameters: - json_str """ pass def ping(self): pass #定义客户端操作 class Client(Iface): def __init__(self, iprot, oprot=None): self._iprot = self._oprot = iprot if oprot is not None: self._oprot = oprot self._seqid = 0 def remoteCall(self, json_str): """ Parameters: - json_str """ self.send_remoteCall(json_str) return self.recv_remoteCall() def send_remoteCall(self, json_str): self._oprot.writeMessageBegin('remoteCall', TMessageType.CALL, self._seqid) args = remoteCall_args() args.json_str = json_str args.write(self._oprot) self._oprot.writeMessageEnd() self._oprot.trans.flush() def recv_remoteCall(self): iprot = self._iprot (fname, mtype, rseqid) = iprot.readMessageBegin() if mtype == TMessageType.EXCEPTION: x = TApplicationException() x.read(iprot) iprot.readMessageEnd() raise x result = remoteCall_result() result.read(iprot) iprot.readMessageEnd() if result.success is not None: return result.success if result.err is not None: raise result.err raise TApplicationException(TApplicationException.MISSING_RESULT, "remoteCall failed: unknown result") def ping(self): self.send_ping() return self.recv_ping() def send_ping(self): self._oprot.writeMessageBegin('ping', TMessageType.CALL, self._seqid) args = ping_args() args.write(self._oprot) self._oprot.writeMessageEnd() self._oprot.trans.flush() def recv_ping(self): iprot = self._iprot (fname, mtype, rseqid) = iprot.readMessageBegin() if mtype == TMessageType.EXCEPTION: x = TApplicationException() x.read(iprot) iprot.readMessageEnd() raise x result = ping_result() result.read(iprot) iprot.readMessageEnd() if result.success is not None: return result.success raise TApplicationException(TApplicationException.MISSING_RESULT, "ping failed: unknown result") #定义服务端处理 class Processor(Iface, TProcessor): def __init__(self, handler): self._handler = handler #这里需要传入一个handler,实际的处理在handler,handler留给使用者进行定义 self._processMap = {} self._processMap["remoteCall"] = Processor.process_remoteCall self._processMap["ping"] = Processor.process_ping def process(self, iprot, oprot): (name, type, seqid) = iprot.readMessageBegin() if name not in self._processMap: iprot.skip(TType.STRUCT) iprot.readMessageEnd() x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name)) oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid) x.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() return else: self._processMap[name](self, seqid, iprot, oprot) return True def process_remoteCall(self, seqid, iprot, oprot): args = remoteCall_args() args.read(iprot) iprot.readMessageEnd() result = remoteCall_result() try: result.success = self._handler.remoteCall(args.json_str) #服务器端处理 msg_type = TMessageType.REPLY except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): raise except Xception as err: msg_type = TMessageType.REPLY result.err = err except Exception as ex: msg_type = TMessageType.EXCEPTION logging.exception(ex) result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') oprot.writeMessageBegin("remoteCall", msg_type, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush() def process_ping(self, seqid, iprot, oprot): args = ping_args() args.read(iprot) iprot.readMessageEnd() result = ping_result() try: result.success = self._handler.ping() msg_type = TMessageType.REPLY except (TTransport.TTransportException, KeyboardInterrupt, SystemExit): raise except Exception as ex: msg_type = TMessageType.EXCEPTION logging.exception(ex) result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') oprot.writeMessageBegin("ping", msg_type, seqid) result.write(oprot) oprot.writeMessageEnd() oprot.trans.flush()
自定义handler
class MY_Handler: def ping(self): return "pong" def remoteCall(self, msg): logger.info("Recv: [%s]" % msg) data = json.loads(msg) ret = CMD_EXCUTED_FAILED cmdId = data.get("cmdId",None) if cmdId: handle = RPC_CMD_HANDLER_MAP.get(cmdId, None) if handle: ret = handle(data) else: logger.error("No handle found for cmd: %d" % cmdId) else: logger.warning("no cmdId found for this msg:%s" % msg) logger.info("Ret: [%s]" % ret) json_ret = {"result":ret} return json.dumps(json_ret)
服务端代码
handler = MMY_Handler() processor = MyService.Processor(handler) #自定义processor transport = TSocket.TServerSocket("localhost", 9090) #链接类型,这里使用了socket tfactory = TTransport.TBufferedTransportFactory() #传输层——传输层工厂,使用了带缓存传输层 pfactory = TBinaryProtocol.TBinaryProtocolFactory() #协议——传输协议工厂,这里使用了二进制传输协议 server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory) #sever server.server()
这里步骤很清晰:需要设置的是链接类型,传输层,协议。
客户端代码
thrift_conf = settings.GOLDFISH['thrift'] transport = TSocket.TSocket(thrift_conf['host'], thrift_conf['port']) #链接 transport = TTransport.TBufferedTransport(transport) #链接传输层包装 protocol = TBinaryProtocol.TBinaryProtocol(transport) #协议 client = MyService.Client(protocol) transport.open() client.ping()
现在client就可以调用server端的接口了。
相关文章推荐
- poj 3080 后缀树应用
- SIFT中的尺度空间和传统图像金字塔【转】
- JVM抛出OutOfMemory 的集中情况的解析
- 新浪微博 授权机制研究
- 禁止鼠标全选住图片
- [实战]MVC5+EF6+MySql企业网盘实战(10)——新建文件夹
- PHP开发提高效率技巧
- mysql 查看死锁和去除死锁
- [LeetCode]48. Word Pattern匹配模式
- SQL Server 2014 版 DISTINCT和Order BY的错误
- Django RestFramework源码剖析(2)———Viwe是自带分页的
- 进击的KFC:OC概述、面向对象编程、类和对象、实例变量操作
- useradd:警告:此主目录已经存在。
- UVA11384-Help is needed for Dexter-水题/贪心
- Django RestFramework源码剖析(1)——设定不同的serializer
- 关于VS2013中To disable deprecation, use _CRT_SECURE_NO_WARNINGS. See online help for details.
- 高精度小数
- 解决Mysql ERROR 1 (HY000): Can't create/write to file 问题
- 超实用的JavaScript代码段 Item1 --倒计时效果
- JavaScript中的delete,typeof,instanceof运算符