您的位置:首页 > 其它

Netty中队列(部分)的处理机制

2014-09-18 16:28 274 查看
Netty中队列(部分)的处理机制


概述

本意是探究netty框架中的队列实现机制,为以后的大数据量并发做前期准备,或者可通过修改其队列实现机制来增大netty框架的并发量,以满足项目需求,然而,netty框架中的队列并未放置在单独的一个包中处理,有针对具体情况做的具体分析,此文档只分析了queue包中的队列实现机制。

以下是包结构图:



类功能分析

BlockingReadHandler

此类仅仅是模拟的阻塞式读取操作,它将接受到的message存储起来并放到一个阻塞队列BlockingQueue中,当调用read(),read(long,TimeUnit),readEvent(long,TimeUnit)方法时,将此消息返回。但这个类仅适用于有极少数的连接(very small number of connections),诸如测试或者简单的客户端应用开发,并且在此类调用后,其他的handler就接收不到messageReceived事件、exceptionCaught事件和channelClosed事件,所以,这个handler应该放置到pipeFactory的最后面。

调用示例:

BlockingReadHandler<ChannelBuffer> reader =

new BlockingReadHandler<ChannelBuffer>();

ChannelPipeline p = ...;

p.addLast("reader", reader);


注意:黑体字放置到最后。

以阻塞方式调用(Read a message from a channel in a blocking manner.):

try {

ChannelBuffer buf = reader.read(60, TimeUnit.SECONDS);

if (buf == null) {

// Connection closed.

} else {

// Handle the received message here.

}

} catch (BlockingReadTimeoutException e) {

// Read timed out.

} catch (IOException e) {

// Other read errors

}

类中比较重要的方法分析:

@Override

public
void
messageReceived(ChannelHandlerContext ctx, MessageEvent e)

throws Exception {

getQueue().put(e);

}

@Override

public
void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)

throws Exception {

getQueue().put(e);

}

@Override

public
void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)

throws Exception {

closed = true;

getQueue().put(e);

}

这三个方法均调用了队列的put操作,即在这个类之后的handler同样的操作不会接收到数据。

有队列就有对死锁的检测机制,此类中的死锁检测机制如下:

private
static
void
detectDeadLock() {

if (DeadLockProofWorker.PARENT.get() != null) {

throw
new IllegalStateException(

"read*(...) in I/O thread causes a dead lock or " +

"sudden performance drop. Implement a state machine or " +

"call read*() from a different thread.");

}

}

此类中的死锁检测调用了DeadLockProofWorker类,此类中有一个ThreadLocal<Executor>类型的全局变量PARENT,一旦发生死锁,线程的Executor就会将此Executor放到此变量中,从而实现死锁检测。

BufferedWriteHandler

模拟缓冲的写操作,此handler会将所有的write请求放到一个无界队列中,当调用flush()操作时,统一将请求flush到downstream。

调用示例如下:

在MyPipelineFactory中设置

BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();

ChannelPipeline p = ...;

p.addFirst("buffer", bufferedWriter);

调用时:

Channel ch = ...;

// msg1, 2, and 3 are stored in the queue of bufferedWriter.

ch.write(msg1);

ch.write(msg2);

ch.write(msg3);

// and will be flushed on request.

bufferedWriter.flush();

调用flush()时再将消息发送出去。

关于自动flush(auto-flush):

BlockingReadTimeoutException

示例

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: