RPC框架protobuf-rpc-pro 阻塞和非阻塞实例
2015-03-05 22:02
537 查看
接着上一节,我们来先实现阻塞RPC的使用。
上一节我们生成了Message.java,其中包含RpcService和ReplyService类,其中BlockingInterface为阻塞接口,Interface为非阻塞接口。下面我们来实现一下这两个接口。
RpcService阻塞接口实现,用于RPC的调用。
BlockRpcService.java
ReplyService阻塞接口,用于调用过程中消息反馈。
BlockReplyService.java
BlockServer.java
BlockClient.java
至于非阻塞很简单,主要使用异步回调RpcCallback实现,使用异步接口注册服务类,如下:
实例代码:http://download.csdn.net/detail/tianwei7518/8476361
上一节我们生成了Message.java,其中包含RpcService和ReplyService类,其中BlockingInterface为阻塞接口,Interface为非阻塞接口。下面我们来实现一下这两个接口。
RpcService阻塞接口实现,用于RPC的调用。
BlockRpcService.java
package cn.slimsmart.protoc.demo.rpc.rpc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.slimsmart.protoc.demo.rpc.Message.Msg; import cn.slimsmart.protoc.demo.rpc.Message.ReplyService; import cn.slimsmart.protoc.demo.rpc.Message.Request; import cn.slimsmart.protoc.demo.rpc.Message.Response; import cn.slimsmart.protoc.demo.rpc.Message.RpcService; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import com.googlecode.protobuf.pro.duplex.ClientRpcController; import com.googlecode.protobuf.pro.duplex.RpcClientChannel; import com.googlecode.protobuf.pro.duplex.execute.ServerRpcController; /** * 阻塞接口实现 */ public class BlockRpcService implements RpcService.BlockingInterface{ private Logger log = LoggerFactory.getLogger(getClass()); @Override public Response call(RpcController controller, Request request) throws ServiceException { if ( controller.isCanceled() ) { return null; } log.info("接收到数据:"); log.info("serviceName : "+request.getServiceName()); log.info("methodName : "+request.getMethodName()); log.info("params : "+request.getParams()); RpcClientChannel channel = ServerRpcController.getRpcChannel(controller); ReplyService.BlockingInterface clientService = ReplyService.newBlockingStub(channel); ClientRpcController clientController = channel.newRpcController(); clientController.setTimeoutMs(3000); //调用过程反馈消息 Msg msg = Msg.newBuilder().setContent("success.").build(); clientService.call(clientController, msg); Response response = Response.newBuilder().setCode(0).setMsg("处理完成").setData("server hello").build(); return response; } }
ReplyService阻塞接口,用于调用过程中消息反馈。
BlockReplyService.java
package cn.slimsmart.protoc.demo.rpc.rpc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.slimsmart.protoc.demo.rpc.Message.Msg; import cn.slimsmart.protoc.demo.rpc.Message.ReplyService; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; /** * 阻塞反馈服务实现 */ public class BlockReplyService implements ReplyService.BlockingInterface{ private Logger log = LoggerFactory.getLogger(getClass()); @Override public Msg call(RpcController controller, Msg request) throws ServiceException { log.debug("接收反馈消息:"+request.getContent()); if ( controller.isCanceled() ) { return null; } return Msg.newBuilder().setContent("收到反馈成功.").build(); } }服务端注册RPC服务,并等待监听。
BlockServer.java
package cn.slimsmart.protoc.demo.rpc.rpc; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.util.List; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.slimsmart.protoc.demo.rpc.Message; import cn.slimsmart.protoc.demo.rpc.Message.RpcService; import com.google.protobuf.BlockingService; import com.google.protobuf.ExtensionRegistry; import com.googlecode.protobuf.pro.duplex.CleanShutdownHandler; import com.googlecode.protobuf.pro.duplex.PeerInfo; import com.googlecode.protobuf.pro.duplex.RpcClientChannel; import com.googlecode.protobuf.pro.duplex.RpcConnectionEventNotifier; import com.googlecode.protobuf.pro.duplex.execute.RpcServerCallExecutor; import com.googlecode.protobuf.pro.duplex.execute.ThreadPoolCallExecutor; import com.googlecode.protobuf.pro.duplex.listener.RpcConnectionEventListener; import com.googlecode.protobuf.pro.duplex.logging.CategoryPerServiceLogger; import com.googlecode.protobuf.pro.duplex.server.DuplexTcpServerPipelineFactory; import com.googlecode.protobuf.pro.duplex.util.RenamingThreadFactoryProxy; public class BlockServer { private static Logger log = LoggerFactory.getLogger(BlockServer.class); public static void main(String[] args) { PeerInfo serverInfo = new PeerInfo("127.0.0.1", 12345); // RPC payloads are uncompressed when logged - so reduce logging CategoryPerServiceLogger logger = new CategoryPerServiceLogger(); logger.setLogRequestProto(false); logger.setLogResponseProto(false); // Configure the server. DuplexTcpServerPipelineFactory serverFactory = new DuplexTcpServerPipelineFactory(serverInfo); //扩展 ExtensionRegistry r = ExtensionRegistry.newInstance(); Message.registerAllExtensions(r); serverFactory.setExtensionRegistry(r); RpcServerCallExecutor rpcExecutor = new ThreadPoolCallExecutor(10, 10); serverFactory.setRpcServerCallExecutor(rpcExecutor); serverFactory.setLogger(logger); // setup a RPC event listener - it just logs what happens RpcConnectionEventNotifier rpcEventNotifier = new RpcConnectionEventNotifier(); RpcConnectionEventListener listener = new RpcConnectionEventListener() { @Override public void connectionReestablished(RpcClientChannel clientChannel) { log.info("connectionReestablished " + clientChannel); } @Override public void connectionOpened(RpcClientChannel clientChannel) { log.info("connectionOpened " + clientChannel); } @Override public void connectionLost(RpcClientChannel clientChannel) { log.info("connectionLost " + clientChannel); } @Override public void connectionChanged(RpcClientChannel clientChannel) { log.info("connectionChanged " + clientChannel); } }; rpcEventNotifier.setEventListener(listener); serverFactory.registerConnectionEventListener(rpcEventNotifier); //注册服务 阻塞RPC服务 BlockingService blockingService = RpcService.newReflectiveBlockingService(new BlockRpcService()); serverFactory.getRpcServiceRegistry().registerService(true, blockingService); ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup boss = new NioEventLoopGroup(2,new RenamingThreadFactoryProxy("boss", Executors.defaultThreadFactory())); EventLoopGroup workers = new NioEventLoopGroup(2,new RenamingThreadFactoryProxy("worker", Executors.defaultThreadFactory())); bootstrap.group(boss,workers); bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_SNDBUF, 1048576); bootstrap.option(ChannelOption.SO_RCVBUF, 1048576); bootstrap.childOption(ChannelOption.SO_RCVBUF, 1048576); bootstrap.childOption(ChannelOption.SO_SNDBUF, 1048576); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.childHandler(serverFactory); bootstrap.localAddress(serverInfo.getPort()); CleanShutdownHandler shutdownHandler = new CleanShutdownHandler(); shutdownHandler.addResource(boss); shutdownHandler.addResource(workers); shutdownHandler.addResource(rpcExecutor); // Bind and start to accept incoming connections. bootstrap.bind(); log.info("Serving " + bootstrap); while ( true ) { List<RpcClientChannel> clients = serverFactory.getRpcClientRegistry().getAllClients(); log.info("Number of clients="+ clients.size()); try { Thread.sleep(50000); } catch (InterruptedException e) { e.printStackTrace(); } } } }客户端注册反馈服务,并调用服务端RPC
BlockClient.java
package cn.slimsmart.protoc.demo.rpc.rpc; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.slimsmart.protoc.demo.rpc.Message; import cn.slimsmart.protoc.demo.rpc.Message.Params; import cn.slimsmart.protoc.demo.rpc.Message.ReplyService; import cn.slimsmart.protoc.demo.rpc.Message.Request; import cn.slimsmart.protoc.demo.rpc.Message.RpcService; import com.google.protobuf.BlockingService; import com.google.protobuf.ExtensionRegistry; import com.googlecode.protobuf.pro.duplex.CleanShutdownHandler; import com.googlecode.protobuf.pro.duplex.ClientRpcController; import com.googlecode.protobuf.pro.duplex.PeerInfo; import com.googlecode.protobuf.pro.duplex.RpcClientChannel; import com.googlecode.protobuf.pro.duplex.RpcConnectionEventNotifier; import com.googlecode.protobuf.pro.duplex.client.DuplexTcpClientPipelineFactory; import com.googlecode.protobuf.pro.duplex.client.RpcClientConnectionWatchdog; import com.googlecode.protobuf.pro.duplex.execute.RpcServerCallExecutor; import com.googlecode.protobuf.pro.duplex.execute.ThreadPoolCallExecutor; import com.googlecode.protobuf.pro.duplex.listener.RpcConnectionEventListener; import com.googlecode.protobuf.pro.duplex.logging.CategoryPerServiceLogger; import com.googlecode.protobuf.pro.duplex.util.RenamingThreadFactoryProxy; public class BlockClient { private static RpcClientChannel channel = null; private static Logger log = LoggerFactory.getLogger(BlockClient.class); public static void main(String[] args) throws Exception { PeerInfo client = new PeerInfo("127.0.0.1", 54321); PeerInfo server = new PeerInfo("127.0.0.1", 12345); DuplexTcpClientPipelineFactory clientFactory = new DuplexTcpClientPipelineFactory(); // force the use of a local port // - normally you don't need this clientFactory.setClientInfo(client); ExtensionRegistry r = ExtensionRegistry.newInstance(); Message.registerAllExtensions(r); clientFactory.setExtensionRegistry(r); clientFactory.setConnectResponseTimeoutMillis(10000); RpcServerCallExecutor rpcExecutor = new ThreadPoolCallExecutor(3, 10); clientFactory.setRpcServerCallExecutor(rpcExecutor); // RPC payloads are uncompressed when logged - so reduce logging CategoryPerServiceLogger logger = new CategoryPerServiceLogger(); logger.setLogRequestProto(false); logger.setLogResponseProto(false); clientFactory.setRpcLogger(logger); // Set up the event pipeline factory. // setup a RPC event listener - it just logs what happens RpcConnectionEventNotifier rpcEventNotifier = new RpcConnectionEventNotifier(); final RpcConnectionEventListener listener = new RpcConnectionEventListener() { @Override public void connectionReestablished(RpcClientChannel clientChannel) { log.info("connectionReestablished " + clientChannel); channel = clientChannel; } @Override public void connectionOpened(RpcClientChannel clientChannel) { log.info("connectionOpened " + clientChannel); channel = clientChannel; } @Override public void connectionLost(RpcClientChannel clientChannel) { log.info("connectionLost " + clientChannel); } @Override public void connectionChanged(RpcClientChannel clientChannel) { log.info("connectionChanged " + clientChannel); channel = clientChannel; } }; rpcEventNotifier.addEventListener(listener); clientFactory.registerConnectionEventListener(rpcEventNotifier); //注册服务 reply阻塞服务,用于反馈 BlockingService blockingReplyService = ReplyService.newReflectiveBlockingService(new BlockReplyService()); clientFactory.getRpcServiceRegistry().registerService(blockingReplyService); Bootstrap bootstrap = new Bootstrap(); EventLoopGroup workers = new NioEventLoopGroup(16, new RenamingThreadFactoryProxy("workers", Executors.defaultThreadFactory())); bootstrap.group(workers); bootstrap.handler(clientFactory); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); bootstrap.option(ChannelOption.SO_SNDBUF, 1048576); bootstrap.option(ChannelOption.SO_RCVBUF, 1048576); RpcClientConnectionWatchdog watchdog = new RpcClientConnectionWatchdog(clientFactory, bootstrap); rpcEventNotifier.addEventListener(watchdog); watchdog.start(); CleanShutdownHandler shutdownHandler = new CleanShutdownHandler(); shutdownHandler.addResource(workers); shutdownHandler.addResource(rpcExecutor); clientFactory.peerWith(server, bootstrap); while (true && channel != null) { RpcService.BlockingInterface blockingService = RpcService.newBlockingStub(channel); final ClientRpcController controller = channel.newRpcController(); controller.setTimeoutMs(0); Params params = Params.newBuilder().setKey("name").setValue("jack").build(); Request request = Request.newBuilder().setServiceName("UserService").setMethodName("insert").setParams(params).build(); //阻塞调用 blockingService.call(controller, request); Thread.sleep(100000); } } }运行服务端和客户端测试一下吧。
至于非阻塞很简单,主要使用异步回调RpcCallback实现,使用异步接口注册服务类,如下:
//注册服务非 阻塞RPC服务 Service nbService = RpcService.newReflectiveService(new NonBlockRpcService()); serverFactory.getRpcServiceRegistry().registerService(true, nbService);这里我就不贴代码了,详情下面实例代码查看。
实例代码:http://download.csdn.net/detail/tianwei7518/8476361
相关文章推荐
- 谷歌发布的首款基于HTTP/2和protobuf的RPC框架:GRPC
- rpc框架: thrift/avro/protobuf 之maven插件生成java类
- 基于HTTP/2和protobuf的RPC框架:GRPC
- gRPC:Google开源的基于HTTP/2和ProtoBuf的通用RPC框架
- 基于Protobuf的分布式高性能RPC框架——Navi-Pbrpc
- 高性能RPC over MINA&google protobuf 代码&实例 (一)
- gRPC:Google开源的基于HTTP/2和ProtoBuf的通用RPC框架
- 简单介绍google protobuf rpc框架使用方法
- 谷歌发布的首款基于HTTP/2和protobuf的RPC框架:GRPC
- grpc+protobuf 的C++ service 实例解析
- 集成libevent,google protobuf的RPC框架
- 谷歌发布的首款基于HTTP/2和protobuf的RPC框架:GRPC
- RPC框架protobuf-rpc-pro 实例
- 高性能RPC over MINA&google protobuf 代码&实例 (一)
- 高性能RPC over MINA&google protobuf 代码&实例 (二)
- 以小见大——那些基于 protobuf 的五花八门的 RPC(5 完)
- 以小见大——那些基于 protobuf 的五花八门的 RPC(5 完)
- protobuf+RPC的几个C++实现
- 以小见大——那些基于 protobuf 的五花八门的 RPC(1)
- 以小见大——那些基于 protobuf 的五花八门的 RPC(4)