您的位置:首页 > 其它

webSocket中并发的策略

2014-09-03 10:01 99 查看
问题描述:当客户端发送请求建立连接以后,服务器会采用一种数据结构来维护这个连接,具体体现就是每次连接,服务器就会产生一个Session,这个Session可以唯一标识这个通道,发数据的时候会把指定的数据放到特定的通道里面去。那么并发情况下,如何实现呢?

从开始一个最简单的webSocket实例跑通了以后就慢慢考虑并发的问题,策略换了好几种,不过都是大家讨论后的结果,由于水平有限不一定是最优的方案。

服务器端通过继承父类的handleTextMessage方法来处理产生的session和从客户端传过来的参数。关于采用什么样的数据结构来存储session和message讨论出来了很多方案。

protected void handleTextMessage(WebSocketSession session,
TextMessage message) throws Exception {
super.handleTextMessage(session, message);
}
发送消息的时候需要把参数转化成TextMessage类型,然后调用通道的sendMessage方法,如下:

public void sendMessage(String  request) throws IOException {
TextMessage returnMessage = new TextMessage(request);
session.sendMessage(returnMessage);
}
}
那么并发情况下,由于是多个客户端连接,需要建立多个通道,通道可以在handleTextMessage的方法里面采用一种数据结构收集起来,然后在sendMessage方法中指定通道然后发送对应的消息。在前一篇文章中提到我们项目中已经做好了一套通信机制:终端发过来的数据通过NIOServer转发到activeMQ中,也就是说所有的消息可以通过activeMQ的出口得到,那么这里就是一种观察者模式的使用了,也就是多个webSocket通道就是多个观察者,而这个监听的出口就是被观察者,当消息队列出口出来一条消息的时候,就类似于被观察者的发生变化了然后通知多个webSocket通道,webSocket通道知道出来消息了马上就检查这条消息是不是自己要的,如果是的话就发送到当前通道中。在handleTextMessage方法中,会从客户端传回来一个参数message,这个message就是表示通道订阅的消息,也就是每次从队列里面出来一条消息,我就检查这条消息是否是我订阅的,即将消息解析后然后和message参数比对。不知道我有没有说清楚,应该是不难理解的。

那么采用一种什么样的数据结构来存放session和message呢,之前我的朋友们做了很多尝试,采用map+list,或者用双list,都能够解决问题,但是后来大家达成共识,采用HashMap<Session,message>来存储比较好操作,我可以在在通道建立的时候将session和message放到map中。这个map的全局定义:

private static Map<WebSocketSession,String> map=new HashMap<WebSocketSession, String>();
 
protected void handleTextMessage(WebSocketSession session,
TextMessage message) throws Exception {
WebSocketSession tempSession = session;
String tempMessage = message.getPayload();
map.put(tempSession,tempMessage);
super.handleTextMessage(session, message);
}
Session里面是服务器生成的一个随机数,message是从前台传过来的参数,也就是订阅消息的格式,采用上述这个结果存放基本上解决了问题,我在发送消息的时候只需要检测从activeMQ出来的消息中是否是我定于的message就行,当然这个地方需要遍历hashmap来判断map.get(key).equal("参数");这样发送的时候没有问题了,当然这种模式下发送消息我可以讲消息发生到指定通道,但是只能通过遍历的方式,因为我们没有为所以的通道建立数据库表格,所以我们没办法指定某一个通道。如果说有这样一个需求:我需要服务器主动断开某一个通道连接,我只知道这个通道是由谁谁建立的,并不知道这个通道的具体信息。这样就没办法实现了,但是由客户端主动断开连接的话,这里有个函数可以主动拿到这个通道,如下:

public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
map.remove(session);
System.out.println("连接关闭!");
super.afterConnectionClosed(session, status);
}
客户端主动断开连接的话,会触发后台的afterConnectionClosed方法,这个方法也是继承TextWebSocketHandle,这在前一篇文章中讲到了,这里面就可以直接获取客户端与服务器建立的那个通道的Session ,然后直接remove掉就行了。这里没有看源码应该是没办法解释的。

既然是并发情况,这里面肯定有多线程的问题,首先在执行map的put操作的时候,因为session是唯一的,所以应该是放在hashmap结构数组中的不同位置,理论上来说应该不存在覆盖的问题,但是我不知道session和message会不会交叉混乱,也就是说多线程情况下,A通道的session和B通道的message组成了一对键值对,B通道的session和A通道的message组成了一对键值对,对于这种情况应该是不存在的。而且我个人觉得还有一种多线程的情况,就是遍历hashmap的时候,如果这个时候有新的通道加进来,这个时候hashmap里面的元素会增加,我们知道在iterator遍历hashmap的时候如果在遍历的过程中执行put操作会抛出concurrentModificationException,而且hashmap在高并发情况下不能保证同步,如果换成hashtable又太影响性能,所以我决定在这里用ConcurrentHashMap来存储,关于这个Concurrenthashmap是怎么发挥作用的,这里有一篇写的很好的文章,我自己也在研究:神奇的ConcurrentHashMap

至此,我们采用的比较好的数据结构来解决我们项目中通过webSocket通道来获取消息的问题,但是不得不说这里面有很多问题,考虑到业务的扩展性和高并发的情况。

首先业务上,我们的原始目的是要讲webSocket作为一个基础服务,所有的应用都可以通过这个通道来完成通信,我们项目中的应用估计就有哦三种:1、如果前台请求的数据时原始数据(也就是说从消息队列处理以后就可以直接传到前台);2、前台需要处理以后的数据,也就是说从消息队列出来以后需要经过其他处理再返回给前台的;3、这个有点遥远了,据说以后要做一个webqq...

对于这些问题,我们也做了很多讨论,最开始采用的方法就是简单粗暴的标记位,前台发送请求过来的时候加上标记位,如001表示上述第一种应用,002表示上述第二种应用...这样来增加扩展性,也就是说前天传过来的message包括标志位和你订阅的消息格式。后台解析的时候先拿到标记位判断是哪种应用,然后在处理订阅的消息。这种方法是所有人都能接受的,但是这个其实扩展性很差。

对于高并发问题,项目中的方案是这样的,拿到消息队列中出来消息以后,我需要做持久化,也就是存入MongoDB,然后处理部分数据并放在缓存redis中,而对于前面第二种应用,并发高了以后,我们知道存MongoDB比较浪费时间,我们再把处理好的数据放在前台展示,一旦前台处理不过来就会导致性能能力,甚至出现数据丢失。

基于这两种问题,一种神奇的思路诞生了:再加入一个消息队列,也就是说从原消息队列出来以后,这些消息存MongoDB或者redis都没有问题,只不过我分出来一个线程把所有应用需要的消息丢到队列里面去,然后这个队列的出口和具体的应用对接起来。不得不说这是一个很好的思路。不同的应用只需要在拿到原始队列里面出来的数据然后根据自己设定的消息格式组装起来,丢到新队列里面去,然后在新队列的出口匹配订阅的模式就能获取应用所需要的数据。

这里我们应用消息队列第一个可以保证有序,第二个可以保证数据不会丢失。但是主要这里是将应用可能需要的数据丢到队列里面去,也就是说如果前台只存在上面所说的第一种应用,我也得把其他应用所需要的数据放到队列里面去,如果采用一种反订阅的模式,也就是我发送数据到消息队列之前判断下这些数据是不是有应用需要,有的话我才发送,这样不得不说能够减小带宽。但是这个检测的过程会使逻辑变得复杂,因为还是得去遍历,因为是考虑到高并发情况才采用这样的方案,所以这一步我们省略了。

消息队列保证并发情况下消息的有序和不丢失很容易理解,但是如何保证服务的可扩展性呢,这个问题至今我还不能给出让人100%心服口服的理由。目前我给出的理由是这样的:假如对同一种类型的数据有10个应用,如果没有消息队列,那么我就是采用最原始的标记位来处理消息,那么我们的做法应该是这样:

从消息队列里面出来了一条消息,我遍历我的hashmap,或者他的标记位,看看这个消息是不是这个应用要的,是的话我就发到这个通道里面去,然后我经过一些列运算,得到了第二种应用可能需要的数据,这个时候再次遍历message中订阅消息的格式,匹配的话发送到通道,新加入一种应用的话我只能再这个逻辑代码之后加上一个message的匹配,而且每一次匹配估计就是一次遍历hashmap。当然如果你说对于每一条消息,我一次性把所有的应用匹配完,这样只存在一次遍历,但是这样的话你处理下一条消息的时候要等待上一条消息处理完,如果某些应用占用的时间比较多,比如存在MongoDB,存在磁盘IO,那么这样性能就很受影响,当然也不可能等待所有的应用处理完了以后再通过一次遍历中匹配应用,这个方案和上面的这个方案是一样的,应用多了以后性能会受影响。而采用消息队列以后,相当于把每个应用分开了,比如说我需要的原始数据,我就直接组装下丢到队里里面去就行了,对于不用的应用我只需要组装成不同的消息格式然后丢到队里面去,这样从这个队列里面出来的消息都是直接组装好的,这样服务和应用就分离了,如果再来一种应用我只需要制定一种应用格式然后完成消息体的组装再把它放到队列里面去就行了。不知道我这样理解能不能说服某些同学。


到这里,我们的思路就差不多了,但是对于前面那个遍历hashmap一直有些耿耿于怀,如果几百万个通道,难道也要等待一个一个去遍历吗,我曾想把订阅了相同消息的通道放在一起,但是一直没有想到一个好数据结构...只到有一天,得到了一条思路,那就是倒排索引,倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted
index)。带有倒排索引的文件我们称为倒排索引文件,简称倒排文件(inverted file)。

所以项目中决定把原来的hashmap结构换成:HashMap<Message,ArrayList<Session>>,如果多个通道定于了同一个message的话,我只需要得到这个message然后把消息发到这个message关联的所有通道当中。只不过采用这样的数据结构remove通道的时候比较麻烦一点,还记得remove的通道么:

public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
map.remove(session);
System.out.println("连接关闭!");
super.afterConnectionClosed(session, status);
}
这样的话可能存在两次遍历了...更好的方案还在研究中!

总结:从最开始跑起来一个简单的webSocket例子到一步一步优化,到今天这样的方案,中间换了很多种思路,过程是非常痛苦的,当然团队的作用也体现得很明显,只是痛苦着,水平就慢慢提升了,我很享受和团队一起讨论问题,探求思路,我想出去工作了应该就不会有这样的机会了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: