您的位置:首页 > 其它

庖丁解牛之仿《闪传》实现文件传输(下)

2016-04-24 10:05 411 查看
转载请标明出处

本文出自[HCY的微博]

一、概述

通过上文庖丁解牛之仿《闪传》实现文件传输(中),客户端接到了服务端的IP地址和端口号,那么连接到服务端,传输链路建立完毕,开始真正的文件收发功能实现。就是那么自然。

本文中介绍四种角色的实现,分别如下:TCP服务端、TCP客户端、TCP消息发送者、TCP消息接收者。其中TCP服务端和TCP客户端都可以作为TCP消息发送者和TCP消息接收者这两种角色。

二、TCP服务端

它的主要功能就是通过accept方法接收客户端的连接,然后可以将这些连接的客户端保存到一个队列中,并且将其构造成TCP消息发送者和TCP消息接收者。这样服务端就可以向客户端发送和接收消息了。

public class TcpServer {
private static TcpServer instance = null;
private ServerSocket serverSocket;
private OnAcceptClientSocketListener acceptClientSocketListener;
private MonitorClientSocketThread monitorClientSocketThread;

public static TcpServer getInstance() {
if (instance == null) {
synchronized (TcpServer.class) {
if (instance == null) {
instance = new TcpServer();
return instance;
}
}
}
return instance;
}

private TcpServer() {

}

/**
* 设置客户端连接监听器
*
* @param l
* @param port
* @throws IOException
*/
public synchronized void setOnAcceptClientSocketListener(
OnAcceptClientSocketListener l, int port) throws IOException {
if (serverSocket == null) {
serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.setSoTimeout(0);
serverSocket.bind(new InetSocketAddress(port));
monitorClientSocketThread = new MonitorClientSocketThread();
monitorClientSocketThread.start();
}
this.acceptClientSocketListener = l;
}

/**
* 停止服务
*
* @throws IOException
*/
public synchronized void stopAcceptClientSocket() throws IOException {
if (monitorClientSocketThread != null) {
monitorClientSocketThread.exit();
monitorClientSocketThread = null;
serverSocket.close();
serverSocket = null;
}
}

/**
* 监听客户端连接的线程
*/
private class MonitorClientSocketThread extends BaseThread {

@Override
public void run() {
while (!isExit) {
try {
Socket socket = serverSocket.accept();
onAcceptClientSocket(socket);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

private void onAcceptClientSocket(Socket socket) {
if (acceptClientSocketListener != null) {
acceptClientSocketListener.onAcceptClientSocket(socket);
}
}
}


三、TCP客户端

主要功能就是连接上服务端,然后通过保存的socket对象分别创建TCP消息发送者和TCP消息接收者,这样客户端就可以向服务端发送消息,并接收来自服务端的消息。

public class TcpClient {
private static TcpClient instance = null;
private Socket clientSocket;

public static TcpClient getInstance() throws IOException {
if (instance == null) {
synchronized (TcpClient.class) {
if (instance == null) {
instance = new TcpClient();
return instance;
}
}
}
return instance;
}

private TcpClient() throws IOException {
clientSocket = new Socket();
}

/**
* 连接到服务端
*
* @param serverHost
*            服务端IP
* @param port
*            服务端监听连接的端口
* @param timeout
*            超时时间,单位毫秒
* @param l
*            连接监听器
*/
public void connnetToServer(final String serverHost, final int port,
final int timeout, final OnTcpClientConnectListener l) {
new Thread() {
@Override
public void run() {
try {
InetAddress address = InetAddress.getByName(serverHost);
SocketAddress remoteAddr = new InetSocketAddress(address,
port);
clientSocket.connect(remoteAddr, timeout);
if (l != null) {
l.onConnectSuccess();
}
} catch (Exception e) {
e.printStackTrace();
if (l != null) {
l.onConnectFailure();
}
}
}
}.start();
}
}


四、TCP消息发送者

主要实现的功能是从消息队列中取出消息并进行发送

public class TcpSender {
private TcpMessagePool messagePool;
private Socket socket;
private OutputStream outputStream;
private OnTcpMessageSendListener onTcpMessageSendListener;
private TcpMessageSendThread messageSenderThread;

public TcpSender(Socket socket) throws IOException {
messagePool = new TcpMessagePool();
this.socket = socket;
this.outputStream = this.socket.getOutputStream();

}

/**
* 发送消息,往消息池里面添加消息
*
* @param message
*/
public synchronized void sendMessage(byte[] message) {
if (messagePool != null) {
messagePool.addMessage(message);
}

}

/**
* 设置消息发送监听器
*
* @param l
*/
public void setOnTcpMessageSendListener(OnTcpMessageSendListener l) {
this.onTcpMessageSendListener = l;
}

/**
* 开始发送消息,从消息池里面取出消息进行发送
*/
public synchronized void startSendMessage() {
if (messageSenderThread == null) {
messageSenderThread = new TcpMessageSendThread();
messageSenderThread.start();
}
}

/**
* 停止消息发送
*/
public synchronized void stopSendMessage() {
if (messageSenderThread != null) {
messageSenderThread.exit();
messageSenderThread = null;
}
}

/**
* 发送消息内部实现
*
* @param message
* @throws IOException
*/
private void sendMessageInternal(byte[] message) throws IOException {
if (message == null) {
return;
}
if (outputStream != null && socket.isConnected()) {
outputStream.write(message, 0, message.length);
outputStream.flush();
} else {
if (onTcpMessageSendListener != null) {
onTcpMessageSendListener.onSocketBreak();
}
throw new IOException();
}
}

/**
* 消息发送线程
*/
private class TcpMessageSendThread extends BaseThread {

@Override
public void run() {
while (!isExit) {
if (messagePool != null) {
byte[] message = messagePool.getMessage();
if (message != null) {
boolean isSendSuccess = false;
try {
sendMessageInternal(message);
isSendSuccess = true;
} catch (Exception e) {
e.printStackTrace();
isSendSuccess = false;
}
if (onTcpMessageSendListener != null) {
onTcpMessageSendListener
.onTcpMessageSend(isSendSuccess);
}
}
}
}
}

}
}


五、TCP消息接收者

这里抽象了一个消息接收者,readMessage需要子类实现,因为可能不同应用场景解析消息的方式不相同。

public abstract class TcpReceiver {
private Socket socket;
private InputStream inputStream;
private byte[] recvBuf;
private OnTcpMessageReceiveListener onTcpMessageReceiveListener;
private TcpMessageReceiveThread tcpMessageReceiveThread;

public TcpReceiver(Socket socket, int recvBufSize) throws IOException {
this.socket = socket;
this.inputStream = this.socket.getInputStream();
recvBuf = new byte[recvBufSize];
}

/**
* 设置数据接收监听器
*
* @param l
*/
public void setOnTcpMessageReceiveListener(OnTcpMessageReceiveListener l) {
this.onTcpMessageReceiveListener = l;
}

/**
* 开始接收数据
*/
public synchronized void startReceiveMessage() {
if (tcpMessageReceiveThread == null) {
tcpMessageReceiveThread = new TcpMessageReceiveThread();
tcpMessageReceiveThread.start();
}
}

/**
* 停止消息接收
*/
public synchronized void stopReceiveMessage() {
if (tcpMessageReceiveThread != null) {
tcpMessageReceiveThread.exit();
tcpMessageReceiveThread = null;
}
}

/**
* TCP消息接收线程
*/
private class TcpMessageReceiveThread extends BaseThread {
@Override
public void run() {
while (!isExit) {
try {
if (inputStream != null && inputStream.available() > 0) {
int len = readMessage(inputStream, recvBuf);
if (len > 0) {
byte[] message = new byte[len];
System.arraycopy(recvBuf, 0, message, 0, len);
if (onTcpMessageReceiveListener != null) {
onTcpMessageReceiveListener
.onTcpMessageReceive(message);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

/**
* 读取消息,读取一个数据包
*
* @param inputStream
*            输入流
* @param recvBuf
*            接收缓冲区
* @return 读取的字节长度
*/
protected abstract int readMessage(InputStream inputStream, byte[] recvBuf);
}


六、处理粘包和半包的问题

在TCP消息的发送过程中,接收方可能会出现粘包和半包的问题。所以需要一种验证方式来保证数据的正确接收,本文通过TLV消息模型处理这类问题。继承消息接收抽象类,构造出TcpTLVReiver。

public class TcpTLVReiver extends TcpReceiver {

public TcpTLVReiver(Socket socket, int recvBufSize) throws IOException {
super(socket, recvBufSize);
}

@Override
protected int readMessage(InputStream inputStream, byte[] recvBuf) {
// TLV编码,第一、第二个字节为TAG,第三、第四个字节为Length,后面Length个字节为数据
int countRead;
int offset;
int remaining;
int messageLength = -1;
offset = 0;
remaining = 4;
try {
// 读取前四个字节,取出数据长度
do {
countRead = inputStream.read(recvBuf, offset, remaining);
if (countRead < 0) {
return -1;
}
offset += countRead;
remaining -= countRead;
} while (remaining > 0);
messageLength = ((recvBuf[2] & 0xff) << 8) | (recvBuf[3] & 0xff);
if (messageLength < 0) {
return 0;
}
// 读取数据到缓冲区
offset = 4;
remaining = messageLength;
do {
countRead = inputStream.read(recvBuf, offset, remaining);
if (countRead < 0) {
return -1;
}
offset += countRead;
remaining -= countRead;
} while (remaining > 0);
} catch (Exception e) {
e.printStackTrace();
return -1;
}
return messageLength + 4;
}

}


七、连接保活

在TCP通信过程中,可能会因为网络原因导致建立的连接断开。那么怎么样才能知道连接断开了呢?答案是通过发送心跳包实现掉线的检测,客户端可以每隔一段时间发送一个心跳包,服务端接受到心跳包之后,应答客户端。如果客户端收到了应答信息,表示连接没有断开,否则表示连接已经断开,需要重新建立连接。

上述代码都在TcpUdpLibrary(https://github.com/Money888/TcpUdpLibrary)中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: