庖丁解牛之仿《闪传》实现文件传输(下)
2016-04-24 10:05
411 查看
转载请标明出处
本文出自[HCY的微博]
本文中介绍四种角色的实现,分别如下:TCP服务端、TCP客户端、TCP消息发送者、TCP消息接收者。其中TCP服务端和TCP客户端都可以作为TCP消息发送者和TCP消息接收者这两种角色。
上述代码都在TcpUdpLibrary(https://github.com/Money888/TcpUdpLibrary)中。
本文出自[HCY的微博]
一、概述
通过上文庖丁解牛之仿《闪传》实现文件传输(中),客户端接到了服务端的IP地址和端口号,那么连接到服务端,传输链路建立完毕,开始真正的文件收发功能实现。就是那么自然。本文中介绍四种角色的实现,分别如下:TCP服务端、TCP客户端、TCP消息发送者、TCP消息接收者。其中TCP服务端和TCP客户端都可以作为TCP消息发送者和TCP消息接收者这两种角色。
二、TCP服务端
它的主要功能就是通过accept方法接收客户端的连接,然后可以将这些连接的客户端保存到一个队列中,并且将其构造成TCP消息发送者和TCP消息接收者。这样服务端就可以向客户端发送和接收消息了。public class TcpServer { private static TcpServer instance = null; private ServerSocket serverSocket; private OnAcceptClientSocketListener acceptClientSocketListener; private MonitorClientSocketThread monitorClientSocketThread; public static TcpServer getInstance() { if (instance == null) { synchronized (TcpServer.class) { if (instance == null) { instance = new TcpServer(); return instance; } } } return instance; } private TcpServer() { } /** * 设置客户端连接监听器 * * @param l * @param port * @throws IOException */ public synchronized void setOnAcceptClientSocketListener( OnAcceptClientSocketListener l, int port) throws IOException { if (serverSocket == null) { serverSocket = new ServerSocket(); serverSocket.setReuseAddress(true); serverSocket.setSoTimeout(0); serverSocket.bind(new InetSocketAddress(port)); monitorClientSocketThread = new MonitorClientSocketThread(); monitorClientSocketThread.start(); } this.acceptClientSocketListener = l; } /** * 停止服务 * * @throws IOException */ public synchronized void stopAcceptClientSocket() throws IOException { if (monitorClientSocketThread != null) { monitorClientSocketThread.exit(); monitorClientSocketThread = null; serverSocket.close(); serverSocket = null; } } /** * 监听客户端连接的线程 */ private class MonitorClientSocketThread extends BaseThread { @Override public void run() { while (!isExit) { try { Socket socket = serverSocket.accept(); onAcceptClientSocket(socket); } catch (Exception e) { e.printStackTrace(); } } } } private void onAcceptClientSocket(Socket socket) { if (acceptClientSocketListener != null) { acceptClientSocketListener.onAcceptClientSocket(socket); } } }
三、TCP客户端
主要功能就是连接上服务端,然后通过保存的socket对象分别创建TCP消息发送者和TCP消息接收者,这样客户端就可以向服务端发送消息,并接收来自服务端的消息。public class TcpClient { private static TcpClient instance = null; private Socket clientSocket; public static TcpClient getInstance() throws IOException { if (instance == null) { synchronized (TcpClient.class) { if (instance == null) { instance = new TcpClient(); return instance; } } } return instance; } private TcpClient() throws IOException { clientSocket = new Socket(); } /** * 连接到服务端 * * @param serverHost * 服务端IP * @param port * 服务端监听连接的端口 * @param timeout * 超时时间,单位毫秒 * @param l * 连接监听器 */ public void connnetToServer(final String serverHost, final int port, final int timeout, final OnTcpClientConnectListener l) { new Thread() { @Override public void run() { try { InetAddress address = InetAddress.getByName(serverHost); SocketAddress remoteAddr = new InetSocketAddress(address, port); clientSocket.connect(remoteAddr, timeout); if (l != null) { l.onConnectSuccess(); } } catch (Exception e) { e.printStackTrace(); if (l != null) { l.onConnectFailure(); } } } }.start(); } }
四、TCP消息发送者
主要实现的功能是从消息队列中取出消息并进行发送public class TcpSender { private TcpMessagePool messagePool; private Socket socket; private OutputStream outputStream; private OnTcpMessageSendListener onTcpMessageSendListener; private TcpMessageSendThread messageSenderThread; public TcpSender(Socket socket) throws IOException { messagePool = new TcpMessagePool(); this.socket = socket; this.outputStream = this.socket.getOutputStream(); } /** * 发送消息,往消息池里面添加消息 * * @param message */ public synchronized void sendMessage(byte[] message) { if (messagePool != null) { messagePool.addMessage(message); } } /** * 设置消息发送监听器 * * @param l */ public void setOnTcpMessageSendListener(OnTcpMessageSendListener l) { this.onTcpMessageSendListener = l; } /** * 开始发送消息,从消息池里面取出消息进行发送 */ public synchronized void startSendMessage() { if (messageSenderThread == null) { messageSenderThread = new TcpMessageSendThread(); messageSenderThread.start(); } } /** * 停止消息发送 */ public synchronized void stopSendMessage() { if (messageSenderThread != null) { messageSenderThread.exit(); messageSenderThread = null; } } /** * 发送消息内部实现 * * @param message * @throws IOException */ private void sendMessageInternal(byte[] message) throws IOException { if (message == null) { return; } if (outputStream != null && socket.isConnected()) { outputStream.write(message, 0, message.length); outputStream.flush(); } else { if (onTcpMessageSendListener != null) { onTcpMessageSendListener.onSocketBreak(); } throw new IOException(); } } /** * 消息发送线程 */ private class TcpMessageSendThread extends BaseThread { @Override public void run() { while (!isExit) { if (messagePool != null) { byte[] message = messagePool.getMessage(); if (message != null) { boolean isSendSuccess = false; try { sendMessageInternal(message); isSendSuccess = true; } catch (Exception e) { e.printStackTrace(); isSendSuccess = false; } if (onTcpMessageSendListener != null) { onTcpMessageSendListener .onTcpMessageSend(isSendSuccess); } } } } } } }
五、TCP消息接收者
这里抽象了一个消息接收者,readMessage需要子类实现,因为可能不同应用场景解析消息的方式不相同。public abstract class TcpReceiver { private Socket socket; private InputStream inputStream; private byte[] recvBuf; private OnTcpMessageReceiveListener onTcpMessageReceiveListener; private TcpMessageReceiveThread tcpMessageReceiveThread; public TcpReceiver(Socket socket, int recvBufSize) throws IOException { this.socket = socket; this.inputStream = this.socket.getInputStream(); recvBuf = new byte[recvBufSize]; } /** * 设置数据接收监听器 * * @param l */ public void setOnTcpMessageReceiveListener(OnTcpMessageReceiveListener l) { this.onTcpMessageReceiveListener = l; } /** * 开始接收数据 */ public synchronized void startReceiveMessage() { if (tcpMessageReceiveThread == null) { tcpMessageReceiveThread = new TcpMessageReceiveThread(); tcpMessageReceiveThread.start(); } } /** * 停止消息接收 */ public synchronized void stopReceiveMessage() { if (tcpMessageReceiveThread != null) { tcpMessageReceiveThread.exit(); tcpMessageReceiveThread = null; } } /** * TCP消息接收线程 */ private class TcpMessageReceiveThread extends BaseThread { @Override public void run() { while (!isExit) { try { if (inputStream != null && inputStream.available() > 0) { int len = readMessage(inputStream, recvBuf); if (len > 0) { byte[] message = new byte[len]; System.arraycopy(recvBuf, 0, message, 0, len); if (onTcpMessageReceiveListener != null) { onTcpMessageReceiveListener .onTcpMessageReceive(message); } } } } catch (IOException e) { e.printStackTrace(); } } } } /** * 读取消息,读取一个数据包 * * @param inputStream * 输入流 * @param recvBuf * 接收缓冲区 * @return 读取的字节长度 */ protected abstract int readMessage(InputStream inputStream, byte[] recvBuf); }
六、处理粘包和半包的问题
在TCP消息的发送过程中,接收方可能会出现粘包和半包的问题。所以需要一种验证方式来保证数据的正确接收,本文通过TLV消息模型处理这类问题。继承消息接收抽象类,构造出TcpTLVReiver。public class TcpTLVReiver extends TcpReceiver { public TcpTLVReiver(Socket socket, int recvBufSize) throws IOException { super(socket, recvBufSize); } @Override protected int readMessage(InputStream inputStream, byte[] recvBuf) { // TLV编码,第一、第二个字节为TAG,第三、第四个字节为Length,后面Length个字节为数据 int countRead; int offset; int remaining; int messageLength = -1; offset = 0; remaining = 4; try { // 读取前四个字节,取出数据长度 do { countRead = inputStream.read(recvBuf, offset, remaining); if (countRead < 0) { return -1; } offset += countRead; remaining -= countRead; } while (remaining > 0); messageLength = ((recvBuf[2] & 0xff) << 8) | (recvBuf[3] & 0xff); if (messageLength < 0) { return 0; } // 读取数据到缓冲区 offset = 4; remaining = messageLength; do { countRead = inputStream.read(recvBuf, offset, remaining); if (countRead < 0) { return -1; } offset += countRead; remaining -= countRead; } while (remaining > 0); } catch (Exception e) { e.printStackTrace(); return -1; } return messageLength + 4; } }
七、连接保活
在TCP通信过程中,可能会因为网络原因导致建立的连接断开。那么怎么样才能知道连接断开了呢?答案是通过发送心跳包实现掉线的检测,客户端可以每隔一段时间发送一个心跳包,服务端接受到心跳包之后,应答客户端。如果客户端收到了应答信息,表示连接没有断开,否则表示连接已经断开,需要重新建立连接。上述代码都在TcpUdpLibrary(https://github.com/Money888/TcpUdpLibrary)中。
相关文章推荐
- hdu4722 Good Numbers(数位dp)
- Oracle之深入浅出(二)--SQL原理、解释计划与执行计划
- 微软面试题:五个囚犯抓绿豆
- Vim
- 期中总结
- Linux下四款Web服务器压力测试工具(http_load、webbench、ab、siege)介绍
- 使用C#调用windows API(从其它地方总结来的,以备查询)
- 当当的面经
- Leetcode——047
- android图形之setClickable,setEnable,setFocusable区别
- 微信运营案例分析
- 记一次酷派尚锋Y75刷机
- 再说最后一次!关于不再更新SkySRS的理由!
- OnTouchListener的使用
- 摄像机码流的计算
- 微信开发
- hdu1170Balloon Comes!
- java的覆盖重写隐藏和C#中的不同
- Cool Edit Pro 2.0详细教程(转)
- Mac下OpenGL环境搭建以及OpenGL解析渲染OBJ模型