您的位置:首页 > 编程语言 > Java开发

Java NIO 实现进程通讯,解决用户自定义数据的组包和拆分粘包的问题

2013-08-08 17:04 603 查看
TCP通讯过程中,由于网络原因或者其他原因,经常出现粘包和半包现象。所以在具体编程中需要考虑。

下边的 java 代码是用 NIO 实现的一个Server端,消息的通讯格式为:

4字节int类型 [包头] + 包体.

包头描述出包体的长度。

package com.sof.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Reactor implements Runnable
{

private static Logger logger = LoggerFactory.getLogger(Reactor.class);

final Selector selector;
final ServerSocketChannel serverSocket;

public Reactor(String ip, int port) throws IOException
{
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}

public void run()
{
try
{
while (!Thread.interrupted())
{
logger.debug("selector is waitting  event....");
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
if (keys.size() == 0)
{
logger.debug("nothing happened");
continue;
}

for (SelectionKey key : keys)
{
if (key.isAcceptable())
{
logger.debug("Acceptable event happened");
}
else if (key.isReadable())
{
logger.debug("Readable event happened");
}
else if (key.isWritable())
{
logger.debug("Writeable event happened");
}
else
{
logger.debug("others event happened");
}
dispatch((SelectionKey) key);
}
keys.clear();
}
}
catch (IOException ex)
{
logger.error(ex.getMessage());
ex.printStackTrace();
}
}

void dispatch(SelectionKey k)
{
Runnable r = (Runnable) (k.attachment());
if (r != null)
{
r.run();
}
}

public class Acceptor implements Runnable
{
public synchronized void run()
{
try
{
SocketChannel c = serverSocket.accept();
logger.info("got a new connection from:  " + c.socket().toString());
if (c != null)
{
new Handler(selector, c);
}
}
catch (IOException ex)
{
logger.error(ex.getMessage());
ex.printStackTrace();
}
}
}
}


package com.sof.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sof.bas.Bytes2util;
import com.sof.bas.Util2Bytes;

final public class Handler implements Runnable
{
private static Logger logger = LoggerFactory.getLogger(Handler.class);
final SocketChannel socket;
final SelectionKey sk;

static final int MESSAGE_LENGTH_HEAD = 4;
byte[] head = new byte[4];
int bodylen = -1;

Handler(Selector selector, SocketChannel socket) throws IOException
{
this.socket = socket;
socket.configureBlocking(false);
sk = socket.register(selector, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}

public void run()
{
try
{
read();
}
catch (IOException ex)
{
try
{
socket.close();
}
catch (IOException e)
{
e.printStackTrace();
}
logger.info("got a disconnect from " + socket.socket().toString());
sk.cancel();
}
}

public synchronized void read() throws IOException
{
ByteBuffer input = ByteBuffer.allocate(1024);
socket.read(input);
input.flip();

//读取数据的原则: 要么读取一个完整的包头,要么读取一个完整包体。不满足这两种情况,不对ByteBuffer进行任何的get操作
//但是要注意可能发生上次读取了一个完整的包头,下次读才读取一个完整包体情况。
//所以包头部分必须用类的成员变量进行暂时的存储,当完整读取包头和包体后,在给业务处理部分。
logger.debug("1: remain=" + input.remaining() + " bodylen=" + bodylen);
while(input.remaining() > 0)
{
if (bodylen < 0) //还没有生成完整的包头部分, 该变量初始值为-1,并且在拼凑一个完整的消息包以后,再将该值设置为-1
{
if ( input.remaining() >= MESSAGE_LENGTH_HEAD) //ByteBuffer缓冲区的字节数够拼凑一个包头
{
input.get(head, 0, 4);
bodylen = Util2Bytes.bytes2bigint(head);
logger.debug("2: remain=" + input.remaining() + " bodylen=" + bodylen);
}
else//ByteBuffer缓冲区的字节数不够拼凑一个包头,什么操作都不做,退出这次处理,继续等待
{
logger.debug("3: remain=" + input.remaining() + " bodylen=" + bodylen);
break;
}
}
else if(bodylen > 0) //包头部分已经完整生成.
{
if (input.remaining() >= bodylen) //缓冲区的内容够一个包体部分
{
byte[] body = new byte[bodylen];
input.get(body, 0, bodylen);
byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];
System.arraycopy(head, 0, headandbody, 0, head.length);
System.arraycopy(body,0, headandbody, head.length, body.length);
bodylen = -1;
logger.debug("4: remain=" + input.remaining() + " bodylen=" + bodylen);
Bytes2util.outputHex(headandbody, 16);
}
else  ///缓冲区的内容不够一个包体部分,继续等待,跳出循环等待下次再出发该函数
{
System.out.println("5: remain=" + input.remaining() + " bodylen=" + bodylen);
break;
}
}
else if(bodylen == 0) //没有包体部分,仅仅有包头的情况
{
byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];
System.arraycopy(head, 0, headandbody, 0, head.length);
Bytes2util.outputHex(headandbody, 16);
bodylen = -1;
}
}

sk.interestOps(SelectionKey.OP_READ);
}
}


package com.sof.bas;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Bytes2util {
private static Logger logger = LoggerFactory.getLogger(Bytes2util.class);

public static byte[] biginttobytes(int value) {
byte[] stream = new byte[4];
for (int i = 0; i < 4; i++) {
stream[i] = (byte) ((value & (0xFF << (4 - i - 1) * 8)) >> ((4 - i - 1) * 8));
}
outputHex(stream, 16);
return stream;
}

public static byte[] bigshorttobytes(short value) {
byte stream[] = new byte[2];
for (int i = 0; i < 2; i++) {
stream[i] = (byte) ((value & (0xFF << (2 - i - 1) * 8)) >> ((2 - i - 1) * 8));
}
outputHex(stream, 16);
return stream;
}

public static byte[] smallinttobytes(int value) {
byte stream[] = new byte[4];
for (int i = 0; i < 4; i++) {
stream[4 - i - 1] = (byte) ((value & (0xFF << (4 - i - 1) * 8)) >> ((4 - i - 1) * 8));
}
outputHex(stream, 16);
return stream;
}

public static byte[] smallshorttobytes(short value) {
byte stream[] = new byte[2];
for (int i = 0; i < 2; i++) {
stream[2 - i - 1] = (byte) ((value & (0xFF << (2 - i - 1) * 8)) >> ((2 - i - 1) * 8));
}
outputHex(stream, 16);
return stream;
}

public static void outputHex(byte[] stream, int number) {
String content = "stream display, length=" + stream.length + "\n";
for (int i = 0; i < stream.length; i++) {
if (i / number != 0 && i % number == 0) {
content += "\n";
}
String tempstr = Integer.toHexString(stream[i] & 0xFF)
.toUpperCase();
if (tempstr.length() == 1)
tempstr = "0" + tempstr;
content += tempstr + " ";
}
logger.debug(content);
}
}


package com.sof.bas;

public class Util2Bytes
{
public static int bytes2smallint(byte stream[])
{
int value = 0;
int temp = 0;
for (int i = 3; i >= 0; i--)
{
if ((stream[i]) >= 0)
{
temp = stream[i];
}
else
{
temp = stream[i] + 256;
}
temp <<= (i * 8);
value += temp;
}
return value;
}

public static short bytes2smallshort(byte stream[])
{
short value = 0;
int temp = 0;
for (int i = 1; i >= 0; i--)
{
if ((stream[i]) >= 0)
{
temp = stream[i];
}
else
{
temp = stream[i] + 256;
}
temp <<= (i * 8);
value += temp;
}
return value;
}

public static int bytes2bigint(byte stream[])
{
int value = 0;
int temp = 0;
for (int i = 0; i < 4; i++)
{
if ((stream[i]) >= 0)
{
temp = stream[i];
}
else
{
temp = stream[i] + 256;
}
temp <<= ((4 - i - 1) * 8);
value += temp;
}
return value;
}

public static short bytes2bigshort(byte stream[])
{
short value = 0;
int temp = 0;
for (int i = 0; i < 2; i++)
{
if ((stream[i]) >= 0)
{
temp = stream[i];
}
else
{
temp = stream[i] + 256;
}
temp <<= ((2 - i - 1) * 8);
value += temp;
}
return value;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐