TCP/IP 长连接 心跳 重连 重发 线程
2013-11-08 15:39
281 查看
最近项目在整理后台JAVA采集程序的架构
C++做任务实例与调度的服务端
JAVA做接收客户端,执行完采集逻辑后上报数据给C++服务端
双方都需要实际服务接收与发送代码
我谈谈我的JAVA实现
JAVA服务端,采用多线程架构,允许多个客户端对服务端进行连接
消息使用同步队列进行FIFO处理
需要使用此队列来解决多线程访问队列的问题
服务接收线的代码片段:
TCPServer
客户端
tcpclient
// 监控线程
class MonitorThread implements Runnable {
重发
C++做任务实例与调度的服务端
JAVA做接收客户端,执行完采集逻辑后上报数据给C++服务端
双方都需要实际服务接收与发送代码
我谈谈我的JAVA实现
JAVA服务端,采用多线程架构,允许多个客户端对服务端进行连接
消息使用同步队列进行FIFO处理
public static ConcurrentLinkedQueue<String> sendMessageQueue = new ConcurrentLinkedQueue<String>();
需要使用此队列来解决多线程访问队列的问题
服务接收线的代码片段:
ServerSocket server = null; // 启动采集代理前置机TcpServer try { if (AgentStartInfo.agengDepInfo != null && AgentStartInfo.centerDepInfo != null) { int tcpServerPort = Integer.parseInt(String.valueOf(AgentStartInfo.agengDepInfo.get("PORT"))); server = new ServerSocket(tcpServerPort);// 启动服务端 log.debug("Java Front Tcp Server Start Up!...Listening Port at-->[" + tcpServerPort+"]"); System.out.println("Java Front Tcp Server Start Up!...Listening Port at-->[" + tcpServerPort+"]"); new TcpClient().start();// 启动客户端 Socket socket = null; while (true) { socket = server.accept(); new TCPServer(socket).start();// 启动处理线程 } } else { System.out.println("Agent DepInfo Is Null Or CenterDepInfo Is Null,Agent Is Start Error..."); log.error("Agent DepInfo Is Null Or CenterDepInfo Is Null,Agent Is Start Error..."); } } catch (Exception e) { System.out.println("Agent Start Error-->" + e.getMessage()); e.printStackTrace(); } finally { if (null != server) { try { server.close(); } catch (IOException e) { e.printStackTrace(); System.out.println("server close is error!-->" + e.getMessage()); } } }
TCPServer
try { clientAddress = socket.getRemoteSocketAddress();// 获取远程客户端IP与端口 log.debug("TCPServer Handling client at Address-->[" + clientAddress+"]"); System.out.println("TCPServer Handling client at Address-->[" + clientAddress+"]"); // 长连接实现机制 while (true) { if (socket.isConnected()) { header = null; footer = null; // readFully 意外会引起EOFExcption dis = new DataInputStream(socket.getInputStream()); // 服务器通过输入管道接收数据流 buffer = new byte[CACHE_BUFFER_SIZE]; // 缓冲区的大小 dis.readFully(buffer, 0, CACHE_BUFFER_SIZE);//阻塞方式一直读到固定的长度才可以 receiveJson = new String(buffer).trim(); log.debug("Indeed Size-->[" + receiveJson.getBytes().length + "],Original Json-->" + receiveJson); // 如果接收的消息为空则有可能客户端已经断掉,此处跳出循环 if (receiveJson.equals("")) { break; } if (receiveJson.length() >= 3) { // HBT hbt = receiveJson.substring(0, 3); // 忽略过滤心跳测试数据 2013/9/6 if (hbt.equals(MessageKey.msgHeartInfo)) { log.debug("HBT TIMES-->" + receiveJson); continue; } } receiveIndex = 0; // 00END while ((receiveIndex = receiveJson.indexOf(MessageKey.msgFooter, receiveIndex)) != -1) { receiveJson = receiveJson.substring(0, receiveIndex + MessageKey.msgFooter.length()); break; } log.debug("Receive Json-->" + receiveJson);
客户端
tcpclient
// 启动运行 public void run() { String jsonMsg = null; try { // 一定要连通才开始处理 while (true) { try { Thread.sleep(ReTrySleepTime); socket = new Socket(serverIp, serverPort);// 启动程序时连接 if (null != socket && socket.isConnected()) { break; } } catch (Exception e) { e.printStackTrace(); System.out.println("TcpClient Connect Error,5 Seconds Later Reconnect!-->" + e.getMessage()); } } // 提示已连接成功 System.out.println("TcpClient is Connected Success!-->[" + socket + "]"); // 启动心跳线程HBT new Thread(new MonitorThread(serverIp, serverPort)).start(); // 重发线程启动 new Thread(new ReSendThread()).start(); // 开始处理发送消息 while (true) { jsonMsg = DataInfoBean.sendMessageQueue.poll();// 从同步列表中获取要发送的数据 if (null != jsonMsg) { SendThread st = new SendThread(jsonMsg); executor.execute(st); System.out .println("TcpClient SendMsg...CompletedTaskCount-->" + executor.getCompletedTaskCount() + ",TaskCount-->" + executor.getTaskCount() + ",Queue Size-->" + executor.getQueue().size()); } Thread.sleep(1);// 延时 } } catch (Exception e) { e.printStackTrace(); System.out.println("TcpClient connect Error-->" + e.getMessage()); log.error("TcpClient connect Error-->" + e.getMessage()); } }
// 发送线程 class SendThread implements Runnable { static Logger log = LoggerFactory.getLogger(SendThread.class); private String json = ""; private final int CACHE_BUFFER_SIZE = 2048;// 缓存大小 public SendThread(String msg) { json = msg; } @Override public void run() { long begin = System.currentTimeMillis(); DataOutputStream dos = null; try { dos = new DataOutputStream(TcpClient.socket.getOutputStream()); byte[] sourceJsonByte = json.getBytes(); byte[] jsonByte = new byte[CACHE_BUFFER_SIZE]; for (int x = 0; x < sourceJsonByte.length; x++) { jsonByte[x] = sourceJsonByte[x]; } Thread.sleep(1); dos.write(jsonByte); dos.flush(); } catch (Exception e) { log.error("SendThread-->" + e.getMessage() + " Send Json-->" + json); e.printStackTrace(); } long end = System.currentTimeMillis(); long useTime = end - begin; // 发送时间超过阀值,加入重发队列 if (TcpClient.threhold > 0 && useTime > TcpClient.threhold) { DataInfoBean.reSendMessageQueue.add(json); System.out.println(Thread.currentThread().getName() + "-->ThreHold-->[" + TcpClient.threhold + "]SendThread Use Time(MS)-->" + useTime); } } }
// 监控线程
class MonitorThread implements Runnable {
static Logger log = LoggerFactory.getLogger(MonitorThread.class); private String serverIp; private int serverPort; private int retryCnt = 0;// 尝试次数 private final String HEART_BEAT_TEST = "HBT";// 发送心跳信息 // private int retryMaxTimes = 10;// 尝试最大次数 private int ReTrySleepTime = 5000;// 5s休眠时间 private final int CACHE_BUFFER_SIZE = 2048;// 缓存大小 private final int SendFreqSleepTime = 30000;// 30s发一个HBT public MonitorThread(String ip, int port) { serverIp = ip; serverPort = port; } // 重连 public void reconnect() { Boolean RetrySucFlag = false; while (true) { try { retryCnt = retryCnt + 1; System.out.println("[" + System.currentTimeMillis() + "]MonitorThread Reconnect Times-->" + retryCnt); log.error("[" + System.currentTimeMillis() + "]MonitorThread Reconnect Times-->" + retryCnt); Thread.sleep(ReTrySleepTime); TcpClient.socket = new Socket(serverIp, serverPort); if (null != TcpClient.socket && TcpClient.socket.isConnected()) { RetrySucFlag = true; break; } } catch (Exception e) { e.printStackTrace(); // if (retryCnt >= retryMaxTimes) { // System.out.println("MonitorThread is Already Retry " + // retryMaxTimes // + " Times!MonitorThread is Quit!"); // break; // } } } if (RetrySucFlag) { System.out.println("[" + System.currentTimeMillis() + "]MonitorThread ReConnect Success!-->" + TcpClient.socket); log.debug("[" + System.currentTimeMillis() + "]MonitorThread ReConnect Success!-->" + TcpClient.socket); run(); } } @Override public void run() { retryCnt = 0; DataOutputStream dos = null; try { while (true) { if (null != TcpClient.socket && TcpClient.socket.isConnected()) { dos = new DataOutputStream(TcpClient.socket.getOutputStream()); byte[] sourceJsonByte = HEART_BEAT_TEST.getBytes(); byte[] jsonByte = new byte[CACHE_BUFFER_SIZE]; for (int x = 0; x < sourceJsonByte.length; x++) { jsonByte[x] = sourceJsonByte[x]; } dos.write(jsonByte); dos.flush(); log.debug("MonitorThread-->" + HEART_BEAT_TEST); Thread.sleep(SendFreqSleepTime);// 30秒后发送心跳 } else { break; } } } catch (Exception e) { log.error("MonitorThread-->" + e.getMessage()); e.printStackTrace(); } finally { try { if (TcpClient.socket != null) { TcpClient.socket.close(); } reconnect();// 重连 } catch (IOException ex) { ex.printStackTrace(); } } } }
重发
// 重发线程 class ReSendThread implements Runnable { static Logger log = LoggerFactory.getLogger(ReSendThread.class); String jsonMsg; @Override public void run() { try { while (true) { jsonMsg = DataInfoBean.reSendMessageQueue.poll();// 从同步列表中获取要发送的数据 if (null != jsonMsg) { System.out.println(Thread.currentThread().getName() + "-->ReSendThread Send Message-->"+jsonMsg); log.debug("ReSendThread Send Message-->" + jsonMsg); new Thread(new SendThread(jsonMsg)).start(); } Thread.sleep(100);// 延时 } } catch (InterruptedException e) { e.printStackTrace(); log.error("ReSendThread Error-->"+e.getMessage()); } } }
相关文章推荐
- Netty4 Tcp长连接、断开重连、心跳监测、Msgpack编码解码
- TCP/IP连接为什么要三次握手,而不是两次
- TCP/IP,http,socket,长连接,短连接
- .tcpip.sys文件破坏导致本地连接无法修复也无法上网
- 到主机 的 TCP/IP 连接失败。 java.net.ConnectException: Connection refused: connect
- linux网络编程之TCP/IP基础(四):TCP连接的建立和断开、滑动窗口
- com.microsoft.sqlserver.jdbc.SQLServerException: 到主机 的 TCP/IP 连接失败。 java.net.ConnectException: Connection timed out: connect数据库
- TCP/IP源码学习(52)——TCP的连接过程的实现(1)
- linux网络编程之TCP/IP基础(四):TCP连接的建立和断开、滑动窗口
- 连接Sql2005报的错误:Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 到主机 的 TCP/IP 连接失败
- 本地连接属性中没有internet协议(TCP/IP)
- TCP/IP,http,socket,长连接,短连接
- TCP/IP,http,socket,长连接,短连接
- Netstat用于显示与IP、TCP、UDP和ICMP协议相关的统计数据,一般用于检验本机各端口的网络连接情况
- SQL SERVER 2008 EXPRESS版本远程连接(tcp/ip)
- SQL Server 2005 提示“到主机 的 TCP/IP 连接失败”错误
- tcp连接检测及重连
- TCP/IP,http,socket,长连接,短连接
- 伪造IP包,禁止TCP连接
- linux网络编程之TCP/IP基础(四):TCP连接的建立和断开、滑动窗口