您的位置:首页 > 理论基础 > 计算机网络

Netty5.x如何用TCP/IP发送大文件到服务器

2016-03-08 18:48 591 查看

什么是Netty点击打开链接

netty拥有强大并发性,并且基于java nio异步线程,~~~扯远了,

首先是服务器的建立

package com.main.t;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Tserver {
public void BindPort(int port) throws InterruptedException{
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).
childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(512, 40960, 102400))
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline p=ch.pipeline();
p.addLast("main_manage",new MainManager());
//	p.addLast("msg",new ServerHandler());

}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
System.out.println("服务器开启成功");
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();

} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[]args){
try {
new Tserver().BindPort(8081);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
可以看到我只用了一个MainMnager()来处理数据
p.addLast("main_manage",new MainManager());

下面是MainManger.class
public class MainManager extends ChannelHandlerAdapter{
private Charset charset=null;
public MainManager(){
charset=Charset.forName("utf-8");
}
public void channelActive(ChannelHandlerContext ctx) throws JSONException {

}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf byteBuf=(ByteBuf)msg;
int jsonlen=byteBuf.readInt();

ByteBuf jsonbyte=Unpooled.buffer(jsonlen);
byteBuf.readBytes(jsonbyte);
JSONObject jsObject=ReadHead.readHead(charset.decode(jsonbyte.nioBuffer()).toString());
try {
ctx.pipeline().addLast("receive_data",new ServerHandler(charset, jsObject,byteBuf));
ctx.pipeline().remove(this);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 释放资源
cause.printStackTrace();
ctx.close();
}

}
紧接着他把数据交给了ServerHandler.class来处理,在这里你可以添加自己的处理方式
public class ServerHandler extends ChannelHandlerAdapter{
private JSONObject jsObject=null;
private ChannelHandlerContext ctx;
private LinkedList<byte[]>listData=null;
private Charset charset;
FileOutputStream fos=null;
private ByteBuf byteBuf=null;
public ServerHandler(Charset charset,JSONObject jsObject,ByteBuf byteBuf) throws Exception{
this.charset=charset;
listData=new LinkedList<byte[]>();
this.jsObject=jsObject;
fos=new FileOutputStream("D:\\277550.jpg");
this.byteBuf=byteBuf;
this.channelRead(ctx, byteBuf);
}
public void channelActive(ChannelHandlerContext ctx) {
this.ctx=ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if(byteBuf==null){
byteBuf=(ByteBuf)msg;
}
int jsonlen=0;
if(listData.size()!=0){
System.out.println("添加保留数据");
ByteBuf buf=Unpooled.buffer(byteBuf.readableBytes()+listData.getFirst().length);
buf.writeBytes(listData.getFirst());
buf.writeBytes(byteBuf);
byteBuf=buf;
listData.removeFirst();
};

if(jsObject==null){
jsonlen=byteBuf.readInt();
System.out.println(jsonlen+"----------");
ByteBuf jsonbyte=Unpooled.buffer(jsonlen);
byteBuf.readBytes(jsonbyte);
jsObject=ReadHead.readHead(charset.decode(jsonbyte.nioBuffer()).toString());
}else{
jsonlen=jsObject.toString().length();

}
int datasize=jsObject.getInt("_size");
int fileLen=jsObject.getInt("_all_size");
System.out.println("-----"+(jsonlen+datasize+8)+"------");
System.out.println("json大小:"+jsonlen);

System.out.println("json数据:"+jsObject.toString());

System.out.println("数据大小:"+datasize);
byte[]data=new byte[datasize];
byteBuf.readBytes(data);
System.out.println("已经读取大小:"+data.length);

fos.write(data, 0, data.length);
fos.flush();
if(fileLen==new File("D:\\277550.jpg").length()){
fos.close();
fos=null;
System.out.println("文件接收完成");
ByteBuf buf=Unpooled.buffer("re_over".getBytes().length);
ctx.write(buf.writeBytes("re_over".getBytes()));
ctx.flush();
ctx.pipeline().addLast("main_manage",new MainManager());
ctx.pipeline().remove(this);
return;
}else{

if((byteBuf.readableBytes()>0)&&byteBuf.readableBytes()>=(jsonlen+datasize+8)){
System.out.println("有剩余数据:"+byteBuf.readableBytes());
//		byteBuf=null;
jsObject=null;
this.channelRead(ctx, byteBuf);

}else{
System.out.println("剩余文件大小:"+byteBuf.nioBuffer().remaining());
if(byteBuf.readableBytes()>(fileLen-new File("D:\\277550.jpg").length())){
System.out.println("执行最后一次调用");
//	byteBuf=null;
jsObject=null;
this.channelRead(ctx, byteBuf);

}else{
byte[] saveData=new byte[byteBuf.readableBytes()];
byteBuf.readBytes(saveData);
listData.add(saveData);
byteBuf=null;
jsObject=null;
}

}
System.out.println("还差多少数据完整:"+(fileLen-new File("D:\\277550.jpg").length()));
}

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 释放资源
cause.printStackTrace();
ctx.close();
}
}
ServerHandler,建立一个链式list用于存储上一次为处理完的半包程序,等到下一次数据的到来合并为一条有效的数据,当然这只是初级版本,你可以添加一些类,用于断点续传。

下面是服务器控制台消息


如果大家看了还是不懂,可以下载demo研究 http://download.csdn.net/detail/li491093957/9455856
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  服务器 nio java netty5.0