基于netty框架实现的TCP服务端程序
2017-09-06 13:40
609 查看
工程目录结构
代码:NioServer main类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
/**
* Netty 服务端代码
*
*/
public class NioServer {
/**
* 日志对象
*/
// protected static Logger logger = LoggerFactory.getLogger(NioServer.class);
private static int port = 9999;
public static void main(String args[]) {
// Server服务启动器
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//可以在socket接上来的时候添加很多指定义逻辑
ch.pipeline().addLast("encode",new StringEncoder());
ch.pipeline().addLast("decode",new StringDecoder());
ch.pipeline().addLast("ping", new IdleStateHandler(25, 15, 10,TimeUnit.SECONDS));
ch.pipeline().addLast(new NioServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture future = bootstrap.bind(port).sync();
// logger.info("server started ,listen {}" ,port);
System.out.println("开始监听 "+ port);
//启动一个线程 来给客户端发消息
//new Thread(new ServerTask()).run();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server. 调用实现优雅关机
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class ServerTask implements Runnable{
/**
* 日志对象
*/
// protected Logger logger = LoggerFactory.getLogger(ServerTask.class);
public void run() {
Channel channel = null;
Map.Entry<String, Channel> entry = null;
while (true) {
Iterator<Entry<String, Channel>> it = GatewayService.map.entrySet().iterator();
while (it.hasNext()) {
entry = it.next();
channel = entry.getValue();
if (channel.isActive() && channel.isWritable()) {
entry.getValue().writeAndFlush(new Date() + "我是测试的服务器端向客户端发数据");
} else {
channel.close();
it.remove();
System.out.println("通道不能连接uid:{ },服务器关闭和删除它 "+ entry.getKey());
// logger.info("channel cannot connect uid : {}, server close and remove it" ,entry.getKey());
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//GatewayService.java
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class GatewayService {
/**
* 日志对象
*/
// protected static Logger logger = LoggerFactory.getLogger(GatewayService.class);
/**
* 存放客户端连接
*/
public static Map<String,Channel> map = new ConcurrentHashMap<String, Channel>();
/**
* 添加一个客户端连接
* @param uid
* @param socketChannel
*/
public static void add(String uid,Channel channel){
if (map.containsKey(uid)) {
Channel temp =map.get(uid);
if (!(temp.isActive() && temp.isWritable())){
map.remove(uid);
map.put(uid, channel);
}
} else {
map.put(uid, channel);
}
System.out.println("添加一个客户端uid : " +uid + "当前客户端数量 :"+getClientSize());
}
/**
* 移除一个客户端连接
* @param uid
*/
public static void remove(String uid){
Channel channel = map.remove(uid);
channel.close();
System.out.println("删除一个客户端uid : " +uid + " 当前客户端数量 :"+ getClientSize());
}
/**
* 当前客户端数量
* @return
*/
public static int getClientSize(){
return map.size();
}
}
NioServerHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 实际的业务处理类
*
* @liuguodong
*
*/
public class NioServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 日志对象
*/
// protected Logger logger = LoggerFactory.getLogger(NioServerHandler.class);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
System.out.println("--------服务注册 通道 -------");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
System.out.println("server--->收到客户端发的消息:" + msg);
ctx.writeAndFlush("Received your message !\n"+msg+'\n');
System.out.println("server--->服务端返回的信息:" + msg+" # ");
GatewayService.add("11", ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush("Received your message !\n");
// ctx.writeAndFlush("你好 !\n");
super.channelActive(ctx);
System.out.println("--------通道激活-------");
}
/**
* 响应netty 心跳 这个可以给客户端发送一些特定包 来标识
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
//多少秒没有读
if (e.state() == IdleState.READER_IDLE) {
// System.out.println("读超时.....");
} else if (e.state() == IdleState.WRITER_IDLE) {//多少秒没有写
//ctx.close();
// System.out.println("写超时.....");
}else if (e.state() == IdleState.ALL_IDLE) { //总时间
// System.out.println("总超时.....");
//ctx.close();
//可以发包了, 然后发出去后 客户单回应一个 这时就不会进入read了 来判断心跳
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
try {
super.exceptionCaught(ctx, cause);
} catch (Exception e) {
System.out.println("服务异常 ....关闭连接");
ctx.close();
}
}
}
源码地址:http://download.csdn.net/download/liu3364575/9966543
代码:NioServer main类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
/**
* Netty 服务端代码
*
*/
public class NioServer {
/**
* 日志对象
*/
// protected static Logger logger = LoggerFactory.getLogger(NioServer.class);
private static int port = 9999;
public static void main(String args[]) {
// Server服务启动器
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//可以在socket接上来的时候添加很多指定义逻辑
ch.pipeline().addLast("encode",new StringEncoder());
ch.pipeline().addLast("decode",new StringDecoder());
ch.pipeline().addLast("ping", new IdleStateHandler(25, 15, 10,TimeUnit.SECONDS));
ch.pipeline().addLast(new NioServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture future = bootstrap.bind(port).sync();
// logger.info("server started ,listen {}" ,port);
System.out.println("开始监听 "+ port);
//启动一个线程 来给客户端发消息
//new Thread(new ServerTask()).run();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server. 调用实现优雅关机
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class ServerTask implements Runnable{
/**
* 日志对象
*/
// protected Logger logger = LoggerFactory.getLogger(ServerTask.class);
public void run() {
Channel channel = null;
Map.Entry<String, Channel> entry = null;
while (true) {
Iterator<Entry<String, Channel>> it = GatewayService.map.entrySet().iterator();
while (it.hasNext()) {
entry = it.next();
channel = entry.getValue();
if (channel.isActive() && channel.isWritable()) {
entry.getValue().writeAndFlush(new Date() + "我是测试的服务器端向客户端发数据");
} else {
channel.close();
it.remove();
System.out.println("通道不能连接uid:{ },服务器关闭和删除它 "+ entry.getKey());
// logger.info("channel cannot connect uid : {}, server close and remove it" ,entry.getKey());
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//GatewayService.java
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class GatewayService {
/**
* 日志对象
*/
// protected static Logger logger = LoggerFactory.getLogger(GatewayService.class);
/**
* 存放客户端连接
*/
public static Map<String,Channel> map = new ConcurrentHashMap<String, Channel>();
/**
* 添加一个客户端连接
* @param uid
* @param socketChannel
*/
public static void add(String uid,Channel channel){
if (map.containsKey(uid)) {
Channel temp =map.get(uid);
if (!(temp.isActive() && temp.isWritable())){
map.remove(uid);
map.put(uid, channel);
}
} else {
map.put(uid, channel);
}
System.out.println("添加一个客户端uid : " +uid + "当前客户端数量 :"+getClientSize());
}
/**
* 移除一个客户端连接
* @param uid
*/
public static void remove(String uid){
Channel channel = map.remove(uid);
channel.close();
System.out.println("删除一个客户端uid : " +uid + " 当前客户端数量 :"+ getClientSize());
}
/**
* 当前客户端数量
* @return
*/
public static int getClientSize(){
return map.size();
}
}
NioServerHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 实际的业务处理类
*
* @liuguodong
*
*/
public class NioServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 日志对象
*/
// protected Logger logger = LoggerFactory.getLogger(NioServerHandler.class);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
System.out.println("--------服务注册 通道 -------");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
System.out.println("server--->收到客户端发的消息:" + msg);
ctx.writeAndFlush("Received your message !\n"+msg+'\n');
System.out.println("server--->服务端返回的信息:" + msg+" # ");
GatewayService.add("11", ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush("Received your message !\n");
// ctx.writeAndFlush("你好 !\n");
super.channelActive(ctx);
System.out.println("--------通道激活-------");
}
/**
* 响应netty 心跳 这个可以给客户端发送一些特定包 来标识
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
//多少秒没有读
if (e.state() == IdleState.READER_IDLE) {
// System.out.println("读超时.....");
} else if (e.state() == IdleState.WRITER_IDLE) {//多少秒没有写
//ctx.close();
// System.out.println("写超时.....");
}else if (e.state() == IdleState.ALL_IDLE) { //总时间
// System.out.println("总超时.....");
//ctx.close();
//可以发包了, 然后发出去后 客户单回应一个 这时就不会进入read了 来判断心跳
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
try {
super.exceptionCaught(ctx, cause);
} catch (Exception e) {
System.out.println("服务异常 ....关闭连接");
ctx.close();
}
}
}
源码地址:http://download.csdn.net/download/liu3364575/9966543
相关文章推荐
- 基于Netty的RPC简单框架实现(二):RPC服务端
- 自定义基于netty的rpc框架(2)---服务端的实现
- 基于Netty的RPC简单框架实现(三):Kryo实现序列化
- 基于Netty网络通信框架的电子白板,可实现同屏互动功能
- 基于socket通信的,利用MFC实现TCP通信的C/S架构程序
- 基于Netty实现的简易服务端与客户端的信息交流
- Apache+PHP 实现基于Slim的REST框架 调用系统命令或自己开发的程序
- 只需五步,即可基于Netty框架实现Android内网推送功能。
- 基于Socket的TCP长连接(服务端Java+客户端Android),Service配合AIDL实现
- 轻易实现基于linux或win运行的聊天服务端程序
- python+soket实现 TCP 协议的客户/服务端中文(自动回复)聊天程序
- 基于链表会话管理的TCP服务端实现
- TCP-IP学习笔记八:RPC(Netty和Spring)实现webServer框架
- 基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
- [精通WindowsSocket网络开发-基于VC++实现]第三章——WindowsSockets基础—TCP,UDP程序 .
- [精通WindowsSocket网络开发-基于VC++实现]第三章——WindowsSockets基础—TCP,UDP程序
- 基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
- VC面向对象的方式 写一个基于TCP的 客户端服务端程序 (SOCKET)