Mina2.0框架源码剖析(三)
2009-11-25 14:49
369 查看
AbstractIoAcceptor类继承自AbstractIoService基类,并实现了IoAcceptor接口,它主要的成员变量是本地绑定地址。
Java代码
private
final
List<SocketAddress> defaultLocalAddresses =
new
ArrayList<SocketAddress>();
private
final
List<SocketAddress> unmodifiableDefaultLocalAddresses =
Collections.unmodifiableList(defaultLocalAddresses);
private
final
Set<SocketAddress> boundAddresses =
new
HashSet<SocketAddress>();
在调用bind或unbind方法时需要先获取绑定锁bindLock,具体的绑定操作还是在bind0这个方法中实现的。一旦绑定成功后,就会
向服务监听者发出服务激活的事件(ServiceActivated),同理,解除绑定也是在unbind0这个方法中具体实现的。一旦解除绑定成功后,
就会向服务监听者发出服务激活的事件(ServiceDeActivated)。
AbstractIoConnector类继承自AbstractIoService基类,并实现了IoConnect接口,连接超时检查间隔时间默认是
50毫秒,超时时间默认为1分钟,用户可以自行配置。此类中重要的方法就是connect方法,其中调用了具体的连接逻辑实现connect0,
Java代码
protected
abstract
ConnectFuture connect0(SocketAddress remoteAddress,
SocketAddress localAddress, IoSessionInitializer<? extends
ConnectFuture> sessionInitializer);
AbstractIoConnector在AbstractIoService的基础上,在会话初始化结束时增加了一个功能,就是加入了一个监听者,当连接请求被取消时立即结束此会话。
Java代码
protected
final
void
finishSessionInitialization0(
final
IoSession session, IoFuture future) {
// In case that ConnectFuture.cancel() is invoked before
// setSession() is invoked, add a listener that closes the
// connection immediately on cancellation.
future.addListener(new
IoFutureListener<ConnectFuture>() {
public
void
operationComplete(ConnectFuture future) {
if
(future.isCanceled()) {
session.close();
}
}
});
}
下面再来看一个IoProcessor接口的基本实现类SimpleIoProcessorPool,它的泛型参数是AbstractIoSession
的子类,表示此Processor管理的具体会话类型。并且这个类还实现了池化,它会将多个IoSession分布到多个IoProcessor上去管
理。下面是文档中给出的一个示例:
Java代码
// Create a shared pool.
SimpleIoProcessorPool<NioSession> pool =
new
SimpleIoProcessorPool<NioSession>(NioProcessor.
class
,
16
);
// Create two services that share the same pool.
SocketAcceptor acceptor = new
NioSocketAcceptor(pool);
SocketConnector connector = new
NioSocketConnector(pool);
// Release related resources.
connector.dispose();
acceptor.dispose();
pool.dispose();
与Processor池有关的包括如下这些成员变量:
Java代码
private
static
final
int
DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() +
1
;
//处理池大小,默认是处理器数+1, 便于多核分布处理
private
final
IoProcessor<T>[] pool;
//IoProcessor池
private
final
AtomicInteger processorDistributor =
new
AtomicInteger();
Processor池的构造过程,其中有三种构造函数供选择来构造一个Processor :
1. 带参数 ExecutorService 的构造函数.
2. 带参数为 Executor的构造函数.
3. 默认构造函数
Java代码
pool =
new
IoProcessor[size];
//构建池
boolean
success =
false
;
try
{
for
(
int
i =
0
; i < pool.length; i ++) {
IoProcessor<T> processor = null
;
/有三种构造函数供选择来构造一个Processor
try
{
try
{
processor = processorType.getConstructor(ExecutorService.class
).newInstance(executor);
} catch
(NoSuchMethodException e) {
// To the next step
}
if
(processor ==
null
) {
try
{
processor = processorType.getConstructor(Executor.class
).newInstance(executor);
} catch
(NoSuchMethodException e) {
// To the next step
}
}
if
(processor ==
null
) {
try
{
processor = processorType.getConstructor().newInstance();
} catch
(NoSuchMethodException e) {
// To the next step
}
}
} catch
(RuntimeException e) {
throw
e;
} catch
(Exception e) {
throw
new
RuntimeIoException(
"Failed to create a new instance of "
+ processorType.getName(), e);
}
pool[i] = processor;
}
success = true
;
} finally
{
if
(!success) {
dispose();
}
}
从Processor池中分配一个processor的过程,注意一个processor是可以同时管理多个session的。
Java代码
private
IoProcessor<T> getProcessor(T session)
{//返回session所在的processor,若没分配,则为之分配一个
IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);//看session的属性中是否保存对应的Processor
if
(p ==
null
)
{//还没为此session分配processor
p = nextProcessor();//从池中取一个processor
IoProcessor<T> oldp =
(IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
if
(oldp !=
null
)
{//原来的processor
p = oldp;
}
}
return
p;
}
private
IoProcessor<T> nextProcessor()
{//从池中分配一个Processor
checkDisposal();
return
pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
}
Java代码
private
final
List<SocketAddress> defaultLocalAddresses =
new
ArrayList<SocketAddress>();
private
final
List<SocketAddress> unmodifiableDefaultLocalAddresses =
Collections.unmodifiableList(defaultLocalAddresses);
private
final
Set<SocketAddress> boundAddresses =
new
HashSet<SocketAddress>();
private final List<SocketAddress> defaultLocalAddresses = new ArrayList<SocketAddress>(); private final List<SocketAddress> unmodifiableDefaultLocalAddresses = Collections.unmodifiableList(defaultLocalAddresses); private final Set<SocketAddress> boundAddresses = new HashSet<SocketAddress>();
在调用bind或unbind方法时需要先获取绑定锁bindLock,具体的绑定操作还是在bind0这个方法中实现的。一旦绑定成功后,就会
向服务监听者发出服务激活的事件(ServiceActivated),同理,解除绑定也是在unbind0这个方法中具体实现的。一旦解除绑定成功后,
就会向服务监听者发出服务激活的事件(ServiceDeActivated)。
AbstractIoConnector类继承自AbstractIoService基类,并实现了IoConnect接口,连接超时检查间隔时间默认是
50毫秒,超时时间默认为1分钟,用户可以自行配置。此类中重要的方法就是connect方法,其中调用了具体的连接逻辑实现connect0,
Java代码
protected
abstract
ConnectFuture connect0(SocketAddress remoteAddress,
SocketAddress localAddress, IoSessionInitializer<? extends
ConnectFuture> sessionInitializer);
protected abstract ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
AbstractIoConnector在AbstractIoService的基础上,在会话初始化结束时增加了一个功能,就是加入了一个监听者,当连接请求被取消时立即结束此会话。
Java代码
protected
final
void
finishSessionInitialization0(
final
IoSession session, IoFuture future) {
// In case that ConnectFuture.cancel() is invoked before
// setSession() is invoked, add a listener that closes the
// connection immediately on cancellation.
future.addListener(new
IoFutureListener<ConnectFuture>() {
public
void
operationComplete(ConnectFuture future) {
if
(future.isCanceled()) {
session.close();
}
}
});
}
protected final void finishSessionInitialization0( final IoSession session, IoFuture future) { // In case that ConnectFuture.cancel() is invoked before // setSession() is invoked, add a listener that closes the // connection immediately on cancellation. future.addListener(new IoFutureListener<ConnectFuture>() { public void operationComplete(ConnectFuture future) { if (future.isCanceled()) { session.close(); } } }); }
下面再来看一个IoProcessor接口的基本实现类SimpleIoProcessorPool,它的泛型参数是AbstractIoSession
的子类,表示此Processor管理的具体会话类型。并且这个类还实现了池化,它会将多个IoSession分布到多个IoProcessor上去管
理。下面是文档中给出的一个示例:
Java代码
// Create a shared pool.
SimpleIoProcessorPool<NioSession> pool =
new
SimpleIoProcessorPool<NioSession>(NioProcessor.
class
,
16
);
// Create two services that share the same pool.
SocketAcceptor acceptor = new
NioSocketAcceptor(pool);
SocketConnector connector = new
NioSocketConnector(pool);
// Release related resources.
connector.dispose();
acceptor.dispose();
pool.dispose();
// Create a shared pool. SimpleIoProcessorPool<NioSession> pool = new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16); // Create two services that share the same pool. SocketAcceptor acceptor = new NioSocketAcceptor(pool); SocketConnector connector = new NioSocketConnector(pool); // Release related resources. connector.dispose(); acceptor.dispose(); pool.dispose();
与Processor池有关的包括如下这些成员变量:
Java代码
private
static
final
int
DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() +
1
;
//处理池大小,默认是处理器数+1, 便于多核分布处理
private
final
IoProcessor<T>[] pool;
//IoProcessor池
private
final
AtomicInteger processorDistributor =
new
AtomicInteger();
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;//处理池大小,默认是处理器数+1, 便于多核分布处理 private final IoProcessor<T>[] pool;//IoProcessor池 private final AtomicInteger processorDistributor = new AtomicInteger();
Processor池的构造过程,其中有三种构造函数供选择来构造一个Processor :
1. 带参数 ExecutorService 的构造函数.
2. 带参数为 Executor的构造函数.
3. 默认构造函数
Java代码
pool =
new
IoProcessor[size];
//构建池
boolean
success =
false
;
try
{
for
(
int
i =
0
; i < pool.length; i ++) {
IoProcessor<T> processor = null
;
/有三种构造函数供选择来构造一个Processor
try
{
try
{
processor = processorType.getConstructor(ExecutorService.class
).newInstance(executor);
} catch
(NoSuchMethodException e) {
// To the next step
}
if
(processor ==
null
) {
try
{
processor = processorType.getConstructor(Executor.class
).newInstance(executor);
} catch
(NoSuchMethodException e) {
// To the next step
}
}
if
(processor ==
null
) {
try
{
processor = processorType.getConstructor().newInstance();
} catch
(NoSuchMethodException e) {
// To the next step
}
}
} catch
(RuntimeException e) {
throw
e;
} catch
(Exception e) {
throw
new
RuntimeIoException(
"Failed to create a new instance of "
+ processorType.getName(), e);
}
pool[i] = processor;
}
success = true
;
} finally
{
if
(!success) {
dispose();
}
}
pool = new IoProcessor[size];//构建池 boolean success = false; try { for (int i = 0; i < pool.length; i ++) { IoProcessor<T> processor = null; //有三种构造函数供选择来构造一个Processor try { try { processor = processorType.getConstructor(ExecutorService.class).newInstance(executor); } catch (NoSuchMethodException e) { // To the next step } if (processor == null) { try { processor = processorType.getConstructor(Executor.class).newInstance(executor); } catch (NoSuchMethodException e) { // To the next step } } if (processor == null) { try { processor = processorType.getConstructor().newInstance(); } catch (NoSuchMethodException e) { // To the next step } } } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException( "Failed to create a new instance of " + processorType.getName(), e); } pool[i] = processor; } success = true; } finally { if (!success) { dispose(); } }
从Processor池中分配一个processor的过程,注意一个processor是可以同时管理多个session的。
Java代码
private
IoProcessor<T> getProcessor(T session)
{//返回session所在的processor,若没分配,则为之分配一个
IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);//看session的属性中是否保存对应的Processor
if
(p ==
null
)
{//还没为此session分配processor
p = nextProcessor();//从池中取一个processor
IoProcessor<T> oldp =
(IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
if
(oldp !=
null
)
{//原来的processor
p = oldp;
}
}
return
p;
}
private
IoProcessor<T> nextProcessor()
{//从池中分配一个Processor
checkDisposal();
return
pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
}
相关文章推荐
- 转:Mina2.0框架源码剖析(四)
- Mina2.0框架源码剖析(四)
- Mina2.0框架源码剖析(七)
- Mina2.0框架源码剖析(二)
- Mina2.0框架源码剖析(六)
- 转:Mina2.0框架源码剖析(六)
- Mina2.0框架源码剖析(五)
- Mina2.0框架源码剖析(五)
- Mina2.0框架源码剖析(八)
- Mina2.0框架源码剖析(一)
- 转:Mina2.0框架源码剖析(五)
- Mina2.0框架源码剖析(三)
- 转:Mina2.0框架源码剖析(七)
- Mina2.0框架源码剖析(六)
- Mina2.0框架源码剖析(三)
- Mina2.0框架源码剖析(八)
- 转:Mina2.0框架源码剖析(八)
- Mina2.0框架源码剖析(一)
- Mina2.0框架源码剖析(五)
- Mina2.0框架源码剖析(二)