Netty5.x如何用TCP/IP发送大文件到服务器
2016-03-08 18:48
591 查看
什么是Netty点击打开链接
netty拥有强大并发性,并且基于java nio异步线程,~~~扯远了,
首先是服务器的建立
package com.main.t; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Tserver { public void BindPort(int port) throws InterruptedException{ // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024). childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(512, 40960, 102400)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p=ch.pipeline(); p.addLast("main_manage",new MainManager()); // p.addLast("msg",new ServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); System.out.println("服务器开启成功"); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[]args){ try { new Tserver().BindPort(8081); } catch (InterruptedException e) { e.printStackTrace(); } } }可以看到我只用了一个MainMnager()来处理数据
p.addLast("main_manage",new MainManager());
下面是MainManger.class
public class MainManager extends ChannelHandlerAdapter{ private Charset charset=null; public MainManager(){ charset=Charset.forName("utf-8"); } public void channelActive(ChannelHandlerContext ctx) throws JSONException { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ ByteBuf byteBuf=(ByteBuf)msg; int jsonlen=byteBuf.readInt(); ByteBuf jsonbyte=Unpooled.buffer(jsonlen); byteBuf.readBytes(jsonbyte); JSONObject jsObject=ReadHead.readHead(charset.decode(jsonbyte.nioBuffer()).toString()); try { ctx.pipeline().addLast("receive_data",new ServerHandler(charset, jsObject,byteBuf)); ctx.pipeline().remove(this); } catch (Exception e) { e.printStackTrace(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 释放资源 cause.printStackTrace(); ctx.close(); } }紧接着他把数据交给了ServerHandler.class来处理,在这里你可以添加自己的处理方式
public class ServerHandler extends ChannelHandlerAdapter{ private JSONObject jsObject=null; private ChannelHandlerContext ctx; private LinkedList<byte[]>listData=null; private Charset charset; FileOutputStream fos=null; private ByteBuf byteBuf=null; public ServerHandler(Charset charset,JSONObject jsObject,ByteBuf byteBuf) throws Exception{ this.charset=charset; listData=new LinkedList<byte[]>(); this.jsObject=jsObject; fos=new FileOutputStream("D:\\277550.jpg"); this.byteBuf=byteBuf; this.channelRead(ctx, byteBuf); } public void channelActive(ChannelHandlerContext ctx) { this.ctx=ctx; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(byteBuf==null){ byteBuf=(ByteBuf)msg; } int jsonlen=0; if(listData.size()!=0){ System.out.println("添加保留数据"); ByteBuf buf=Unpooled.buffer(byteBuf.readableBytes()+listData.getFirst().length); buf.writeBytes(listData.getFirst()); buf.writeBytes(byteBuf); byteBuf=buf; listData.removeFirst(); }; if(jsObject==null){ jsonlen=byteBuf.readInt(); System.out.println(jsonlen+"----------"); ByteBuf jsonbyte=Unpooled.buffer(jsonlen); byteBuf.readBytes(jsonbyte); jsObject=ReadHead.readHead(charset.decode(jsonbyte.nioBuffer()).toString()); }else{ jsonlen=jsObject.toString().length(); } int datasize=jsObject.getInt("_size"); int fileLen=jsObject.getInt("_all_size"); System.out.println("-----"+(jsonlen+datasize+8)+"------"); System.out.println("json大小:"+jsonlen); System.out.println("json数据:"+jsObject.toString()); System.out.println("数据大小:"+datasize); byte[]data=new byte[datasize]; byteBuf.readBytes(data); System.out.println("已经读取大小:"+data.length); fos.write(data, 0, data.length); fos.flush(); if(fileLen==new File("D:\\277550.jpg").length()){ fos.close(); fos=null; System.out.println("文件接收完成"); ByteBuf buf=Unpooled.buffer("re_over".getBytes().length); ctx.write(buf.writeBytes("re_over".getBytes())); ctx.flush(); ctx.pipeline().addLast("main_manage",new MainManager()); ctx.pipeline().remove(this); return; }else{ if((byteBuf.readableBytes()>0)&&byteBuf.readableBytes()>=(jsonlen+datasize+8)){ System.out.println("有剩余数据:"+byteBuf.readableBytes()); // byteBuf=null; jsObject=null; this.channelRead(ctx, byteBuf); }else{ System.out.println("剩余文件大小:"+byteBuf.nioBuffer().remaining()); if(byteBuf.readableBytes()>(fileLen-new File("D:\\277550.jpg").length())){ System.out.println("执行最后一次调用"); // byteBuf=null; jsObject=null; this.channelRead(ctx, byteBuf); }else{ byte[] saveData=new byte[byteBuf.readableBytes()]; byteBuf.readBytes(saveData); listData.add(saveData); byteBuf=null; jsObject=null; } } System.out.println("还差多少数据完整:"+(fileLen-new File("D:\\277550.jpg").length())); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 释放资源 cause.printStackTrace(); ctx.close(); } }ServerHandler,建立一个链式list用于存储上一次为处理完的半包程序,等到下一次数据的到来合并为一条有效的数据,当然这只是初级版本,你可以添加一些类,用于断点续传。
下面是服务器控制台消息
如果大家看了还是不懂,可以下载demo研究 http://download.csdn.net/detail/li491093957/9455856
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- 小心服务器内存居高不下的元凶--WebAPI服务
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- 运维入门
- PropertyChangeListener简单理解
- spymemcached源码中Reactor模式分析
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序