您的位置:首页 > 运维架构 > Apache

Apache Mina使用手记(四)

2014-10-26 21:07 190 查看
上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。

实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。

我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:

[java] view
plaincopy

package com.gftech.mytool.mina;

import com.gftech.util.GFCommon;

public class MyProtocalPack {

private int length;

private byte flag;

private String content;

public MyProtocalPack(){

}

public MyProtocalPack(byte flag,String content){

this.flag=flag;

this.content=content;

int len1=content==null?0:content.getBytes().length;

this.length=5+len1;

}

public MyProtocalPack(byte[] bs){

if(bs!=null && bs.length>=5){

length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4));

flag=bs[4];

content=new String(GFCommon.bytesCopy(bs, 5, length-5));

}

}

public int getLength() {

return length;

}

public void setLength(int length) {

this.length = length;

}

public byte getFlag() {

return flag;

}

public void setFlag(byte flag) {

this.flag = flag;

}

public String getContent() {

return content;

}

public void setContent(String content) {

this.content = content;

}

public String toString(){

StringBuffer sb=new StringBuffer();

sb.append(" Len:").append(length);

sb.append(" flag:").append(flag);

sb.append(" content:").append(content);

return sb.toString();

}

}

回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用 了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解 码器。我们看一下具体的源代码:

[java] view
plaincopy

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));

[java] view
plaincopy

package org.apache.mina.filter.codec.textline;

import java.nio.charset.Charset;

import org.apache.mina.core.buffer.BufferDataException;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolEncoder;

/**

* A {@link ProtocolCodecFactory} that performs encoding and decoding between

* a text line data and a Java string object. This codec is useful especially

* when you work with a text-based protocols such as SMTP and IMAP.

*

* @author The Apache MINA Project (dev@mina.apache.org)

* @version $Rev$, $Date$

*/

public class TextLineCodecFactory implements ProtocolCodecFactory {

private final TextLineEncoder encoder;

private final TextLineDecoder decoder;

/**

* Creates a new instance with the current default {@link Charset}.

*/

public TextLineCodecFactory() {

this(Charset.defaultCharset());

}

/**

* Creates a new instance with the specified {@link Charset}. The

* encoder uses a UNIX {@link LineDelimiter} and the decoder uses

* the AUTO {@link LineDelimiter}.

*

* @param charset

* The charset to use in the encoding and decoding

*/

public TextLineCodecFactory(Charset charset) {

encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);

decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);

}

/**

* Creates a new instance of TextLineCodecFactory. This constructor

* provides more flexibility for the developer.

*

* @param charset

* The charset to use in the encoding and decoding

* @param encodingDelimiter

* The line delimeter for the encoder

* @param decodingDelimiter

* The line delimeter for the decoder

*/

public TextLineCodecFactory(Charset charset,

String encodingDelimiter, String decodingDelimiter) {

encoder = new TextLineEncoder(charset, encodingDelimiter);

decoder = new TextLineDecoder(charset, decodingDelimiter);

}

/**

* Creates a new instance of TextLineCodecFactory. This constructor

* provides more flexibility for the developer.

*

* @param charset

* The charset to use in the encoding and decoding

* @param encodingDelimiter

* The line delimeter for the encoder

* @param decodingDelimiter

* The line delimeter for the decoder

*/

public TextLineCodecFactory(Charset charset,

LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) {

encoder = new TextLineEncoder(charset, encodingDelimiter);

decoder = new TextLineDecoder(charset, decodingDelimiter);

}

public ProtocolEncoder getEncoder(IoSession session) {

return encoder;

}

public ProtocolDecoder getDecoder(IoSession session) {

return decoder;

}

/**

* Returns the allowed maximum size of the encoded line.

* If the size of the encoded line exceeds this value, the encoder

* will throw a {@link IllegalArgumentException}. The default value

* is {@link Integer#MAX_VALUE}.

* <p>

* This method does the same job with {@link TextLineEncoder#getMaxLineLength()}.

*/

public int getEncoderMaxLineLength() {

return encoder.getMaxLineLength();

}

/**

* Sets the allowed maximum size of the encoded line.

* If the size of the encoded line exceeds this value, the encoder

* will throw a {@link IllegalArgumentException}. The default value

* is {@link Integer#MAX_VALUE}.

* <p>

* This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}.

*/

public void setEncoderMaxLineLength(int maxLineLength) {

encoder.setMaxLineLength(maxLineLength);

}

/**

* Returns the allowed maximum size of the line to be decoded.

* If the size of the line to be decoded exceeds this value, the

* decoder will throw a {@link BufferDataException}. The default

* value is <tt>1024</tt> (1KB).

* <p>

* This method does the same job with {@link TextLineDecoder#getMaxLineLength()}.

*/

public int getDecoderMaxLineLength() {

return decoder.getMaxLineLength();

}

/**

* Sets the allowed maximum size of the line to be decoded.

* If the size of the line to be decoded exceeds this value, the

* decoder will throw a {@link BufferDataException}. The default

* value is <tt>1024</tt> (1KB).

* <p>

* This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}.

*/

public void setDecoderMaxLineLength(int maxLineLength) {

decoder.setMaxLineLength(maxLineLength);

}

}

TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():

[java] view
plaincopy

package org.apache.mina.filter.codec;

import org.apache.mina.core.session.IoSession;

/**

* Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates

* binary or protocol specific data into message object and vice versa.

* <p>

* Please refer to

* <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>

* example.

*

* @author The Apache MINA Project (dev@mina.apache.org)

* @version $Rev$, $Date$

*/

public interface ProtocolCodecFactory {

/**

* Returns a new (or reusable) instance of {@link ProtocolEncoder} which

* encodes message objects into binary or protocol-specific data.

*/

ProtocolEncoder getEncoder(IoSession session) throws Exception;

/**

* Returns a new (or reusable) instance of {@link ProtocolDecoder} which

* decodes binary or protocol-specific data into message objects.

*/

ProtocolDecoder getDecoder(IoSession session) throws Exception;

}

我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。

[java] view
plaincopy

package com.gftech.mytool.mina;

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolEncoder;

public class MyProtocalCodecFactory implements ProtocolCodecFactory {

private final MyProtocalEncoder encoder;

private final MyProtocalDecoder decoder;

public MyProtocalCodecFactory(Charset charset) {

encoder=new MyProtocalEncoder(charset);

decoder=new MyProtocalDecoder(charset);

}

public ProtocolEncoder getEncoder(IoSession session) {

return encoder;

}

public ProtocolDecoder getDecoder(IoSession session) {

return decoder;

}

}

[java] view
plaincopy

package com.gftech.mytool.mina;

import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolEncoderAdapter;

import org.apache.mina.filter.codec.ProtocolEncoderOutput;

public class MyProtocalEncoder extends ProtocolEncoderAdapter {

private final Charset charset;

public MyProtocalEncoder(Charset charset) {

this.charset = charset;

}

//在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中

public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {

MyProtocalPack value = (MyProtocalPack) message;

IoBuffer buf = IoBuffer.allocate(value.getLength());

buf.setAutoExpand(true);

buf.putInt(value.getLength());

buf.put(value.getFlag());

if (value.getContent() != null)

buf.put(value.getContent().getBytes());

buf.flip();

out.write(buf);

}

public void dispose() throws Exception {

}

}

[java] view
plaincopy

package com.gftech.mytool.mina;

import java.nio.charset.Charset;

import java.nio.charset.CharsetDecoder;

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.AttributeKey;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolDecoderOutput;

public class MyProtocalDecoder implements ProtocolDecoder {

private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");

private final Charset charset;

private int maxPackLength = 100;

public MyProtocalDecoder() {

this(Charset.defaultCharset());

}

public MyProtocalDecoder(Charset charset) {

this.charset = charset;

}

public int getMaxLineLength() {

return maxPackLength;

}

public void setMaxLineLength(int maxLineLength) {

if (maxLineLength <= 0) {

throw new IllegalArgumentException("maxLineLength: " + maxLineLength);

}

this.maxPackLength = maxLineLength;

}

private Context getContext(IoSession session) {

Context ctx;

ctx = (Context) session.getAttribute(CONTEXT);

if (ctx == null) {

ctx = new Context();

session.setAttribute(CONTEXT, ctx);

}

return ctx;

}

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

final int packHeadLength = 5;

//先获取上次的处理上下文,其中可能有未处理完的数据

Context ctx = getContext(session);

// 先把当前buffer中的数据追加到Context的buffer当中

ctx.append(in);

//把position指向0位置,把limit指向原来的position位置

IoBuffer buf = ctx.getBuffer();

buf.flip();

// 然后按数据包的协议进行读取

while (buf.remaining() >= packHeadLength) {

buf.mark();

// 读取消息头部分

int length = buf.getInt();

byte flag = buf.get();

//检查读取的包头是否正常,不正常的话清空buffer

if (length<0 ||length > maxPackLength) {

buf.clear();

break;

}

//读取正常的消息包,并写入输出流中,以便IoHandler进行处理

else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) {

int oldLimit2 = buf.limit();

buf.limit(buf.position() + length - packHeadLength);

String content = buf.getString(ctx.getDecoder());

buf.limit(oldLimit2);

MyProtocalPack pack = new MyProtocalPack(flag, content);

out.write(pack);

} else {

// 如果消息包不完整

// 将指针重新移动消息头的起始位置

buf.reset();

break;

}

}

if (buf.hasRemaining()) {

// 将数据移到buffer的最前面

IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);

temp.put(buf);

temp.flip();

buf.clear();

buf.put(temp);

} else {// 如果数据已经处理完毕,进行清空

buf.clear();

}

}

public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {

}

public void dispose(IoSession session) throws Exception {

Context ctx = (Context) session.getAttribute(CONTEXT);

if (ctx != null) {

session.removeAttribute(CONTEXT);

}

}

//记录上下文,因为数据触发没有规模,很可能只收到数据包的一半

//所以,需要上下文拼起来才能完整的处理

private class Context {

private final CharsetDecoder decoder;

private IoBuffer buf;

private int matchCount = 0;

private int overflowPosition = 0;

private Context() {

decoder = charset.newDecoder();

buf = IoBuffer.allocate(80).setAutoExpand(true);

}

public CharsetDecoder getDecoder() {

return decoder;

}

public IoBuffer getBuffer() {

return buf;

}

public int getOverflowPosition() {

return overflowPosition;

}

public int getMatchCount() {

return matchCount;

}

public void setMatchCount(int matchCount) {

this.matchCount = matchCount;

}

public void reset() {

overflowPosition = 0;

matchCount = 0;

decoder.reset();

}

public void append(IoBuffer in) {

getBuffer().put(in);

}

}

}

在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:

[java] view
plaincopy

package com.gftech.mytool.mina;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.charset.Charset;

import org.apache.log4j.Logger;

import org.apache.log4j.PropertyConfigurator;

import org.apache.mina.core.service.IoAcceptor;

import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class MyProtocalServer {

private static final int PORT = 2500;

static Logger logger = Logger.getLogger(MyProtocalServer.class);

public static void main(String[] args) throws IOException {

PropertyConfigurator.configure("conf//log4j.properties");

IoAcceptor acceptor = new NioSocketAcceptor();

Log4jFilter lf = new Log4jFilter(logger);

acceptor.getFilterChain().addLast("logger", lf);

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));

acceptor.getSessionConfig().setReadBufferSize(1024);

acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

acceptor.setHandler(new MyHandler());

acceptor.bind(new InetSocketAddress(PORT));

System.out.println("start server ...");

}

}

class MyHandler extends IoHandlerAdapter {

static Logger logger = Logger.getLogger(MyHandler.class);

@Override

public void exceptionCaught(IoSession session, Throwable cause) throws Exception {

cause.printStackTrace();

}

@Override

public void messageReceived(IoSession session, Object message) throws Exception {

MyProtocalPack pack=(MyProtocalPack)message;

logger.debug("Rec:" + pack);

}

@Override

public void sessionIdle(IoSession session, IdleStatus status) throws Exception {

logger.debug("IDLE " + session.getIdleCount(status));

}

}

编写一个客户端程序进行测试:

[java] view
plaincopy

package com.gftech.mytool.mina;

import java.io.DataOutputStream;

import java.net.Socket;

public class MyProtocalClient {

public static void main(String[] args) {

try {

Socket socket = new Socket("127.0.0.1", 2500);

DataOutputStream out = new DataOutputStream( socket.getOutputStream() ) ;

for (int i = 0; i < 1000; i++) {

MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa");

out.writeInt(pack.getLength());

out.write(pack.getFlag());

out.write(pack.getContent().getBytes());

out.flush();

System.out.println(i + " sended");

}

Thread.sleep(1000 );

} catch (Exception e) {

e.printStackTrace();

}

}

}

也可以用IoConnector实现自己的客户端:

[java] view
plaincopy

package com.gftech.mytool.mina;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.charset.Charset;

import org.apache.mina.core.future.ConnectFuture;

import org.apache.mina.core.future.IoFutureListener;

import org.apache.mina.core.service.IoConnector;

import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class MyProtocalClient2 {

private static final String HOST = "192.168.10.8";

private static final int PORT = 2500;

static long counter = 0;

final static int FC1 = 100;

static long start = 0;

/**

* 使用Mina的框架结构进行测试

*

* @param args

*/

public static void main(String[] args) throws IOException {

start = System.currentTimeMillis();

IoConnector connector = new NioSocketConnector();

connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));

connector.setHandler(new TimeClientHandler2());

connector.getSessionConfig().setReadBufferSize(100);

connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT));

connFuture.addListener(new IoFutureListener<ConnectFuture>() {

public void operationComplete(ConnectFuture future) {

try {

if (future.isConnected()) {

IoSession session = future.getSession();

sendData(session);

} else {

System.out.println("连接不存在 ");

}

} catch (Exception e) {

e.printStackTrace();

}

}

});

System.out.println("start client ...");

}

public static void sendData(IoSession session) throws IOException {

for (int i = 0; i < FC1; i++) {

String content = "afdjkdafk张新波测试" + i;

MyProtocalPack pack = new MyProtocalPack((byte) i, content);

session.write(pack);

System.out.println("send data:" + pack);

}

}

}

class TimeClientHandler2 extends IoHandlerAdapter {

@Override

public void sessionOpened(IoSession session) {

// Set reader idle time to 10 seconds.

// sessionIdle(...) method will be invoked when no data is read

// for 10 seconds.

session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);

}

@Override

public void sessionClosed(IoSession session) {

// Print out total number of bytes read from the remote peer.

System.err.println("Total " + session.getReadBytes() + " byte(s)");

}

@Override

public void sessionIdle(IoSession session, IdleStatus status) {

// Close the connection if reader is idle.

if (status == IdleStatus.READER_IDLE) {

session.close(true);

}

}

@Override

public void messageReceived(IoSession session, Object message) {

MyProtocalPack pack = (MyProtocalPack) message;

System.out.println("rec:" + pack);

}

}

转载地址:
http://blog.csdn.net/sinboy/article/details/3988642
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: