您的位置:首页 > 运维架构 > 网站架构

架构师入门笔记十一 Netty5编解码

2017-10-15 11:39 218 查看
架构师入门笔记十一 Netty5编解码

1 基础知识

1.1 什么是编解码技术

编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
解码(Decode)称为反序列化(deserialization),把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。
进行远程跨进程服务调用时,需要使用特定的编解码技术,对需要进行网络传输的对象做编码或者解码,以便完成远程调用。这样理解:我们通过api调用A平台的接口,A平台返回给我们一个Json格数的数据(或者是xml格式数据)。我们再解析Json数据转换成需要的格式。

1.2 为什么要用编解码技术

因为java的序列化存在很多缺点,比如
1 无法跨语言(最为致命的问题,因为java的序列化是java语言内部的私有协议,其他语言并不支持),
2 序列化后码流太大(采用二进制编解码技术要比java原生的序列化技术强),
3 序列化性能太低等

1.3 有那些主流的编解码框架

1.3.1 google 的 Protobuf 

它由谷歌开源而来。它将数据结构以 .proto 文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性。
特点如下:
1) 结构化数据存储格式(XML,JSON等);
2) 高效的编解码性能;
3) 语言无关、平台无关、扩展性好;
4) 官方支持Java、C++和Python三种语言。

1.3.2 JBoss Marshalling

JBoss Marshalling是一个Java对象的序列化API包,修正了JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容;同时增加了一些可调的参数和附加的特性,并且这些参数和特性可通过工厂类进行配置。
相比于传统的Java序列化机制,它的优点如下:
1) 可插拔的类解析器,提供更加便捷的类加载定制策略,通过一个接口即可实现定制;
2) 可插拔的对象替换技术,不需要通过继承的方式;
3) 可插拔的预定义类缓存表,可以减小序列化的字节数组长度,提升常用类型的对象序列化性能;
4) 无须实现java.io.Serializable接口,即可实现Java序列化;
5) 通过缓存技术提升对象的序列化性能。

1.3.3 MessagePack 框架

MessagePack是一个高效的二进制序列化格式。像JSON一样可以在各种语言之间交换数据。但是它比JSON更快、更小(It's like JSON.but fast and small)。

1.3.4 Kyro

Kryo 是一个快速高效的Java对象图形序列化框架(已经非常成熟了,很多大公司都在用)。主要特点是性能高效和易用。可以用来序列化对象到文件、数据库或者网络中。

2 JBoss Marshalling

2.1 Marshalling简介

JBoss Marshalling 是一个Java 对象序列化包,对 JDK 默认的序列化框架进行了优化,但又保持跟 Java.io.Serializable 接口的兼容,同时增加了一些可调的参数和附件的特性, 这些参数和附加的特性, 这些参数和特性可通过工厂类进行配置。

2.2 代码事例

本章节不再用byteBuf传数据,改用实体类交互数据。ReqData是Client端请求实体类,RespData是服务端返回实体类。客户端向服务端发送两种数据请求,一种是普通的数据交互,另一种是传附件。
import java.io.Serializable;
import java.util.Arrays;

/**
* 发起请求的实体类
* step1 序列化 Serializable
*/
public class ReqData implements Serializable {

private Long id;
private String name;
private String requestMsg;
private byte[] attachment; // 传文件的时候会用到

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getRequestMsg() {
return requestMsg;
}

public void setRequestMsg(String requestMsg) {
this.requestMsg = requestMsg;
}

public byte[] getAttachment() {
return attachment;
}

public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}

@Override
public String toString() {
return "ReqData [id=" + id + ", name=" + name + ", requestMsg="
+ requestMsg + ", attachment=" + Arrays.toString(attachment)
+ "]";
}

}
import java.io.Serializable;

/**
* 返回数据实体类 step1 序列化 Serializable
*/
public class RespData implements Serializable {

private Long id;
private String name;
private String responseMsg;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getResponseMsg() {
return responseMsg;
}

public void setResponseMsg(String responseMsg) {
this.responseMsg = responseMsg;
}

@Override
public String toString() {
return "RespData [id=" + id + ", name=" + name + ", responseMsg="
+ responseMsg + "]";
}

}
核心代码Marshalling工厂类
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
* Marshalling工厂
*/
public final class MarshallingFactory {

private static final String NAME = "serial"; // 指定值,不可随意修改
private static final int VERSION = 5;
private static final int MAX_OBJECT_SIZE = 1024 * 1024 * 1;

/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// step1 通过工具类 Marshalling,获取Marshalling实例对象,参数serial 标识创建的是java序列化工厂对象
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(NAME);
// step2 初始化Marshalling配置
final MarshallingConfiguration configuration = new MarshallingConfiguration();
// step3 设置Marshalling版本号
configuration.setVersion(VERSION);
// step4 初始化生产者
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// step5 通过生产者和单个消息序列化后最大长度构建 Netty的MarshallingDecoder
MarshallingDecoder decoder = new MarshallingDecoder(provider, MAX_OBJECT_SIZE);
return decoder;
}

/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder builMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory(NAME);
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(VERSION);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}

}

服务器处理类

import java.io.File;
import java.io.FileOutputStream;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ServerHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server.......Server");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// step1 获取客户端传来的数据
ReqData requestData = (ReqData) msg;
// step2 打印数据
System.out.println("Server : " + requestData.toString());
// step3 设置返回值
RespData responseData = new RespData();
responseData.setId(requestData.getId());
responseData.setName(requestData.getName() + " 不错哦!");
responseData.setResponseMsg(requestData.getRequestMsg() + " 你很棒棒的!");
// 有附件的情况
if (null != requestData.getAttachment()) {
byte[] attachment = GzipUtils.ungzip(requestData.getAttachment());
String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";
FileOutputStream outputStream = new FileOutputStream(path);
outputStream.write(attachment);
outputStream.close();
responseData.setResponseMsg("收到图片了, 图片路径是 : " + path);
}
// step4 把数据返回给客户端
ctx.writeAndFlush(responseData);
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

服务端Netty启动代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class Server {

private static final int PORT = 8888; // 监听的端口号

public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) // 设置打印日志级别,其他知识点上一章节均有介绍,这里不做重复说明
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(MarshallingFactory.buildMarshallingDecoder()); // 配置解码器
socketChannel.pipeline().addLast(MarshallingFactory.builMarshallingEncoder()); // 配置编码器
socketChannel.pipeline().addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.bind(PORT).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

}

客户端处理类

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client.......Client");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// step1 获取客户端传来的数据
RespData respData = (RespData) msg;
// step2 打印数据
System.out.println("Client : " + respData.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

客户端启动服务类

import java.io.File;
import java.io.FileInputStream;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

private static final int PORT = 8888;
private static final String HOST = "127.0.0.1";

public static void main(String[] args) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(MarshallingFactory.buildMarshallingDecoder()); // 配置编码器
socketChannel.pipeline().addLast(MarshallingFactory.builMarshallingEncoder()); // 配置解码器
socketChannel.pipeline().addLast(new ClientHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128);

ChannelFuture future = bootstrap.connect(HOST, PORT).sync();

// 普通请求
ReqData reqData = new ReqData();
reqData.setId(1L);
reqData.setName("ITDragon博客");
reqData.setRequestMsg("这是一篇Netty的编解码博客!");
future.channel().writeAndFlush(reqData);

// 传附件
ReqData reqData2 = new ReqData();
reqData2.setId(2L);
reqData2.setName("发送附件案例");
reqData2.setRequestMsg("客户端给服务端发送一张图片");
// 指定附件的路径  System.getProperty("user.dir") --》 当前程序所在目录  File.separatorChar --》会根据操作系统选择自动选择 / 或者 \)
String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";
File file = new File(path);
FileInputStream inputStream = new FileInputStream(file);
byte[] data = new byte[inputStream.available()];
inputStream.read(data);
inputStream.close();
reqData2.setAttachment(GzipUtils.gzip(data));
future.channel().writeAndFlush(reqData2);

future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}

}
文件压缩解压工具类
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class GzipUtils {

public static byte[] gzip(byte[] data) throws Exception{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.finish();
gzip.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
}

public static byte[] ungzip(byte[] data) throws Exception{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num = -1;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while((num = gzip.read(buf, 0 , buf.length)) != -1 ){
bos.write(buf, 0, num);
}
gzip.close();
bis.close();
byte[] ret = bos.toByteArray();
bos.flush();
bos.close();
return ret;
}

}
代码和前几章节大同小异,这里就不做过多的描述。执行结果(删掉了一些太长的打印信息):
[14:16:14] nioEventLoopGroup-0-0 INFO  [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x2a77484b] REGISTERED
[14:16:14] nioEventLoopGroup-0-0 INFO  [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x2a77484b] BIND: 0.0.0.0/0.0.0.0:8888
[14:16:14] nioEventLoopGroup-0-0 INFO  [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x2a77484b, /0:0:0:0:0:0:0:0:8888] ACTIVE
[14:16:20] nioEventLoopGroup-0-2 INFO  [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x2a77484b, /0:0:0:0:0:0:0:0:8888] RECEIVED: [id: 0x5fda9fbc, /127.0.0.1:59193 => /127.0.0.1:8888]
Server.......Server
Server : ReqData [id=1, name=ITDragon博客, requestMsg=这是一篇Netty的编解码博客!, attachment=null]
Server : ReqData [id=2, name=发送附件案例, requestMsg=客户端给服务端发送一张图片, attachment=[....]]
[14:16:20] main WARN  [] [] [io.netty.bootstrap.Bootstrap] - Unknown channel option: SO_BACKLOG=128
Client.......Client
Client : RespData [id=1, name=ITDragon博客 不错哦!, responseMsg=这是一篇Netty的编解码博客! 你很棒棒的!]
Client : RespData [id=2, name=发送附件案例 不错哦!, responseMsg=收到图片了, 图片路径是 : F:\DayDayUp\...\receive\001.jpg]


3 优质博客链接

Netty系列之Netty编解码框架分析

以上便是Netty的编解码的入门笔记,下一章介绍Netty的通信连接和心跳检测实战应用。如果觉得不错,可以点个赞哦 




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: