您的位置:首页 > 其它

Netty与Marshalling结合发送对象—Netty学习二

2016-12-19 14:58 281 查看
之前的博客是Netty简单的学习,我们可以传递一个字符串,那么如果我们想要在Netty中传递一个对象该怎么办呢 ?那么这个时候我们可以结合Marshalling来传递。

首先需要导入两个Marshalling的依赖包

jboss-marshalling-1.3.0.CR9.jar

jboss-marshalling-serial-1.3.0.CR9.jar

注意:我开始学习的时候只导入了第一个jar包,没有导入第二个,结果是不报错,但是客户端和服务端之间传递不了消息。所以两个包一定要都导入才行。

MarshallingCodeCFactory工具类

public class MarshallingCodeCFactory {

public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024);
return decoder;
}

public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
server端

public class Server {

public static void main(String[] args) throws InterruptedException {
//1.第一个线程组是用于接收Client端连接的
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.第二个线程组是用于实际的业务处理的
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);//绑定两个线程池
b.channel(NioServerSocketChannel.class);//指定NIO的模式,如果是客户端就是NioSocketChannel
b.option(ChannelOption.SO_BACKLOG, 1024);//TCP的缓冲区设置
b.option(ChannelOption.SO_SNDBUF, 32*1024);//设置发送缓冲的大小
b.option(ChannelOption.SO_RCVBUF, 32*1024);//设置接收缓冲区大小
b.option(ChannelOption.SO_KEEPALIVE, true);//保持连续
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//设置Marshalling的编码和解码
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new ServertHandler());
}
});
ChannelFuture future = b.bind(8765).sync();//绑定端口
future.channel().closeFuture().sync();//等待关闭(程序阻塞在这里等待客户端请求)
bossGroup.shutdownGracefully();//关闭线程
workerGroup.shutdownGracefully();//关闭线程
}

}ServerHandler处理类
public class ServertHandler extends ChannelHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Send send = (Send) msg;
System.out.println("client发送:"+send);

Receive receive = new Receive();
receive.setId(send.getId());
receive.setMessage(send.getMessage());
receive.setName(send.getName());
ctx.writeAndFlush(receive);
}

}

由于我们已经在Server端和Client端定义了传递的类型又Marshalling工厂处理,所以此时我们接收的时候直接转成发送的对象类型就行了。

Client端
public class Client {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
//sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
//sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f=b.connect("127.0.0.1",8765).sync();
for(int i=1;i<=5;i++){
Send send = new Send();
send.setId(i);
send.setMessage("message"+i);
send.setName("name"+i);
f.channel().writeAndFlush(send);
}
f.channel().closeFuture().sync();
worker.shutdownGracefully();
}
}ClientHandler端
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Receive receive = (Receive) msg;
System.out.println("server反馈:"+receive);
}
}
send类
public class Send implements Serializable {

/**
* serialVersionUID:TODO(用一句话描述这个变量表示什么)
*
* @since 1.0.0
*/

private static final long serialVersionUID = 1L;

private Integer id;
private String name;
private String message;

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

@Override
public String toString() {
return "Send [id=" + id + ", name=" + name + ", message=" + message + "]";
}

}
Receive类
public class Receive implements Serializable{

/**
* serialVersionUID:TODO(用一句话描述这个变量表示什么)
* @since 1.0.0
*/

private static final long serialVersionUID = 1L;
private Integer id;
private String name;
private String message;
private byte[] sss;

public byte[] getSss() {
return sss;
}
public void setSss(byte[] sss) {
this.sss = sss;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "Receive [id=" + id + ", name=" + name + ", message=" + message + ", sss=" + Arrays.toString(sss) + "]";
}

}


注意:send类和receive这两个类,我们再真实环境开发的时候服务
4000
器和客户端往往是两个web应用程序,在这里我们要注意服务端和客户端之间的两个类类名和包名在两端要完全相同。

java架构师视频教程,高并发集群分布式视频教程,高并发处理方式,大数据视频教程出售QQ:694042039
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty和Marshalling