您的位置:首页 > 理论基础 > 计算机网络

Mina、Netty、Twisted一起学习(三):TCP前缀固定大小的消息(Header)

2015-10-17 20:13 656 查看
以前的博文于,有介绍切割消息换行的方法。

但是有一个小问题,这样的方法,设消息中本身就包括换行符,那将会将这条消息切割成两条。结果就不正确了。

本文介绍第二种消息切割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body。用一个固定字节数的Header前缀来指定Body的字节数,以此来切割消息。



上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,比如12即为0x0000000C。这个整数用来指定Body的长度(字节数)。当读完这么多字节的Body之后,又是下一条消息的Header。

以下分别用MINA、Netty、Twisted来实现对这样的消息的切合和解码。

MINA:

MINA提供了PrefixedStringCodecFactory来对这样的类型的消息进行编码解码。PrefixedStringCodecFactory默认Header的大小是4字节。当然也能够指定成1或2。

public class TcpServer {

public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor();

// 4字节的Header指定Body的字节数。对这样的消息的处理
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));

acceptor.setHandler(new TcpServerHandle());
acceptor.bind(new InetSocketAddress(8080));
}

}

class TcpServerHandle extends IoHandlerAdapter {

@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
cause.printStackTrace();
}

// 接收到新的数据
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {

String msg = (String) message;
System.out.println("messageReceived:" + msg);

}

@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("sessionCreated");
}

@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("sessionClosed");
}
}

Netty:

Netty使用LengthFieldBasedFrameDecoder来处理这样的消息。

以下代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包括5个參数,各自是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度。lengthFieldOffset为Header的位置。lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度。并不包括Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的所有内容。这里设为4表示去掉4字节的Header。仅仅留下Body)。

public class TcpServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// LengthFieldBasedFrameDecoder按行切割消息,取出body
pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
// 再按UTF-8编码转成字符串
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast(new TcpServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

// 接收到新的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {

String message = (String) msg;
System.out.println("channelRead:" + message);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("channelInactive");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

Twisted:

在Twisted中须要继承Int32StringReceiver,不再继承Protocol。

Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而须要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived。而是stringReceived。

# -*- coding:utf-8 –*-

from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(Int32StringReceiver):

# 新的连接建立
def connectionMade(self):
print 'connectionMade'

# 连接断开
def connectionLost(self, reason):
print 'connectionLost'

# 接收到新的数据
def stringReceived(self, data):
print 'stringReceived:' + data

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

以下是Java编写的一个client測试程序:

public class TcpClient {

public static void main(String[] args) throws IOException {

Socket socket = null;
DataOutputStream out = null;

try {

socket = new Socket("localhost", 8080);
out = new DataOutputStream(socket.getOutputStream());

// 请求server
String data1 = "牛顿";
byte[] outputBytes1 = data1.getBytes("UTF-8");
out.writeInt(outputBytes1.length); // write header
out.write(outputBytes1); // write body

String data2 = "爱因斯坦";
byte[] outputBytes2 = data2.getBytes("UTF-8");
out.writeInt(outputBytes2.length); // write header
out.write(outputBytes2); // write body

out.flush();

} finally {
// 关闭连接
out.close();
socket.close();
}

}

}


MINAserver输出结果:

sessionCreated
messageReceived:牛顿
messageReceived:爱因斯坦
sessionClosed

Nettyserver输出结果:

channelActive
channelRead:牛顿
channelRead:爱因斯坦
channelInactive

Twistedserver输出结果:

connectionMade
stringReceived:牛顿
stringReceived:爱因斯坦
connectionLost

作者:叉叉哥 转载请注明出处:/article/1375427.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: