Netty 3.x 简例
2016-04-04 21:06
399 查看
Netty是一个异步的、事件驱动的网络应用框架,可以用来快速开发高性的客户端、服务端程序。示例使用Netty 3.10.5
![](https://img-blog.csdn.net/20160404211136454?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
首先是Server部分的代码:
Server端主程序:
PipelineFactory:
各个Handler:
然后是Client部分的代码:
Client端主程序:
各个Handler:
运行结果如下:
Server端后台日志:
Client端后台日志:
首先是Server部分的代码:
Server端主程序:
package com.sean.server; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; public class Server { private ChannelFactory factory; public static ChannelGroup channelGroup = new DefaultChannelGroup(); public void start(){ // NioServerSocketChannelFactory用于创建基于NIO的服务端 // ServerSocketChannel。本身包含2种线程,boss线程和worker线程。 // 每个ServerSocketChannel会都会拥有自己的boss线程, // 当一个连接被服务端接受(accepted), // boss线程就会将接收到的Channel传递给一个worker线程处理, // 而worker线程以非阻塞的方式为一个或多个Channel提供非阻塞的读写 factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), // boss线程池 Executors.newCachedThreadPool(), // worker线程池 8); // worker线程数 // ServerBootstrap用于帮助服务器启动 ServerBootstrap bootstrap = new ServerBootstrap(factory); // 没有child.前缀,则该选项是为ServerSocketChannel设置 bootstrap.setOption("reuseAddress", true); // 有child.前缀,则该选项是为Channel设置 // bootstrap.setOption("child.tcpNoDelay", true); // bootstrap.setOption("child.keepAlive", true); // 对每一个连接(channel),server都会调用 // ChannelPipelineFactory为该连接创建一个ChannelPipeline ServerChannelPiplineFactory channelPiplineFactory = new ServerChannelPiplineFactory(); bootstrap.setPipelineFactory(channelPiplineFactory); // 这里绑定服务端监听的IP和端口 Channel channel = bootstrap.bind(new InetSocketAddress("127.0.0.1", 8000)); Server.channelGroup.add(channel); System.out.println("Server is started..."); } public void stop(){ // ChannelGroup为其管理的Channels提供一系列的批量操作 // 关闭的Channel会自动从ChannelGroup中移除 ChannelGroupFuture channelGroupFuture = Server.channelGroup.close(); channelGroupFuture.awaitUninterruptibly(); factory.releaseExternalResources(); System.out.println("Server is stopped."); } public static void main(String[] args) throws Exception { Server server = new Server(); server.start(); Thread.sleep(30*1000); server.stop(); } }
PipelineFactory:
package com.sean.server; import java.util.concurrent.Executor; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; import com.sean.server.handler.ServerExecutionHandler; import com.sean.server.handler.ServerLogicHandler; import com.sean.server.handler.ServerReadDecoder; import com.sean.server.handler.ServerWriteEncoder; public class ServerChannelPiplineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { ServerReadDecoder serverReadDecoder = new ServerReadDecoder(); ServerWriteEncoder serverWriteEncoder = new ServerWriteEncoder(); Executor executor = new OrderedMemoryAwareThreadPoolExecutor(4, 200, 200); ServerExecutionHandler serverExecutionHandler = new ServerExecutionHandler(executor); ServerLogicHandler serverLogicHandler = new ServerLogicHandler(); // ChannelPipeline的源码中的javadoc介绍的非常详细,很有必要看一下 // ChannelPipeline是一个处理ChannelEvent的handler链 // 如果为读操作,ChannelEvent事件会从前到后依次被 // Upstream的handler处理 // serverReadDecoder -> serverLogicHandler // 如果为写操作,ChannelEvent事件会从后至前依次被 // Downstream的handler处理 // serverLogicHandler -> serverWriteEncoder ChannelPipeline channelPipeline = Channels.pipeline(); channelPipeline.addLast("1", serverReadDecoder); channelPipeline.addLast("2", serverWriteEncoder); channelPipeline.addLast("3", serverExecutionHandler); channelPipeline.addLast("4", serverLogicHandler); System.out.println(channelPipeline.hashCode()); return channelPipeline; } }
各个Handler:
package com.sean.server.handler; import java.util.concurrent.Executor; import org.jboss.netty.handler.execution.ExecutionHandler; // 提供一个线程池 public class ServerExecutionHandler extends ExecutionHandler{ public ServerExecutionHandler(Executor executor) { super(executor); } }
package com.sean.server.handler; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import com.sean.server.Server; // SimpleChannelHandler提供了很多基本的handler方法用来重写 // 通常情况下足够使用了 public class ServerLogicHandler extends SimpleChannelHandler { @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("######channelConnected"); // channel group is thread safe Server.channelGroup.add(e.getChannel()); System.out.println(e.getChannel().toString()); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { System.out.println("######messageReceived"); // 经过了ServerReadDecoder的处理,这里可以直接得到String类型的message String msg = (String)e.getMessage(); System.out.println("The message sent by client is : " + msg); Channel ch = e.getChannel(); String str = "Hi, Client."; // 由于IO操作是异步的,当方法返回时并不能保证IO操作一定完成了 // 因此返回一个ChannelFuture对象实例 // 该实例中保存了IO操作的状态信息 ChannelFuture cf = ch.write(str); // 为ChannelFuture对象实例添加监听,如果数据发送完毕则关闭连接 cf.addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { Channel ch = future.getChannel(); ch.close(); } }); System.out.println("The message has sent to client."); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } }
package com.sean.server.handler; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.util.CharsetUtil; // 解决接收流数据时,数据出现碎片化的问题 public class ServerReadDecoder extends StringDecoder{ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { System.out.println("######ServerReadDecoder"); // 从msg中取出的数据类型是ChannelBuffer的 byte[] buffer = ((ChannelBuffer)msg).array(); byte last = buffer[buffer.length - 1]; // 46 is '.' if(last == 46) // 并将ChannelBuffer转为String return new String(buffer, CharsetUtil.UTF_8); return null; } }
package com.sean.server.handler; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.util.CharsetUtil; public class ServerWriteEncoder extends StringEncoder{ @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { System.out.println("######ServerWriteEncoder"); String str = (String)msg; // 通过ChannelBuffers工具,为指定编码的指定字符串分配缓存空间 // 并将String转为ChannelBuffer ChannelBuffer channelBuffer = ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8); return channelBuffer; } }
然后是Client部分的代码:
Client端主程序:
package com.sean.client; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import com.sean.client.handler.ClientLogicHandler; import com.sean.client.handler.ClientReadDecoder; import com.sean.client.handler.ClientWriteEncoder; public class Client { public static void main(String[] args){ // 同服务端相同,只是这里使用的是NioClientSocketChannelFactory final ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), 8); // ClientBootstrap用于帮助客户端启动 ClientBootstrap bootstrap = new ClientBootstrap(factory); // 由于客户端不包含ServerSocketChannel,所以参数名不能带有child.前缀 bootstrap.setOption("tcpNoDelay", true); // bootstrap.setOption("keepAlive", true); bootstrap.setPipelineFactory(new ChannelPipelineFactory(){ @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(new ClientReadDecoder(), new ClientWriteEncoder(), new ClientLogicHandler()); System.out.println(channelPipeline.hashCode()); return channelPipeline; } }); // 这里连接服务端绑定的IP和端口 bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)); System.out.println("Client is started..."); } }
各个Handler:
package com.sean.client.handler; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.WriteCompletionEvent; public class ClientLogicHandler extends SimpleChannelHandler { @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("######channelConnected"); Channel ch = e.getChannel(); String msg = "Hi, Server."; ch.write(msg); } @Override public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { System.out.println("######writeComplete"); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { System.out.println("######messageReceived"); String msg = (String)e.getMessage(); System.out.println("The message gotten from server is : " + msg); ChannelFuture channelFuture = e.getChannel().close(); channelFuture.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } }
package com.sean.client.handler; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.util.CharsetUtil; public class ClientReadDecoder extends StringDecoder{ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { System.out.println("######ClientReadDecoder"); byte[] buffer = ((ChannelBuffer)msg).array(); byte last = buffer[buffer.length - 1]; // 46 is '.' if(last == 46) return new String(buffer, CharsetUtil.UTF_8); return null; } }
package com.sean.client.handler; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.util.CharsetUtil; public class ClientWriteEncoder extends StringEncoder{ @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { System.out.println("######ClientWriteEncoder"); String str = (String)msg; ChannelBuffer channelBuffer = ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8); return channelBuffer; } }
运行结果如下:
Server端后台日志:
Server is started... 1257526899 ######channelConnected [id: 0x88120865, /127.0.0.1:58887 => /127.0.0.1:8000] ######ServerReadDecoder ######messageReceived The message sent by client is : Hi, Server. ######ServerWriteEncoder The message has sent to client. Server is stopped.
Client端后台日志:
1767582956 Client is started... ######channelConnected ######ClientWriteEncoder ######writeComplete ######ClientReadDecoder ######messageReceived The message gotten from server is : Hi, Client.
相关文章推荐
- spymemcached源码中Reactor模式分析
- Java IO与NIO的一些文件拷贝测试
- Java NIO工作原理的全面分析
- java的nio的使用示例分享
- Java NIO和IO的区别
- Netty使用Http上传文件
- tomcat、netty以及nodejs的helloworld性能对比 3ff8
- Netty入门-client/server
- flatbuffers 和netty的结合使用
- netty 处理远程主机强制关闭一个连接
- Netty 源码分析(三):服务器端的初始化和注册过程
- 轻量级分布式 RPC 框架
- spark总体概况
- java十分钟速懂知识点——NIO
- Java IO/NIO学习总结
- (IO密集型事务)同步,异步与CPU使用率关系
- 再说异步调用和NIO
- Netty系列之Netty百万级推送服务设计要点
- Java NIO:NIO概述
- Netty初步