关于项目中使用的关于nio client的一些小结
2016-08-30 17:53
555 查看
之前项目使用的都是tcp的同步操作,比如之前的接收数据是写到发送数据的后面的,这样就导致了只能够发送数据之后才能够接收到数据。然后经过一些参考资料以及同事的帮助下,把同步换成了异步。具体关于nio和bio 大家有不明白的请自行百度。
当然啦,首先去github上找下有没有大神写的demo赛。我自己认为比较好的demo,传送门,本文代码是在其基础上修改而来,简单滴说就是copy改,请不要喷我。
下面说下基本步骤吧。
使用SocketChannel不阻塞的Socket替代阻塞的Socket
打开选择器Selector
注册Selector选择器到信道
通过Selector.selectedKeys()获取到SelectionKey,然后通过SelectionKey.channel()获取到 SocketChannel,从而可以利用Socket的write和read去读写数据。具体信息可以查看Java NIO系列教程(六) Selector。
大致思路都有了下面上下代码断,主要有3个类
最后是
关于这里用到
注释应该很清楚了,这个就当是一个小小的笔记吧。。。。。
当然啦,首先去github上找下有没有大神写的demo赛。我自己认为比较好的demo,传送门,本文代码是在其基础上修改而来,简单滴说就是copy改,请不要喷我。
下面说下基本步骤吧。
使用SocketChannel不阻塞的Socket替代阻塞的Socket
打开选择器Selector
注册Selector选择器到信道
通过Selector.selectedKeys()获取到SelectionKey,然后通过SelectionKey.channel()获取到 SocketChannel,从而可以利用Socket的write和read去读写数据。具体信息可以查看Java NIO系列教程(六) Selector。
大致思路都有了下面上下代码断,主要有3个类
NioClient,
CenterControlStatusListener,
RecvData。
CenterControlStatusListener如下:
package com.pd.plugin.pd.led.listener; /** * 中控状态监听器,主要用于{@link com.pd.plugin.pd.led.protocol.thread.TcpHandleThread}向{@link com.pd.plugin.pd.led.LedLightApplication}里面传递数据 <br> * Created by Tangxb on 2016/8/16. */ public interface CenterControlStatusListener { /** * tcp重连失败,需要开启udp发送状态 * * @param deviceIp */ void handleReconnByTcpAfter(String deviceIp); /** * 将中控状态保存起来 * * @param deviceIp 中控的IP * @param value true:在线 false:离线 */ void updateStatusMapByDeviceIp(String deviceIp, boolean value); }
RecvData如下:
package com.pd.plugin.pd.led.protocol.thread; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.ArrayList; import java.util.List; /** * 缓存数据,使用了{@link java.nio.channels.SelectionKey#attach(Object)}来管理同一管道的缓存数据,这样可以缓存分包发送的数据 * 参考{@link NioClient#connect()}里面的<code>key.attach(data);</code>以及参考{@link com.pd.plugin.pd.led.protocol.thread.NioClient.ClientThread#readMessage(SelectionKey)} * 里面的<code>RecvData data = (RecvData) sk.attachment();</code>和<code>sk.attach(data);</code> */ public class RecvData { // 调试下,采用system.out.println打印关键信息 private boolean bDebug = true; // 当前接受的长度 private int iRecvLen = 0; // 一个完整包的长度 private int iTotalLen = 0; // 建议接受缓存大小 private int default_buffer_size = 1024; private String uuid = ""; // 数据缓冲区 加上default_buffer_size 为接收时的缓存大小 ByteBuffer buffer = ByteBuffer.allocate(65535 + default_buffer_size); public int getDefault_buffer_size() { return default_buffer_size; } public String getUuid() { return uuid; } public void setUuid(String uuid) { this.uuid = uuid; } /** * 获取一个完整的包 当且仅当收取完成后调用 * * @return */ public List<byte[]> getData() { List<byte[]> list = new ArrayList<byte[]>(); // 没有接收到完整包返回 if (iRecvLen < iTotalLen || iRecvLen == 0 || iTotalLen == 0) { return list; } int iPostion = 0; // 能够到达这里说明肯定存在至少一个完整的包 // 用int变量 主要是放置死循环 也许存在未考虑到的地方 int iCount = 0; while (iCount < buffer.capacity()) { iCount++; //多次处理过后,存在遗留一个byte的情况 if (iRecvLen < 2) { break; } int iDataSize = bytesToShort(buffer.array(), iPostion); iTotalLen = iDataSize; // 协议长度 3 < iRecvLen包的长度4 RunLog("getData: iRecvLen = " + iRecvLen + " , iTotalLen = " + iTotalLen); // 数据长度大于现有收取数据 if (iDataSize > iRecvLen) { // 首次执行while不处理 if (iPostion != 0) { // 将剩余的移动到buffer中 byte[] b = new byte[iRecvLen]; System.arraycopy(buffer.array(), iPostion, b, 0, iRecvLen); buffer.flip(); buffer.clear(); buffer.put(b); } break; } else if (iDataSize == iRecvLen) { // 仅有一个包啦 byte[] b = new byte[iDataSize]; System.arraycopy(buffer.array(), iPostion, b, 0, iDataSize); list.add(b); iRecvLen = 0; iTotalLen = 0; buffer.flip(); buffer.clear(); break; } else if (iDataSize < iRecvLen) { // 收取一个包然后设置收取长度/总长等 byte[] b = new byte[iDataSize]; System.arraycopy(buffer.array(), iPostion, b, 0, iDataSize); // 移动postion iPostion += iDataSize; iRecvLen = iRecvLen - iDataSize; iTotalLen = 0; list.add(b); } } return list; } /** * 每次接收到tcp数据,通过该函数放入缓冲区中 * * @param b */ public void setData(byte[] b) { if (b == null || b.length == 0) { return; } buffer.put(b); iRecvLen += b.length; // 判断是否该包没有长度 if (iTotalLen == 0 && iRecvLen >= 2) { iTotalLen = bytesToShort(buffer.array(), 0); } RunLog("setData: iRecvLen = " + iRecvLen + " , iTotalLen = " + iTotalLen); } private short bytesToShort(byte[] b, int pos) { if (b == null || b.length == 0) { return 0; } short sLen = 0; short sLen1 = (short) (b[pos + 0] & 0x00ff); // 最低位 short sLen2 = (short) (b[pos + 1] & 0x00ff); sLen1 <<= 8 * 0; sLen2 <<= 8 * 1; sLen = (short) (sLen2 | sLen1); return sLen; } private void RunLog(String s) { if (bDebug) { System.err.println("UUID:" + uuid + " >> " + s); } } }
最后是
NioClient:
package com.pd.plugin.pd.led.protocol.thread; import android.support.annotation.Nullable; import android.util.Log; import com.pd.plugin.pd.led.listener.CenterControlStatusListener; import com.pd.plugin.pd.led.protocol.DataHelper; import com.pd.plugin.pd.led.protocol.ProtocolEntity; import com.pd.plugin.pd.led.util.DebugToolUtils; import org.greenrobot.eventbus.EventBus; import java.io.IOException; import java.lang.ref.WeakReference; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.concurrent.LinkedBlockingQueue; public class NioClient { // Debugging private static final String TAG = "NetworkClient"; private static final boolean D = true; public interface IClientListener { // 接收数据后调用此接口 void onReceived(InetSocketAddress remoteAddress, byte[] data); // 数据发送后调用此接口 void onSent(InetSocketAddress remoteAddress, int sentCount); /** * 连接是否成功 * * @param flag */ void onConnResult(boolean flag); // 数据发送失败后调用此接口 void onSendFailed(InetSocketAddress remoteAddress); } public static final int STATE_NONE = 0; //未连接 public static final int STATE_CONNECTING = 1; //正在链接 public static final int STATE_CONNECTED = 2; //已连接 private volatile int mState = STATE_NONE; private InetSocketAddress mRemoteAddress; private IClientListener mClientListener; private Thread mClientThread = null; private Selector mSelector; // 发送队列 private LinkedBlockingQueue<ByteBuffer> mSendQueue = new LinkedBlockingQueue<>(); // 接受缓冲(2kb) private ByteBuffer mReceiveBuffer = ByteBuffer.allocate(2 * 1024); private String mNeedIp; private int mNeedPort; private WeakReference<CenterControlStatusListener> mListener; public NioClient(String serverIp, int port, @Nullable IClientListener clientListener, CenterControlStatusListener listener) { this(new InetSocketAddress(serverIp, port), clientListener); mNeedIp = serverIp; mNeedPort = port; mListener = new WeakReference<>(listener); } /** * 测试的时候使用 * * @param remoteAddress * @param clientListener */ public NioClient(InetSocketAddress remoteAddress, @Nullable IClientListener clientListener) { mRemoteAddress = remoteAddress; mClientListener = clientListener; mState = STATE_NONE; } public InetSocketAddress getRemoteAddress() { return mRemoteAddress; } private void setState(int state) { mState = state; if (D) { switch (mState) { case STATE_CONNECTING: Log.d(TAG, "connecting"); break; case STATE_CONNECTED: Log.d(TAG, "connected"); break; default: Log.d(TAG, "not connected"); break; } } } public int getState() { return mState; } public void connect() { if (mState == STATE_NONE) { //TODO 检查网络状态 mClientThread = new ClientThread(); mClientThread.start(); } } /** * 适用于一次性操作 * * @param data */ public synchronized void sendBeforeClear(byte[] data) { mSendQueue.clear(); send(ByteBuffer.wrap(data)); } public void send(byte[] data) { send(ByteBuffer.wrap(data)); } public void send(ByteBuffer buffer) { mSendQueue.offer(buffer); //检查是否连接 connect(); //已连接状态则唤醒Selector进行发讯,其他状态只加入发讯队列即可 if (mState == STATE_CONNECTED) { mSelector.wakeup(); } } public void close() { if (mState == STATE_NONE) return; // 停止线程运行方式一:使用interrupt() if (mClientThread != null && mClientThread.isAlive()) { mClientThread.interrupt(); } mClientThread = null; // 停止线程运行方式二:设置volatile条件变量 mState = STATE_NONE; } public class ClientThread extends Thread { @Override public void run() { if (D) Log.d(TAG, String.format("Thread[%d] start RUN.", Thread.currentThread().getId())); SocketChannel socketChannel = null; try { setState(STATE_CONNECTING); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); mSelector = Selector.open(); socketChannel.register(mSelector, SelectionKey.OP_CONNECT); if (D) Log.i(TAG, "Connect to " + mRemoteAddress.toString()); // 这里并没有真正连接,只有socketChannel.finishConnect()返回为true才是连接成功了 socketChannel.connect(mRemoteAddress); while (mState != STATE_NONE) { mSelector.select(); //当调用Thread.interrupt()进行中断线程时,上面的Selector的阻塞操作会马上返回,在此处立马检查线程状态 if (Thread.interrupted()) { throw new InterruptedException(String.format(Locale.getDefault(), "Thread[%d] has been Interrupted.", Thread.currentThread().getId())); } Iterator<SelectionKey> it = mSelector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (!key.isValid()) continue; if (key.isConnectable()) { connect(key); } else if (key.isReadable()) { readMessage(key); } else if (key.isWritable()) { write(key); } } if (!mSendQueue.isEmpty()) { SelectionKey key = socketChannel.keyFor(mSelector); key.interestOps(SelectionKey.OP_WRITE); } } } catch (IOException e) { if (D) Log.e(TAG, "IOException occur. " + e.getMessage()); } catch (InterruptedException e) { if (D) Log.e(TAG, "InterruptedException occur. " + e.getMessage()); } finally { if (socketChannel != null) { SelectionKey key = socketChannel.keyFor(mSelector); key.cancel(); try { socketChannel.close(); //关闭Socket } catch (IOException e) { e.printStackTrace(); } } mSendQueue.clear(); setState(STATE_NONE); if (D) Log.d(TAG, String.format("Thread[%d] END", Thread.currentThread().getId())); } } public void readMessage(SelectionKey sk) throws IOException { SocketChannel curSc = (SocketChannel) sk.channel(); RecvData data = (RecvData) sk.attachment(); ByteBuffer buffer = ByteBuffer.allocate(data.getDefault_buffer_size()); while (curSc.read(buffer) > 0) { sk.interestOps(SelectionKey.OP_READ); byte[] tempBuffer = new byte[buffer.position()]; System.arraycopy(buffer.array(), 0, tempBuffer, 0, tempBuffer.length); buffer.flip(); buffer.clear(); data.setData(tempBuffer); List<byte[]> list = data.getData(); if (list != null && list.size() > 0) { for (byte[] b : list) { receiveData(b); } } } sk.attach(data); } private void receiveData(byte[] buffer) { if (buffer.length < ProtocolEntity.SIZE - 1) { return; } byte[] tempBytes = new byte[ProtocolEntity.SIZE - 1]; System.arraycopy(buffer, 0, tempBytes, 0, tempBytes.length); // 检查特征码 String checkCodeStr = DataHelper.byteArrayToHexString(tempBytes, tempBytes.length - 2, 2); // 特征码检查通过 if (checkCodeStr.equalsIgnoreCase(ProtocolEntity.CHECKCODE)) { int totalLen = DataHelper.bytesToShort(tempBytes, 0); int remainLen = totalLen - tempBytes.length; byte[] totalBytes = new byte[totalLen]; // 拷贝第一次的数组 System.arraycopy(buffer, 0, totalBytes, 0, tempBytes.length); int readCount = 0; // 已经成功读取的字节的个数 int iLen = remainLen - readCount; // 拷贝第二次的数组 System.arraycopy(buffer, readCount + tempBytes.length, totalBytes, readCount + tempBytes.length, iLen); readCount += iLen; // 判断第2次接收的数据长度对不对(防止出现接收数据不完全的情况) if (readCount == remainLen) { // 返回成功 if (mListener != null && mListener.get() != null) { mListener.get().updateStatusMapByDeviceIp(mNeedIp, true); } ProtocolEntity entity = new ProtocolEntity(); entity.setBytes(totalBytes, 0); EventBus.getDefault().post(entity); DebugToolUtils.e(getClass(), "receive$$$$$cmd===" + entity.getCmd() + ",getSubCmd===" + entity.getSubCmd() + ",data=====" + DataHelper.byteArrayToHexString(entity.getBytes())); } } } private void connect(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); if (socketChannel.isConnectionPending()) { try { if (socketChannel.finishConnect()) { key.interestOps(SelectionKey.OP_READ); setState(STATE_CONNECTED); if (mClientListener != null) { mClientListener.onConnResult(true); } // 返回成功 if (mListener != null && mListener.get() != null) { mListener.get().updateStatusMapByDeviceIp(mNeedIp, true); } } } catch (IOException e) { if (mClientListener != null) { mClientListener.onConnResult(false); } // 返回失败 if (mListener != null && mListener.get() != null) { mListener.get().updateStatusMapByDeviceIp(mNeedIp, false); mListener.get().handleReconnByTcpAfter(mNeedIp); } throw e; } } RecvData data = new RecvData(); key.attach(data); } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); mReceiveBuffer.clear(); int count = socketChannel.read(mReceiveBuffer); if (count <= 0) { throw new IOException(String.format(Locale.getDefault(), "Thread[%d] read error:%d", Thread.currentThread().getId(), count)); } else { byte[] data = new byte[count]; System.arraycopy(mReceiveBuffer.array(), 0, data, 0, count); if (mClientListener != null) { mClientListener.onReceived(mRemoteAddress, data); } } } private void write(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); while (!mSendQueue.isEmpty()) { ByteBuffer buffer = mSendQueue.poll(); int count; try { count = socketChannel.write(buffer); } catch (IOException e) { if (mClientListener != null) { mClientListener.onSendFailed(mRemoteAddress); } throw e; } if (mClientListener != null) { mClientListener.onSent(mRemoteAddress, count); } } key.interestOps(SelectionKey.OP_READ); } } }
关于这里用到
receiveData(byte[] buffer)方法可以去掉。
注释应该很清楚了,这个就当是一个小小的笔记吧。。。。。
相关文章推荐
- 关于C++项目中的一些LINK错误小结
- 使用Angular CLI快速创建Angular项目的一些基本概念和写法小结
- 关于Apache 工具包的一些记录,希望以后项目可以多多使用。
- 关于移动iscroll项目中使用的一些坑
- 关于项目中要使用thymeleaf的一些想法
- 关于jeesite开源项目的一些使用总结
- C#使用HttpWebRequest进行HTTP请求发送和接收的一些小结。(新增修复.NET4.0以下关于cookie的bug)
- 关于CListCtrl 实际项目中的一些简单使用
- 关于android控件属性的一些使用小结
- 又是一点关于过年前做的项目的一些想法!
- 关于何种情况下使用DataGrid、DataList或Repeater的一些讨论
- 关于Editplus的一些使用方法
- 关于何种情况下使用DataGrid、DataList或Repeater的一些讨论
- 关于何种情况下使用DataGrid、DataList或Repeater的一些讨论
- COM中关于使用DLL的一些知识点
- 关于使用commons-betwixt组件实现xml信息-->Java Bean的转化的一些感受
- 关于何种情况下使用DataGrid、DataList或Repeater的一些讨论
- 关于项目方面的几点经典小结
- 关于最近使用stl的一些感受
- 关于在.net环境下通过使用LDAP来访问AD,LDAPServer的一些问题?