您的位置:首页 > 其它

Mina2.0框架源码剖析(四)

2009-11-25 14:51 447 查看
前面几篇介绍完了org.apache.mina.core.service这个包,现在进入org.apache.mina.core.session,这个包主要是围绕IoSession展开的,包括会话的方方面面。

IoSession接口与底层的传输层类型无关(也就是不管是TCP还是UDP),它表示通信双端的连接。它提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关的信息。

每个会话都有一个Service为之提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法是read和write,这两个方
法都是异步执行,若要真正完成必须在其返回结果上进行等待。关闭会话的方法close是异步执行的,也就是应当等待返回的CloseFuture,此外,
还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,再关闭会话,但同样是异步的。会话的读写
类型是可配置的,在运行中可设置此端是否可读写。

一个会话主要包含两个方面的数据,属性映射图,写请求队列,在这里作者使用了工厂模式来为新创建的会话提供这些数据结构。

Java代码

public

interface
IoSessionDataStructureFactory

{

IoSessionAttributeMap getAttributeMap(IoSession session) throws
Exception;

WriteRequestQueue getWriteRequestQueue(IoSession session) throws
Exception;

}

public interface IoSessionDataStructureFactory
{
IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}


IoSessionConfig接口用于表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量时间间隔,指定会话端的空闲时间,写请求操作超时时间。在这个接口中有一个方法值得注意

Java代码

void
setUseReadOperation(
boolean
useReadOperation);

void setUseReadOperation(boolean useReadOperation);


通过它来设置IoSession的read方法是否启用,若启用的话,则所有接收到的消息都会存储在内部的一个阻塞队列中,好处在于可以更方便用户对信息的处理,但对于某些应用来说并不管用,而且还会造成内存泄露,因此默认情况下这个选项是不开启的。

IoSessionInitializer接口定义了一个回调函数,这在AbstractIoService这个类中的finishSessionInitialization方法中已经见识过它的使用了,用于把用户自定义的会话初始化行为剥离出来。

Java代码

public

interface
IoSessionInitializer<T
extends
IoFuture>

{

void
initializeSession(IoSession session, T future);

}

public interface IoSessionInitializer<T extends IoFuture>
{
void initializeSession(IoSession session, T future);
}


IoSessionRecycler接口为一个无连接的传输服务提供回收现有会话的服务,主要的方法是:

Java代码

IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);


一个会话的读写能力控制通过TrafficMask类来描述,主要是SelectionKey.OP_READ和SelectionKey.OP_WRITE结合。此类使用单例模式实现,还提供了与,或,非,异或等位操作来动态控制会话读写能力。

Mina中的I/O事件类型如下:

Java代码

public

enum
IoEventType {

SESSION_CREATED,//会话创建

SESSION_OPENED,//会话打开

SESSION_CLOSED,//会话关闭

MESSAGE_RECEIVED,//接收到消息

MESSAGE_SENT,//发送消息

SESSION_IDLE,//空闲

EXCEPTION_CAUGHT,//异常捕获

WRITE,

CLOSE,

SET_TRAFFIC_MASK,//设置读写能力

}

public enum IoEventType {
SESSION_CREATED,//会话创建
SESSION_OPENED,//会话打开
SESSION_CLOSED,//会话关闭
MESSAGE_RECEIVED,//接收到消息
MESSAGE_SENT,//发送消息
SESSION_IDLE,//空闲
EXCEPTION_CAUGHT,//异常捕获
WRITE,
CLOSE,
SET_TRAFFIC_MASK,//设置读写能力
}


IoEvent类实现了Runnable接口,表示一个I/O事件或一个I/O请求,包括事件类型,所属的会话,事件参数值。最重要的方法就是fire,根据事件类型向会话的过滤器链上的众多监听者发出事件到来的信号。

Java代码

public

void
fire() {

switch
(getType()) {

case
MESSAGE_RECEIVED:

getSession().getFilterChain().fireMessageReceived(getParameter());

break
;

case
MESSAGE_SENT:

getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());

break
;

case
WRITE:

getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());

break
;

case
SET_TRAFFIC_MASK:

getSession().getFilterChain().fireFilterSetTrafficMask((TrafficMask) getParameter());

break
;

case
CLOSE:

getSession().getFilterChain().fireFilterClose();

break
;

case
EXCEPTION_CAUGHT:

getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());

break
;

case
SESSION_IDLE:

getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());

break
;

case
SESSION_OPENED:

getSession().getFilterChain().fireSessionOpened();

break
;

case
SESSION_CREATED:

getSession().getFilterChain().fireSessionCreated();

break
;

case
SESSION_CLOSED:

getSession().getFilterChain().fireSessionClosed();

break
;

default
:

throw

new
IllegalArgumentException(
"Unknown event type: "
+ getType());

}

}

public void fire() {
switch (getType()) {
case MESSAGE_RECEIVED:
getSession().getFilterChain().fireMessageReceived(getParameter());
break;
case MESSAGE_SENT:
getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());
break;
case WRITE:
getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());
break;
case SET_TRAFFIC_MASK:
getSession().getFilterChain().fireFilterSetTrafficMask((TrafficMask) getParameter());
break;
case CLOSE:
getSession().getFilterChain().fireFilterClose();
break;
case EXCEPTION_CAUGHT:
getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());
break;
case SESSION_IDLE:
getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());
break;
case SESSION_OPENED:
getSession().getFilterChain().fireSessionOpened();
break;
case SESSION_CREATED:
getSession().getFilterChain().fireSessionCreated();
break;
case SESSION_CLOSED:
getSession().getFilterChain().fireSessionClosed();
break;
default:
throw new IllegalArgumentException("Unknown event type: " + getType());
}
}


Mina的会话中,有三种类型的闲置状态:1)READER_IDLE
,这表示从远端没有数据到来,读端空闲。2)WRITER_IDLE ,这表示写端没有在写数据。3)BOTH_IDLE,读端和写端都空闲。
为了节约会话资源,可以让用户设置当空闲超过一定时间后关闭此会话,因为此会话可能在某一端出问题了,从而导致另一端空闲超过太长时间。这可以通过使用
IoSessionConfig.setIdleTime(IdleStatus,int)来完成,空闲时间阀值在会话配
(IoSessionConfig)中设置。

前面介绍过IoSessionDataStructureFactor接口为会话提供所需要的数据结
构,DefaultIoSessionDataStructureFactory是其一个默认实现类。它提供的写请求队列内部是一个初始大小为16的循环
队列,并且在插入队列尾部和从队列头部取数据时都必须满足互斥同步。

Java代码

private

static

class
DefaultWriteRequestQueue
implements
WriteRequestQueue {

private

final
Queue<WriteRequest> q =
new
CircularQueue<WriteRequest>(
16
);

public

void
dispose(IoSession session) {

}

public

void
clear(IoSession session) {

q.clear();

}

public

synchronized

boolean
isEmpty(IoSession session) {

return
q.isEmpty();

}

public

synchronized

void
offer(IoSession session, WriteRequest writeRequest) {

q.offer(writeRequest);

}

public

synchronized
WriteRequest poll(IoSession session) {

return
q.poll();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: