Go官方库RPC开发指南
2017-03-04 00:00
387 查看
Go官方提供了一个RPC库:net/rpc。包rpc提供了通过网络访问一个对象的方法的能力。服务器需要注册对象,通过对象的类型名暴露这个服务。注册后这个对象的输出方法就可以远程调用,这个库封装了底层传输的细节,包括序列化。服务器可以注册多个不同类型的对象,但是注册相同类型的多个对象的时候回出错。
我最近写了一本电子书:GoRPC开发指南,介绍GoRPC开发的相关技术,这是其中的一章,专门介绍官方库net/rpc的使用。
同时,如果对象的方法要能远程访问,它们必须满足一定的条件,否则这个对象的方法回呗忽略。
这些条件是:
方法的类型是可输出的(themethod'stypeisexported)
方法本身也是可输出的(themethodisexported)
方法必须由两个参数,必须是输出类型或者是内建类型(themethodhastwoarguments,bothexportedorbuiltintypes)
方法的第二个参数是指针类型(themethod'ssecondargumentisapointer)
方法返回类型为error(themethodhasreturntypeerror)
所以一个输出方法的格式如下:
这里的T、T1、T2能够被encoding/gob序列化,即使使用其它的序列化框架,将来这个需求可能回被弱化。
这个方法的第一个参数代表调用者(client)提供的参数,
第二个参数代表要返回给调用者的计算结果,
方法的返回值如果不为空,那么它作为一个字符串返回给调用者。
如果返回error,则reply参数不会返回给调用者。
服务器通过调用ServeConn在一个连接上处理请求,更典型地,它可以创建一个networklistener然后accept请求。
对于HTTPlistener来说,可以调用HandleHTTP和http.Serve。细节会在下面介绍。
客户端可以调用Dial和DialHTTP建立连接。客户端有两个方法调用服务:Call和Go,可以同步地或者异步地调用服务。
当然,调用的时候,需要吧服务名、方法名和参数传递给服务器。异步方法调用Go通过Donechannel通知调用结果返回。
除非显示的设置codec,否则这个库默认使用包encoding/gob作为序列化框架。
这个例子中提供了对两个数相乘和相除的两个方法。
第一步你需要定义传入参数和返回参数的数据结构:
第二步定义一个服务对象,这个服务对象可以很简单,比如类型是int或者是interface{},重要的是它输出的方法。
这里我们定义一个算术类型Arith,其实它是一个int类型,但是这个int的值我们在后面方法的实现中也没用到,所以它基本上就起一个辅助的作用。
第三步实现这个类型的两个方法,乘法和除法:
目前为止,我们的准备工作已经完成,喝口茶继续下面的步骤。
第四步实现RPC服务器:
这里我们生成了一个Arith对象,并使用rpc.Register注册这个服务,然后通过HTTP暴露出来。
客户端可以看到服务Arith以及它的两个方法Arith.Multiply和Arith.Divide。
第五步创建一个客户端,建立客户端和服务器端的连接:
然后客户端就可以进行远程调用了。比如同步的方式:
或者异步的方式:
比如不同的监听地址或端口,就需要自己生成Server:
Server有多种Socket监听的方式:
其中,ServeHTTP实现了处理http请求的业务逻辑,它首先处理http的CONNECT请求,接收后就Hijacker这个连接conn,然后调用ServeConn在这个连接上 处理这个客户端的请求。
它其实是实现了http.Handler接口,我们一般不直接调用这个方法。
`Server.HandleHTTP`设置rpc的上下文路径,`rpc.HandleHTTP`使用默认的上下文路径`DefaultRPCPath`、DefaultDebugPath。
这样,当你启动一个httpserver的时候`http.ListenAndServe`,上面设置的上下文将用作RPC传输,这个上下文的请求会教给ServeHTTP来处理。
以上是RPCoverhttp的实现,可以看出net/rpc只是利用httpCONNECT建立连接,这和普通的RESTfulapi还是不一样的。
`Accept`用来处理一个监听器,一直在监听客户端的连接,一旦监听器接收了一个连接,则还是交给ServeConn在另外一个goroutine中去处理:
可以看出,很重要的一个方法就是ServeConn:
连接其实是交给一个ServerCodec去处理,这里默认使用gobServerCodec去处理,这是一个未输出默认的编解码器,你可以使用其它的编解码器,我们下面再介绍,
这里我们可以看看ServeCodec是怎么实现的:
它其实一直从连接中读取请求,然后调用goservice.call在另外的goroutine中处理服务调用。
我们从中可以学到:
对象重用。Request和Response都是可重用的,通过Lock处理竞争。这在大并发的情况下很有效。有兴趣的读者可以参考fasthttp的实现。
使用了大量的goroutine。和Java中的线程不同,你可以创建非常多的goroutine,并发处理非常好。如果使用一定数量的goutine作为worker池去处理这个case,可能还会有些性能的提升,但是更复杂了。使用goroutine已经获得了非常好的性能。
业务处理是异步的,服务的执行不会阻塞其它消息的读取。
注意一个codec实例必然和一个connnection相关,因为它需要从connection中读取request和发送response。
go的rpc官方库的消息(request和response)的定义很简单,就是消息头(header)+内容体(body)。
请求的消息头的定义如下,包括服务的名称和序列号:
消息体就是传入的参数。
返回的消息头的定义如下:
消息体是reply类型的序列化后的值。
Server还提供了两个注册服务的方法:
第二个方法为服务起一个别名,否则服务名已它的类型命名,它们俩底层调用register进行服务的注册。
受限于Go语言的特点,我们不可能在接到客户端的请求的时候,根据反射动态的创建一个对象,就是Java那样,
因此在Go语言中,我们需要预先创建一个服务map这是在编译的时候完成的:
同时每个服务还有一个方法map:map[string]*methodType,通过suitableMethods建立:
这样rpc在读取请求header,通过查找这两个map,就可以得到要调用的服务及它的对应方法了。
方法的调用:
DialHTTP和DialHTTPPath是通过HTTP的方式和服务器建立连接,他俩的区别之在于是否设置上下文路径:
首先发送CONNECT请求,如果连接成功则通过NewClient(conn)创建client。
而Dial则通过TCP直接连接服务器:
根据服务是overHTTP还是overTCP选择合适的连接方式。
NewClient则创建一个缺省codec为glob序列化库的客户端:
如果你想用其它的序列化库,你可以调用NewClientWithCodec方法<:></:>
重要的是input方法,它已一个死循环的方式不断地从连接中读取response,然后调用map中读取等待的Call.Donechannel通知完成。
消息的结构和服务器一致,都是Header+Body的方式。
客户端的调用有两个方法:Go和Call。Go方法是异步的,它返回一个Call指针对象,它的Done是一个channel,如果服务返回,
Done就可以得到返回的对象(实际是Call对象,包含Reply和error信息)。Go是同步的方式调用,它实际是调用Call实现的,
我们可以看看它是怎么实现的,可以了解一下异步变同步的方式:
从一个Channel中读取对象会被阻塞住,直到有对象可以读取,这种实现很简单,也很方便。
其实从服务器端的代码和客户端的代码实现我们还可以学到锁Lock的一种实用方式,也就是尽快的释放锁,而不是defermu.Unlock直到函数执行到最后才释放,那样锁占用的时间太长了。
gob序列化库有个要求,就是对于接口类型的值,你需要注册具体的实现类型:
初次使用rpc的人容易犯这个错误,导致序列化不成功。
Go官方库实现了JSON-RPC1.0。JSON-RPC是一个通过JSON格式进行消息传输的RPC规范,因此可以进行跨语言的调用。
Go的net/rpc/jsonrpc库可以将JSON-RPC的请求转换成自己内部的格式,比如requestheader的处理:
JSON-RPC2.0官方库布支持,但是有第三方开发者提供了实现,比如:
https://github.com/powerman/rpc-codec
https://github.com/dwlnetnl/generpc
一些其它的codec如bsonrpc、messagepack、protobuf等。
如果你使用其它特定的序列化框架,你可以参照这些实现来写一个你自己的rpccodec。
关于Go序列化库的性能的比较你可以参考gosercomp。
deprecatenet/rpc:
Thepackagehasoutstandingbugsthatarehardtofix,andcannotsupportTLSwithoutmajorwork.SoalthoughithasaniceAPIandallowsonetousenativeGotypeswithoutanIDL,itshouldprobablyberetired.
Theproposalistofreezethepackage,retirethemanybugsfiledagainstit,andadddocumentationindicatingthatitisfrozenandthatsuggestsalternativessuchasGRPC.
但我认为net/rpc的设计相当的优秀,性能超好,如果不继续开发就太可惜了。提案中提到的一些bug和TLS并不是不能修复,可能Goteam缺乏相应的资源,或者开发者兴趣不在这里而已。我相信这个提案有很大的反对意见。
目前看来`gRPC`的性能远远逊于net/rpc,不仅仅是吞吐率,还包括CPU的占有率。
更多GoRPC开发技术请阅读:GoRPC开发指南。
https://golang.org/pkg/net/rpc/
https://golang.org/pkg/encoding/gob/
https://golang.org/pkg/net/rpc/jsonrpc/
https://github.com/golang/go/issues/16844
我最近写了一本电子书:
同时,如果对象的方法要能远程访问,它们必须满足一定的条件,否则这个对象的方法回呗忽略。
这些条件是:
方法的类型是可输出的(themethod'stypeisexported)
方法本身也是可输出的(themethodisexported)
方法必须由两个参数,必须是输出类型或者是内建类型(themethodhastwoarguments,bothexportedorbuiltintypes)
方法的第二个参数是指针类型(themethod'ssecondargumentisapointer)
方法返回类型为error(themethodhasreturntypeerror)
所以一个输出方法的格式如下:
这个方法的第一个参数代表调用者(client)提供的参数,
第二个参数代表要返回给调用者的计算结果,
方法的返回值如果不为空,那么它作为一个字符串返回给调用者。
如果返回error,则reply参数不会返回给调用者。
服务器通过调用
对于HTTPlistener来说,可以调用
客户端可以调用
当然,调用的时候,需要吧服务名、方法名和参数传递给服务器。异步方法调用
除非显示的设置
简单例子
首选介绍一个简单的例子。这个例子中提供了对两个数相乘和相除的两个方法。
第一步你需要定义传入参数和返回参数的数据结构:
2 3 4 5 6 7 8 9 | typeArgsstruct{ A,Bint } typeQuotientstruct{ Quo,Remint } |
这里我们定义一个算术类型
2 3 4 5 6 7 8 9 10 11 12 13 14 | *reply=args.A*args.B returnnil } func(t*Arith)Divide(args*Args,quo*Quotient)error{ ifargs.B==0{ returnerrors.New("dividebyzero") } quo.Quo=args.A/args.B quo.Rem=args.A%args.B returnnil } |
第四步实现RPC服务器:
2 3 4 5 6 7 8 9 10 11 | rpc.Register(arith) rpc.HandleHTTP() l,e:=net.Listen("tcp",":1234") ife!=nil{ log.Fatal("listenerror:",e) } gohttp.Serve(l,nil) |
客户端可以看到服务
第五步创建一个客户端,建立客户端和服务器端的连接:
2 3 4 | iferr!=nil{ log.Fatal("dialing:",err) } |
2 3 4 5 6 7 8 9 | varreplyint err=client.Call("Arith.Multiply",args,&reply) iferr!=nil{ log.Fatal("aritherror:",err) } fmt.Printf("Arith:%d*%d=%d",args.A,args.B,reply) |
2 3 4 | divCall:=client.Go("Arith.Divide",args,quotient,nil) replyCall:=<-divCall.Done//willbeequaltodivCall //checkerrors,print,etc. |
服务器代码分析
首先,`net/rpc`定义了一个缺省的Server,所以Server的很多方法你可以直接调用,这对于一个简单的Server的实现更方便,但是你如果需要配置不同的Server,比如不同的监听地址或端口,就需要自己生成Server:
2 3 4 5 6 | func(server*Server)HandleHTTP(rpcPath,debugPathstring) func(server*Server)ServeCodec(codecServerCodec) func(server*Server)ServeConn(connio.ReadWriteCloser) func(server*Server)ServeHTTP(whttp.ResponseWriter,req*http.Request) func(server*Server)ServeRequest(codecServerCodec)error |
它其实是实现了
`Server.HandleHTTP`设置rpc的上下文路径,`rpc.HandleHTTP`使用默认的上下文路径`DefaultRPCPath`、
这样,当你启动一个httpserver的时候`http.ListenAndServe`,上面设置的上下文将用作RPC传输,这个上下文的请求会教给
以上是RPCoverhttp的实现,可以看出
`Accept`用来处理一个监听器,一直在监听客户端的连接,一旦监听器接收了一个连接,则还是交给
2 3 4 5 6 7 8 9 10 11 12 | for{ conn,err:=lis.Accept() iferr!=nil{ log.Print("rpc.Serve:accept:",err.Error()) return } goserver.ServeConn(conn) } } |
2 3 4 5 6 7 8 9 10 11 12 13 | buf:=bufio.NewWriter(conn) srv:=&gobServerCodec{ rwc:conn, dec:gob.NewDecoder(conn), enc:gob.NewEncoder(buf), encBuf:buf, } server.ServeCodec(srv) } |
这里我们可以看看
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | sending:=new(sync.Mutex) for{ service,mtype,req,argv,replyv,keepReading,err:=server.readRequest(codec) iferr!=nil{ ifdebugLog&&err!=io.EOF{ log.Println("rpc:",err) } if!keepReading{ break } //sendaresponseifweactuallymanagedtoreadaheader. ifreq!=nil{ server.sendResponse(sending,req,invalidRequest,codec,err.Error()) server.freeRequest(req) } continue } goservice.call(server,sending,mtype,req,argv,replyv,codec) } codec.Close() } |
我们从中可以学到:
对象重用。Request和Response都是可重用的,通过Lock处理竞争。这在大并发的情况下很有效。有兴趣的读者可以参考
使用了大量的goroutine。和Java中的线程不同,你可以创建非常多的goroutine,并发处理非常好。如果使用一定数量的goutine作为worker池去处理这个case,可能还会有些性能的提升,但是更复杂了。使用goroutine已经获得了非常好的性能。
业务处理是异步的,服务的执行不会阻塞其它消息的读取。
注意一个codec实例必然和一个connnection相关,因为它需要从connection中读取request和发送response。
go的rpc官方库的消息(request和response)的定义很简单,就是消息头(header)+内容体(body)。
请求的消息头的定义如下,包括服务的名称和序列号:
2 3 4 5 6 7 | ServiceMethodstring//format:"Service.Method" Sequint64//sequencenumberchosenbyclient //containsfilteredorunexportedfields } |
返回的消息头的定义如下:
2 3 4 5 6 7 | ServiceMethodstring//echoesthatoftheRequest Sequint64//echoesthatoftherequest Errorstring//error,ifany. //containsfilteredorunexportedfields } |
Server还提供了两个注册服务的方法:
2 | func(server*Server)RegisterName(namestring,rcvrinterface{})error |
因此在Go语言中,我们需要预先创建一个服务map这是在编译的时候完成的:
方法的调用:
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | mtype.Lock() mtype.numCalls++ mtype.Unlock() function:=mtype.method.Func //Invokethemethod,providinganewvalueforthereply. returnValues:=function.Call([]reflect.Value{s.rcvr,argv,replyv}) //Thereturnvalueforthemethodisanerror. errInter:=returnValues[0].Interface() errmsg:="" iferrInter!=nil{ errmsg=errInter.(error).Error() } server.sendResponse(sending,req,replyv.Interface(),codec,errmsg) server.freeRequest(req) } |
客户端代码分析
客户端要建立和服务器的连接,可以有以下几种方式:2 3 4 5 | funcDialHTTP(network,addressstring)(*Client,error) funcDialHTTPPath(network,address,pathstring)(*Client,error) funcNewClient(connio.ReadWriteCloser)*Client funcNewClientWithCodec(codecClientCodec)*Client |
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | varerrerror conn,err:=net.Dial(network,address) iferr!=nil{ returnnil,err } io.WriteString(conn,"CONNECT"+path+"HTTP/1.0\n\n") //RequiresuccessfulHTTPresponse //beforeswitchingtoRPCprotocol. resp,err:=http.ReadResponse(bufio.NewReader(conn),&http.Request{Method:"CONNECT"}) iferr==nil&&resp.Status==connected{ returnNewClient(conn),nil } iferr==nil{ err=errors.New("unexpectedHTTPresponse:"+resp.Status) } conn.Close() returnnil,&net.OpError{ Op:"dial-http", Net:network+""+address, Addr:nil, Err:err, } } |
而
2 3 4 5 6 7 8 9 10 | conn,err:=net.Dial(network,address) iferr!=nil{ returnnil,err } returnNewClient(conn),nil } |
2 3 4 5 | encBuf:=bufio.NewWriter(conn) client:=&gobClientCodec{conn,gob.NewDecoder(conn),gob.NewEncoder(encBuf),encBuf} returnNewClientWithCodec(client) } |
2 3 4 5 6 7 8 9 | client:=&Client{ codec:codec, pending:make(map[uint64]*Call), } goclient.input() returnclient } |
消息的结构和服务器一致,都是Header+Body的方式。
客户端的调用有两个方法:
Done就可以得到返回的对象(实际是Call对象,包含Reply和error信息)。
我们可以看看它是怎么实现的,可以了解一下异步变同步的方式:
2 3 4 | call:=<-client.Go(serviceMethod,args,reply,make(chan*Call,1)).Done returncall.Error } |
其实从服务器端的代码和客户端的代码实现我们还可以学到锁Lock的一种实用方式,也就是尽快的释放锁,而不是
codec/序列化框架
前面我们介绍了rpc框架默认使用gob序列化库,很多情况下我们追求更好的效率的情况下,或者追求更通用的序列化格式,我们可能采用其它的序列化方式,比如protobuf,json,xml等。gob序列化库有个要求,就是对于接口类型的值,你需要注册具体的实现类型:
2 | funcRegisterName(namestring,valueinterface{}) |
Go官方库实现了
Go的
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | c.req.reset() iferr:=c.dec.Decode(&c.req);err!=nil{ returnerr } r.ServiceMethod=c.req.Method c.mutex.Lock() c.seq++ c.pending[c.seq]=c.req.Id c.req.Id=nil r.Seq=c.seq c.mutex.Unlock() returnnil } |
一些其它的codec如
如果你使用其它特定的序列化框架,你可以参照这些实现来写一个你自己的rpccodec。
关于Go序列化库的性能的比较你可以参考
其它
有一个提案Thepackagehasoutstandingbugsthatarehardtofix,andcannotsupportTLSwithoutmajorwork.SoalthoughithasaniceAPIandallowsonetousenativeGotypeswithoutanIDL,itshouldprobablyberetired.
Theproposalistofreezethepackage,retirethemanybugsfiledagainstit,andadddocumentationindicatingthatitisfrozenandthatsuggestsalternativessuchasGRPC.
但我认为net/rpc的设计相当的优秀,性能超好,如果不继续开发就太可惜了。提案中提到的一些bug和TLS并不是不能修复,可能Goteam缺乏相应的资源,或者开发者兴趣不在这里而已。我相信这个提案有很大的反对意见。
目前看来`gRPC`的性能远远逊于
更多GoRPC开发技术请阅读:
参考文档
相关文章推荐
- Quartz.net官方开发指南 第一课:使用Quartz.net
- Quartz.net官方开发指南 第二课:Jobs And Triggers
- Quartz.net官方开发指南 第七课 : TriggerListeners和JobListeners
- Quartz.net官方开发指南 第八课:SchedulerListeners
- Quartz.net官方开发指南 第七课 : TriggerListeners和JobListeners
- MM官方的CFMX代码规范和Machii开发指南更新
- Quartz.net官方开发指南 第九课: JobStore
- Quartz.net官方开发指南 第六课 : CronTrigger
- Quartz.net官方开发指南 第六课 : CronTrigger
- Quartz.net官方开发指南 第十课: 配置、资源使用以及SchedulerFactory
- Quartz.net官方开发指南 第三课:更多关于Jobs和JobDetails
- Quartz.net官方开发指南 第四课:关于Triggers更多内容
- Quartz.net官方开发指南 第三课:更多关于Jobs和JobDetails
- Quartz.net官方开发指南 第九课: JobStore
- Quartz.net官方开发指南 第五课: SimpleTrigger
- Quartz.net官方开发指南 第一课:使用Quartz.net 推荐
- Quartz.net官方开发指南 第三课:更多关于Jobs和JobDetails
- Quartz.net官方开发指南 第四课:关于Triggers更多内容
- Quartz.net官方开发指南 第十课: 配置、资源使用以及SchedulerFactory
- Quartz.net官方开发指南 第十课: 配置、资源使用以及SchedulerFactory