您的位置:首页 > 理论基础 > 计算机网络

Java使用redis+sse实现带频道的网络聊天室

2017-06-10 11:26 579 查看
  因为某些原因,需要将flask搭建的项目用spring boot重构一遍,其中有一个聊天室的功能,在flask下我采用了flask-sse这个第三方库来实现,该模块采用基于redis的消息订阅系统实现,当然类spring boot下自然没有这个方便的库了,但是spring boot对redis的消息机制的支持还是不错的,所以在看了下相关文档后我觉得可以自己实现一个,接下来就是思路和具体步骤了

  参考文档: http://spring.io/guides/gs/messaging-redis/

废话不多说,上代码

配置类

@Configuration
public class RedisConf {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}

@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}

@Bean
Receiver receiver() {
return new Receiver();
}

@Bean("channelMap")
Map<String, List<SseEmitter>> channelMap(){
return new ConcurrentHashMap<String, List<SseEmitter>>();
}
}


该类注入了一些必要的实例,顺便说一句,默认情况下,spring的注入是单例模式。

接下来是receiver类,该类接收到redis发送的消息,并且发布给相应的频道中的所有人,该类与上面的配置中注入

public class Receiver {
@Autowired
private Map<String, List<SseEmitter>> channelMap;

public void receiveMessage(String message){
System.out.println("sending message...");
Map map = (Map) JSON.parse(message);
String channel = (String) map.get("channel");
Map data = (Map) map.get("data");
String event = (String) map.get("event");
if (!channelMap.containsKey(channel)){
return;
}
List<SseEmitter> sseEmitters = channelMap.get(channel);
Iterator<SseEmitter> it = sseEmitters.listIterator();
while (it.hasNext()){
SseEmitter x = it.next();
try {
x.send(SseEmitter.event().data(data).name(event));
} catch (IOException e) {
it.remove();
}
}
}
}


其实try/catch部分是为了处理有人突然离线的异常。否则有人突然离线,但是List集合中依然保存着它的sse连接,导致send方法异常,故而出现异常则直接删除导致异常的sse实例

接下来是一个业务类

@Service
public class ChannelService {
@Autowired
private Map<String, List<SseEmitter>> channelMap;
@Autowired
private RedisMessageListenerContainer container;
@Autowired
private MessageListenerAdapter listenerAdapter;

public void createChannel(String channelName, SseEmitter sseEmitter) throws PPKTException {
if (channelMap.containsKey(channelName)){
throw new PPKTException(new PPKTError(400, "频道已存在"));
}
//        添加reids订阅频道
container.addMessageListener(listenerAdapter, new PatternTopic(channelName));
List<SseEmitter> sseEmitters = new ArrayList<SseEmitter>();
sseEmitters.add(sseEmitter);
channelMap.put(channelName, sseEmitters);
}

public void addInChannel(String channelName, SseEmitter sseEmitter) throws PPKTException {
if (!channelMap.containsKey(channelName)){
throw new PPKTException(new PPKTError(400, "频道不存在"));
}
List<SseEmitter> sseEmitters = channelMap.get(channelName);
sseEmitters.add(sseEmitter);
channelMap.put(channelName, sseEmitters);
}

public void delChannel(String channelName){
channelMap.remove(channelName);
}

public boolean contains(String channelName){
return channelMap.containsKey(channelName);
}
}
该业务类则是封装了一些添加频道,添加人员进入频道的操作,其实该类可写可不写。。。

接下来是两个控制类,一个是加入频道的控制类,一个是聊天的控制类

加入频道控制类

@Controller
public class ChannelController {
@Autowired
private HttpServletRequest httpServletRequest;
@Autowired
private ChannelService channelService;
@Autowired
private HttpServletResponse httpServletResponse;

@RequestMapping(value = "/stream", method = RequestMethod.GET)
public SseEmitter handler(String channel)
throws PPKTException, AVException {
SseEmitter sseEmitter = new SseEmitter(0L);
if (channelService.contains(channel)){
channelService.addInChannel(channel, sseEmitter);
}else {
channelService.createChannel(channel, sseEmitter);
}
return sseEmitter;
}
}
该类实现构造了一个SseEmitter实例,并将其加入指定频道,同时将SseEmitter实例返回给客户端

聊天控制类

@RestController
@RequestMapping("/api/message")
public class ChatController {
@Autowired
private StringRedisTemplate template;
@Autowired
private HttpServletRequest httpServletRequest;

@RequestMapping(value = "/", method = RequestMethod.POST)
public Map sendMessage(@RequestBody Map map,
@CookieValue String channel,
@CookieValue String sessionToken) throws AVException, PPKTException {
String message = (String) map.get("message");

AVQuery<AVUser> query = AVUser.getQuery();
query.whereEqualTo("sessionToken", sessionToken);
List<AVUser> users = query.find();
if (users.size() == 0){
throw new PPKTException(new PPKTError(403, "用户未登录"));
}

String sendMessage =
createJsonEvent(
message,
channel,
users.get(0).getUsername(),
"chat");
template.convertAndSend(channel, sendMessage);
Map m1 = new HashMap();
m1.put("message", "OK");
return m1;
}

private String createJsonEvent(String message,
String channel,
String username,
String event) {
Map data = new HashMap();
data.put("message", message);
data.put("sender", username);
Map m = new HashMap();
m.put("channel", channel);
m.put("data", data);
m.put("event", event);
return JSON.toJSONString(m);
}
}


该类则是从客户端获取用户要发送的信息,然后使用StringRedisTemplate进行推送,经过推送的消息自然会被recevier类接受到,接下来就不用说了,事件部分我是用json来表示的。用的是FastJson库。

其实主要就是用了redis自带的消息订阅机制,用rabbitmq也可,详情请参考: http://spring.io/guides/gs/messaging-rabbitmq/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: