Mina自定义协议-实现数据交互
2016-06-20 11:12
323 查看
本文主要现实mina的自定义协议,并且实现服务器和客户端的简单数据交互。
"mina协议的自定义"可参考本博Mina相关文章。
正题,所需要的基础类:
抽象协议类
请求协议
响应协议
(需要定制自己的协议格式)
协议编码解码工厂
协议编码
协议解码
客户端
客户端Handler
服务器
服务器Handler
Java代码
/**
* 消息协议
*
* @author Simple
*
*/
public abstract class JAbsMessageProtocal {
public abstract byte getTag();// 消息协议类型 请求/响应
public abstract int getLength();// 消息协议数据长度
}
/**
* 报头:
* short tag:请求/响应
* int length:数据长度
* 报体:
* short methodCode:功能函数
* byte resultCode:结果码
* String content:数据内容
*/
Java代码
/**
* 消息协议-请求
*
* @author Simple
*
*/
public class JMessageProtocalReq extends JAbsMessageProtocal {
private short functionCode;// 功能代码
private String content;// 请求内容
@Override
public int getLength() {
return 2 + (content == null ? 0 : content.getBytes().length);
}
@Override
public byte getTag() {
return JConstant.REQ;
}
public void setFunctionCode(short functionCode) {
this.functionCode=functionCode;
}
public short getFunctionCode() {
return functionCode;
}
public void setContent(String content) {
this.content=content;
}
public String getContent() {
return content;
}
@Override
public String toString() {
return "JMessageProtocalReq [content=" + content + ", functionCode=" + functionCode + ", getLength()=" + getLength()
+ ", getTag()=" + getTag() + "]";
}
}
Java代码
/**
* 消息协议-响应
*
* @author Simple
*
*/
public class JMessageProtocalRes extends JAbsMessageProtocal {
private byte resultCode;// 结果码
private String content;// 响应内容
@Override
public int getLength() {
return 1 + (getContent() == null ? 0 : getContent().getBytes().length);
}
@Override
public byte getTag() {
return JConstant.RES;
}
public void setResultCode(byte resultCode) {
this.resultCode=resultCode;
}
public byte getResultCode() {
return resultCode;
}
public void setContent(String content) {
this.content=content;
}
public String getContent() {
return content;
}
@Override
public String toString() {
return "JMessageProtocalRes [content=" + content + ", resultCode=" + resultCode + ", getLength()=" + getLength()
+ ", getTag()=" + getTag() + "]";
}
}
Java代码
/**
* JMessageProtocal解码编码工厂
*
* @author Simple
*
*/
public class JMessageProtocalCodecFactory implements ProtocolCodecFactory {
private final JMessageProtocalDecoder decoder;
private final JMessageProtocalEncoder encoder;
public JMessageProtocalCodecFactory(Charset charset) {
this.decoder=new JMessageProtocalDecoder(charset);
this.encoder=new JMessageProtocalEncoder(charset);
}
public ProtocolDecoder getDecoder(IoSession paramIoSession) throws Exception {
return decoder;
}
public ProtocolEncoder getEncoder(IoSession paramIoSession) throws Exception {
return encoder;
}
}
Java代码
/**
* JMessageProtocal解码
* @author Simple
*
*/
public class JMessageProtocalDecoder extends ProtocolDecoderAdapter {
private Logger log=Logger.getLogger(JMessageProtocalDecoder.class);
private Charset charset;
public JMessageProtocalDecoder(Charset charset) {
this.charset=charset;
}
/**
* 解码
*/
public void decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception {
JAbsMessageProtocal absMP=null;
// 获取协议tag
byte tag=buf.get();
// 获取协议体长度
int length=buf.getInt();
// 取出协议体
byte[] bodyData=new byte[length];
buf.get(bodyData);
// 为解析数据做准备
// 检测协议
IoBuffer tempBuf=IoBuffer.allocate(100).setAutoExpand(true);
tempBuf.put(tag);
tempBuf.putInt(length);
tempBuf.put(bodyData);
tempBuf.flip();
if(!canDecode(tempBuf)) {
return;
}
// 协议体buf
IoBuffer bodyBuf=IoBuffer.allocate(100).setAutoExpand(true);
bodyBuf.put(bodyData);
bodyBuf.flip();
// 整个协议buf
IoBuffer allBuf=IoBuffer.allocate(100).setAutoExpand(true);
allBuf.put(tag);
allBuf.putInt(length);
allBuf.put(bodyData);
allBuf.flip();
//
if(tag == JConstant.REQ) {
JMessageProtocalReq req=new JMessageProtocalReq();
short functionCode=bodyBuf.getShort();
String content=bodyBuf.getString(charset.newDecoder());
req.setFunctionCode(functionCode);
req.setContent(content);
absMP=req;
} else if(tag == JConstant.RES) {
JMessageProtocalRes res=new JMessageProtocalRes();
byte resultCode=bodyBuf.get();
String content=bodyBuf.getString(charset.newDecoder());
res.setResultCode(resultCode);
res.setContent(content);
absMP=res;
} else {
log.error("未定义的Tag");
}
out.write(absMP);
}
// 是否可以解码
private boolean canDecode(IoBuffer buf) {
int protocalHeadLength=5;// 协议头长度
int remaining=buf.remaining();
if(remaining < protocalHeadLength) {
log.error("错误,协议不完整,协议头长度小于" + protocalHeadLength);
return false;
} else {
log.debug("协议完整");
// 获取协议tag
byte tag=buf.get();
if(tag == JConstant.REQ || tag == JConstant.RES) {
log.debug("Tag=" + tag);
} else {
log.error("错误,未定义的Tag类型");
return false;
}
// 获取协议体长度
int length=buf.getInt();
if(buf.remaining() < length) {
log.error("错误,真实协议体长度小于消息头中取得的值");
return false;
} else {
log.debug("真实协议体长度:" + buf.remaining() + " = 消息头中取得的值:" + length);
}
}
return true;
}
}
Java代码
/**
* JMessageProtocal编码
* @author Simple
*
*/
public class JMessageProtocalEncoder extends ProtocolEncoderAdapter {
private Charset charset;
public JMessageProtocalEncoder(Charset charset) {
this.charset=charset;
}
/**
* 编码
*/
public void encode(IoSession session, Object object, ProtocolEncoderOutput out) throws Exception {
// new buf
IoBuffer buf=IoBuffer.allocate(2048).setAutoExpand(true);
// object --> AbsMP
JAbsMessageProtocal absMp=(JAbsMessageProtocal)object;
buf.put(absMp.getTag());
buf.putInt(absMp.getLength());
if(object instanceof JMessageProtocalReq) {// 请求协议
JMessageProtocalReq mpReq=(JMessageProtocalReq)object;
buf.putShort(mpReq.getFunctionCode());
buf.putString(mpReq.getContent(), charset.newEncoder());
} else if(object instanceof JMessageProtocalRes) {// 响应协议
JMessageProtocalRes mpRes=(JMessageProtocalRes)object;
buf.put(mpRes.getResultCode());
buf.putString(mpRes.getContent(), charset.newEncoder());
}
buf.flip();
out.write(buf);
}
}
Java代码
/**
* MINA 客户端
*
* @author Simple
*
*/
public class MainClient {
@SuppressWarnings("unused")
private static Logger log=Logger.getLogger(MainClient.class);
private static final int PORT=9999;
public static void main(String[] args) {
NioSocketConnector connector=new NioSocketConnector();
DefaultIoFilterChainBuilder chain=connector.getFilterChain();
chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
connector.setHandler(new MinaClientHandler());
connector.setConnectTimeoutMillis(3000);
ConnectFuture cf=connector.connect(new InetSocketAddress("localhost", PORT));
cf.awaitUninterruptibly();// 等待连接创建完成
JMessageProtocalReq req=new JMessageProtocalReq();
req.setFunctionCode((short)1);
req.setContent("hello world!!!");
cf.getSession().write(req);
cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开
connector.dispose();
}
}
Java代码
/**
* MINA 客户端消息处理
*
* @author Simple
*
*/
public class MinaClientHandler extends IoHandlerAdapter {
private Logger log=Logger.getLogger(MinaClientHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
log.error(String.format("Client产生异常!"));
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
log.debug(String.format("来自Server[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.debug(String.format("向Server[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]断开连接!", session.getRemoteAddress()));
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]建立连接!", session.getRemoteAddress()));
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]打开连接!", session.getRemoteAddress()));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
log.debug(String.format("Client进入空闲状态!"));
}
}
Java代码
/**
* MINA 服务器
*
* @author Simple
*
*/
public class MainServer {
private static Logger log=Logger.getLogger(MainServer.class);
private static final int PORT=9999;
public static void main(String[] args) throws Exception {
SocketAcceptor acceptor=new NioSocketAcceptor();// tcp/ip 接收者
DefaultIoFilterChainBuilder chain=acceptor.getFilterChain();// 过滤管道
chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10s内无操作进入空闲状态
acceptor.setHandler(new MinaServerHandler());// 设置handler
acceptor.bind(new InetSocketAddress(PORT));// 设置端口
log.debug(String.format("Server Listing on %s", PORT));
}
}
Java代码
/**
* MINA 服务器消息处理
*
* @author Simple
*
*/
public class MinaServerHandler extends IoHandlerAdapter {
private Logger log=Logger.getLogger(MinaServerHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
log.error(String.format("Server产生异常!"));
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
log.debug(String.format("来自Client[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.debug(String.format("向Client[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server断开连接!", session.getRemoteAddress()));
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server建立连接!", session.getRemoteAddress()));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
log.debug(String.format("Server进入空闲状态!"));
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server打开连接!", session.getRemoteAddress()));
}
}
"mina协议的自定义"可参考本博Mina相关文章。
正题,所需要的基础类:
抽象协议类
请求协议
响应协议
(需要定制自己的协议格式)
协议编码解码工厂
协议编码
协议解码
客户端
客户端Handler
服务器
服务器Handler
Java代码
/**
* 消息协议
*
* @author Simple
*
*/
public abstract class JAbsMessageProtocal {
public abstract byte getTag();// 消息协议类型 请求/响应
public abstract int getLength();// 消息协议数据长度
}
/**
* 报头:
* short tag:请求/响应
* int length:数据长度
* 报体:
* short methodCode:功能函数
* byte resultCode:结果码
* String content:数据内容
*/
Java代码
/**
* 消息协议-请求
*
* @author Simple
*
*/
public class JMessageProtocalReq extends JAbsMessageProtocal {
private short functionCode;// 功能代码
private String content;// 请求内容
@Override
public int getLength() {
return 2 + (content == null ? 0 : content.getBytes().length);
}
@Override
public byte getTag() {
return JConstant.REQ;
}
public void setFunctionCode(short functionCode) {
this.functionCode=functionCode;
}
public short getFunctionCode() {
return functionCode;
}
public void setContent(String content) {
this.content=content;
}
public String getContent() {
return content;
}
@Override
public String toString() {
return "JMessageProtocalReq [content=" + content + ", functionCode=" + functionCode + ", getLength()=" + getLength()
+ ", getTag()=" + getTag() + "]";
}
}
Java代码
/**
* 消息协议-响应
*
* @author Simple
*
*/
public class JMessageProtocalRes extends JAbsMessageProtocal {
private byte resultCode;// 结果码
private String content;// 响应内容
@Override
public int getLength() {
return 1 + (getContent() == null ? 0 : getContent().getBytes().length);
}
@Override
public byte getTag() {
return JConstant.RES;
}
public void setResultCode(byte resultCode) {
this.resultCode=resultCode;
}
public byte getResultCode() {
return resultCode;
}
public void setContent(String content) {
this.content=content;
}
public String getContent() {
return content;
}
@Override
public String toString() {
return "JMessageProtocalRes [content=" + content + ", resultCode=" + resultCode + ", getLength()=" + getLength()
+ ", getTag()=" + getTag() + "]";
}
}
Java代码
/**
* JMessageProtocal解码编码工厂
*
* @author Simple
*
*/
public class JMessageProtocalCodecFactory implements ProtocolCodecFactory {
private final JMessageProtocalDecoder decoder;
private final JMessageProtocalEncoder encoder;
public JMessageProtocalCodecFactory(Charset charset) {
this.decoder=new JMessageProtocalDecoder(charset);
this.encoder=new JMessageProtocalEncoder(charset);
}
public ProtocolDecoder getDecoder(IoSession paramIoSession) throws Exception {
return decoder;
}
public ProtocolEncoder getEncoder(IoSession paramIoSession) throws Exception {
return encoder;
}
}
Java代码
/**
* JMessageProtocal解码
* @author Simple
*
*/
public class JMessageProtocalDecoder extends ProtocolDecoderAdapter {
private Logger log=Logger.getLogger(JMessageProtocalDecoder.class);
private Charset charset;
public JMessageProtocalDecoder(Charset charset) {
this.charset=charset;
}
/**
* 解码
*/
public void decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception {
JAbsMessageProtocal absMP=null;
// 获取协议tag
byte tag=buf.get();
// 获取协议体长度
int length=buf.getInt();
// 取出协议体
byte[] bodyData=new byte[length];
buf.get(bodyData);
// 为解析数据做准备
// 检测协议
IoBuffer tempBuf=IoBuffer.allocate(100).setAutoExpand(true);
tempBuf.put(tag);
tempBuf.putInt(length);
tempBuf.put(bodyData);
tempBuf.flip();
if(!canDecode(tempBuf)) {
return;
}
// 协议体buf
IoBuffer bodyBuf=IoBuffer.allocate(100).setAutoExpand(true);
bodyBuf.put(bodyData);
bodyBuf.flip();
// 整个协议buf
IoBuffer allBuf=IoBuffer.allocate(100).setAutoExpand(true);
allBuf.put(tag);
allBuf.putInt(length);
allBuf.put(bodyData);
allBuf.flip();
//
if(tag == JConstant.REQ) {
JMessageProtocalReq req=new JMessageProtocalReq();
short functionCode=bodyBuf.getShort();
String content=bodyBuf.getString(charset.newDecoder());
req.setFunctionCode(functionCode);
req.setContent(content);
absMP=req;
} else if(tag == JConstant.RES) {
JMessageProtocalRes res=new JMessageProtocalRes();
byte resultCode=bodyBuf.get();
String content=bodyBuf.getString(charset.newDecoder());
res.setResultCode(resultCode);
res.setContent(content);
absMP=res;
} else {
log.error("未定义的Tag");
}
out.write(absMP);
}
// 是否可以解码
private boolean canDecode(IoBuffer buf) {
int protocalHeadLength=5;// 协议头长度
int remaining=buf.remaining();
if(remaining < protocalHeadLength) {
log.error("错误,协议不完整,协议头长度小于" + protocalHeadLength);
return false;
} else {
log.debug("协议完整");
// 获取协议tag
byte tag=buf.get();
if(tag == JConstant.REQ || tag == JConstant.RES) {
log.debug("Tag=" + tag);
} else {
log.error("错误,未定义的Tag类型");
return false;
}
// 获取协议体长度
int length=buf.getInt();
if(buf.remaining() < length) {
log.error("错误,真实协议体长度小于消息头中取得的值");
return false;
} else {
log.debug("真实协议体长度:" + buf.remaining() + " = 消息头中取得的值:" + length);
}
}
return true;
}
}
Java代码
/**
* JMessageProtocal编码
* @author Simple
*
*/
public class JMessageProtocalEncoder extends ProtocolEncoderAdapter {
private Charset charset;
public JMessageProtocalEncoder(Charset charset) {
this.charset=charset;
}
/**
* 编码
*/
public void encode(IoSession session, Object object, ProtocolEncoderOutput out) throws Exception {
// new buf
IoBuffer buf=IoBuffer.allocate(2048).setAutoExpand(true);
// object --> AbsMP
JAbsMessageProtocal absMp=(JAbsMessageProtocal)object;
buf.put(absMp.getTag());
buf.putInt(absMp.getLength());
if(object instanceof JMessageProtocalReq) {// 请求协议
JMessageProtocalReq mpReq=(JMessageProtocalReq)object;
buf.putShort(mpReq.getFunctionCode());
buf.putString(mpReq.getContent(), charset.newEncoder());
} else if(object instanceof JMessageProtocalRes) {// 响应协议
JMessageProtocalRes mpRes=(JMessageProtocalRes)object;
buf.put(mpRes.getResultCode());
buf.putString(mpRes.getContent(), charset.newEncoder());
}
buf.flip();
out.write(buf);
}
}
Java代码
/**
* MINA 客户端
*
* @author Simple
*
*/
public class MainClient {
@SuppressWarnings("unused")
private static Logger log=Logger.getLogger(MainClient.class);
private static final int PORT=9999;
public static void main(String[] args) {
NioSocketConnector connector=new NioSocketConnector();
DefaultIoFilterChainBuilder chain=connector.getFilterChain();
chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
connector.setHandler(new MinaClientHandler());
connector.setConnectTimeoutMillis(3000);
ConnectFuture cf=connector.connect(new InetSocketAddress("localhost", PORT));
cf.awaitUninterruptibly();// 等待连接创建完成
JMessageProtocalReq req=new JMessageProtocalReq();
req.setFunctionCode((short)1);
req.setContent("hello world!!!");
cf.getSession().write(req);
cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开
connector.dispose();
}
}
Java代码
/**
* MINA 客户端消息处理
*
* @author Simple
*
*/
public class MinaClientHandler extends IoHandlerAdapter {
private Logger log=Logger.getLogger(MinaClientHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
log.error(String.format("Client产生异常!"));
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
log.debug(String.format("来自Server[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.debug(String.format("向Server[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]断开连接!", session.getRemoteAddress()));
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]建立连接!", session.getRemoteAddress()));
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]打开连接!", session.getRemoteAddress()));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
log.debug(String.format("Client进入空闲状态!"));
}
}
Java代码
/**
* MINA 服务器
*
* @author Simple
*
*/
public class MainServer {
private static Logger log=Logger.getLogger(MainServer.class);
private static final int PORT=9999;
public static void main(String[] args) throws Exception {
SocketAcceptor acceptor=new NioSocketAcceptor();// tcp/ip 接收者
DefaultIoFilterChainBuilder chain=acceptor.getFilterChain();// 过滤管道
chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10s内无操作进入空闲状态
acceptor.setHandler(new MinaServerHandler());// 设置handler
acceptor.bind(new InetSocketAddress(PORT));// 设置端口
log.debug(String.format("Server Listing on %s", PORT));
}
}
Java代码
/**
* MINA 服务器消息处理
*
* @author Simple
*
*/
public class MinaServerHandler extends IoHandlerAdapter {
private Logger log=Logger.getLogger(MinaServerHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
log.error(String.format("Server产生异常!"));
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
log.debug(String.format("来自Client[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.debug(String.format("向Client[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server断开连接!", session.getRemoteAddress()));
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server建立连接!", session.getRemoteAddress()));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
log.debug(String.format("Server进入空闲状态!"));
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server打开连接!", session.getRemoteAddress()));
}
}
相关文章推荐
- 自定义Lucene分词器示例
- Spark 应用程序调优
- Fragment中使用地图切换时会闪一下的处理方案
- 二叉树------由前序和中序求后序
- jQuery获取多种input值的简单实现方法
- DevExpress Grid中实现点击Detail获得Master rowHandle的方法
- C++合成默认构造函数的真相
- 从今日起开始在CSDN上编写自己的博客
- hadoop1.x 和 hadoop 2.x 区别, 2.5 2.6 2.7之间的区别
- 【QTP】如何改进QTP性能
- Rails 5 有什么新特性?
- Akka-remote使用入门
- C# Repeater 嵌套
- android studio无法关联源码
- 1、java面试
- rsync+inotify
- selenium webdriver 杂记 - 自定义Table操作类(代码展示)
- 自定义View之常用工具类汇总
- RPC框架
- jQuery中prop()和attr()的区别