socket 长连接(心跳,延时检查)
2017-11-01 11:29
316 查看
package cn.test.socket; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; /** * 心跳包对象 * @author zhouy * */ public class KeepAlive implements Serializable{ private static final long serialVersionUID = 2949229070705266367L; private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public String toString() { return sdf.format(new Date()); } }
package cn.test.socket; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; import java.util.concurrent.ConcurrentHashMap; /** * c/s架构的客户端 * @author zhouy * */ public class Client { private String serverIp; private int port; private volatile boolean running = false;//连接状态 private Socket socket; public Client(String serverIp,int port){ this.serverIp = serverIp; this.port = port; } private long lastSendTime; //最后一次发送数据的时间 /** * 处理服务端发回的对象,可实现该接口。 */ public static interface ObjectAction{ void doAction(Object obj,Client client); } public static final class DefaultObjectAction implements ObjectAction{ public void doAction(Object obj,Client client) { System.out.println("处理:\t"+obj.toString()); } } //用于保存接收消息对象类型及该类型消息处理的对象 private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class,ObjectAction>(); public void start() throws UnknownHostException, IOException { if(running)return; socket = new Socket(serverIp,port); System.out.println("本地端口:"+socket.getLocalPort()); lastSendTime=System.currentTimeMillis(); running=true; new Thread(new KeepAliveWatchDog()).start(); //保持长连接的线程,每隔2秒项服务器发一个一个保持连接的心跳消息 new Thread(new ReceiveWatchDog()).start(); //接受消息的线程,处理消息 } public void stop(){ if(running)running=false; } public void sendObject(Object obj) throws IOException { ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeObject(obj.toString()); System.out.println("发送:\t"+obj); oos.flush(); } /** * 发送数据线程类 * @author zhouy * */ private class KeepAliveWatchDog implements Runnable{ private final int delay =10; private final int keepAliveDelay = 2000;//间隔两秒发送一次 @Override public void run() { while(running){ if(System.currentTimeMillis()-lastSendTime>keepAliveDelay){ try { Client.this.sendObject(new KeepAlive()); lastSendTime = System.currentTimeMillis(); } catch (IOException e) { e.printStackTrace(); Client.this.stop(); } }else{ try { //当间隔时间没到,延迟10mills Thread.currentThread().sleep(delay); } catch (InterruptedException e) { e.printStackTrace(); Client.this.stop(); } } } } } /** * 接受消息处理线程 */ private class ReceiveWatchDog implements Runnable{ @Override public void run() { while(running){ try { InputStream is = socket.getInputStream(); if(is.available()>0){ ObjectInputStream ois = new ObjectInputStream(is); Object obj = ois.readObject(); System.out.println("接受:\t"+obj.toString()); ObjectAction oa = actionMapping.get(obj.getClass()); oa = oa==null?new DefaultObjectAction():oa; oa.doAction(obj, Client.this); }else{ Thread.sleep(10); } } catch (Exception e) { e.printStackTrace(); Client.this.stop(); } } } } public static void main(String[] args) { Client client = new Client("127.0.0.1", 8888); try { client.start(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
package cn.test.socket; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import javax.print.attribute.standard.Severity; /** * 服务端 * @author zhouy * */ public class Server { /** * 要处理客户端发来的对象,并返回一个对象,可实现该接口。 */ public interface ObjectAction{ Object doAction(Object rev, Server server); } public static final class DefaultObjectAction implements ObjectAction{ public Object doAction(Object rev,Server server) { System.out.println("处理并返回:"+rev); return rev; } } private int port; private int maxConnect; private volatile boolean running = false;//运行状态 private long receiveTimeDelay = 3000; private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class,ObjectAction>(); public void addActionMap(Class<Object> cls,ObjectAction action){ actionMapping.put(cls, action); } private Thread connWatchDog; public Server(int port,int maxConnect){ this.port = port; this.maxConnect = maxConnect; } public void start(){ if(running)return; running=true; connWatchDog = new Thread(new ConnWatchDog()); connWatchDog.start(); } public void stop(){ if(running)running=false; } /** * * @author zhouy * */ private class ConnWatchDog implements Runnable{ @Override public void run() { try { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(port), maxConnect); while(running){ Socket socket = server.accept(); Thread t = new Thread(new SocketAction(socket)); t.start(); } server.close(); } catch (IOException e) { e.printStackTrace(); Server.this.stop(); } } } /** * 监听客户端请求,超时 * @author zhouy * */ class SocketAction implements Runnable{ private Socket socket; private boolean run = true; private long lastReceiveTime = System.currentTimeMillis(); public SocketAction(Socket socket){ this.socket = socket; } @Override public void run() { while(running&&run){ try { if(System.currentTimeMillis()-lastReceiveTime>receiveTimeDelay){ //超时处理 overthis(); }else{ InputStream is = socket.getInputStream(); if(is.available()>0){ ObjectInputStream ois = new ObjectInputStream(is); ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); Object obj = ois.readObject(); //接受到服务端发送的数据,将时间重置 lastReceiveTime = System.currentTimeMillis(); System.out.println("服务端接受到的数据"+obj.toString()); ObjectAction oa = actionMapping.get(obj.getClass()); oa = oa ==null?new DefaultObjectAction():oa; oa.doAction(obj, Server.this); if(oa!=null){ //写回客户端 oos.writeObject(new KeepAlive()); oos.flush(); } }else{ Thread.currentThread().sleep(10); } } } catch (Exception e) { e.printStackTrace(); overthis(); } } } private void overthis(){ if(run) run=false; if(socket!=null){ try { System.out.println("关闭:"+socket.getRemoteSocketAddress()); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } 4000 } public static void main(String[] args) { Server server = new Server(8888, 3); server.start(); } }
大概意思:
客户端间隔两秒发送数据,服务端进行延时检查,超过时间就断开连接。
1. keepalive心跳对象,必须要序列化
2. 客户端,心跳机制,间隔两秒向服务端用sendObject发送数据
3. 服务端,延迟机制,延迟3秒以上的断开连接,支持多个客户端的联接,一个服务端,多客户端的机制。
相关文章推荐
- Socket 长连接与短连接,心跳针
- Java,在Windows平台上使用Socket.sendUrgentData() 来检查连接有效性是不可靠的
- Socket编程-长连接与短连接,心跳(keep-alive)
- 如何在socket编程的Tcp连接中实现心跳协议
- [Golang] 从零开始写Socket Server(3): 对长、短连接的处理策略(模拟心跳)
- 非阻塞socket调用connect, epoll和select检查连接情况示例
- 消息推送之Socket----长连接和心跳
- 基于心跳的socket长连接
- Android socket与服务器通信及心跳连接的实现
- 【转】Socket 长连接与短连接,心跳
- python socket 编程之三:长连接、短连接以及心跳(转药师Aric的文章)
- 非阻塞socket调用connect, epoll和select检查连接情况示例
- 非阻塞socket调用connect, epoll和select检查连接情况示例
- Socket 长连接与短连接,心跳
- Socket 长连接与短连接,心跳
- Socket 长连接,短连接以及心跳(keep-alive)概念
- Socket通讯长连接和短连接、心跳
- linux下socket连接下的心跳机制
- Socket 长连接 短连接 心跳 JAVA SOCKET编程
- GCDAsyncSocket实现TCP连接怎么设置发送数据延时