Netty中队列(部分)的处理机制
2014-09-18 16:28
274 查看
Netty中队列(部分)的处理机制
概述
本意是探究netty框架中的队列实现机制,为以后的大数据量并发做前期准备,或者可通过修改其队列实现机制来增大netty框架的并发量,以满足项目需求,然而,netty框架中的队列并未放置在单独的一个包中处理,有针对具体情况做的具体分析,此文档只分析了queue包中的队列实现机制。
以下是包结构图:
类功能分析
BlockingReadHandler
此类仅仅是模拟的阻塞式读取操作,它将接受到的message存储起来并放到一个阻塞队列BlockingQueue中,当调用read(),read(long,TimeUnit),readEvent(long,TimeUnit)方法时,将此消息返回。但这个类仅适用于有极少数的连接(very small number of connections),诸如测试或者简单的客户端应用开发,并且在此类调用后,其他的handler就接收不到messageReceived事件、exceptionCaught事件和channelClosed事件,所以,这个handler应该放置到pipeFactory的最后面。
调用示例:
BlockingReadHandler<ChannelBuffer> reader =
new BlockingReadHandler<ChannelBuffer>();
ChannelPipeline p = ...;
p.addLast("reader", reader);
注意:黑体字放置到最后。
以阻塞方式调用(Read a message from a channel in a blocking manner.):
try {
ChannelBuffer buf = reader.read(60, TimeUnit.SECONDS);
if (buf == null) {
// Connection closed.
} else {
// Handle the received message here.
}
} catch (BlockingReadTimeoutException e) {
// Read timed out.
} catch (IOException e) {
// Other read errors
}
类中比较重要的方法分析:
@Override
public
void
messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
getQueue().put(e);
}
@Override
public
void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
getQueue().put(e);
}
@Override
public
void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
closed = true;
getQueue().put(e);
}
这三个方法均调用了队列的put操作,即在这个类之后的handler同样的操作不会接收到数据。
有队列就有对死锁的检测机制,此类中的死锁检测机制如下:
private
static
void
detectDeadLock() {
if (DeadLockProofWorker.PARENT.get() != null) {
throw
new IllegalStateException(
"read*(...) in I/O thread causes a dead lock or " +
"sudden performance drop. Implement a state machine or " +
"call read*() from a different thread.");
}
}
此类中的死锁检测调用了DeadLockProofWorker类,此类中有一个ThreadLocal<Executor>类型的全局变量PARENT,一旦发生死锁,线程的Executor就会将此Executor放到此变量中,从而实现死锁检测。
BufferedWriteHandler
模拟缓冲的写操作,此handler会将所有的write请求放到一个无界队列中,当调用flush()操作时,统一将请求flush到downstream。
调用示例如下:
在MyPipelineFactory中设置
BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
ChannelPipeline p = ...;
p.addFirst("buffer", bufferedWriter);
调用时:
Channel ch = ...;
// msg1, 2, and 3 are stored in the queue of bufferedWriter.
ch.write(msg1);
ch.write(msg2);
ch.write(msg3);
// and will be flushed on request.
bufferedWriter.flush();
调用flush()时再将消息发送出去。
关于自动flush(auto-flush):
BlockingReadTimeoutException
示例
概述
本意是探究netty框架中的队列实现机制,为以后的大数据量并发做前期准备,或者可通过修改其队列实现机制来增大netty框架的并发量,以满足项目需求,然而,netty框架中的队列并未放置在单独的一个包中处理,有针对具体情况做的具体分析,此文档只分析了queue包中的队列实现机制。以下是包结构图:
类功能分析
BlockingReadHandler
此类仅仅是模拟的阻塞式读取操作,它将接受到的message存储起来并放到一个阻塞队列BlockingQueue中,当调用read(),read(long,TimeUnit),readEvent(long,TimeUnit)方法时,将此消息返回。但这个类仅适用于有极少数的连接(very small number of connections),诸如测试或者简单的客户端应用开发,并且在此类调用后,其他的handler就接收不到messageReceived事件、exceptionCaught事件和channelClosed事件,所以,这个handler应该放置到pipeFactory的最后面。调用示例:
BlockingReadHandler<ChannelBuffer> reader =
new BlockingReadHandler<ChannelBuffer>();
ChannelPipeline p = ...;
p.addLast("reader", reader);
注意:黑体字放置到最后。
以阻塞方式调用(Read a message from a channel in a blocking manner.):
try {
ChannelBuffer buf = reader.read(60, TimeUnit.SECONDS);
if (buf == null) {
// Connection closed.
} else {
// Handle the received message here.
}
} catch (BlockingReadTimeoutException e) {
// Read timed out.
} catch (IOException e) {
// Other read errors
}
类中比较重要的方法分析:
@Override
public
void
messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
getQueue().put(e);
}
@Override
public
void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
getQueue().put(e);
}
@Override
public
void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
closed = true;
getQueue().put(e);
}
这三个方法均调用了队列的put操作,即在这个类之后的handler同样的操作不会接收到数据。
有队列就有对死锁的检测机制,此类中的死锁检测机制如下:
private
static
void
detectDeadLock() {
if (DeadLockProofWorker.PARENT.get() != null) {
throw
new IllegalStateException(
"read*(...) in I/O thread causes a dead lock or " +
"sudden performance drop. Implement a state machine or " +
"call read*() from a different thread.");
}
}
此类中的死锁检测调用了DeadLockProofWorker类,此类中有一个ThreadLocal<Executor>类型的全局变量PARENT,一旦发生死锁,线程的Executor就会将此Executor放到此变量中,从而实现死锁检测。
BufferedWriteHandler
模拟缓冲的写操作,此handler会将所有的write请求放到一个无界队列中,当调用flush()操作时,统一将请求flush到downstream。调用示例如下:
在MyPipelineFactory中设置
BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
ChannelPipeline p = ...;
p.addFirst("buffer", bufferedWriter);
调用时:
Channel ch = ...;
// msg1, 2, and 3 are stored in the queue of bufferedWriter.
ch.write(msg1);
ch.write(msg2);
ch.write(msg3);
// and will be flushed on request.
bufferedWriter.flush();
调用flush()时再将消息发送出去。
关于自动flush(auto-flush):
BlockingReadTimeoutException
示例
相关文章推荐
- Looper中的消息队列处理机制
- 中断处理的tasklet(小任务)机制和workqueue(工作队列)机制
- netty 数据分包、组包、粘包处理机制
- 中断处理机制与工作队列
- 中断处理的工作队列机制
- Linux2.6中断下半部分的三种实现机制---工作队列
- lua 协程 | 协程实现消息机制(事件队列轮询处理机制)
- netty 数据分包、组包、粘包处理机制(一)
- 【转】【译】JavaScript魔法揭秘--探索当前流行框架中部分功能的处理机制
- 中断处理的tasklet(小任务)机制和workqueue(工作队列)机制
- 中断处理的工作队列机制
- Looper中的消息队列处理机制
- Looper中的消息队列处理机制
- netty 数据分包、组包、粘包处理机制(二)
- netty学习笔记(一)—结合reactor模式探索netty对网络io的处理机制
- 中断处理的工作队列机制-原来如此
- Linux2.6中断下半部分的三种实现机制---软中断/tasklet/工作队列
- MFC的消息处理机制由两部分组成:CCmdTarget类和消息映射表。
- Openstack的消息队列机制及其部分代码解析(非oslo.message)
- netty学习笔记(一)—结合reactor模式探索netty对网络io的处理机制