您的位置:首页 > 其它

Netty就是这么回事(七)

2017-01-16 00:00 302 查看
这一章,主要介绍下Netty的心跳处理,心跳处理在通信开发中是最常用的,服务端通过心跳可以监控客户端的链接状态,进行相应的处理。

记得,之前用NIO做了一个客户端和服务端通信的项目,客户端并不是用java写的,而且一个嵌入式的设备,走的lwapp协议栈,有时候嵌入式设备点击复位或者直接掉电后,服务端还没有反应过来,还认为链接是连接状态,资源也就是一直没有得到释放。早在BIO的时候通过检测返回值是否是-1,异常捕获,setSoTimeout(超时时间)来确定客户端是否连接有效。到了nio只能自己实现一个心跳检测,非常的麻烦。好在Netty为我们提供了IdleStateHandler类来完成心跳检测功能,它非常简单,只有三个参数:public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) 读超时时间,写超时时间,读写超时时间,然后实现用户事件触发监听userEventTriggered这个方法,在这个方法里做相应的处理就可以了,是不是非常的方便!

下面来看一下服务端的代码:

package com.dlb.note.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

/**
* 功能:心跳时间服务器
* 版本:1.0
* 日期:2016/12/13 10:51
* 作者:馟苏
*/
public class IdleTimerServer {
/**
* 主函数
*/
public static void main(String []args) {
// 配置服务端的NIO线程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer() {
protected void initChannel(Channel channel) throws Exception {
// 添加心跳处理器 5s读,5s写,10s读写
channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
channel.pipeline().addLast(new IdleTimerServerHandler());
}
});

// 绑定端口,同步等待成功
ChannelFuture future = serverBootstrap.bind(8888).sync();
System.out.println("服务器在8888端口监听hello");

// 等待服务端监听端口关闭
future.channel().closeFuture().sync();
System.out.println("服务器关闭bye");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

class IdleTimerServerHandler extends ChannelHandlerAdapter {
// 可读
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读数据
ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);

String body = new String(req, "UTF-8");
System.out.println("receive:" + body);

// 写数据
ByteBuf res = Unpooled.copiedBuffer("hello,client!".getBytes());
ctx.write(res);
ctx.flush();
}

/**
* 用户事件触发
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){ // 接受心跳事件
IdleStateEvent event = (IdleStateEvent)evt;

if(event.state() == IdleState.ALL_IDLE){ // 读和写状态
System.out.println("心跳结束");
//清除超时会话
ByteBuf res = Unpooled.copiedBuffer("you will close!".getBytes());
ChannelFuture writeAndFlush = ctx.writeAndFlush(res);
// 监听结果
writeAndFlush.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
ctx.channel().close();
}
});
}
}
}

// 连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client come,ip:" + ctx.channel().remoteAddress());
}

// 关闭
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client close,ip:" + ctx.channel().remoteAddress());
ctx.close();
}

// 异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.toString());
ctx.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: