您的位置:首页 > 其它

Mina2.0框架源码剖析(三)

2008-12-04 14:51 453 查看
AbstractIoAcceptor类继承自AbstractIoService基类,并实现了IoAcceptor接口,它主要的成员变量是本地绑定地址。

private final List<SocketAddress> defaultLocalAddresses =

new ArrayList<SocketAddress>();

private final List<SocketAddress> unmodifiableDefaultLocalAddresses =

Collections.unmodifiableList(defaultLocalAddresses);

private final Set<SocketAddress> boundAddresses =

new HashSet<SocketAddress>();

在调用bind或unbind方法时需要先获取绑定锁bindLock,具体的绑定操作还是在bind0这个方法中实现的。一旦绑定成功后,就会向服务监听者发出服务激活的事件(ServiceActivated),同理,解除绑定也是在unbind0这个方法中具体实现的。一旦解除绑定成功后,就会向服务监听者发出服务激活的事件(ServiceDeActivated)。

AbstractIoConnector类继承自AbstractIoService基类,并实现了IoConnect接口,连接超时检查间隔时间默认是50毫秒,超时时间默认为1分钟,用户可以自行配置。此类中重要的方法就是connect方法,其中调用了具体的连接逻辑实现connect0,

protected abstract ConnectFuture connect0(SocketAddress remoteAddress,

SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

AbstractIoConnector在AbstractIoService的基础上,在会话初始化结束时增加了一个功能,就是加入了一个监听者,当连接请求被取消时立即结束此会话。

protected final void finishSessionInitialization0(

final IoSession session, IoFuture future) {

// In case that ConnectFuture.cancel() is invoked before

// setSession() is invoked, add a listener that closes the

// connection immediately on cancellation.

future.addListener(new IoFutureListener<ConnectFuture>() {

public void operationComplete(ConnectFuture future) {

if (future.isCanceled()) {

session.close();

}

}

});

}

下面再来看一个IoProcessor接口的基本实现类SimpleIoProcessorPool,它的泛型参数是AbstractIoSession的子类,表示此Processor管理的具体会话类型。并且这个类还实现了池化,它会将多个IoSession分布到多个IoProcessor上去管理。下面是文档中给出的一个示例:

// Create a shared pool.

SimpleIoProcessorPool<NioSession> pool =

new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16);

// Create two services that share the same pool.

SocketAcceptor acceptor = new NioSocketAcceptor(pool);

SocketConnector connector = new NioSocketConnector(pool);

// Release related resources.

connector.dispose();

acceptor.dispose();

pool.dispose();

与Processor池有关的包括如下这些成员变量:

private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;//处理池大小,默认是处理器数+1, 便于多核分布处理

private final IoProcessor<T>[] pool;//IoProcessor池

private final AtomicInteger processorDistributor = new AtomicInteger();

Processor池的构造过程,其中有三种构造函数供选择来构造一个Processor :

带参数 ExecutorService 的构造函数.

带参数为 Executor的构造函数.

默认构造函数

pool = new IoProcessor[size];//构建池

boolean success = false;

try {

for (int i = 0; i < pool.length; i ++) {

IoProcessor<T> processor = null;

//有三种构造函数供选择来构造一个Processor

try {

try {

processor = processorType.getConstructor(ExecutorService.class).newInstance(executor);

} catch (NoSuchMethodException e) {

// To the next step

}

if (processor == null) {

try {

processor = processorType.getConstructor(Executor.class).newInstance(executor);

} catch (NoSuchMethodException e) {

// To the next step

}

}

if (processor == null) {

try {

processor = processorType.getConstructor().newInstance();

} catch (NoSuchMethodException e) {

// To the next step

}

}

} catch (RuntimeException e) {

throw e;

} catch (Exception e) {

throw new RuntimeIoException(

"Failed to create a new instance of " + processorType.getName(), e);

}

pool[i] = processor;

}

success = true;

} finally {

if (!success) {

dispose();

}

}

从Processor池中分配一个processor的过程,注意一个processor是可以同时管理多个session

private IoProcessor<T> getProcessor(T session)

{//返回session所在的processor,若没分配,则为之分配一个

IoProcessor<T> p = (IoProcessor<T>) session.getAttribute(PROCESSOR);//看session的属性中是否保存对应的Processor

if (p == null)

{//还没为此session分配processor

p = nextProcessor();//从池中取一个processor

IoProcessor<T> oldp =

(IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);

if (oldp != null)

{//原来的processor

p = oldp;

}

}

return p;

}

private IoProcessor<T> nextProcessor()

{//从池中分配一个Processor

checkDisposal();

return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];

}

作者:phinecos(洞庭散人)

出处:http://phinecos.cnblogs.com/

本文版权归作者和博客园共有,欢迎转载,但请保留此段声明,并在文章页面明显位置给出原文连接。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: