您的位置:首页 > 理论基础 > 计算机网络

基于netty框架实现的TCP服务端程序

2017-09-06 13:40 609 查看
工程目录结构

代码:NioServer main类

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.string.StringDecoder;

import io.netty.handler.codec.string.StringEncoder;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.Date;

import java.util.Iterator;

import java.util.Map;

import java.util.Map.Entry;

import java.util.concurrent.TimeUnit;

/** 

 * Netty 服务端代码 

 *  

 */  

public class NioServer {  

  
/**
* 日志对象
*/

// protected static Logger logger = LoggerFactory.getLogger(NioServer.class);
private static int port = 9999;

    public static void main(String args[])  {  

   

        // Server服务启动器  

        ServerBootstrap bootstrap = new ServerBootstrap();  

        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

        bootstrap.group(bossGroup, workerGroup)

        .channel(NioServerSocketChannel.class)

        .childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel  ch) throws Exception {
//可以在socket接上来的时候添加很多指定义逻辑
ch.pipeline().addLast("encode",new StringEncoder());      
ch.pipeline().addLast("decode",new StringDecoder());  
ch.pipeline().addLast("ping", new IdleStateHandler(25, 15, 10,TimeUnit.SECONDS));
ch.pipeline().addLast(new  NioServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

        // Bind and start to accept incoming connections. 

        ChannelFuture future = bootstrap.bind(port).sync();

//         logger.info("server started ,listen {}" ,port);

        System.out.println("开始监听 "+ port);

        //启动一个线程 来给客户端发消息

        //new Thread(new ServerTask()).run();

           // Wait until the server socket is closed.

           // In this example, this does not happen, but you can do that to gracefully

           // shut down your server. 调用实现优雅关机

           future.channel().closeFuture().sync();

    } catch (InterruptedException e) {
e.printStackTrace();
} finally {

    bossGroup.shutdownGracefully();

    workerGroup.shutdownGracefully();

    }

    }  

}  

class ServerTask implements Runnable{
/**
* 日志对象
*/

// protected Logger logger = LoggerFactory.getLogger(ServerTask.class);
public void run() {
Channel channel = null;
Map.Entry<String, Channel>  entry = null;
while (true) {
Iterator<Entry<String, Channel>> it = GatewayService.map.entrySet().iterator();
while (it.hasNext()) {
entry = it.next();
channel = entry.getValue();
if (channel.isActive() && channel.isWritable()) {
entry.getValue().writeAndFlush(new Date() + "我是测试的服务器端向客户端发数据");
} else {
channel.close();
it.remove();
System.out.println("通道不能连接uid:{ },服务器关闭和删除它 "+ entry.getKey());

// logger.info("channel cannot connect uid : {}, server close and remove it" ,entry.getKey());
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

 //GatewayService.java

import io.netty.channel.Channel;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

public class GatewayService {
/**
* 日志对象
*/

// protected static Logger logger = LoggerFactory.getLogger(GatewayService.class);
/**
* 存放客户端连接
*/
public static Map<String,Channel> map = new ConcurrentHashMap<String, Channel>();
/**
* 添加一个客户端连接
* @param uid
* @param socketChannel
*/
public static void add(String uid,Channel channel){
if (map.containsKey(uid)) {
Channel temp =map.get(uid);
if (!(temp.isActive() && temp.isWritable())){
map.remove(uid);
map.put(uid, channel);
}
} else {
map.put(uid, channel);
}
  System.out.println("添加一个客户端uid : " +uid + "当前客户端数量  :"+getClientSize());
 
}
/**
* 移除一个客户端连接
* @param uid
*/
public static void remove(String uid){
Channel channel = map.remove(uid);
channel.close();
 
 System.out.println("删除一个客户端uid : " +uid + "  当前客户端数量  :"+ getClientSize());
}
/**
* 当前客户端数量
* @return
*/
public static int getClientSize(){
return map.size();
}

}

NioServerHandler.java

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

/**

 * 实际的业务处理类

 * 

 * @liuguodong

 *

 */

public class NioServerHandler extends SimpleChannelInboundHandler<String> {

/**
* 日志对象
*/

// protected Logger logger = LoggerFactory.getLogger(NioServerHandler.class);

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

super.channelRegistered(ctx);

System.out.println("--------服务注册 通道 -------");
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {

System.out.println("server--->收到客户端发的消息:" + msg);
ctx.writeAndFlush("Received your message !\n"+msg+'\n');
System.out.println("server--->服务端返回的信息:" + msg+" # ");
GatewayService.add("11", ctx.channel());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

//   ctx.writeAndFlush("Received your message !\n");

//   ctx.writeAndFlush("你好 !\n");
super.channelActive(ctx);

 System.out.println("--------通道激活-------");
}

/**
* 响应netty 心跳  这个可以给客户端发送一些特定包 来标识
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
//多少秒没有读
if (e.state() == IdleState.READER_IDLE) {

// System.out.println("读超时.....");
} else if (e.state() == IdleState.WRITER_IDLE) {//多少秒没有写
//ctx.close();

// System.out.println("写超时.....");
}else if (e.state() == IdleState.ALL_IDLE) { //总时间

          

//                System.out.println("总超时.....");

                //ctx.close();

                //可以发包了, 然后发出去后  客户单回应一个 这时就不会进入read了 来判断心跳

            }
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
try {
super.exceptionCaught(ctx, cause);
} catch (Exception e) {
  System.out.println("服务异常 ....关闭连接");
 
ctx.close();
}
}

}

源码地址:http://download.csdn.net/download/liu3364575/9966543
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: