您的位置:首页 > 大数据 > 人工智能

Socket通信之BIO(同步阻塞IO)、PAIO(伪异步阻塞IO)、NIO(异步非阻塞IO)、AIO(异步非阻塞IO)、netty5之IO

2015-07-01 10:45 846 查看
参考源:

http://ifeve.com/netty5-user-guide/

书籍 netty权威指南

对于这本书 很多人褒贬不一 但是对于新手 我觉得还是比较好的 记录下学习的笔记

* 使用BIO方式(同步阻塞IO)
* 使用ServerSocket绑定IP地址,启动端口,使用Socket进行握手连接,连接成功后,双方通过输入输出流进行同步阻塞式通信
* 每当有客户端的请求后,即启动一个线程进行处理

使用PAIO(同步阻塞IO)
* 使用线程池进行处理客户端的请求
* 因为在进行read或者write的时候 都是阻塞IO的 所以还是同步阻塞IO  只是线程的资源交由线程池进行控制

使用java.nio包的NIO方式(异步非阻塞IO)
* 使用NIO方式 异步IO
* 使用多路复用器关联通道 当通道中有事件时 即通知处理 不用阻塞IO

使用AIO方式(异步非阻塞IO)
* 使用AIO方式,异步非阻塞IO
* 相当于NIO的升级版 编码思路上相比于NIO而言更加的简单明了

* 使用netty框架进行编程
* 步骤
* 1、构建事件处理池
* 2、使用引导程序关联事件处理池、通道、事件处理器
* 3、绑定端口服务
* 4、等待操作完成
* 5、关闭事件处理池




图片来源于 netty权威指南

上面是5种方式的大致介绍 详细的注释都在代码里面

代码有点多 放在github上了 地址 https://github.com/undergrowthlinear/netty.git

附:

google的protobuf协议 netty编解码

* 使用google的protobuf协议进行通信
* //netty编程5步骤
//1、创建NIO线程池,接收连接、处理连接
//2、使用引导类管理线程池、NIO通道、事件处理器、参数配置
//3、绑定端口,同步等待操作完成
//4、同步等待关闭通道,防止main进程退出
//5、关闭线程池资源


服务端:

/**
*
*/
package com.undergrowth.netty.protobuf;

import com.undergrowth.netty.protobuf.message.MessageReqProto;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
* 使用google的protobuf协议进行通信 * //netty编程5步骤 //1、创建NIO线程池,接收连接、处理连接 //2、使用引导类管理线程池、NIO通道、事件处理器、参数配置 //3、绑定端口,同步等待操作完成 //4、同步等待关闭通道,防止main进程退出 //5、关闭线程池资源
*
* @author u1
* @Date 2015-7-1
*/
public class ProtoTimeServer {

private int port;

public static void main(String[] args) {
new ProtoTimeServer(6666).run();
}

public ProtoTimeServer(int port) {
this.port = port;
}

public void run() {
NioEventLoopGroup boss = null, work = null;
try {
// netty编程5步骤
// 1、创建线程池
boss = new NioEventLoopGroup();
work = new NioEventLoopGroup();
// 2、使用引导器关联线程池、通道、通道处理器、设置执行参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch)
throws Exception {
// TODO Auto-generated method stub
// 设置解码处理器和半包处理器
/*ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());*/
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4,0,4));
ch.pipeline().addLast(
new ProtobufDecoder( //设置解码器的目标类型
MessageReqProto.MessageReq
.getDefaultInstance()));
// 设置编码器和半包处理器
/*ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());*/
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new ProtobufEncoder());
// 设置处理器
ch.pipeline().addLast(new ProtoTimeServerHandler());
}
});
// 3、绑定端口同步操作
ChannelFuture future = bootstrap.bind(port).sync();
// 4、监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} finally {
// 5、释放资源
boss.shutdownGracefully();
work.shutdownGracefully();
}

}
}


服务端处理器

/**
*
*/
package com.undergrowth.netty.protobuf;

import com.undergrowth.netty.protobuf.message.MessageReqProto;
import com.undergrowth.netty.protobuf.message.MessageReqProto.MessageReq;
import com.undergrowth.netty.protobuf.message.MessageRespProto;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* @author u1
* @Date  2015-7-1
*/
public class ProtoTimeServerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
//
MessageReqProto.MessageReq messageReq=(MessageReq) msg;
System.out.println("服务端接收到的消息为:"+messageReq);
//会写客户端信息
ctx.writeAndFlush(resp(messageReq.getReqId()));
}

/**
*
* @param reqId
* @return
*/
private MessageRespProto.MessageResp resp(int reqId) {
// TODO Auto-generated method stub
MessageRespProto.MessageResp.Builder builder=MessageRespProto.MessageResp.newBuilder();
builder.setReqId(reqId);
builder.setRespCode(0);
builder.setDesc("服务器接收到客户端的消息");
return builder.build();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
cause.printStackTrace();
ctx.close();
}

}


客户端:

/**
*
*/
package com.undergrowth.netty.protobuf;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

import java.net.InetSocketAddress;

import com.undergrowth.netty.protobuf.message.MessageReqProto;
import com.undergrowth.netty.protobuf.message.MessageRespProto;

/**
* @author u1
* @Date 2015-7-1
*/
public class ProtoTimeClient {

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
new ProtoTimeClient(6666).run();
}

private int port;

public ProtoTimeClient(int port) {
this.port = port;
}

public void run() {
// netty编程5步骤
NioEventLoopGroup work = null;
try {
// 1、创建线程池
work = new NioEventLoopGroup();
// 2、使用引导器关联线程池、通道、通达处理器、设置执行参数
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch)
throws Exception {
// TODO Auto-generated method stub
// 设置解码处理器和半包处理器
/*ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());*/
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4,0,4));
ch.pipeline().addLast(
new ProtobufDecoder( //设置解码器的目标类型
MessageRespProto.MessageResp.getDefaultInstance()));
// 设置编码器和半包处理器
/*ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());*/
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new ProtobufEncoder());
// 设置处理器
ch.pipeline().addLast(new ProtoTimeClientHandler());
}

});
// 3、绑定端口同步操作
ChannelFuture future = bootstrap.connect(
new InetSocketAddress(port)).sync();
// 4、监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} finally {
// 5、释放资源
work.shutdownGracefully();
}

}

}


客户端处理器:

/**
*
*/
package com.undergrowth.netty.protobuf;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import com.undergrowth.netty.protobuf.message.MessageReqProto;

/**
* @author u1
* @Date  2015-7-1
*/
public class ProtoTimeClientHandler extends ChannelHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.writeAndFlush(req());
}

/**
*
* @return
*/
private MessageReqProto.MessageReq req() {
// TODO Auto-generated method stub
MessageReqProto.MessageReq.Builder builder=MessageReqProto.MessageReq.newBuilder();
builder.setReqId(1);
builder.setUserName("undergrowth");
builder.setAddress("广州天河区");
return builder.build();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
//客户端接收服务器信息
System.out.println("接收到服务器的消息为:"+msg);
}

@Override
@Skip
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
super.channelReadComplete(ctx);
}

@Override
@Skip
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
cause.printStackTrace();
ctx.close();
}

}


MessageReqProto和MessageRespProto都是使用protoc生成的 很简单 就不说了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: