您的位置:首页 > 编程语言 > Java开发

Java简易RPC框架学习(二)---传输层

2016-12-04 15:46 387 查看
众做周知七层协议中的传输层的作用,这里的传输层的作用也类似,即封装了tcp,ip协议传输的实现,使用者不需要关系底层传输细节的实现,只需要调用接口来传输所需要的数据。为了实现这一层,可以使用最基本的套接字接口,java现有版本提供了OIO,NIO,BIO等技术,方便开发者创建自己的网络传输应用。但是,本文的重点不在于造轮子,没必要自己写网络传输系统。而且java的OIO(Old IO),和NIO(New
IO)风格迥异,编写起来代码不统一,而且在编写网络传输系统的时候会涉及到阻塞IO、非阻塞IO、同步、异步,事件模型(select、poll、epoll)等等,这里的学问就更深了。那么前面balabala这么多的目的其实是为了偷懒,这也是我在dubbo这个大系统中学到的一点——不要重复造轮子。那么dubbo使用的底层通信框架是啥了?Netty和Mina。
传输层的实现代码在我的GitHub主页上,其中的netty包里面包含了传输层的代码。

1、Netty

久仰Netty的大名不是一天两天了,之前写过的一个《java pipeline并发模式》就是在参考了Netty的消息处理机制后模仿出来的。netty中比较重要的几个概念是Server、Client、ChannelHandler、PipeLine。

2、Server端

由于GitHub主页上有全部源代码,下面就只介绍少量关键代码。

2.1、启动Server端

EventLoopGroup boss= new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap=new ServerBootstrap().group(boss,worker).channel(NioServerSocketChannel.class)
.localAddress(bindAddress).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline=socketChannel.pipeline();
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder",new ObjectEncoder());
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));//inbound
pipeline.addLast(new NettyHandler2(NettyServer.this));
pipeline.addLast(new NettyHandler(NettyServer.this));
}
}).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future=serverBootstrap.bind(url.getPort()).sync();
future.channel().closeFuture().sync();


启动客户端的过程很简单,可定制化得内容就在new ChannelInitializer<SocketChannel>()的重载方法initChannel里面。需要在该方法体中初始化pipline的channelhandler构成。这里的的initChannel其实相当于一种回调机制,回调机制的实现可以参考之前的文章《Java
异步回调》。然后使用serverBootStrap绑定端口,监听事件即可。
下面重点介绍一下netty的pipeline以及其ChannelHandler。

2.2 netty之ChannelHandler

netty的ChannelHandler已经被很多人研究过,如果希望阅读更加详细的介绍,我推荐《netty in action》此博主的博客对netty的研究非常细致。尽管别人介绍得很详细,我还是要介绍一下自己的理解。ChannelHandler可以分为两种,Inbound和outbound,如下图所示:
 


 在pipeline中添加上诉顺序的handler的代码为:
pipeline.addLast(handler1);
pipeline.addLast(handler2);
pipeline.addLast(handler3);
pipeline.addLast(handler4);
pipeline.addLast(handler5);
服务器的数据流分为上行和下行,下行为入站的流量,在pipeline中入站和出站流量通过handler的顺序不一样,对于入站流量,其顺序为1->2->4,对于出站流量,其顺序为5->3。于是服务器收到一条消息,走完整个pipeline流程通过的handler顺序为1->2->4->5->3.
再次分析2.1中handler的顺序,可以看到Decoder总是inbound类型,encoder总是outbound类型,而且顺序上freamDecoder先于ObjectDecoder,最后才是自定义的上层处理handler。
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));//inbound
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));//outbound
pipeline.addLast("encoder",new ObjectEncoder());//outbound
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));//inbound
pipeline.addLast(new NettyHandler2(NettyServer.this));//outbound
pipeline.addLast(new NettyHandler(NettyServer.this));//outbound
自定义的handler需要注意一些事项,那就是在处理玩数据之后要使用上下文的write、flush方法来清空缓存,并使用super.channelRead(ctx,msg)来传递到下一个handler。

3、Client端

Client端的创建过程和Server端类似,这就是netty简洁之处。
group=new NioEventLoopGroup();
try{
bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline=socketChannel.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());//outbound
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));//inbound
pipeline.addLast(new ResultHandler());
}
});

}catch (Exception e){
System.out.println(e);
}
初始化handler的过程类似,然后就是使用客户端发消息的代码:
public void sent(Object message) {
try {
channel.writeAndFlush(message);
channel.closeFuture();
}finally {}
}

4、门面类

dubbo作为一个可拓展的框架,不会只依赖netty一个通信框架,于是需要门面模式来对外提供不可见的服务。如下所示为一个门面类,门面提供了bind和connect接口,分别实现服务器绑定和客户端连接,返回的EndPoin是一个接口,该接口提供了一些通用的数据通信的方法。通过该门面类,隐藏了底层的实现细节,使用者不必关系通信框架是netty还是其他框架。大家可能注意到bind接口需要提供一个ExhangeHandler对象,这个对象其实是用户定义的操作方法用来替代handler的作用,大家可能想到,在使用netty的时候,处理数据需要在pipeline中添加handler,如果我们换了通信框架,那么服务提供者是不是需要相应地更改呢?这样做的可拓展性当然是不好的,于是这里利用了回调的思路,将一个接口类传入,用户需要在上层定义具体的实现,就不必再传输层再做具体的操作了。该ExchangeHandler将会在下一个章节中具体介绍,属于传输层以上上层的实现了。
//单例模式
public class Transporters {
private static Transporter transporter;
public static Transporter getTranspoter(){
if(transporter==null) {
synchronized (Transporters.class) {
if(transporter==null){
transporter=new NettyTransporter();
}
}
}
return transporter;
}
public static EndPoint bind(URL url,ExhangeHandler dispatcher){
return getTranspoter().bind(url,dispatcher);
}
public static EndPoint connect(URL url){
return getTranspoter().connect(url);
};
}

门面类还是一个单利工厂模式,维护了一个Transporter对象,通过更改transporter对象就可以获得不同通信框架的支持。本文只实现了netty的通信框架,于是netty的Transporter实现如下:
public class NettyTransporter implements Transporter{
@Override
public EndPoint bind(URL url, ExhangeHandler dispatcher) {
return new NettyServer(url,dispatcher);
}

@Override
public EndPoint connect(URL url) {
return new NettyClient(url);
}
}

5、测试

本章测试将和下一个章节的测试合并,按规矩是应该做单元测试的,但是题主比较懒,请多多包涵。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  PRC java dubbo netty