您的位置:首页 > 其它

Mina服务端客户端心跳机制

2017-12-01 12:20 633 查看
心跳:

1、自定义数据包,在业务逻辑里接收,客户端判断读写空闲,大于一个半的心跳之后,在空闲处理里将session关闭,注册的监听里断线重连

2、tcp的keepalive,服务端、客户端相互发确认心跳,设置好对应的心跳场景(服务端写空闲大于n秒后,每隔一个时间周期发送一个心跳1111,客户端读空闲大于n秒后,每隔一个时间周期发送一个心跳2222) 客户端设置超时时间,大于超时时间后,没有收到服务端心跳,认为连接不可用,主动关闭连接

代码实例:

package com.ydzq.hq.hk.mina.connect;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.charset.Charset;

import org.apache.mina.core.service.IoAcceptor;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.textline.TextLineCodecFactory;

import org.apache.mina.filter.keepalive.KeepAliveFilter;

import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;

import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;

import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class Server {

    private static final int PORT = 9000;

    /** 30秒后超时 */

    private static final int IDELTIMEOUT = 30;

    /** 15秒发送一次心跳包 */

    private static final int HEARTBEATRATE = 15;

    private static Logger log = LoggerFactory.getLogger(Server.class);

    public static void main(String[] args) throws IOException {

        IoAcceptor acceptor = new NioSocketAcceptor();

        acceptor.getSessionConfig().setReadBufferSize(1024);

        acceptor.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE,IDELTIMEOUT);    

        acceptor.getFilterChain().addLast("logger", new LoggingFilter());

        acceptor.getFilterChain().addLast("codec1", new ProtocolCodecFilter(new TextLineCodecFactory()));

        KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();

        //下面注释掉的是自定义Handler方式

        KeepAliveRequestTimeoutHandler heartBeatHandler = new

                                KeepAliveRequestTimeoutHandlerImpl();

        KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,

                IdleStatus.WRITER_IDLE, heartBeatHandler);    

//        KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,

//                IdleStatus.READER_IDLE);

        //设置是否forward到下一个filter

        heartBeat.setForwardEvent(true);

        //设置心跳频率

        heartBeat.setRequestInterval(HEARTBEATRATE);

        acceptor.getFilterChain().addLast("heartbeat", heartBeat);

        acceptor.setHandler(new MyIoHandler());

        acceptor.bind(new InetSocketAddress(PORT));

        log.info("Server started on port: " + PORT);

    }

}

package com.ydzq.hq.hk.mina.connect;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

 * Provides keep-alive messages to KeepAliveFilter.

 *

 */

public  class KeepAliveMessageFactoryImpl implements

            KeepAliveMessageFactory {

    private  Logger log = LoggerFactory.getLogger(this.getClass());

     /** 心跳包内容 */

    private static final String SERVERHEARTBEATREQUEST = "1111";

    private static final String SERVERHEARTBEATRESPONSE = "1112";

        /*

         *返回给客户端的心跳包数据 return 返回结果才是客户端收到的心跳包数据

         */

        @Override

        public Object getRequest(IoSession session) {

            return SERVERHEARTBEATREQUEST;

        }

        /*

         *"接受到的客户端数据包"

         */

        @Override

        public Object getResponse(IoSession session, Object request) {

            //log.info(request.toString());

            return request;

        }

        /*

         * 判断是否是客户端发送来的的心跳包此判断影响 KeepAliveRequestTimeoutHandler实现类

         * 判断是否心跳包发送超时

         */

        @Override

        public boolean isRequest(IoSession session, Object message) {

            if(message.equals(SERVERHEARTBEATRESPONSE)){

                log.info("正常:   "+"接收到客户端心数据包引发心跳事件                 心跳数据包是》》" + message);

            return true;

        }

            //log.info("不正常:  "+"客户端发过来的不是"+SERVERHEARTBEATRESPONSE+"  那服务器端收到的是:"+message.toString());

            return false;

        }

        /*

               判断发送信息是否是心跳数据包此判断影响 KeepAliveRequestTimeoutHandler实现类

         * 判断是否心跳包发送超时

         */

        @Override

        public boolean isResponse(IoSession session, Object message) {

            if(message.equals(SERVERHEARTBEATREQUEST)){

                //log.info("服务器发送数据包中引发心跳事件: " + message);

                return true;

            }

            return false;

        }

    }

package com.ydzq.hq.hk.mina.connect;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.keepalive.KeepAliveFilter;

import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

 * 服务器端 心跳异常处理器

 * Tells KeepAliveFilter what to do

 * when a keep-alive response message was not received within a certain timeout.

 *

 */

public  class KeepAliveRequestTimeoutHandlerImpl implements

            KeepAliveRequestTimeoutHandler {

    private  Logger log = LoggerFactory.getLogger(this.getClass());

        @Override

        public void keepAliveRequestTimedOut(KeepAliveFilter filter,

                IoSession session) throws Exception {

            log.info("《*服务器端*》心跳包发送超时处理(及长时间没有发送(接受)心跳包)");

        }

    }

package com.ydzq.hq.hk.mina.connect;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import javax.annotation.Resource;

import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IoSession;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MyIoHandler extends IoHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(MyIoHandler.class);

    private int
4000
count = 0;

    private static int i = 0;

    ExecutorService executor = Executors.newCachedThreadPool();

    @Override

    public void sessionOpened(IoSession session) throws Exception {

        count++;

        log.info("第 " + count + " 个 client 登陆!address: : " + session.getRemoteAddress());

        executor.submit(new Callable<String>() {

            public String loop() {

                try {

                    while (true) {

                        session.write(i);

                        log.info(i + "");

                        i++;

                        Thread.sleep(10000);

                    }

                } catch (Exception e) {

                } finally {

                }

                return null;

            }

            @Override

            public String call() throws Exception {

                return loop();

            }

        });

    }

    @Override

    public void sessionClosed(IoSession session) throws Exception {

    }

    @Override

    public void messageReceived(IoSession session, Object message) throws Exception {

    }

}

########################################客户端##############################################

package com.ydzq.hq.hk.mina.connect;

import java.net.InetSocketAddress;

import java.text.SimpleDateFormat;

import java.util.Date;

import org.apache.mina.core.RuntimeIoException;

import org.apache.mina.core.filterchain.IoFilterAdapter;

import org.apache.mina.core.future.ConnectFuture;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.textline.TextLineCodecFactory;

import org.apache.mina.filter.keepalive.KeepAliveFilter;

import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;

import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;

import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.filter.stream.StreamWriteFilter;

import org.apache.mina.transport.socket.SocketConnector;

import org.apache.mina.transport.socket.nio.NioSocketConnector;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import com.ydzq.hq.hk.job.QuoteConstrants;

import com.ydzq.hq.hk.mina.handler.MoonHkHqBufferHandler;

import com.ydzq.hq.hk.mina.listener.IoListener;

/**

 * mina客户端链接

 *

 * @author f.w

 */

@Component

public class MinaConnect {

    private static Logger log = LoggerFactory.getLogger(MinaConnect.class);

    @Autowired

    private MoonHkHqBufferHandler handler;

    private int reconnect_time = 3000;

    private IoSession session;

    @Value("${hk_server_ip}")

    private String ip;

    @Value("${hk_server_port}")

    private int port;

    private void connect(NioSocketConnector connector) {

        for (;;) {

            try {

                ConnectFuture future = connector.connect();

                future.awaitUninterruptibly();

                session = future.getSession();

                if (session.isConnected()) {

                    log.info("socketconnect[" + connector.getDefaultRemoteAddress().getHostName() + ":"

                            + connector.getDefaultRemoteAddress().getPort() + "] millisecond");

                    break;

                }

            } catch (Exception ex) {

                log.info("socketconnect[" + connector.getDefaultRemoteAddress().getHostName() + ":"

                        + connector.getDefaultRemoteAddress().getPort() + "] fail. retry after " + reconnect_time

                        + " millisecond, error:" + ex.getMessage());

            } finally {

                try {

                    Thread.sleep(reconnect_time);

                } catch (InterruptedException e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                }

            }

        }

    }

    public void connect() {

        log.info("start socketconnect  " + ip + ":" + port);

        NioSocketConnector connector = new NioSocketConnector();

        connector.setConnectTimeoutMillis(IDELTIMEOUT*1000);

        connector.getFilterChain().addLast("logger", new LoggingFilter());

        connector.getFilterChain().addLast("codec1", new ProtocolCodecFilter(new TextLineCodecFactory()));

        KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();

        // 下面注释掉的是自定义Handler方式

        KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();

        KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.READER_IDLE, heartBeatHandler);

        // 设置是否forward到下一个filter

        heartBeat.setForwardEvent(true);

        // 设置心跳频率

        heartBeat.setRequestInterval(HEARTBEATRATE);

        connector.getFilterChain().addLast("heartbeat", heartBeat);

        connector.getFilterChain().addLast("codec", new StreamWriteFilter());

        connector.setHandler(handler);//

        connector.setDefaultRemoteAddress(new InetSocketAddress(ip, port));

        connector.getSessionConfig().setKeepAlive(true);

//        connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 45);

        connector.addListener(new IoListener() {

            @Override

            public void sessionDestroyed(IoSession arg0) throws Exception {

                connect(connector);

            }

        });

        connect(connector);

    }

    /** 30秒后超时 */

    private static final int IDELTIMEOUT = 30;

    /** 15秒发送一次心跳包 */

    private static final int HEARTBEATRATE = 15;

}

package com.ydzq.hq.hk.mina.connect;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

 * 客户端

 * Provides keep-alive messages to KeepAliveFilter.

 *

 */

public  class KeepAliveMessageFactoryImpl implements

            KeepAliveMessageFactory {

     /** 心跳包内容  刚好和服务器相反*/

    private static final String CLIENTHEARTBEATREQUEST = "1112";//客户端发

    private static final String CLIENTHEARTBEATRESPONSE = "1111";//客户端收的服务端信息

    private  Logger log = LoggerFactory.getLogger(this.getClass());

        /*

         * 返回服务端消息

         */

        @Override

        public Object getRequest(IoSession session) {

            return CLIENTHEARTBEATREQUEST;

        }

        

        

        /*

         *"接受到的服务器数据包

         */

        @Override

        public Object getResponse(IoSession session, Object request) {

            //log.info(request.toString());

            return request;

        }

        //客户端接收到服务器发送的数据

        @Override

        public boolean isRequest(IoSession session, Object message) {

            if(message.equals(CLIENTHEARTBEATRESPONSE)){

                log.info("接收到服务器心数据包引发心跳事件                 心跳数据包是》》" + message);

            return true;

        }

            return false;

        }

        //判断客户端发送的是否是客户端请求消息

        @Override

        public boolean isResponse(IoSession session, Object message) {

            if(message.equals(CLIENTHEARTBEATREQUEST)){

                //ålog.info("客户端发送数据包中引发心跳事件: " + message);

                return true;

            }

            return false;

        }

    }

package com.ydzq.hq.hk.mina.connect;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.keepalive.KeepAliveFilter;

import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

 * 客户端 心跳异常处理器

 * Tells KeepAliveFilter what to do

 * when a keep-alive response message was not received within a certain timeout.

 *

 */

public  class KeepAliveRequestTimeoutHandlerImpl implements

            KeepAliveRequestTimeoutHandler {

    private  Logger log = LoggerFactory.getLogger(this.getClass());

        @Override

        public void keepAliveRequestTimedOut(KeepAliveFilter filter,

                IoSession session) throws Exception {

            log.info("《*客户端*》心跳包发送超时处理(及长时间没有发送(接受)心跳包)");

        }

    }

package com.ydzq.hq.hk.mina.handler;

import java.nio.ByteBuffer;

import java.nio.ByteOrder;

import java.util.ArrayList;

import java.util.Calendar;

import java.util.Date;

import java.util.List;

import java.util.TimeZone;

import javax.annotation.Resource;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.common.serialization.ByteArraySerializer;

import org.apache.kafka.common.serialization.LongSerializer;

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import com.ydzq.hq.config.KafkaProducerConfig;

import com.ydzq.hq.exception.BreakLoopException;

import com.ydzq.hq.vo.HqSnapVo;

@Component

public class MoonHkHqBufferHandler extends IoHandlerAdapter {

    Logger datalog = LoggerFactory.getLogger(MoonHkHqBufferHandler.class);

    //

    // @Autowired

    // private HeartBeatHkMonitorThread heartBeatHkMonitorThread;

    public void sessionCreated(IoSession session) throws Exception {

        // Empty handler

    }

    @Override

    public void sessionClosed(IoSession session) throws Exception {

        datalog.info("socketconnect disconnect ");

        if (saveBuffer != null)

            datalog.info("position:" + saveBuffer.position() + " limit:" + saveBuffer.limit() + " capacity:"

                    + saveBuffer.capacity() + " remain:" + saveBuffer.remaining());

        saveBuffer.clear();

    }

    @Override

    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {

//        datalog.info("socketconnectIdle - " + status.toString());

//        if (session != null) {

//            session.close(true);

//        }

    }

    @Override

    public void messageSent(IoSession session, Object message) throws Exception {

        // Empty handler

    }

    @Override

    // 客服端连接

    public void sessionOpened(IoSession session) throws Exception {

        datalog.info("I am coming!");

    }

    @Override

    public void messageReceived(IoSession session, Object message) {

        datalog.warn("客户端收到消息:" + message);

    }

    @Override

    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {

        // super.exceptionCaught(session, cause);

        session.close(false);

    }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: