您的位置:首页 > 理论基础 > 计算机网络

TCP/IP 长连接 心跳 重连 重发 线程

2013-11-08 15:39 281 查看
最近项目在整理后台JAVA采集程序的架构

C++做任务实例与调度的服务端

JAVA做接收客户端,执行完采集逻辑后上报数据给C++服务端

双方都需要实际服务接收与发送代码

我谈谈我的JAVA实现

JAVA服务端,采用多线程架构,允许多个客户端对服务端进行连接

消息使用同步队列进行FIFO处理

public static ConcurrentLinkedQueue<String> sendMessageQueue = new ConcurrentLinkedQueue<String>();

需要使用此队列来解决多线程访问队列的问题

服务接收线的代码片段:

ServerSocket server = null;
// 启动采集代理前置机TcpServer
try {
if (AgentStartInfo.agengDepInfo != null && AgentStartInfo.centerDepInfo != null) {
int tcpServerPort = Integer.parseInt(String.valueOf(AgentStartInfo.agengDepInfo.get("PORT")));
server = new ServerSocket(tcpServerPort);// 启动服务端
log.debug("Java Front Tcp Server Start Up!...Listening Port at-->[" + tcpServerPort+"]");
System.out.println("Java Front Tcp Server Start Up!...Listening Port at-->[" + tcpServerPort+"]");
new TcpClient().start();// 启动客户端
Socket socket = null;
while (true) {
socket = server.accept();
new TCPServer(socket).start();// 启动处理线程
}
} else {
System.out.println("Agent DepInfo Is Null Or CenterDepInfo Is Null,Agent Is Start Error...");
log.error("Agent DepInfo Is Null Or CenterDepInfo Is Null,Agent Is Start Error...");
}
} catch (Exception e) {
System.out.println("Agent Start Error-->" + e.getMessage());
e.printStackTrace();
} finally {
if (null != server) {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
System.out.println("server close is error!-->" + e.getMessage());
}
}
}

TCPServer

try {
clientAddress = socket.getRemoteSocketAddress();// 获取远程客户端IP与端口
log.debug("TCPServer Handling client at Address-->[" + clientAddress+"]");
System.out.println("TCPServer Handling client at Address-->[" + clientAddress+"]");

// 长连接实现机制
while (true) {
if (socket.isConnected()) {
header = null;
footer = null;
// readFully 意外会引起EOFExcption
dis = new DataInputStream(socket.getInputStream()); // 服务器通过输入管道接收数据流
buffer = new byte[CACHE_BUFFER_SIZE]; // 缓冲区的大小
dis.readFully(buffer, 0, CACHE_BUFFER_SIZE);//阻塞方式一直读到固定的长度才可以
receiveJson = new String(buffer).trim();

log.debug("Indeed Size-->[" + receiveJson.getBytes().length + "],Original Json-->" + receiveJson);

// 如果接收的消息为空则有可能客户端已经断掉,此处跳出循环
if (receiveJson.equals("")) {
break;
}
if (receiveJson.length() >= 3) {
// HBT
hbt = receiveJson.substring(0, 3);
// 忽略过滤心跳测试数据 2013/9/6
if (hbt.equals(MessageKey.msgHeartInfo)) {
log.debug("HBT TIMES-->" + receiveJson);
continue;
}
}
receiveIndex = 0;
// 00END
while ((receiveIndex = receiveJson.indexOf(MessageKey.msgFooter, receiveIndex)) != -1) {
receiveJson = receiveJson.substring(0, receiveIndex + MessageKey.msgFooter.length());
break;
}

log.debug("Receive Json-->" + receiveJson);


客户端

tcpclient

// 启动运行
public void run() {
String jsonMsg = null;
try {
// 一定要连通才开始处理
while (true) {
try {
Thread.sleep(ReTrySleepTime);
socket = new Socket(serverIp, serverPort);// 启动程序时连接
if (null != socket && socket.isConnected()) {
break;
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("TcpClient Connect Error,5 Seconds Later Reconnect!-->" + e.getMessage());
}
}

// 提示已连接成功
System.out.println("TcpClient is Connected Success!-->[" + socket + "]");
// 启动心跳线程HBT
new Thread(new MonitorThread(serverIp, serverPort)).start();
// 重发线程启动
new Thread(new ReSendThread()).start();
// 开始处理发送消息
while (true) {
jsonMsg = DataInfoBean.sendMessageQueue.poll();// 从同步列表中获取要发送的数据
if (null != jsonMsg) {
SendThread st = new SendThread(jsonMsg);
executor.execute(st);
System.out
.println("TcpClient SendMsg...CompletedTaskCount-->" + executor.getCompletedTaskCount()
+ ",TaskCount-->" + executor.getTaskCount() + ",Queue Size-->"
+ executor.getQueue().size());
}
Thread.sleep(1);// 延时
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("TcpClient connect Error-->" + e.getMessage());
log.error("TcpClient connect Error-->" + e.getMessage());
}
}


// 发送线程
class SendThread implements Runnable {
static Logger log = LoggerFactory.getLogger(SendThread.class);

private String json = "";
private final int CACHE_BUFFER_SIZE = 2048;// 缓存大小

public SendThread(String msg) {
json = msg;
}

@Override
public void run() {
long begin = System.currentTimeMillis();
DataOutputStream dos = null;
try {
dos = new DataOutputStream(TcpClient.socket.getOutputStream());
byte[] sourceJsonByte = json.getBytes();
byte[] jsonByte = new byte[CACHE_BUFFER_SIZE];

for (int x = 0; x < sourceJsonByte.length; x++) {
jsonByte[x] = sourceJsonByte[x];
}
Thread.sleep(1);
dos.write(jsonByte);
dos.flush();
} catch (Exception e) {
log.error("SendThread-->" + e.getMessage() + " Send Json-->" + json);
e.printStackTrace();
}
long end = System.currentTimeMillis();
long useTime = end - begin;
// 发送时间超过阀值,加入重发队列
if (TcpClient.threhold > 0 && useTime > TcpClient.threhold) {
DataInfoBean.reSendMessageQueue.add(json);
System.out.println(Thread.currentThread().getName() + "-->ThreHold-->[" + TcpClient.threhold
+ "]SendThread Use Time(MS)-->" + useTime);
}
}
}


// 监控线程

class MonitorThread implements Runnable {

static Logger log = LoggerFactory.getLogger(MonitorThread.class);

private String serverIp;
private int serverPort;
private int retryCnt = 0;// 尝试次数
private final String HEART_BEAT_TEST = "HBT";// 发送心跳信息
// private int retryMaxTimes = 10;// 尝试最大次数
private int ReTrySleepTime = 5000;// 5s休眠时间
private final int CACHE_BUFFER_SIZE = 2048;// 缓存大小
private final int SendFreqSleepTime = 30000;// 30s发一个HBT

public MonitorThread(String ip, int port) {
serverIp = ip;
serverPort = port;
}

// 重连
public void reconnect() {
Boolean RetrySucFlag = false;
while (true) {
try {
retryCnt = retryCnt + 1;
System.out.println("[" + System.currentTimeMillis() + "]MonitorThread Reconnect Times-->" + retryCnt);
log.error("[" + System.currentTimeMillis() + "]MonitorThread Reconnect Times-->" + retryCnt);
Thread.sleep(ReTrySleepTime);
TcpClient.socket = new Socket(serverIp, serverPort);
if (null != TcpClient.socket && TcpClient.socket.isConnected()) {
RetrySucFlag = true;
break;
}
} catch (Exception e) {
e.printStackTrace();
// if (retryCnt >= retryMaxTimes) {
// System.out.println("MonitorThread is Already Retry " +
// retryMaxTimes
// + " Times!MonitorThread is Quit!");
// break;
// }
}
}
if (RetrySucFlag) {
System.out.println("[" + System.currentTimeMillis() + "]MonitorThread ReConnect Success!-->"
+ TcpClient.socket);
log.debug("[" + System.currentTimeMillis() + "]MonitorThread ReConnect Success!-->" + TcpClient.socket);
run();
}
}

@Override
public void run() {
retryCnt = 0;
DataOutputStream dos = null;
try {
while (true) {
if (null != TcpClient.socket && TcpClient.socket.isConnected()) {
dos = new DataOutputStream(TcpClient.socket.getOutputStream());
byte[] sourceJsonByte = HEART_BEAT_TEST.getBytes();
byte[] jsonByte = new byte[CACHE_BUFFER_SIZE];

for (int x = 0; x < sourceJsonByte.length; x++) {
jsonByte[x] = sourceJsonByte[x];
}
dos.write(jsonByte);
dos.flush();
log.debug("MonitorThread-->" + HEART_BEAT_TEST);
Thread.sleep(SendFreqSleepTime);// 30秒后发送心跳
} else {
break;
}
}
} catch (Exception e) {
log.error("MonitorThread-->" + e.getMessage());
e.printStackTrace();
} finally {
try {
if (TcpClient.socket != null) {
TcpClient.socket.close();
}
reconnect();// 重连
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}


重发

// 重发线程
class ReSendThread implements Runnable {
static Logger log = LoggerFactory.getLogger(ReSendThread.class);
String jsonMsg;

@Override
public void run() {
try {
while (true) {
jsonMsg = DataInfoBean.reSendMessageQueue.poll();// 从同步列表中获取要发送的数据
if (null != jsonMsg) {
System.out.println(Thread.currentThread().getName() + "-->ReSendThread Send Message-->"+jsonMsg);
log.debug("ReSendThread Send Message-->" + jsonMsg);
new Thread(new SendThread(jsonMsg)).start();
}
Thread.sleep(100);// 延时
}
} catch (InterruptedException e) {
e.printStackTrace();
log.error("ReSendThread Error-->"+e.getMessage());
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: