您的位置:首页 > 其它

Netty维持长连接 消息推送及心跳机制

2016-04-28 10:25 211 查看
ps: 以下是单机的处理方案

多机器只要把连接map内容维护到缓存里面 集中管理 就可以了。。

Netty

准备说明:引入java-websocket,netty-all-5.0等的jar包。其中内部心跳机制使用userEventTriggered事件方式实现,客户端的心跳实现客户端的断点重连工作,服务端的心跳实现服务端清除多余链接的功能

。以下为一些实现的代码:1.

[java] view plain copy

package base;

/**

*

* 请求类型的消息

*/

public class AskMsg extends BaseMsg {

public AskMsg() {

super();

setType(MsgType.ASK);

}

private AskParams params;

public AskParams getParams() {

return params;

}

public void setParams(AskParams params) {

this.params = params;

}

}

2.

[java] view plain copy

package base;

import java.io.Serializable;

/**

*

*/

public class AskParams implements Serializable {

private static final long serialVersionUID = 1L;

private String auth;

private String content;

public String getContent() {

return content;

}

public void setContent(String content) {

this.content = content;

}

public String getAuth() {

return auth;

}

public void setAuth(String auth) {

this.auth = auth;

}

}

3.

[java] view plain copy

package base;

import java.io.Serializable;

/**

*

* 必须实现序列,serialVersionUID 一定要有

*/

public abstract class BaseMsg implements Serializable {

private static final long serialVersionUID = 1L;

private MsgType type;

//必须唯一,否者会出现channel调用混乱

private String clientId;

//初始化客户端id

public BaseMsg() {

this.clientId = Constants.getClientId();

}

public String getClientId() {

return clientId;

}

public void setClientId(String clientId) {

this.clientId = clientId;

}

public MsgType getType() {

return type;

}

public void setType(MsgType type) {

this.type = type;

}

}

4.

[java] view plain copy

package base;

/**

*

*/

public class Constants {

private static String clientId;

public static String getClientId() {

return clientId;

}

public static void setClientId(String clientId) {

Constants.clientId = clientId;

}

}

5.

[java] view plain copy

package base;

/**

*

* 登录验证类型的消息

*/

public class LoginMsg extends BaseMsg {

private String userName;

private String password;

public LoginMsg() {

super();

setType(MsgType.LOGIN);

}

public String getUserName() {

return userName;

}

public void setUserName(String userName) {

this.userName = userName;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

}

6.

[java] view plain copy

package base;

/**

*

*/

public enum MsgType {

PING,ASK,REPLY,LOGIN

}

7.

[java] view plain copy

package base;

/**

*

* 心跳检测的消息类型

*/

public class PingMsg extends BaseMsg {

public PingMsg() {

super();

setType(MsgType.PING);

}

}

8.

[java] view plain copy

package base;

import java.io.Serializable;

/**

*

*/

public class ReplyBody implements Serializable {

private static final long serialVersionUID = 1L;

}

9.

[java] view plain copy

package base;

/**

*

*/

public class ReplyClientBody extends ReplyBody {

private String clientInfo;

public ReplyClientBody(String clientInfo) {

this.clientInfo = clientInfo;

}

public String getClientInfo() {

return clientInfo;

}

public void setClientInfo(String clientInfo) {

this.clientInfo = clientInfo;

}

}

10.

[java] view plain copy

package base;

/**

*

*/

public class ReplyMsg extends BaseMsg {

public ReplyMsg() {

super();

setType(MsgType.REPLY);

}

private ReplyBody body;

public ReplyBody getBody() {

return body;

}

public void setBody(ReplyBody body) {

this.body = body;

}

}

11.

[java] view plain copy

package base;

/**

*

*/

public class ReplyServerBody extends ReplyBody {

private String serverInfo;

public ReplyServerBody(String serverInfo) {

this.serverInfo = serverInfo;

}

public String getServerInfo() {

return serverInfo;

}

public void setServerInfo(String serverInfo) {

this.serverInfo = serverInfo;

}

}

12. netty客户端启动类

[java] view plain copy

package client;

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;

import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketFrame;

import io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

import io.netty.handler.timeout.IdleStateHandler;

import io.netty.util.concurrent.DefaultEventExecutorGroup;

import io.netty.util.concurrent.EventExecutorGroup;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.concurrent.TimeUnit;

import base.AskMsg;

import base.AskParams;

import base.Constants;

import base.LoginMsg;

/**

*

*/

public class NettyClientBootstrap {

private int port;

private String host;

private SocketChannel socketChannel;

private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);

public NettyClientBootstrap(int port, String host) throws InterruptedException {

this.port = port;

this.host = host;

start();

}

private void start() throws InterruptedException {

EventLoopGroup eventLoopGroup=new NioEventLoopGroup();

Bootstrap bootstrap=new Bootstrap();

bootstrap.channel(NioSocketChannel.class);

bootstrap.option(ChannelOption.SO_KEEPALIVE,true);

bootstrap.group(eventLoopGroup);

bootstrap.remoteAddress(host,port);

bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));

socketChannel.pipeline().addLast(new ObjectEncoder());

socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));

socketChannel.pipeline().addLast(new NettyClientHandler());

}

});

ChannelFuture future =bootstrap.connect(host,port).sync();

if (future.isSuccess()) {

socketChannel = (SocketChannel)future.channel();

System.out.println("connect server 成功---------");

}

}

public static void main(String[]args) throws InterruptedException, IOException {

Constants.setClientId("002");

NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");

LoginMsg loginMsg=new LoginMsg();

loginMsg.setPassword("yao");

loginMsg.setUserName("robin");

bootstrap.socketChannel.writeAndFlush(loginMsg);

// while (true){

// TimeUnit.SECONDS.sleep(3);

// AskMsg askMsg=new AskMsg();

// AskParams askParams=new AskParams();

// askParams.setAuth("authToken");

// askMsg.setParams(askParams);

// bootstrap.socketChannel.writeAndFlush(askMsg);

// }

BufferedReader console = new BufferedReader(new InputStreamReader(System.in));

while (true) {

String msg = console.readLine();

if (msg == null) {

break;

} else if ("bye".equals(msg.toLowerCase())) {

break;

} else if ("ping".equals(msg.toLowerCase())) {

} else {

AskMsg askMsg=new AskMsg();

AskParams askParams=new AskParams();

askParams.setAuth("authToken");

askParams.setContent(msg);

askMsg.setParams(askParams);

bootstrap.socketChannel.writeAndFlush(askMsg);

}

}

}

}

13. netty客户端操作实现类

[java] view plain copy

package client;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.timeout.IdleStateEvent;

import io.netty.util.ReferenceCountUtil;

import java.util.Date;

import server.NettyChannelMap;

import base.AskMsg;

import base.BaseMsg;

import base.LoginMsg;

import base.MsgType;

import base.PingMsg;

import base.ReplyClientBody;

import base.ReplyMsg;

import base.ReplyServerBody;

/**

*

*/

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {

private int UNCONNECT_NUM = 0;

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

if(UNCONNECT_NUM >= 4) {

System.err.println("connect status is disconnect.");

ctx.close();

//此处当重启次数达到4次之后,关闭此链接后,并重新请求进行一次登录请求

return;

}

IdleStateEvent e = (IdleStateEvent) evt;

switch (e.state()) {

case WRITER_IDLE:

System.out.println("send ping to server---date=" + new Date());

PingMsg pingMsg=new PingMsg();

ctx.writeAndFlush(pingMsg);

UNCONNECT_NUM++;

System.err.println("writer_idle over. and UNCONNECT_NUM=" + UNCONNECT_NUM);

break;

case READER_IDLE:

System.err.println("reader_idle over.");

UNCONNECT_NUM++;

//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道

case ALL_IDLE:

System.err.println("all_idle over.");

UNCONNECT_NUM++;

//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道

default:

break;

}

}

}

@Override

protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

MsgType msgType=baseMsg.getType();

switch (msgType){

case LOGIN:{

//向服务器发起登录

LoginMsg loginMsg=new LoginMsg();

loginMsg.setPassword("alan");

loginMsg.setUserName("lin");

channelHandlerContext.writeAndFlush(loginMsg);

}break;

case PING:{

System.out.println("receive server ping ---date=" + new Date());

ReplyMsg replyPing=new ReplyMsg();

ReplyClientBody body = new ReplyClientBody("send client msg.");

replyPing.setBody(body);

channelHandlerContext.writeAndFlush(replyPing);

}break;

case ASK:{

AskMsg askMsg=(AskMsg)baseMsg;

ReplyClientBody replyClientBody=new ReplyClientBody("receive server askmsg:" + askMsg.getParams().getContent());

ReplyMsg replyMsg=new ReplyMsg();

replyMsg.setBody(replyClientBody);

channelHandlerContext.writeAndFlush(replyMsg);

}break;

case REPLY:{

ReplyMsg replyMsg=(ReplyMsg)baseMsg;

ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();

UNCONNECT_NUM = 0;

System.out.println("UNCONNECT_NUM="+ UNCONNECT_NUM + ",receive server replymsg: "+replyServerBody.getServerInfo());

}

default:break;

}

ReferenceCountUtil.release(msgType);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.err.println("in client exceptionCaught.");

super.exceptionCaught(ctx, cause);

//出现异常时,可以发送或者记录相关日志信息,之后,直接断开该链接,并重新登录请求,建立通道

}

}

14.

[java] view plain copy

package server;

import io.netty.channel.Channel;

import io.netty.channel.socket.SocketChannel;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

/**

*

*/

public class NettyChannelMap {

private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();

public static void add(String clientId,SocketChannel socketChannel){

map.put(clientId,socketChannel);

}

public static Channel get(String clientId){

return map.get(clientId);

}

public static void remove(SocketChannel socketChannel){

for (Map.Entry entry:map.entrySet()){

if (entry.getValue()==socketChannel){

map.remove(entry.getKey());

}

}

}

}

15. netty服务端启动类

[java] view plain copy

package server;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

import base.AskMsg;

/**

*

*/

public class NettyServerBootstrap {

private int port;

private SocketChannel socketChannel;

public NettyServerBootstrap(int port) throws InterruptedException {

this.port = port;

bind();

}

private void bind() throws InterruptedException {

EventLoopGroup boss=new NioEventLoopGroup();

EventLoopGroup worker=new NioEventLoopGroup();

ServerBootstrap bootstrap=new ServerBootstrap();

bootstrap.group(boss,worker);

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.option(ChannelOption.SO_BACKLOG, 128);

bootstrap.option(ChannelOption.TCP_NODELAY, true);

bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline p = socketChannel.pipeline();

p.addLast(new IdleStateHandler(10,5,0));

p.addLast(new ObjectEncoder());

p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));

p.addLast(new NettyServerHandler());

}

});

ChannelFuture f= bootstrap.bind(port).sync();

if(f.isSuccess()){

System.out.println("server start---------------");

}

}

public static void main(String []args) throws InterruptedException {

NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);

while (true){

// SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");

// if(channel!=null){

// AskMsg askMsg=new AskMsg();

// channel.writeAndFlush(askMsg);

// }

TimeUnit.SECONDS.sleep(10);

}

}

}

16. netty服务端操作实现类

[java] view plain copy

package server;

import java.util.Date;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.timeout.IdleStateEvent;

import io.netty.util.ReferenceCountUtil;

import base.AskMsg;

import base.BaseMsg;

import base.LoginMsg;

import base.MsgType;

import base.PingMsg;

import base.ReplyBody;

import base.ReplyClientBody;

import base.ReplyMsg;

import base.ReplyServerBody;

/**

*

*/

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {

private int UNCONNECT_NUM_S = 0;

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

if(UNCONNECT_NUM_S >= 4) {

System.err.println("client connect status is disconnect.");

ctx.close();

//此处当重启次数达到4次之后,关闭此链接后,清除服务端相关的记录信息

return;

}

IdleStateEvent e = (IdleStateEvent) evt;

switch (e.state()) {

case WRITER_IDLE:

System.out.println("send ping to client---date=" + new Date());

PingMsg pingMsg=new PingMsg();

ctx.writeAndFlush(pingMsg);

UNCONNECT_NUM_S++;

System.err.println("writer_idle over. and UNCONNECT_NUM_S=" + UNCONNECT_NUM_S);

break;

case READER_IDLE:

System.err.println("reader_idle over.");

UNCONNECT_NUM_S++;

//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道

case ALL_IDLE:

System.err.println("all_idle over.");

UNCONNECT_NUM_S++;

//读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道

default:

break;

}

}

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.err.println("in channelInactive.");

NettyChannelMap.remove((SocketChannel)ctx.channel());

}

@Override

protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

if(MsgType.LOGIN.equals(baseMsg.getType())){

LoginMsg loginMsg=(LoginMsg)baseMsg;

if("lin".equals(loginMsg.getUserName())&&"alan".equals(loginMsg.getPassword())){

//登录成功,把channel存到服务端的map中

NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());

System.out.println("client"+loginMsg.getClientId()+" 登录成功");

}

}else{

if(NettyChannelMap.get(baseMsg.getClientId())==null){

//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录

LoginMsg loginMsg=new LoginMsg();

channelHandlerContext.channel().writeAndFlush(loginMsg);

}

}

switch (baseMsg.getType()){

case PING:{

PingMsg pingMsg=(PingMsg)baseMsg;

ReplyMsg replyPing=new ReplyMsg();

ReplyServerBody body = new ReplyServerBody("send server msg.");

replyPing.setBody(body);

NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);

System.err.println("ping over.");

}break;

case ASK:{

//收到客户端的请求

AskMsg askMsg=(AskMsg)baseMsg;

if("authToken".equals(askMsg.getParams().getAuth())){

ReplyServerBody replyBody=new ReplyServerBody("receive client askmsg:" + askMsg.getParams().getContent());

ReplyMsg replyMsg=new ReplyMsg();

replyMsg.setBody(replyBody);

NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);

}

}break;

case REPLY:{

//收到客户端回复

ReplyMsg replyMsg=(ReplyMsg)baseMsg;

ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();

UNCONNECT_NUM_S = 0;

System.out.println("UNCONNECT_NUM_S=" + UNCONNECT_NUM_S +",receive client replymsg: "+clientBody.getClientInfo());

}break;

default:break;

}

ReferenceCountUtil.release(baseMsg);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.err.println("in here has an error.");

NettyChannelMap.remove((SocketChannel)ctx.channel());

super.exceptionCaught(ctx, cause);

System.err.println("channel is exception over. (SocketChannel)ctx.channel()=" + (SocketChannel)ctx.channel());

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: