您的位置:首页 > 其它

Netty的超时机制

2015-07-06 17:56 211 查看

技术点描述

ReadTimeoutHandler读取数据超时处理
WriteTimeoutHandler写数据超时处理

IdleStateHandler状态空闲处理
 

通过以上三种方式,可以轻松实现SOCKET长连接的心跳包机制。

另要注意的是这里所说的超时是指逻辑上的超时,而非TCP连接超时。

实现方案

ReadTimeoutHandler,WriteTimeoutHandler, IdleStateHandler这三种方式都是通过在数据处理管道ChannelPipelineFactory的实现类里配置相关代码来实现。

一般使用根据需求使用一种方式即可。

 

它主要使用Timer定时器用自定义的时间,去定时检测相应的状态,检测到后会根据选择的处理方式,是报错或回调指定的接口实现类处理。

 

参考源码包



主要包

IdleStateHandler 空闲状态的处理代码
 

源码:

/**

* Creates a new instance.

*

@param timer

* the Timer that
is used to trigger the scheduled event.

* The recommended Timer implementation
is HashedWheelTimer.

@param readerIdleTimeSeconds

* an IdleStateEvent whose
state is IdleState.READER_IDLE

* will be triggered when no read was performed for the specified

* period of time. Specify 0 to disable.

@param writerIdleTimeSeconds

* an IdleStateEvent whose
state is IdleState.WRITER_IDLE

* will be triggered when no write was performed for the specified

* period of time. Specify {@code 0} to disable.

@param allIdleTimeSeconds

* an IdleStateEvent whose
state is IdleState.ALL_IDLE

* will be triggered when neither read nor write was performed

* for the specified period of time. Specify 0 to disable.

*/

public IdleStateHandler(

Timer timer,

int readerIdleTimeSeconds,

int writerIdleTimeSeconds,

int allIdleTimeSeconds)
{

 

this(timer,
readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);

}

 

ReadTimeoutHandler 读取数据超时的处理代码
 
源码:

/**

* Creates a new instance.

*

@param timer

* the Timer that
is used to trigger the scheduled event.

* The recommended Timer implementation
is HashedWheelTimer.

@param timeoutSeconds

* read timeout in seconds

*/

public ReadTimeoutHandler(Timer
timer, int timeoutSeconds) {

this(timer,
timeoutSeconds, TimeUnit.SECONDS);

}

 

WriteTimeoutHandler 写数据超时的处理代码
 
源码:

/**

* Creates a new instance.

*

@param timer

* the Timer that
is used to trigger the scheduled event.

* The recommended Timer implementation
is HashedWheelTimer.

@param timeoutSeconds

* write timeout in seconds

*/

public WriteTimeoutHandler(Timer
timer, int timeoutSeconds) {

this(timer,
timeoutSeconds, TimeUnit.SECONDS);

}

 

辅助包

IdleStateEvent 空闲事件的接口类
 
源码:
/**
* A ChannelEvent that
is triggered when a Channel has been idle
* for a while.
@apiviz.landmark
@apiviz.has org.jboss.netty.handler.timeout.IdleState
oneway - -
*/
public interface IdleStateEvent extends ChannelEvent
{
/**
* Returns the detailed idle state.
*/
IdleState getState();
 
/**
* Returns the last time when I/O occurred in
milliseconds.
*/
long getLastActivityTimeMillis();
}
 

DefaultIdleStateEvent空闲事件的实现类

IdleStateAwareChannelHandler 处理空闲事件的接口.
 
源码:
/**
* An extended SimpleChannelHandler that
adds the handler method for
* an IdleStateEvent.
@apiviz.uses org.jboss.netty.handler.timeout.IdleStateEvent
*/
public class IdleStateAwareChannelHandler extendsSimpleChannelHandler
{
 
/**
* Creates a new instance.
*/
public IdleStateAwareChannelHandler()
{
super();
}
 
@Override
public void handleUpstream(ChannelHandlerContext
ctx, ChannelEvent e)throws Exception {
if (e instanceof IdleStateEvent)
{
channelIdle(ctx, (IdleStateEvent) e);
else {
super.handleUpstream(ctx,
e);
}
}
 
/**
* Invoked when a Channel has
been idle for a while.
*/
public void channelIdle(ChannelHandlerContext
ctx, IdleStateEvent e)throws Exception {
ctx.sendUpstream(e);
}
}
 

IdleStateAwareChannelUpstreamHandler处理空闲事件的接口.

IdleState 定义了三种空闲状态的枚举类
 
源码:
/**
* An Enum that
represents the idle state of a Channel.
*/
public enum IdleState
{
/**
* No data was received for a while.
*/
READER_IDLE,
/**
* No data was sent for a while.
*/
WRITER_IDLE,
/**
* No data was either received or sent for a
while.
*/
ALL_IDLE;
}
 

ReadTimeoutException 读取超时异常类
WriteTimeoutException 写数据超时异常类
TimeoutException 读取数据和写数据异常类的父类
 

Demo实现

以下是关于三种方式的关键代码实现讲解。

 

IdleStateHandler

 

//第一步,在服务启动类中设置数据处理管道.

...

bootstrap.setPipelineFactory(new TCPServerPipelineFactory());

...

 

//第二步,在数据处理管道实现类里配置空闲状态处理代码.

public class TCPServerPipelineFactory implements ChannelPipelineFactory
{

    

    @Override

    public ChannelPipeline
getPipeline() throws Exception {

        // Create a default pipeline implementation.

        ChannelPipeline pipeline = Channels.pipeline();

        //设置空闲状态处理操作

pipeline.addLast("idlehandler", new IdleStateHandler(new HashedWheelTimer(),
10, 5 , 0));

 

pipeline.addLast("hearbeat", new Heartbeat());

        pipeline.addLast("handler", new TCPServerHandler());

        return pipeline;

    }

 

//第三步,实现方式1:自定义IdleStateAwareChannelHandler的实现类,

//当Channel处理空闲状态时,会触发此方法.

public class Heartbeat extends IdleStateAwareChannelHandler {

    

    @Override

public void channelIdle(ChannelHandlerContext
ctx, IdleStateEvent e) throwsException {

        super.channelIdle(ctx,
e);        

        if (e.getState()
== IdleState.READER_IDLE){

             byte[]
test = " ac0bce0490007050006".getBytes();

ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);

            channelBuffer.writeBytes(upData);

            //发送超时数据到终端.

            e.getChannel().write(channelBuffer);

        }

}

 

//第三步, 实现方式2:在扩展SimpleChannelHandler的handler类里,如下操作:

//记得注释掉第二步中的代码pipeline.addLast("hearbeat",
new Heartbeat());

@Override

public void handleUpstream(ChannelHandlerContext
ctx, ChannelEvent e) throwsException {

if (e instanceof IdleStateEvent)
{

     IdleStateEvent ise = (IdleStateEvent)e;

     if (ise.getState()
== IdleState.READER_IDLE){

     byte[]
test = "超时 ...".getBytes();

ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);

     channelBuffer.writeBytes(test);

         //发送超时数据到终端.

     ctx.getChannel().write(channelBuffer);

     }    

}

super.handleUpstream(ctx,
e);

}

 

ReadTimeoutHandler

 

//第一步,在服务启动类中设置数据处理管道.

...

bootstrap.setPipelineFactory(new TCPServerPipelineFactory());

...

 

//第二步,在数据处理管道实现类里配置空闲状态处理代码.

public class TCPServerPipelineFactory implements ChannelPipelineFactory
{

    

    @Override

    public ChannelPipeline
getPipeline() throws Exception {

        // Create a default pipeline implementation.

        ChannelPipeline pipeline = Channels.pipeline();

        //设置读取数据超时处理

pipeline.addLast("readTimeOut",new ReadTimeoutHandler(new HashedWheelTimer(),10));

        pipeline.addLast("handler", new TCPServerHandler());

        return pipeline;

    }

 

//第三步, 在扩展SimpleChannelHandler的handler类里,如下操作:

 

@Override

public void exceptionCaught(ChannelHandlerContext
ctx, ExceptionEvent e) {

    //对读取超时异常进行判断

    if (e.getCause() instanceof ReadTimeoutException)
{

     byte[]
test = "超时 ...".getBytes();

ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);

     channelBuffer.writeBytes(test);

     ctx.getChannel().write(channelBuffer);

    } else {

logger.log(Level.WARN, "Unexpected
exception from downstream.",e.getCause());

    }

}

WriteTimeoutHandler

略,原理同ReadTimeoutHandler。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: