您的位置:首页 > 其它

MINA 2.0.9: 用户定义的事件是如何触发的

2015-03-27 14:48 330 查看
比如典型的代码是

private void read(S session) {//session就是一个连接相关的所有信息
        IoSessionConfig config = session.getConfig();//获取配置信息
        int bufferSize = config.getReadBufferSize();//获取读缓冲区大小
        IoBuffer buf = IoBuffer.allocate(bufferSize);//分配缓冲区,难道每次都重新分配缓冲区再释放?
                                                                                                                                    //太奢侈了吧 

        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
                     //是否是TCP模式

 
        try {
            int readBytes = 0;//刚开始什么都没读,自然是0
            int ret;
 
            try {
                if (hasFragmentation) {//TCP模式,字节流
 
                    while ((ret = read(session, buf)) > 0) {//读到了数据
                        readBytes += ret;//累积读取的总字节数
 
                        if (!buf.hasRemaining()) {//缓冲区满了
                            break;//本轮结束
                        }
                    }
                } else {
                    ret = read(session, buf);//UDP模式,不做分析
 
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();//为读做准备,读操作可以直接利用里面的若干指示器
            }
 
            if (readBytes > 0) {//如果总读取的字节数大于0
                IoFilterChain filterChain = session.getFilterChain();//本文章要关注的重点
                filterChain.fireMessageReceived(buf);
                buf = null;//究竟是不是每次都申请然后在这里释放?
 
                if (hasFragmentation) {//动态调整读取缓冲区
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
 
            if (ret < 0) {//流关闭了
                // scheduleRemove(session);
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireInputClosed();
            }
        } catch (Exception e) {//异常发生
            if (e instanceof IOException) {
                if (!(e instanceof PortUnreachableException)
                        || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
                    scheduleRemove(session);
                }
            }
 
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }

----------------------------

本文主要是分析 上面两行 橙色 文字。

 

public void fireMessageReceived(Object message) {
        if (message instanceof IoBuffer) {//信息统计用,不解释
            session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
        }
 
        callNextMessageReceived(head, session, message);
    }

那么黑色粗体做了什么事情呢?

--------------------------

函数体如下所示:

 

private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
        try {
            IoFilter filter = entry.getFilter();//当前入口的filter
            NextFilter nextFilter = entry.getNextFilter();//下一个filter,以便一个一个传递
            filter.messageReceived(nextFilter, session, message);//调用其信息接收方法

        } catch (Exception e) {
            fireExceptionCaught(e);
        } catch (Error e) {
            fireExceptionCaught(e);
            throw e;
        }
    }

------------------------

上面的方法实际上是执行

nextFilter.messageReceived(session, message);

因为代码是这样的

 

/**
     * {@inheritDoc}
     */
    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
        nextFilter.messageReceived(session, message);
    }

可见这里不考虑具体细节的话,就是一个链,然后多次调用每个节点的messageReceived.

注意,已经初始化实现了一个head节点和tail节点,这两个节点的方法就是什么都不做,直接传给下一个节点处理。

-----------------------

那么问题来了,如何在我们自己的代码里加入filterChain呢?以便可以获取消息。

首先找到这个变量

 

/** The FilterChain created for this session */
    private final IoFilterChain filterChain;

所在类为

 

package org.apache.mina.transport.socket.nio;

public abstract class NioSession extends AbstractIoSession {

而且初始化的时候

 

protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
        super(service);
        this.channel = channel;
        this.processor = processor;
        filterChain = new DefaultIoFilterChain(this);
    }

看,这里是默认的一个IoFilterChain.

------------问题怎么破?

所有的问题如下:

1 如何触发各种自定义事件

2 读写操作如何实现。

3 总体的网络IO框架

弄懂了这3个问题,基本上MINA就弄清楚了。

这样,Netty也就基本懂了一大半了。

后面再开几篇文章来讲解Netty vs MINA .
http://my.oschina.net/qiangzigege/blog/391083
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: