您的位置:首页 > 其它

Mina自定义协议-实现数据交互

2016-06-20 11:12 323 查看
本文主要现实mina的自定义协议,并且实现服务器和客户端的简单数据交互。

 

"mina协议的自定义"可参考本博Mina相关文章。

 

正题,所需要的基础类:

抽象协议类

请求协议

响应协议

(需要定制自己的协议格式)

 

协议编码解码工厂

协议编码

协议解码

 

客户端

客户端Handler

 

服务器

服务器Handler

 

 

Java代码  


/** 

 * 消息协议 

 *  

 * @author Simple 

 *  

 */  

public abstract class JAbsMessageProtocal {  

  

  public abstract byte getTag();// 消息协议类型  请求/响应  

  

  public abstract int getLength();// 消息协议数据长度  

}  

  

/** 

 * 报头: 

 *    short tag:请求/响应 

 *    int   length:数据长度 

 * 报体: 

 *    short   methodCode:功能函数 

 *    byte    resultCode:结果码 

 *    String  content:数据内容 

 */  

 

 

 

Java代码  


/** 

 * 消息协议-请求 

 *  

 * @author Simple 

 *  

 */  

public class JMessageProtocalReq extends JAbsMessageProtocal {  

  

  private short functionCode;// 功能代码  

  

  private String content;// 请求内容  

  

  @Override  

  public int getLength() {  

    return 2 + (content == null ? 0 : content.getBytes().length);  

  }  

  

  @Override  

  public byte getTag() {  

    return JConstant.REQ;  

  }  

  

  public void setFunctionCode(short functionCode) {  

    this.functionCode=functionCode;  

  }  

  

  public short getFunctionCode() {  

    return functionCode;  

  }  

  

  public void setContent(String content) {  

    this.content=content;  

  }  

  

  public String getContent() {  

    return content;  

  }  

  

  @Override  

  public String toString() {  

    return "JMessageProtocalReq [content=" + content + ", functionCode=" + functionCode + ", getLength()=" + getLength()  

      + ", getTag()=" + getTag() + "]";  

  }  

}  

 

 

 

Java代码  


/** 

 * 消息协议-响应 

 *  

 * @author Simple 

 *  

 */  

public class JMessageProtocalRes extends JAbsMessageProtocal {  

  

  private byte resultCode;// 结果码  

  

  private String content;// 响应内容  

  

  @Override  

  public int getLength() {  

    return 1 + (getContent() == null ? 0 : getContent().getBytes().length);  

  }  

  

  @Override  

  public byte getTag() {  

    return JConstant.RES;  

  }  

  

  public void setResultCode(byte resultCode) {  

    this.resultCode=resultCode;  

  }  

  

  public byte getResultCode() {  

    return resultCode;  

  }  

  

  public void setContent(String content) {  

    this.content=content;  

  }  

  

  public String getContent() {  

    return content;  

  }  

  

  @Override  

  public String toString() {  

    return "JMessageProtocalRes [content=" + content + ", resultCode=" + resultCode + ", getLength()=" + getLength()  

      + ", getTag()=" + getTag() + "]";  

  }  

}  

 

Java代码  


/** 

 * JMessageProtocal解码编码工厂 

 *  

 * @author Simple 

 *  

 */  

public class JMessageProtocalCodecFactory implements ProtocolCodecFactory {  

  

  private final JMessageProtocalDecoder decoder;  

  

  private final JMessageProtocalEncoder encoder;  

  

  public JMessageProtocalCodecFactory(Charset charset) {  

    this.decoder=new JMessageProtocalDecoder(charset);  

    this.encoder=new JMessageProtocalEncoder(charset);  

  }  

  

  public ProtocolDecoder getDecoder(IoSession paramIoSession) throws Exception {  

    return decoder;  

  }  

  

  public ProtocolEncoder getEncoder(IoSession paramIoSession) throws Exception {  

    return encoder;  

  }  

}  

 

Java代码  


/** 

 * JMessageProtocal解码 

 * @author Simple 

 * 

 */  

public class JMessageProtocalDecoder extends ProtocolDecoderAdapter {  

  

  private Logger log=Logger.getLogger(JMessageProtocalDecoder.class);  

  

  private Charset charset;  

  

  public JMessageProtocalDecoder(Charset charset) {  

    this.charset=charset;  

  }  

  

  /** 

   * 解码 

   */  

  public void decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception {  

    JAbsMessageProtocal absMP=null;  

    // 获取协议tag  

    byte tag=buf.get();  

    // 获取协议体长度  

    int length=buf.getInt();  

    // 取出协议体  

    byte[] bodyData=new byte[length];  

    buf.get(bodyData);  

    // 为解析数据做准备  

    // 检测协议  

    IoBuffer tempBuf=IoBuffer.allocate(100).setAutoExpand(true);  

    tempBuf.put(tag);  

    tempBuf.putInt(length);  

    tempBuf.put(bodyData);  

    tempBuf.flip();  

    if(!canDecode(tempBuf)) {  

      return;  

    }  

    // 协议体buf  

    IoBuffer bodyBuf=IoBuffer.allocate(100).setAutoExpand(true);  

    bodyBuf.put(bodyData);  

    bodyBuf.flip();  

    // 整个协议buf  

    IoBuffer allBuf=IoBuffer.allocate(100).setAutoExpand(true);  

    allBuf.put(tag);  

    allBuf.putInt(length);  

    allBuf.put(bodyData);  

    allBuf.flip();  

    //  

    if(tag == JConstant.REQ) {  

      JMessageProtocalReq req=new JMessageProtocalReq();  

      short functionCode=bodyBuf.getShort();  

      String content=bodyBuf.getString(charset.newDecoder());  

      req.setFunctionCode(functionCode);  

      req.setContent(content);  

      absMP=req;  

    } else if(tag == JConstant.RES) {  

      JMessageProtocalRes res=new JMessageProtocalRes();  

      byte resultCode=bodyBuf.get();  

      String content=bodyBuf.getString(charset.newDecoder());  

      res.setResultCode(resultCode);  

      res.setContent(content);  

      absMP=res;  

    } else {  

      log.error("未定义的Tag");  

    }  

    out.write(absMP);  

  }  

  

  // 是否可以解码  

  private boolean canDecode(IoBuffer buf) {  

    int protocalHeadLength=5;// 协议头长度  

    int remaining=buf.remaining();  

    if(remaining < protocalHeadLength) {  

      log.error("错误,协议不完整,协议头长度小于" + protocalHeadLength);  

      return false;  

    } else {  

      log.debug("协议完整");  

      // 获取协议tag  

      byte tag=buf.get();  

      if(tag == JConstant.REQ || tag == JConstant.RES) {  

        log.debug("Tag=" + tag);  

      } else {  

        log.error("错误,未定义的Tag类型");  

        return false;  

      }  

      // 获取协议体长度  

      int length=buf.getInt();  

      if(buf.remaining() < length) {  

        log.error("错误,真实协议体长度小于消息头中取得的值");  

        return false;  

      } else {  

        log.debug("真实协议体长度:" + buf.remaining() + " = 消息头中取得的值:" + length);  

      }  

    }  

    return true;  

  }  

}  

 

Java代码  


/** 

 * JMessageProtocal编码 

 * @author Simple 

 * 

 */  

public class JMessageProtocalEncoder extends ProtocolEncoderAdapter {  

  

  private Charset charset;  

  

  public JMessageProtocalEncoder(Charset charset) {  

    this.charset=charset;  

  }  

  

  /** 

   * 编码 

   */  

  public void encode(IoSession session, Object object, ProtocolEncoderOutput out) throws Exception {  

    // new buf  

    IoBuffer buf=IoBuffer.allocate(2048).setAutoExpand(true);  

    // object --> AbsMP  

    JAbsMessageProtocal absMp=(JAbsMessageProtocal)object;  

    buf.put(absMp.getTag());  

    buf.putInt(absMp.getLength());  

    if(object instanceof JMessageProtocalReq) {// 请求协议  

      JMessageProtocalReq mpReq=(JMessageProtocalReq)object;  

      buf.putShort(mpReq.getFunctionCode());  

      buf.putString(mpReq.getContent(), charset.newEncoder());  

    } else if(object instanceof JMessageProtocalRes) {// 响应协议  

      JMessageProtocalRes mpRes=(JMessageProtocalRes)object;  

      buf.put(mpRes.getResultCode());  

      buf.putString(mpRes.getContent(), charset.newEncoder());  

    }  

    buf.flip();  

    out.write(buf);  

  }  

}  

 

Java代码  


/** 

 * MINA 客户端 

 *  

 * @author Simple 

 *  

 */  

public class MainClient {  

  

  @SuppressWarnings("unused")  

  private static Logger log=Logger.getLogger(MainClient.class);  

  

  private static final int PORT=9999;  

  

  public static void main(String[] args) {  

    NioSocketConnector connector=new NioSocketConnector();  

    DefaultIoFilterChainBuilder chain=connector.getFilterChain();  

    chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));  

    connector.setHandler(new MinaClientHandler());  

    connector.setConnectTimeoutMillis(3000);  

    ConnectFuture cf=connector.connect(new InetSocketAddress("localhost", PORT));  

    cf.awaitUninterruptibly();// 等待连接创建完成  

    JMessageProtocalReq req=new JMessageProtocalReq();  

    req.setFunctionCode((short)1);  

    req.setContent("hello world!!!");  

    cf.getSession().write(req);  

    cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开  

    connector.dispose();  

  }  

}  

 

Java代码  


/** 

 * MINA 客户端消息处理 

 *  

 * @author Simple 

 *  

 */  

public class MinaClientHandler extends IoHandlerAdapter {  

  

  private Logger log=Logger.getLogger(MinaClientHandler.class);  

  

  @Override  

  public void exceptionCaught(IoSession session, Throwable cause) throws Exception {  

    log.error(String.format("Client产生异常!"));  

  }  

  

  @Override  

  public void messageReceived(IoSession session, Object message) throws Exception {  

    log.debug(String.format("来自Server[%s]的消息:%s", session.getRemoteAddress(), message.toString()));  

  }  

  

  @Override  

  public void messageSent(IoSession session, Object message) throws Exception {  

    log.debug(String.format("向Server[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));  

  }  

  

  @Override  

  public void sessionClosed(IoSession session) throws Exception {  

    log.debug(String.format("与Server[%s]断开连接!", session.getRemoteAddress()));  

  }  

  

  @Override  

  public void sessionCreated(IoSession session) throws Exception {  

    log.debug(String.format("与Server[%s]建立连接!", session.getRemoteAddress()));  

  }  

  

  @Override  

  public void sessionOpened(IoSession session) throws Exception {  

    log.debug(String.format("与Server[%s]打开连接!", session.getRemoteAddress()));  

  }  

  

  @Override  

  public void sessionIdle(IoSession session, IdleStatus status) throws Exception {  

    log.debug(String.format("Client进入空闲状态!"));  

  }  

}  

 

Java代码  


/** 

 * MINA 服务器 

 *  

 * @author Simple 

 *  

 */  

public class MainServer {  

  

  private static Logger log=Logger.getLogger(MainServer.class);  

  

  private static final int PORT=9999;  

  

  public static void main(String[] args) throws Exception {  

    SocketAcceptor acceptor=new NioSocketAcceptor();// tcp/ip 接收者  

    DefaultIoFilterChainBuilder chain=acceptor.getFilterChain();// 过滤管道  

    chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));  

    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10s内无操作进入空闲状态  

    acceptor.setHandler(new MinaServerHandler());// 设置handler  

    acceptor.bind(new InetSocketAddress(PORT));// 设置端口  

    log.debug(String.format("Server Listing on %s", PORT));  

  }  

}  

 

Java代码  


/** 

 * MINA 服务器消息处理 

 *  

 * @author Simple 

 *  

 */  

public class MinaServerHandler extends IoHandlerAdapter {  

  

  private Logger log=Logger.getLogger(MinaServerHandler.class);  

  

  @Override  

  public void exceptionCaught(IoSession session, Throwable cause) throws Exception {  

    log.error(String.format("Server产生异常!"));  

  }  

  

  @Override  

  public void messageReceived(IoSession session, Object message) throws Exception {  

    log.debug(String.format("来自Client[%s]的消息:%s", session.getRemoteAddress(), message.toString()));  

  }  

  

  @Override  

  public void messageSent(IoSession session, Object message) throws Exception {  

    log.debug(String.format("向Client[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));  

  }  

  

  @Override  

  public void sessionClosed(IoSession session) throws Exception {  

    log.debug(String.format("Client[%s]与Server断开连接!", session.getRemoteAddress()));  

  }  

  

  @Override  

  public void sessionCreated(IoSession session) throws Exception {  

    log.debug(String.format("Client[%s]与Server建立连接!", session.getRemoteAddress()));  

  }  

  

  @Override  

  public void sessionIdle(IoSession session, IdleStatus status) throws Exception {  

    log.debug(String.format("Server进入空闲状态!"));  

  }  

  

  @Override  

  public void sessionOpened(IoSession session) throws Exception {  

    log.debug(String.format("Client[%s]与Server打开连接!", session.getRemoteAddress()));  

  }  

}  

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: