Apache Mina使用手记(四)
2014-10-26 21:07
190 查看
上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。
实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。
我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:
[java] view
plaincopy
package com.gftech.mytool.mina;
import com.gftech.util.GFCommon;
public class MyProtocalPack {
private int length;
private byte flag;
private String content;
public MyProtocalPack(){
}
public MyProtocalPack(byte flag,String content){
this.flag=flag;
this.content=content;
int len1=content==null?0:content.getBytes().length;
this.length=5+len1;
}
public MyProtocalPack(byte[] bs){
if(bs!=null && bs.length>=5){
length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4));
flag=bs[4];
content=new String(GFCommon.bytesCopy(bs, 5, length-5));
}
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte getFlag() {
return flag;
}
public void setFlag(byte flag) {
this.flag = flag;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String toString(){
StringBuffer sb=new StringBuffer();
sb.append(" Len:").append(length);
sb.append(" flag:").append(flag);
sb.append(" content:").append(content);
return sb.toString();
}
}
回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用 了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解 码器。我们看一下具体的源代码:
[java] view
plaincopy
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));
[java] view
plaincopy
package org.apache.mina.filter.codec.textline;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.BufferDataException;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
/**
* A {@link ProtocolCodecFactory} that performs encoding and decoding between
* a text line data and a Java string object. This codec is useful especially
* when you work with a text-based protocols such as SMTP and IMAP.
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public class TextLineCodecFactory implements ProtocolCodecFactory {
private final TextLineEncoder encoder;
private final TextLineDecoder decoder;
/**
* Creates a new instance with the current default {@link Charset}.
*/
public TextLineCodecFactory() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified {@link Charset}. The
* encoder uses a UNIX {@link LineDelimiter} and the decoder uses
* the AUTO {@link LineDelimiter}.
*
* @param charset
* The charset to use in the encoding and decoding
*/
public TextLineCodecFactory(Charset charset) {
encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);
decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);
}
/**
* Creates a new instance of TextLineCodecFactory. This constructor
* provides more flexibility for the developer.
*
* @param charset
* The charset to use in the encoding and decoding
* @param encodingDelimiter
* The line delimeter for the encoder
* @param decodingDelimiter
* The line delimeter for the decoder
*/
public TextLineCodecFactory(Charset charset,
String encodingDelimiter, String decodingDelimiter) {
encoder = new TextLineEncoder(charset, encodingDelimiter);
decoder = new TextLineDecoder(charset, decodingDelimiter);
}
/**
* Creates a new instance of TextLineCodecFactory. This constructor
* provides more flexibility for the developer.
*
* @param charset
* The charset to use in the encoding and decoding
* @param encodingDelimiter
* The line delimeter for the encoder
* @param decodingDelimiter
* The line delimeter for the decoder
*/
public TextLineCodecFactory(Charset charset,
LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) {
encoder = new TextLineEncoder(charset, encodingDelimiter);
decoder = new TextLineDecoder(charset, decodingDelimiter);
}
public ProtocolEncoder getEncoder(IoSession session) {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession session) {
return decoder;
}
/**
* Returns the allowed maximum size of the encoded line.
* If the size of the encoded line exceeds this value, the encoder
* will throw a {@link IllegalArgumentException}. The default value
* is {@link Integer#MAX_VALUE}.
* <p>
* This method does the same job with {@link TextLineEncoder#getMaxLineLength()}.
*/
public int getEncoderMaxLineLength() {
return encoder.getMaxLineLength();
}
/**
* Sets the allowed maximum size of the encoded line.
* If the size of the encoded line exceeds this value, the encoder
* will throw a {@link IllegalArgumentException}. The default value
* is {@link Integer#MAX_VALUE}.
* <p>
* This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}.
*/
public void setEncoderMaxLineLength(int maxLineLength) {
encoder.setMaxLineLength(maxLineLength);
}
/**
* Returns the allowed maximum size of the line to be decoded.
* If the size of the line to be decoded exceeds this value, the
* decoder will throw a {@link BufferDataException}. The default
* value is <tt>1024</tt> (1KB).
* <p>
* This method does the same job with {@link TextLineDecoder#getMaxLineLength()}.
*/
public int getDecoderMaxLineLength() {
return decoder.getMaxLineLength();
}
/**
* Sets the allowed maximum size of the line to be decoded.
* If the size of the line to be decoded exceeds this value, the
* decoder will throw a {@link BufferDataException}. The default
* value is <tt>1024</tt> (1KB).
* <p>
* This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}.
*/
public void setDecoderMaxLineLength(int maxLineLength) {
decoder.setMaxLineLength(maxLineLength);
}
}
TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():
[java] view
plaincopy
package org.apache.mina.filter.codec;
import org.apache.mina.core.session.IoSession;
/**
* Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
* binary or protocol specific data into message object and vice versa.
* <p>
* Please refer to
* <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>
* example.
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public interface ProtocolCodecFactory {
/**
* Returns a new (or reusable) instance of {@link ProtocolEncoder} which
* encodes message objects into binary or protocol-specific data.
*/
ProtocolEncoder getEncoder(IoSession session) throws Exception;
/**
* Returns a new (or reusable) instance of {@link ProtocolDecoder} which
* decodes binary or protocol-specific data into message objects.
*/
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
public class MyProtocalCodecFactory implements ProtocolCodecFactory {
private final MyProtocalEncoder encoder;
private final MyProtocalDecoder decoder;
public MyProtocalCodecFactory(Charset charset) {
encoder=new MyProtocalEncoder(charset);
decoder=new MyProtocalDecoder(charset);
}
public ProtocolEncoder getEncoder(IoSession session) {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession session) {
return decoder;
}
}
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
public class MyProtocalEncoder extends ProtocolEncoderAdapter {
private final Charset charset;
public MyProtocalEncoder(Charset charset) {
this.charset = charset;
}
//在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
MyProtocalPack value = (MyProtocalPack) message;
IoBuffer buf = IoBuffer.allocate(value.getLength());
buf.setAutoExpand(true);
buf.putInt(value.getLength());
buf.put(value.getFlag());
if (value.getContent() != null)
buf.put(value.getContent().getBytes());
buf.flip();
out.write(buf);
}
public void dispose() throws Exception {
}
}
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
public class MyProtocalDecoder implements ProtocolDecoder {
private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
private final Charset charset;
private int maxPackLength = 100;
public MyProtocalDecoder() {
this(Charset.defaultCharset());
}
public MyProtocalDecoder(Charset charset) {
this.charset = charset;
}
public int getMaxLineLength() {
return maxPackLength;
}
public void setMaxLineLength(int maxLineLength) {
if (maxLineLength <= 0) {
throw new IllegalArgumentException("maxLineLength: " + maxLineLength);
}
this.maxPackLength = maxLineLength;
}
private Context getContext(IoSession session) {
Context ctx;
ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
}
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
final int packHeadLength = 5;
//先获取上次的处理上下文,其中可能有未处理完的数据
Context ctx = getContext(session);
// 先把当前buffer中的数据追加到Context的buffer当中
ctx.append(in);
//把position指向0位置,把limit指向原来的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 然后按数据包的协议进行读取
while (buf.remaining() >= packHeadLength) {
buf.mark();
// 读取消息头部分
int length = buf.getInt();
byte flag = buf.get();
//检查读取的包头是否正常,不正常的话清空buffer
if (length<0 ||length > maxPackLength) {
buf.clear();
break;
}
//读取正常的消息包,并写入输出流中,以便IoHandler进行处理
else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) {
int oldLimit2 = buf.limit();
buf.limit(buf.position() + length - packHeadLength);
String content = buf.getString(ctx.getDecoder());
buf.limit(oldLimit2);
MyProtocalPack pack = new MyProtocalPack(flag, content);
out.write(pack);
} else {
// 如果消息包不完整
// 将指针重新移动消息头的起始位置
buf.reset();
break;
}
}
if (buf.hasRemaining()) {
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
} else {// 如果数据已经处理完毕,进行清空
buf.clear();
}
}
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
}
public void dispose(IoSession session) throws Exception {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx != null) {
session.removeAttribute(CONTEXT);
}
}
//记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
//所以,需要上下文拼起来才能完整的处理
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int matchCount = 0;
private int overflowPosition = 0;
private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(80).setAutoExpand(true);
}
public CharsetDecoder getDecoder() {
return decoder;
}
public IoBuffer getBuffer() {
return buf;
}
public int getOverflowPosition() {
return overflowPosition;
}
public int getMatchCount() {
return matchCount;
}
public void setMatchCount(int matchCount) {
this.matchCount = matchCount;
}
public void reset() {
overflowPosition = 0;
matchCount = 0;
decoder.reset();
}
public void append(IoBuffer in) {
getBuffer().put(in);
}
}
}
在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class MyProtocalServer {
private static final int PORT = 2500;
static Logger logger = Logger.getLogger(MyProtocalServer.class);
public static void main(String[] args) throws IOException {
PropertyConfigurator.configure("conf//log4j.properties");
IoAcceptor acceptor = new NioSocketAcceptor();
Log4jFilter lf = new Log4jFilter(logger);
acceptor.getFilterChain().addLast("logger", lf);
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));
acceptor.getSessionConfig().setReadBufferSize(1024);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.setHandler(new MyHandler());
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("start server ...");
}
}
class MyHandler extends IoHandlerAdapter {
static Logger logger = Logger.getLogger(MyHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
MyProtocalPack pack=(MyProtocalPack)message;
logger.debug("Rec:" + pack);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
logger.debug("IDLE " + session.getIdleCount(status));
}
}
编写一个客户端程序进行测试:
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.io.DataOutputStream;
import java.net.Socket;
public class MyProtocalClient {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 2500);
DataOutputStream out = new DataOutputStream( socket.getOutputStream() ) ;
for (int i = 0; i < 1000; i++) {
MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa");
out.writeInt(pack.getLength());
out.write(pack.getFlag());
out.write(pack.getContent().getBytes());
out.flush();
System.out.println(i + " sended");
}
Thread.sleep(1000 );
} catch (Exception e) {
e.printStackTrace();
}
}
}
也可以用IoConnector实现自己的客户端:
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class MyProtocalClient2 {
private static final String HOST = "192.168.10.8";
private static final int PORT = 2500;
static long counter = 0;
final static int FC1 = 100;
static long start = 0;
/**
* 使用Mina的框架结构进行测试
*
* @param args
*/
public static void main(String[] args) throws IOException {
start = System.currentTimeMillis();
IoConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));
connector.setHandler(new TimeClientHandler2());
connector.getSessionConfig().setReadBufferSize(100);
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT));
connFuture.addListener(new IoFutureListener<ConnectFuture>() {
public void operationComplete(ConnectFuture future) {
try {
if (future.isConnected()) {
IoSession session = future.getSession();
sendData(session);
} else {
System.out.println("连接不存在 ");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
System.out.println("start client ...");
}
public static void sendData(IoSession session) throws IOException {
for (int i = 0; i < FC1; i++) {
String content = "afdjkdafk张新波测试" + i;
MyProtocalPack pack = new MyProtocalPack((byte) i, content);
session.write(pack);
System.out.println("send data:" + pack);
}
}
}
class TimeClientHandler2 extends IoHandlerAdapter {
@Override
public void sessionOpened(IoSession session) {
// Set reader idle time to 10 seconds.
// sessionIdle(...) method will be invoked when no data is read
// for 10 seconds.
session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);
}
@Override
public void sessionClosed(IoSession session) {
// Print out total number of bytes read from the remote peer.
System.err.println("Total " + session.getReadBytes() + " byte(s)");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
// Close the connection if reader is idle.
if (status == IdleStatus.READER_IDLE) {
session.close(true);
}
}
@Override
public void messageReceived(IoSession session, Object message) {
MyProtocalPack pack = (MyProtocalPack) message;
System.out.println("rec:" + pack);
}
}
转载地址:
http://blog.csdn.net/sinboy/article/details/3988642
实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。
我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:
[java] view
plaincopy
package com.gftech.mytool.mina;
import com.gftech.util.GFCommon;
public class MyProtocalPack {
private int length;
private byte flag;
private String content;
public MyProtocalPack(){
}
public MyProtocalPack(byte flag,String content){
this.flag=flag;
this.content=content;
int len1=content==null?0:content.getBytes().length;
this.length=5+len1;
}
public MyProtocalPack(byte[] bs){
if(bs!=null && bs.length>=5){
length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4));
flag=bs[4];
content=new String(GFCommon.bytesCopy(bs, 5, length-5));
}
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte getFlag() {
return flag;
}
public void setFlag(byte flag) {
this.flag = flag;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String toString(){
StringBuffer sb=new StringBuffer();
sb.append(" Len:").append(length);
sb.append(" flag:").append(flag);
sb.append(" content:").append(content);
return sb.toString();
}
}
回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用 了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解 码器。我们看一下具体的源代码:
[java] view
plaincopy
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));
[java] view
plaincopy
package org.apache.mina.filter.codec.textline;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.BufferDataException;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
/**
* A {@link ProtocolCodecFactory} that performs encoding and decoding between
* a text line data and a Java string object. This codec is useful especially
* when you work with a text-based protocols such as SMTP and IMAP.
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public class TextLineCodecFactory implements ProtocolCodecFactory {
private final TextLineEncoder encoder;
private final TextLineDecoder decoder;
/**
* Creates a new instance with the current default {@link Charset}.
*/
public TextLineCodecFactory() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified {@link Charset}. The
* encoder uses a UNIX {@link LineDelimiter} and the decoder uses
* the AUTO {@link LineDelimiter}.
*
* @param charset
* The charset to use in the encoding and decoding
*/
public TextLineCodecFactory(Charset charset) {
encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);
decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);
}
/**
* Creates a new instance of TextLineCodecFactory. This constructor
* provides more flexibility for the developer.
*
* @param charset
* The charset to use in the encoding and decoding
* @param encodingDelimiter
* The line delimeter for the encoder
* @param decodingDelimiter
* The line delimeter for the decoder
*/
public TextLineCodecFactory(Charset charset,
String encodingDelimiter, String decodingDelimiter) {
encoder = new TextLineEncoder(charset, encodingDelimiter);
decoder = new TextLineDecoder(charset, decodingDelimiter);
}
/**
* Creates a new instance of TextLineCodecFactory. This constructor
* provides more flexibility for the developer.
*
* @param charset
* The charset to use in the encoding and decoding
* @param encodingDelimiter
* The line delimeter for the encoder
* @param decodingDelimiter
* The line delimeter for the decoder
*/
public TextLineCodecFactory(Charset charset,
LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) {
encoder = new TextLineEncoder(charset, encodingDelimiter);
decoder = new TextLineDecoder(charset, decodingDelimiter);
}
public ProtocolEncoder getEncoder(IoSession session) {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession session) {
return decoder;
}
/**
* Returns the allowed maximum size of the encoded line.
* If the size of the encoded line exceeds this value, the encoder
* will throw a {@link IllegalArgumentException}. The default value
* is {@link Integer#MAX_VALUE}.
* <p>
* This method does the same job with {@link TextLineEncoder#getMaxLineLength()}.
*/
public int getEncoderMaxLineLength() {
return encoder.getMaxLineLength();
}
/**
* Sets the allowed maximum size of the encoded line.
* If the size of the encoded line exceeds this value, the encoder
* will throw a {@link IllegalArgumentException}. The default value
* is {@link Integer#MAX_VALUE}.
* <p>
* This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}.
*/
public void setEncoderMaxLineLength(int maxLineLength) {
encoder.setMaxLineLength(maxLineLength);
}
/**
* Returns the allowed maximum size of the line to be decoded.
* If the size of the line to be decoded exceeds this value, the
* decoder will throw a {@link BufferDataException}. The default
* value is <tt>1024</tt> (1KB).
* <p>
* This method does the same job with {@link TextLineDecoder#getMaxLineLength()}.
*/
public int getDecoderMaxLineLength() {
return decoder.getMaxLineLength();
}
/**
* Sets the allowed maximum size of the line to be decoded.
* If the size of the line to be decoded exceeds this value, the
* decoder will throw a {@link BufferDataException}. The default
* value is <tt>1024</tt> (1KB).
* <p>
* This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}.
*/
public void setDecoderMaxLineLength(int maxLineLength) {
decoder.setMaxLineLength(maxLineLength);
}
}
TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():
[java] view
plaincopy
package org.apache.mina.filter.codec;
import org.apache.mina.core.session.IoSession;
/**
* Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
* binary or protocol specific data into message object and vice versa.
* <p>
* Please refer to
* <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a>
* example.
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public interface ProtocolCodecFactory {
/**
* Returns a new (or reusable) instance of {@link ProtocolEncoder} which
* encodes message objects into binary or protocol-specific data.
*/
ProtocolEncoder getEncoder(IoSession session) throws Exception;
/**
* Returns a new (or reusable) instance of {@link ProtocolDecoder} which
* decodes binary or protocol-specific data into message objects.
*/
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
public class MyProtocalCodecFactory implements ProtocolCodecFactory {
private final MyProtocalEncoder encoder;
private final MyProtocalDecoder decoder;
public MyProtocalCodecFactory(Charset charset) {
encoder=new MyProtocalEncoder(charset);
decoder=new MyProtocalDecoder(charset);
}
public ProtocolEncoder getEncoder(IoSession session) {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession session) {
return decoder;
}
}
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
public class MyProtocalEncoder extends ProtocolEncoderAdapter {
private final Charset charset;
public MyProtocalEncoder(Charset charset) {
this.charset = charset;
}
//在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
MyProtocalPack value = (MyProtocalPack) message;
IoBuffer buf = IoBuffer.allocate(value.getLength());
buf.setAutoExpand(true);
buf.putInt(value.getLength());
buf.put(value.getFlag());
if (value.getContent() != null)
buf.put(value.getContent().getBytes());
buf.flip();
out.write(buf);
}
public void dispose() throws Exception {
}
}
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
public class MyProtocalDecoder implements ProtocolDecoder {
private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
private final Charset charset;
private int maxPackLength = 100;
public MyProtocalDecoder() {
this(Charset.defaultCharset());
}
public MyProtocalDecoder(Charset charset) {
this.charset = charset;
}
public int getMaxLineLength() {
return maxPackLength;
}
public void setMaxLineLength(int maxLineLength) {
if (maxLineLength <= 0) {
throw new IllegalArgumentException("maxLineLength: " + maxLineLength);
}
this.maxPackLength = maxLineLength;
}
private Context getContext(IoSession session) {
Context ctx;
ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
}
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
final int packHeadLength = 5;
//先获取上次的处理上下文,其中可能有未处理完的数据
Context ctx = getContext(session);
// 先把当前buffer中的数据追加到Context的buffer当中
ctx.append(in);
//把position指向0位置,把limit指向原来的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 然后按数据包的协议进行读取
while (buf.remaining() >= packHeadLength) {
buf.mark();
// 读取消息头部分
int length = buf.getInt();
byte flag = buf.get();
//检查读取的包头是否正常,不正常的话清空buffer
if (length<0 ||length > maxPackLength) {
buf.clear();
break;
}
//读取正常的消息包,并写入输出流中,以便IoHandler进行处理
else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) {
int oldLimit2 = buf.limit();
buf.limit(buf.position() + length - packHeadLength);
String content = buf.getString(ctx.getDecoder());
buf.limit(oldLimit2);
MyProtocalPack pack = new MyProtocalPack(flag, content);
out.write(pack);
} else {
// 如果消息包不完整
// 将指针重新移动消息头的起始位置
buf.reset();
break;
}
}
if (buf.hasRemaining()) {
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
} else {// 如果数据已经处理完毕,进行清空
buf.clear();
}
}
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
}
public void dispose(IoSession session) throws Exception {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx != null) {
session.removeAttribute(CONTEXT);
}
}
//记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
//所以,需要上下文拼起来才能完整的处理
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int matchCount = 0;
private int overflowPosition = 0;
private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(80).setAutoExpand(true);
}
public CharsetDecoder getDecoder() {
return decoder;
}
public IoBuffer getBuffer() {
return buf;
}
public int getOverflowPosition() {
return overflowPosition;
}
public int getMatchCount() {
return matchCount;
}
public void setMatchCount(int matchCount) {
this.matchCount = matchCount;
}
public void reset() {
overflowPosition = 0;
matchCount = 0;
decoder.reset();
}
public void append(IoBuffer in) {
getBuffer().put(in);
}
}
}
在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class MyProtocalServer {
private static final int PORT = 2500;
static Logger logger = Logger.getLogger(MyProtocalServer.class);
public static void main(String[] args) throws IOException {
PropertyConfigurator.configure("conf//log4j.properties");
IoAcceptor acceptor = new NioSocketAcceptor();
Log4jFilter lf = new Log4jFilter(logger);
acceptor.getFilterChain().addLast("logger", lf);
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));
acceptor.getSessionConfig().setReadBufferSize(1024);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.setHandler(new MyHandler());
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("start server ...");
}
}
class MyHandler extends IoHandlerAdapter {
static Logger logger = Logger.getLogger(MyHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
MyProtocalPack pack=(MyProtocalPack)message;
logger.debug("Rec:" + pack);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
logger.debug("IDLE " + session.getIdleCount(status));
}
}
编写一个客户端程序进行测试:
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.io.DataOutputStream;
import java.net.Socket;
public class MyProtocalClient {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 2500);
DataOutputStream out = new DataOutputStream( socket.getOutputStream() ) ;
for (int i = 0; i < 1000; i++) {
MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa");
out.writeInt(pack.getLength());
out.write(pack.getFlag());
out.write(pack.getContent().getBytes());
out.flush();
System.out.println(i + " sended");
}
Thread.sleep(1000 );
} catch (Exception e) {
e.printStackTrace();
}
}
}
也可以用IoConnector实现自己的客户端:
[java] view
plaincopy
package com.gftech.mytool.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class MyProtocalClient2 {
private static final String HOST = "192.168.10.8";
private static final int PORT = 2500;
static long counter = 0;
final static int FC1 = 100;
static long start = 0;
/**
* 使用Mina的框架结构进行测试
*
* @param args
*/
public static void main(String[] args) throws IOException {
start = System.currentTimeMillis();
IoConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));
connector.setHandler(new TimeClientHandler2());
connector.getSessionConfig().setReadBufferSize(100);
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT));
connFuture.addListener(new IoFutureListener<ConnectFuture>() {
public void operationComplete(ConnectFuture future) {
try {
if (future.isConnected()) {
IoSession session = future.getSession();
sendData(session);
} else {
System.out.println("连接不存在 ");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
System.out.println("start client ...");
}
public static void sendData(IoSession session) throws IOException {
for (int i = 0; i < FC1; i++) {
String content = "afdjkdafk张新波测试" + i;
MyProtocalPack pack = new MyProtocalPack((byte) i, content);
session.write(pack);
System.out.println("send data:" + pack);
}
}
}
class TimeClientHandler2 extends IoHandlerAdapter {
@Override
public void sessionOpened(IoSession session) {
// Set reader idle time to 10 seconds.
// sessionIdle(...) method will be invoked when no data is read
// for 10 seconds.
session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);
}
@Override
public void sessionClosed(IoSession session) {
// Print out total number of bytes read from the remote peer.
System.err.println("Total " + session.getReadBytes() + " byte(s)");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
// Close the connection if reader is idle.
if (status == IdleStatus.READER_IDLE) {
session.close(true);
}
}
@Override
public void messageReceived(IoSession session, Object message) {
MyProtocalPack pack = (MyProtocalPack) message;
System.out.println("rec:" + pack);
}
}
转载地址:
http://blog.csdn.net/sinboy/article/details/3988642
相关文章推荐
- Apache Mina使用手记(三)
- Apache Mina使用手记(五)
- Apache Mina使用手记(一)
- Apache Mina使用手记(一)
- Apache Mina使用手记(四)
- Apache Mina使用手记(四)
- Apache Mina使用手记(三)
- Apache Mina使用手记(二)
- Apache Mina使用手记(二)
- smarty类使用手记
- PowerDesign使用手记
- WinCVS与CVSNT使用手记
- Fedora Core 4使用手记(二) 添加软件仓库:
- Fedora Core 4 使用手记(六) 安装Reaplayer
- VMWare GSX Server for Linux使用手记
- Fedora Core 4 使用手记(三) ADSL上网
- Subversion使用手记
- WinCVS与CVSNT使用手记
- dotnetcharting 使用手记
- eVC使用手记