Java使用redis+sse实现带频道的网络聊天室
2017-06-10 11:26
579 查看
因为某些原因,需要将flask搭建的项目用spring boot重构一遍,其中有一个聊天室的功能,在flask下我采用了flask-sse这个第三方库来实现,该模块采用基于redis的消息订阅系统实现,当然类spring boot下自然没有这个方便的库了,但是spring boot对redis的消息机制的支持还是不错的,所以在看了下相关文档后我觉得可以自己实现一个,接下来就是思路和具体步骤了
参考文档: http://spring.io/guides/gs/messaging-redis/
废话不多说,上代码
配置类
该类注入了一些必要的实例,顺便说一句,默认情况下,spring的注入是单例模式。
接下来是receiver类,该类接收到redis发送的消息,并且发布给相应的频道中的所有人,该类与上面的配置中注入
其实try/catch部分是为了处理有人突然离线的异常。否则有人突然离线,但是List集合中依然保存着它的sse连接,导致send方法异常,故而出现异常则直接删除导致异常的sse实例
接下来是一个业务类
接下来是两个控制类,一个是加入频道的控制类,一个是聊天的控制类
加入频道控制类
聊天控制类
该类则是从客户端获取用户要发送的信息,然后使用StringRedisTemplate进行推送,经过推送的消息自然会被recevier类接受到,接下来就不用说了,事件部分我是用json来表示的。用的是FastJson库。
其实主要就是用了redis自带的消息订阅机制,用rabbitmq也可,详情请参考: http://spring.io/guides/gs/messaging-rabbitmq/
参考文档: http://spring.io/guides/gs/messaging-redis/
废话不多说,上代码
配置类
@Configuration public class RedisConf { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean MessageListenerAdapter listenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean Receiver receiver() { return new Receiver(); } @Bean("channelMap") Map<String, List<SseEmitter>> channelMap(){ return new ConcurrentHashMap<String, List<SseEmitter>>(); } }
该类注入了一些必要的实例,顺便说一句,默认情况下,spring的注入是单例模式。
接下来是receiver类,该类接收到redis发送的消息,并且发布给相应的频道中的所有人,该类与上面的配置中注入
public class Receiver { @Autowired private Map<String, List<SseEmitter>> channelMap; public void receiveMessage(String message){ System.out.println("sending message..."); Map map = (Map) JSON.parse(message); String channel = (String) map.get("channel"); Map data = (Map) map.get("data"); String event = (String) map.get("event"); if (!channelMap.containsKey(channel)){ return; } List<SseEmitter> sseEmitters = channelMap.get(channel); Iterator<SseEmitter> it = sseEmitters.listIterator(); while (it.hasNext()){ SseEmitter x = it.next(); try { x.send(SseEmitter.event().data(data).name(event)); } catch (IOException e) { it.remove(); } } } }
其实try/catch部分是为了处理有人突然离线的异常。否则有人突然离线,但是List集合中依然保存着它的sse连接,导致send方法异常,故而出现异常则直接删除导致异常的sse实例
接下来是一个业务类
@Service public class ChannelService { @Autowired private Map<String, List<SseEmitter>> channelMap; @Autowired private RedisMessageListenerContainer container; @Autowired private MessageListenerAdapter listenerAdapter; public void createChannel(String channelName, SseEmitter sseEmitter) throws PPKTException { if (channelMap.containsKey(channelName)){ throw new PPKTException(new PPKTError(400, "频道已存在")); } // 添加reids订阅频道 container.addMessageListener(listenerAdapter, new PatternTopic(channelName)); List<SseEmitter> sseEmitters = new ArrayList<SseEmitter>(); sseEmitters.add(sseEmitter); channelMap.put(channelName, sseEmitters); } public void addInChannel(String channelName, SseEmitter sseEmitter) throws PPKTException { if (!channelMap.containsKey(channelName)){ throw new PPKTException(new PPKTError(400, "频道不存在")); } List<SseEmitter> sseEmitters = channelMap.get(channelName); sseEmitters.add(sseEmitter); channelMap.put(channelName, sseEmitters); } public void delChannel(String channelName){ channelMap.remove(channelName); } public boolean contains(String channelName){ return channelMap.containsKey(channelName); } }该业务类则是封装了一些添加频道,添加人员进入频道的操作,其实该类可写可不写。。。
接下来是两个控制类,一个是加入频道的控制类,一个是聊天的控制类
加入频道控制类
@Controller public class ChannelController { @Autowired private HttpServletRequest httpServletRequest; @Autowired private ChannelService channelService; @Autowired private HttpServletResponse httpServletResponse; @RequestMapping(value = "/stream", method = RequestMethod.GET) public SseEmitter handler(String channel) throws PPKTException, AVException { SseEmitter sseEmitter = new SseEmitter(0L); if (channelService.contains(channel)){ channelService.addInChannel(channel, sseEmitter); }else { channelService.createChannel(channel, sseEmitter); } return sseEmitter; } }该类实现构造了一个SseEmitter实例,并将其加入指定频道,同时将SseEmitter实例返回给客户端
聊天控制类
@RestController @RequestMapping("/api/message") public class ChatController { @Autowired private StringRedisTemplate template; @Autowired private HttpServletRequest httpServletRequest; @RequestMapping(value = "/", method = RequestMethod.POST) public Map sendMessage(@RequestBody Map map, @CookieValue String channel, @CookieValue String sessionToken) throws AVException, PPKTException { String message = (String) map.get("message"); AVQuery<AVUser> query = AVUser.getQuery(); query.whereEqualTo("sessionToken", sessionToken); List<AVUser> users = query.find(); if (users.size() == 0){ throw new PPKTException(new PPKTError(403, "用户未登录")); } String sendMessage = createJsonEvent( message, channel, users.get(0).getUsername(), "chat"); template.convertAndSend(channel, sendMessage); Map m1 = new HashMap(); m1.put("message", "OK"); return m1; } private String createJsonEvent(String message, String channel, String username, String event) { Map data = new HashMap(); data.put("message", message); data.put("sender", username); Map m = new HashMap(); m.put("channel", channel); m.put("data", data); m.put("event", event); return JSON.toJSONString(m); } }
该类则是从客户端获取用户要发送的信息,然后使用StringRedisTemplate进行推送,经过推送的消息自然会被recevier类接受到,接下来就不用说了,事件部分我是用json来表示的。用的是FastJson库。
其实主要就是用了redis自带的消息订阅机制,用rabbitmq也可,详情请参考: http://spring.io/guides/gs/messaging-rabbitmq/
相关文章推荐
- Java使用socket网络编程实现多人聊天室
- 使用socket实现基于select模型的网络聊天室
- .Net网络通讯编程[利用Socket实现字串、文件、序列化对象传输]--使用封装的网络服务3[聊天室][使用IE浏览本页]
- Java使用Http实现网络图片的下载
- 使用node.js + socket.io + redis实现基本的聊天室场景
- 使用python和redis实现实时聊天室
- 使用Android网络编程实现简易聊天室
- 使用socket.io+redis来实现基本的聊天室应用场景
- 使用Android网络编程实现简易聊天室
- 使用java基于pushlet和bootstrap实现的简单聊天室
- .Net网络通讯编程[利用Socket实现字串、文件、序列化对象传输]--使用封装的网络服务3[聊天室]
- linux socket实现网络聊天室(二):使用线程独立收发消息
- java网络编程,通过TCP,Socket实现多对一的局域网聊天室
- 使用Java实现网络传输数据的压缩
- .Net网络通讯编程[利用Socket实现字串、文件、序列化对象传输]--使用封装的网络服务4[聊天室]
- java网络编程,通过TCP,Socket实现多对一的局域网聊天室 .
- Java网络编程——使用NIO实现非阻塞Socket通信
- Java网络编程 - 基于UDP协议 实现简单的聊天室程序
- redis中使用java脚本实现分布式锁
- Java网络编程——使用NIO实现非阻塞Socket通信