您的位置:首页 > 移动开发 > Android开发

关于项目中使用的关于nio client的一些小结

2016-08-30 17:53 555 查看
之前项目使用的都是tcp的同步操作,比如之前的接收数据是写到发送数据的后面的,这样就导致了只能够发送数据之后才能够接收到数据。然后经过一些参考资料以及同事的帮助下,把同步换成了异步。具体关于nio和bio 大家有不明白的请自行百度。

当然啦,首先去github上找下有没有大神写的demo赛。我自己认为比较好的demo,传送门,本文代码是在其基础上修改而来,简单滴说就是copy改,请不要喷我。

下面说下基本步骤吧。

使用SocketChannel不阻塞的Socket替代阻塞的Socket

打开选择器Selector

注册Selector选择器到信道



通过Selector.selectedKeys()获取到SelectionKey,然后通过SelectionKey.channel()获取到 SocketChannel,从而可以利用Socket的write和read去读写数据。具体信息可以查看Java NIO系列教程(六) Selector

大致思路都有了下面上下代码断,主要有3个类
NioClient
,
CenterControlStatusListener
,
RecvData


CenterControlStatusListener
如下:

package com.pd.plugin.pd.led.listener;

/**
* 中控状态监听器,主要用于{@link com.pd.plugin.pd.led.protocol.thread.TcpHandleThread}向{@link com.pd.plugin.pd.led.LedLightApplication}里面传递数据 <br>
* Created by Tangxb on 2016/8/16.
*/
public interface CenterControlStatusListener {
/**
* tcp重连失败,需要开启udp发送状态
*
* @param deviceIp
*/
void handleReconnByTcpAfter(String deviceIp);

/**
* 将中控状态保存起来
*
* @param deviceIp 中控的IP
* @param value    true:在线 false:离线
*/
void updateStatusMapByDeviceIp(String deviceIp, boolean value);
}


RecvData
如下:

package com.pd.plugin.pd.led.protocol.thread;

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;

/**
* 缓存数据,使用了{@link java.nio.channels.SelectionKey#attach(Object)}来管理同一管道的缓存数据,这样可以缓存分包发送的数据
* 参考{@link NioClient#connect()}里面的<code>key.attach(data);</code>以及参考{@link com.pd.plugin.pd.led.protocol.thread.NioClient.ClientThread#readMessage(SelectionKey)}
* 里面的<code>RecvData data = (RecvData) sk.attachment();</code>和<code>sk.attach(data);</code>
*/
public class RecvData {
// 调试下,采用system.out.println打印关键信息
private boolean bDebug = true;

// 当前接受的长度
private int iRecvLen = 0;

// 一个完整包的长度
private int iTotalLen = 0;

// 建议接受缓存大小
private int default_buffer_size = 1024;

private String uuid = "";

// 数据缓冲区 加上default_buffer_size 为接收时的缓存大小
ByteBuffer buffer = ByteBuffer.allocate(65535 + default_buffer_size);

public int getDefault_buffer_size() {
return default_buffer_size;
}

public String getUuid() {
return uuid;
}

public void setUuid(String uuid) {
this.uuid = uuid;
}

/**
* 获取一个完整的包 当且仅当收取完成后调用
*
* @return
*/
public List<byte[]> getData() {
List<byte[]> list = new ArrayList<byte[]>();

// 没有接收到完整包返回
if (iRecvLen < iTotalLen || iRecvLen == 0 || iTotalLen == 0) {
return list;
}

int iPostion = 0;

// 能够到达这里说明肯定存在至少一个完整的包
// 用int变量 主要是放置死循环 也许存在未考虑到的地方
int iCount = 0;

while (iCount < buffer.capacity()) {
iCount++;

//多次处理过后,存在遗留一个byte的情况
if (iRecvLen < 2) {
break;
}

int iDataSize = bytesToShort(buffer.array(), iPostion);

iTotalLen = iDataSize;

// 协议长度 3 < iRecvLen包的长度4
RunLog("getData: iRecvLen = " + iRecvLen + " , iTotalLen = " + iTotalLen);

// 数据长度大于现有收取数据
if (iDataSize > iRecvLen) {
// 首次执行while不处理
if (iPostion != 0) {
// 将剩余的移动到buffer中
byte[] b = new byte[iRecvLen];

System.arraycopy(buffer.array(), iPostion, b, 0, iRecvLen);

buffer.flip();

buffer.clear();

buffer.put(b);
}

break;
} else if (iDataSize == iRecvLen) {
// 仅有一个包啦
byte[] b = new byte[iDataSize];

System.arraycopy(buffer.array(), iPostion, b, 0, iDataSize);

list.add(b);

iRecvLen = 0;

iTotalLen = 0;

buffer.flip();

buffer.clear();

break;
} else if (iDataSize < iRecvLen) {
// 收取一个包然后设置收取长度/总长等
byte[] b = new byte[iDataSize];

System.arraycopy(buffer.array(), iPostion, b, 0, iDataSize);

// 移动postion
iPostion += iDataSize;

iRecvLen = iRecvLen - iDataSize;

iTotalLen = 0;

list.add(b);
}
}
return list;
}

/**
* 每次接收到tcp数据,通过该函数放入缓冲区中
*
* @param b
*/
public void setData(byte[] b) {
if (b == null || b.length == 0) {
return;
}
buffer.put(b);

iRecvLen += b.length;

// 判断是否该包没有长度
if (iTotalLen == 0 && iRecvLen >= 2) {
iTotalLen = bytesToShort(buffer.array(), 0);
}

RunLog("setData: iRecvLen = " + iRecvLen + " , iTotalLen = " + iTotalLen);
}

private short bytesToShort(byte[] b, int pos) {
if (b == null || b.length == 0) {
return 0;
}

short sLen = 0;

short sLen1 = (short) (b[pos + 0] & 0x00ff); // 最低位
short sLen2 = (short) (b[pos + 1] & 0x00ff);
sLen1 <<= 8 * 0;
sLen2 <<= 8 * 1;

sLen = (short) (sLen2 | sLen1);

return sLen;
}

private void RunLog(String s) {
if (bDebug) {
System.err.println("UUID:" + uuid + " >> " + s);
}
}
}


最后是
NioClient
:

package com.pd.plugin.pd.led.protocol.thread;

import android.support.annotation.Nullable;
import android.util.Log;

import com.pd.plugin.pd.led.listener.CenterControlStatusListener;
import com.pd.plugin.pd.led.protocol.DataHelper;
import com.pd.plugin.pd.led.protocol.ProtocolEntity;
import com.pd.plugin.pd.led.util.DebugToolUtils;

import org.greenrobot.eventbus.EventBus;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.LinkedBlockingQueue;

public class NioClient {
// Debugging
private static final String TAG = "NetworkClient";
private static final boolean D = true;

public interface IClientListener {
// 接收数据后调用此接口
void onReceived(InetSocketAddress remoteAddress, byte[] data);

// 数据发送后调用此接口
void onSent(InetSocketAddress remoteAddress, int sentCount);

/**
* 连接是否成功
*
* @param flag
*/
void onConnResult(boolean flag);

// 数据发送失败后调用此接口
void onSendFailed(InetSocketAddress remoteAddress);
}

public static final int STATE_NONE = 0;        //未连接
public static final int STATE_CONNECTING = 1;  //正在链接
public static final int STATE_CONNECTED = 2;   //已连接
private volatile int mState = STATE_NONE;

private InetSocketAddress mRemoteAddress;
private IClientListener mClientListener;

private Thread mClientThread = null;
private Selector mSelector;

// 发送队列
private LinkedBlockingQueue<ByteBuffer> mSendQueue = new LinkedBlockingQueue<>();
// 接受缓冲(2kb)
private ByteBuffer mReceiveBuffer = ByteBuffer.allocate(2 * 1024);
private String mNeedIp;
private int mNeedPort;
private WeakReference<CenterControlStatusListener> mListener;

public NioClient(String serverIp, int port, @Nullable IClientListener clientListener, CenterControlStatusListener listener) {
this(new InetSocketAddress(serverIp, port), clientListener);
mNeedIp = serverIp;
mNeedPort = port;
mListener = new WeakReference<>(listener);
}

/**
* 测试的时候使用
*
* @param remoteAddress
* @param clientListener
*/
public NioClient(InetSocketAddress remoteAddress, @Nullable IClientListener clientListener) {
mRemoteAddress = remoteAddress;
mClientListener = clientListener;
mState = STATE_NONE;
}

public InetSocketAddress getRemoteAddress() {
return mRemoteAddress;
}

private void setState(int state) {
mState = state;
if (D) {
switch (mState) {
case STATE_CONNECTING:
Log.d(TAG, "connecting");
break;
case STATE_CONNECTED:
Log.d(TAG, "connected");
break;
default:
Log.d(TAG, "not connected");
break;
}
}
}

public int getState() {
return mState;
}

public void connect() {
if (mState == STATE_NONE) {
//TODO 检查网络状态

mClientThread = new ClientThread();
mClientThread.start();
}
}

/**
* 适用于一次性操作
*
* @param data
*/
public synchronized void sendBeforeClear(byte[] data) {
mSendQueue.clear();
send(ByteBuffer.wrap(data));
}

public void send(byte[] data) {
send(ByteBuffer.wrap(data));
}

public void send(ByteBuffer buffer) {
mSendQueue.offer(buffer);

//检查是否连接
connect();

//已连接状态则唤醒Selector进行发讯,其他状态只加入发讯队列即可
if (mState == STATE_CONNECTED) {
mSelector.wakeup();
}
}

public void close() {
if (mState == STATE_NONE) return;

// 停止线程运行方式一:使用interrupt()
if (mClientThread != null && mClientThread.isAlive()) {
mClientThread.interrupt();
}
mClientThread = null;
// 停止线程运行方式二:设置volatile条件变量
mState = STATE_NONE;
}

public class ClientThread extends Thread {
@Override
public void run() {
if (D)
Log.d(TAG, String.format("Thread[%d] start RUN.", Thread.currentThread().getId()));
SocketChannel socketChannel = null;
try {
setState(STATE_CONNECTING);
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
mSelector = Selector.open();
socketChannel.register(mSelector, SelectionKey.OP_CONNECT);
if (D) Log.i(TAG, "Connect to " + mRemoteAddress.toString());
// 这里并没有真正连接,只有socketChannel.finishConnect()返回为true才是连接成功了
socketChannel.connect(mRemoteAddress);

while (mState != STATE_NONE) {
mSelector.select();

//当调用Thread.interrupt()进行中断线程时,上面的Selector的阻塞操作会马上返回,在此处立马检查线程状态
if (Thread.interrupted()) {
throw new InterruptedException(String.format(Locale.getDefault(),
"Thread[%d] has been Interrupted.", Thread.currentThread().getId()));
}

Iterator<SelectionKey> it = mSelector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();

if (!key.isValid()) continue;

if (key.isConnectable()) {
connect(key);
} else if (key.isReadable()) {
readMessage(key);
} else if (key.isWritable()) {
write(key);
}
}

if (!mSendQueue.isEmpty()) {
SelectionKey key = socketChannel.keyFor(mSelector);
key.interestOps(SelectionKey.OP_WRITE);
}
}
} catch (IOException e) {
if (D) Log.e(TAG, "IOException occur. " + e.getMessage());
} catch (InterruptedException e) {
if (D) Log.e(TAG, "InterruptedException occur. " + e.getMessage());
} finally {
if (socketChannel != null) {
SelectionKey key = socketChannel.keyFor(mSelector);
key.cancel();
try {
socketChannel.close();  //关闭Socket
} catch (IOException e) {
e.printStackTrace();
}
}
mSendQueue.clear();
setState(STATE_NONE);
if (D) Log.d(TAG, String.format("Thread[%d] END", Thread.currentThread().getId()));
}
}

public void readMessage(SelectionKey sk) throws IOException {
SocketChannel curSc = (SocketChannel) sk.channel();
RecvData data = (RecvData) sk.attachment();
ByteBuffer buffer = ByteBuffer.allocate(data.getDefault_buffer_size());
while (curSc.read(buffer) > 0) {
sk.interestOps(SelectionKey.OP_READ);
byte[] tempBuffer = new byte[buffer.position()];
System.arraycopy(buffer.array(), 0, tempBuffer, 0, tempBuffer.length);
buffer.flip();
buffer.clear();
data.setData(tempBuffer);
List<byte[]> list = data.getData();
if (list != null && list.size() > 0) {
for (byte[] b : list) {
receiveData(b);
}
}
}
sk.attach(data);
}

private void receiveData(byte[] buffer) {
if (buffer.length < ProtocolEntity.SIZE - 1) {
return;
}
byte[] tempBytes = new byte[ProtocolEntity.SIZE - 1];
System.arraycopy(buffer, 0, tempBytes, 0, tempBytes.length);
// 检查特征码
String checkCodeStr = DataHelper.byteArrayToHexString(tempBytes, tempBytes.length - 2, 2);
// 特征码检查通过
if (checkCodeStr.equalsIgnoreCase(ProtocolEntity.CHECKCODE)) {
int totalLen = DataHelper.bytesToShort(tempBytes, 0);
int remainLen = totalLen - tempBytes.length;
byte[] totalBytes = new byte[totalLen];
// 拷贝第一次的数组
System.arraycopy(buffer, 0, totalBytes, 0, tempBytes.length);
int readCount = 0; // 已经成功读取的字节的个数
int iLen = remainLen - readCount;
// 拷贝第二次的数组
System.arraycopy(buffer, readCount + tempBytes.length, totalBytes, readCount + tempBytes.length, iLen);
readCount += iLen;
// 判断第2次接收的数据长度对不对(防止出现接收数据不完全的情况)
if (readCount == remainLen) {
// 返回成功
if (mListener != null && mListener.get() != null) {
mListener.get().updateStatusMapByDeviceIp(mNeedIp, true);
}
ProtocolEntity entity = new ProtocolEntity();
entity.setBytes(totalBytes, 0);
EventBus.getDefault().post(entity);
DebugToolUtils.e(getClass(), "receive$$$$$cmd===" + entity.getCmd() + ",getSubCmd===" + entity.getSubCmd() + ",data=====" + DataHelper.byteArrayToHexString(entity.getBytes()));
}
}
}

private void connect(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
if (socketChannel.isConnectionPending()) {
try {
if (socketChannel.finishConnect()) {
key.interestOps(SelectionKey.OP_READ);
setState(STATE_CONNECTED);
if (mClientListener != null) {
mClientListener.onConnResult(true);
}
// 返回成功
if (mListener != null && mListener.get() != null) {
mListener.get().updateStatusMapByDeviceIp(mNeedIp, true);
}
}
} catch (IOException e) {
if (mClientListener != null) {
mClientListener.onConnResult(false);
}
// 返回失败
if (mListener != null && mListener.get() != null) {
mListener.get().updateStatusMapByDeviceIp(mNeedIp, false);
mListener.get().handleReconnByTcpAfter(mNeedIp);
}
throw e;
}
}
RecvData data = new RecvData();
key.attach(data);
}

private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
mReceiveBuffer.clear();
int count = socketChannel.read(mReceiveBuffer);
if (count <= 0) {
throw new IOException(String.format(Locale.getDefault(),
"Thread[%d] read error:%d", Thread.currentThread().getId(), count));
} else {
byte[] data = new byte[count];
System.arraycopy(mReceiveBuffer.array(), 0, data, 0, count);
if (mClientListener != null) {
mClientListener.onReceived(mRemoteAddress, data);
}
}
}

private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();

while (!mSendQueue.isEmpty()) {
ByteBuffer buffer = mSendQueue.poll();
int count;
try {
count = socketChannel.write(buffer);
} catch (IOException e) {
if (mClientListener != null) {
mClientListener.onSendFailed(mRemoteAddress);
}
throw e;
}
if (mClientListener != null) {
mClientListener.onSent(mRemoteAddress, count);
}
}

key.interestOps(SelectionKey.OP_READ);
}
}
}


关于这里用到
receiveData(byte[] buffer)
方法可以去掉。

注释应该很清楚了,这个就当是一个小小的笔记吧。。。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  nio android