您的位置:首页 > 其它

netty5 (1)初识

2016-01-27 15:50 239 查看
看完netty4 ,因为好奇看了一下netty 5,虽然没出正式版,但是netty 5的新特性的,比如重构了HTTP多部分的编解码,ByteBuf被重写了...,然后带着好奇看了看netty 5的教程 http://netty.io/wiki/user-guide-for-5.x.html 发现netty 4和netty 5入门编码上的变化倒不是很大。于是吧之前netty 4的例子改造一下换成netty 5,然后跑起来了。有人说

netty5 只支持jdk7,在这里备注一下,笔者亲测netty 5在jdk 7和jdk 8上都能跑。

先看服务端:

public class NServer {

private static final Logger logger = LogManager.getLogger(NServer.class);

private final static String host = "127.0.0.1";
private final static Integer port = 8898;

@Test
public void testServer() {
/***
* ·NioEventLoopGroup 实际上是个连接池,NioEventLoopGroup在后台启动了n个NioEventLoop
* 来处理Channel事件,每个NioEventLoop负责m个Channel
* ·NioEventLoopGroup从NioEventLoop数组集中挨个取出NioEventLoop用以处理Channel
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// NIO服务器端的辅助启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
// 设置 nio 类型的 channel
serverBootstrap.channel(NioServerSocketChannel.class);
/***
* 此处在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。
*/
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
/***
* option() 是提供NioServerSocketChannel用来接收进来的连接。 childOption()
* 是提供父管道ServerChannel接收到的 连接(此例是 NioServerSocketChannel)。
*/
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
/***
* 绑定I/O事件的处理类 处理网络IO事件
*/
serverBootstrap.childHandler(new NServerInitializer());
/***
* 服务器启动后 绑定监听端口 同步等待成功 ,异步操作的通知回调 回调处理用的 ChildChannelHandler
*
*/
ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
logger.debug("NServer启动");
// 监听服务器关闭监听(应用程序等待直到channel关闭)
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭EventLoopGroup释放资源包
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
logger.debug("服务端释放了线程资源...");
}
}
}
服务端的事件处理,收到客户端请求,返回“ok”:

/***
* server端网络IO事件处理
*
* @author shiky
*
*/
public class NServerHandler extends ChannelHandlerAdapter {

private static final Logger logger = LogManager.getLogger(NServerHandler.class);

/***
* 读取客户消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.debug("服务器读取到客户端请求...");
if (null != msg) {
try {
StringBuffer sbf = new StringBuffer("收到客户端->");
sbf.append(ctx.channel().remoteAddress());
sbf.append("的消息:");
sbf.append(msg);
logger.debug(sbf);
System.out.println(sbf);
// 服务端响应消息
ctx.writeAndFlush("ok");
ctx.close();
} finally {
ReferenceCountUtil.release(msg);
}

}
}

/***
* 服务端监听到客户端活动
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelActive>>>> Client:" + ctx.channel().remoteAddress() + "在线");
ctx.fireChannelActive();
}

/****
* 响应处理
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.debug("服务端readComplete 响应完成");
ctx.fireChannelReadComplete();
}

/***
* 监听客户端掉线
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Client:"+ctx.channel().remoteAddress()+"掉线");
super.channelInactive(ctx);
}

/***
* 异常信息 (根据需要,选择是否关闭)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.debug("服务端异常" + cause.getMessage());
ctx.fireExceptionCaught(cause);
// ctx.close();
}
}


服务端对于编码和粘包拆包的处理:

/***
*
* @author: shiky
*
*
*/
public class NServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/***
* 个地方的 必须和服务端对应上。否则无法正常解码和编码
* 设置包长,解决,粘包问题
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));
pipeline.addLast(new LengthFieldPrepender(8));
// 当 read 的时候
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
// 当 send 的时候
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 服务端逻辑
pipeline.addLast(new NServerHandler());
}

}


客户端:

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/****
*
* @author shiky
* @Describe 客户端
*/
public class NClient {

private static final Logger logger = LogManager.getLogger(NClient.class);
private final static String host = "127.0.0.1";
private final static Integer port = 8898;

@Test
public void testClient() {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客户端辅助启动类 对客户端配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new NClinetlInitializer());
// 异步链接服务器 同步等待链接
Channel ch = bootstrap.connect(host, port).sync().channel();
ch.writeAndFlush("发送一条指令:我的小鱼你醒了,还认识早晨吗?" + Thread.currentThread().getId());
ChannelFuture channelFuture = ch.closeFuture().sync();
// 监听服务器关闭监听(应用程序等待直到channel关闭)
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
logger.debug("客户端的释放了线程资源...");
}
}

}


客户端事件处理:

/***
*
* @author: shiky
*
*/
public class NClientHandler extends ChannelHandlerAdapter {

private static final Logger logger = LogManager.getLogger(NClientHandler.class);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(ctx);
// logger.debug("客户端 active");
}

/***
* 处理服务端响应数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (null != msg) {
try {
logger.debug("客户端收到服务器响应数据:" + msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
logger.debug("客户端收到服务器响应数据处理完成");
}

/***
* 处理异常,根据需要选择要不要关闭连接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// downstream:"+cause.getMessage());
// ctx.close();
logger.warn("客户端异常:" + cause.getMessage());
}
}


客户端编码解码处理:

/***
*
* @author shiky
*
*/
public class NClinetlInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/****
* 个地方的 必须和服务端对应上。否则无法正常解码和编码
* 设置包长,解决,粘包问题
*/
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(8));
// 当 read 的时候
pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// 当 send 的时候
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
// 客户端的逻辑
pipeline.addLast("handler", new NClientHandler());
}

}


跑一下,先启动服务端,然后启动客户端,服务端打印:


客户端打印:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: