您的位置:首页 > 理论基础 > 计算机网络

NIO Socket实现非阻塞通信示例

2016-11-01 18:01 232 查看

NIO Socket实现非阻塞通信示例

这两天学习了下NIO,写了个练习的demo,如下:

服务端步骤

通过ServerSocketChannel.open()方法得到一个ServerSocketChannel通道对象

设置通道为非阻塞模式(服务端也可以使用阻塞模式)

通过Selector.open()得到一个选择器

在选择器上注册通道,并设置感兴趣的事件

绑定本地端口并启动服务端

等待处理事件

客户端步骤

获取通道和选择器,注册敢兴趣的事件

连接服务端

等待连接成功后发送数据

完整代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Set;

import org.junit.Test;

public class OneThreadHandleAll
{
InetSocketAddress isaServer = new InetSocketAddress(7777);

InetSocketAddress serverServer = new InetSocketAddress("localhost", 7777);

Selector serverSelector;

Selector clientSelector;

Charset charset = Charset.forName("utf-8");

@Test
public void mainTest() throws IOException, InterruptedException
{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().setReuseAddress(true);
ssc.configureBlocking(false);
serverSelector = Selector.open();
ssc.register(serverSelector, SelectionKey.OP_ACCEPT);
ssc.bind(isaServer);
Server server = new Server();
server.start();

//
clientSelector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.socket().setReuseAddress(true);
sc.configureBlocking(false);
SelectionKey sk = sc.register(clientSelector, SelectionKey.OP_CONNECT);
System.out.println(Integer.toBinaryString(sk.interestOps()));
sc.connect(serverServer);

Client client = new Client();
client.start();

Thread.sleep(2000000);
}

class Client extends Thread
{
void init() throws IOException, InterruptedException
{
while (clientSelector.select() > 0)
{
Set<SelectionKey> keys = clientSelector.selectedKeys();
for (SelectionKey selectionKey : keys)
{
//打印当前的selectionKey感兴趣的事件
System.out.println(Integer.toBinaryString(selectionKey.interestOps()));
//从原始集合中移除当前事件,以免后续重复处理
keys.remove(selectionKey);
if (selectionKey.isConnectable())
{
System.out.println("Client isConnectable");
SocketChannel sc = (SocketChannel) selectionKey.channel();
sc.finishConnect();
//此处可以注册到clientSelector不会有冲突
sc.register(clientSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
else if (selectionKey.isReadable())
{
System.out.println("Client isReadable");
}
else if (selectionKey.isWritable())
{
System.out.println("Client isWritable");
//发送数据
handleWritable(selectionKey);
}
}
//为了在控制台清晰显示运行过程,让出cpu等待server接收并输出
Thread.sleep(2000);
}
}

int i = 1;

private void handleWritable(SelectionKey selectionKey) throws IOException
{
System.out.println("client writing...");
SocketChannel sc = (SocketChannel) selectionKey.channel();
while (true)
{
String content = "第" + i + "行数据\n";
if (i % 3 == 0)
{
content += "bye";
}
ByteBuffer bb = ByteBuffer.wrap(content.getBytes());
sc.write(bb);
i++;
if (i % 3 == 0)
{
break;
}
}
}

@Override
public void run()
{
try
{
init();
}
catch (Exception e)
{
e.printStackTrace();
}
}

}

class Server extends Thread
{
void init() throws IOException, InterruptedException
{
while (serverSelector.select() > 0)
{
Set<SelectionKey> keys = serverSelector.selectedKeys();
for (SelectionKey selectionKey : keys)
{
keys.remove(selectionKey);
if (selectionKey.isAcceptable())
{
System.out.println("Server isAcceptable");
handleAcceptable(selectionKey);
}
else if (selectionKey.isReadable())
{
System.out.println("Server isReadable");
handleRreadable(selectionKey);
}
else if (selectionKey.isWritable())
{
System.out.println("Server isWritable");
}
}
Thread.sleep(2000);
}
}

private void handleRreadable(SelectionKey selectionKey) throws IOException
{
SocketChannel sc = (SocketChannel) selectionKey.channel();

ByteBuffer rst = ByteBuffer.allocate(512);
while (selectionKey.isReadable())
{
sc.read(rst);
if (rst.remaining() == rst.capacity())
{
System.out.println("server find no data!");
break;
}
rst.flip();

String rec = charset.decode(rst).toString();
System.out.println("server接收到数据:" + rec);
rst.cle
a716
ar();
if (rec.endsWith("bye"))
{
break;
}
}
}

private void handleAcceptable(SelectionKey selectionKey) throws IOException
{
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(serverSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}

@Override
public void run()
{
try
{
init();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息