您的位置:首页 > 运维架构 > Apache

Java - Apache Mina 自定义协议通信

2014-06-30 23:13 357 查看
一、定义协议实体

import java.nio.charset.Charset;

/**
* 自定义协议的消息体
*/
public class MyMsg {
/**
* 消息长度
*/
private Integer lenth;

/**
* 发送人
*/
private Long sender;

/**
* 接收人
*/
private Long receiver;

/**
* 消息内容
*/
private String content;

public MyMsg() {

}

public Long getSender() {
return sender;
}

public void setSender(Long sender) {
this.sender = sender;
}

public Long getReceiver() {
return receiver;
}

public void setReceiver(Long receiver) {
this.receiver = receiver;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

/**
* 先计算长度,再返回。这里长度包含长度本身的字节
*/
public Integer getLenth() {

this.lenth = 4 + 8*2 + this.content.getBytes(Charset.forName("utf-8")).length;

return lenth;
}

public MyMsg(Long sender, Long receiver, String content) {
this.sender = sender;
this.receiver = receiver;
this.content = content;
}

@Override
public String toString() {
return "MyMsg [lenth=" + this.getLenth() + ", sender=" + sender + ", receiver="
+ receiver + ", content=" + content + "]";
}

}
二、定义编解码器

    1、编码器

import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

/**
*	编码器
*/
public class MyEncoder extends ProtocolEncoderAdapter {

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput encoderOutput)
throws Exception {

CharsetEncoder ce = Charset.forName("utf-8").newEncoder();

MyMsg msg = (MyMsg) message;

//	Mina IoBuffer
IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);

buffer.putInt(msg.getLenth());
buffer.putLong(msg.getSender());
buffer.putLong(msg.getReceiver());

//	有多个可变长度的属性时,可约定通过定义可变属性的最大长度(多余截取不足补齐)或put之前put其长度等方式处理
buffer.putString(msg.getContent(), ce);

buffer.flip();

encoderOutput.write(buffer);

}

}
    2、解码器

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
*	解码器可继承ProtocolEncoderAdapter,但它不利于处理粘包的情况
*/
public class MyDecoder extends CumulativeProtocolDecoder {

@Override
protected boolean doDecode(IoSession session, IoBuffer buffer, ProtocolDecoderOutput decoderOutput)
throws Exception {
CharsetDecoder de = Charset.forName("utf-8").newDecoder();

/**
* 这里,如果长度不够20字节,就认为这条消息还没有累积完成
*/
if(buffer.remaining() < 20){
/**
*		停止调用decode,但如果还有数据没有读取完,将含有剩余数据的IoBuffer保存到IoSession中,
*	当有新数据包来的时候再和新的合并后再调用decoder解码
*		注意:没有消费任何数据时不能返回true,否则会抛出异常
*/
return false;

}else{

int length = buffer.getInt();

MyMsg msg = new MyMsg();

msg.setSender(buffer.getLong());
msg.setReceiver(buffer.getLong());

//	注意:20 = 消息长度的字节 + 发送人和接收人的字节
msg.setContent(buffer.getString(length - 20, de));

decoderOutput.write(msg);

/**
*	CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来继续解码
*/
return true;
}
}

}
    3、编解码工厂

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

/**
*	编解码工厂
*/
public class MyCoderFactory implements ProtocolCodecFactory{

private MyDecoder decoder;
private MyEncoder encoder;

public MyCoderFactory() {
this.decoder = new MyDecoder();
this.encoder = new MyEncoder();
}

@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return decoder;
}

@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return encoder;
}

}
三、服务器端

    1、运行类

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class ServerMain {

public static void main(String[] args) throws IOException {

//	在服务器端创建一个监听连接的接收器 基于tcp/ip
IoAcceptor acceptor = new NioSocketAcceptor();

//	绑定的端口
SocketAddress address = new InetSocketAddress("localhost", 8888);

//	 获取过滤器链
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();

//	添加日志过滤器
chain.addLast("logger", new LoggingFilter());

// 配置自定义的编解码器 
chain.addLast("mycodec", new ProtocolCodecFilter(new MyCoderFactory()));

//	添加数据处理的处理器
acceptor.setHandler(new ServerHandler());

//	进行配置信息的设置
acceptor.getSessionConfig().setReadBufferSize(100);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

//	绑定服务器端口 
acceptor.bind(address);

System.out.println("服务器开始在 8888 端口监听.......");

}
}
    2、Handler

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class ServerHandler extends IoHandlerAdapter{

/**
*  接受消息
*/
public void messageReceived(IoSession session, Object message)
throws Exception {
MyMsg msg = (MyMsg) message;
System.out.println("服务器端接收到的消息:" + msg);

//	返回一个消息给客户端
msg = new MyMsg(10000L, 10001L, "你好,这是来自服务器的应答!");
session.write(msg);
}

/**
*  发送消息
*/
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println("服务器端开始发送消息...");
super.messageSent(session, message);
}

/**
*  会话开启
*/
public void sessionOpened(IoSession session) throws Exception {
System.out.println("服务端会话已打开...");
super.sessionOpened(session);
}

/**
* 	异常处理
*/
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
System.out.println("服务端发生异常了...");
cause.printStackTrace();
}

/**
* 	会话关闭
*/
public void sessionClosed(IoSession session) throws Exception {
System.out.println("服务端会话已关闭...");
super.sessionClosed(session);
}

/**
* 	会话创建
*/
public void sessionCreated(IoSession session) throws Exception {
System.out.println("服务端会话已创建...");
super.sessionCreated(session);
}

/**
* 	连接空闲
*/
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
System.out.println("服务端连接空闲中...");
super.sessionIdle(session, status);
}
}
四、客户端

    1、连接类

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class ClientMain {
public static void main(String[] args) {

//	创建客户端连接器 基于tcp/ip
NioSocketConnector connector = new NioSocketConnector();

//	连接的地址和端口
SocketAddress address = new InetSocketAddress("localhost",8888);

//	获取过滤器链
DefaultIoFilterChainBuilder chain = connector.getFilterChain();

//	配置日志过滤器和自定义编解码器
chain.addLast("logger", new LoggingFilter());
chain.addLast("mycodec",new ProtocolCodecFilter(new MyCoderFactory()));

//	添加处理器
connector.setHandler(new ClientHandler());

// 连接到服务器 
ConnectFuture future = connector.connect(address);

//	等待连接创建完成
future.awaitUninterruptibly();

//	会话创建后发送消息到服务器
MyMsg msg = new MyMsg(10001L, 10000L, "你好,这是来自客户端的请求!");
future.getSession().write(msg);

//	等待28000毫秒后连接断开
future.getSession().getCloseFuture().awaitUninterruptibly(28000);

//	关闭连接
connector.dispose();

}
}
    2、Handler

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

public class ClientHandler extends IoHandlerAdapter{

/**
* 	接收消息
*/
public void messageReceived(IoSession session, Object message)
throws Exception {
System.out.println("客户端接收到的消息:" + (MyMsg)message);
}

/**
* 	发送消息
*/
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println("客户端发送消息...");
super.messageSent(session, message);

}

/**
* 	会话创建
*/
public void sessionOpened(IoSession session) throws Exception {

System.out.println("客户端已经连接到了服务器...");

}

/**
* 	会话关闭
*/
public void sessionClosed(IoSession session) throws Exception {
System.out.println("连接关闭...");
super.sessionClosed(session);
}

}

五、结果

    1、服务器端

        


    2、客户端

        


六、资源

    http://download.csdn.net/detail/u013379717/7574675

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: