rabbitmq websocket fanout 没有ack导致消息堆积问题
2017-09-01 00:00
519 查看
后台服务往fanoutModelRetValReturnQueue ,fanoutModelRetValReturnQueue发送消息,前端websocket订阅,实时拿到计算结果,中间遇到了一个大坑,配置的队列没有ack导致rabitmq队列中消息ready状态的数量一直累积最后消息无法实时到前台
解决办法:添加listener同时设置acknowledge="auto",这样ready状态的消息自动被消费确认 如下:
<rabbit:listener-container connection-factory="websocketRabbitConnectionFactory" message-converter="mqMessageConverter" acknowledge="auto">
<rabbit:listener ref="webSocketImgGenerateReturnListener" queues="${websocket.fanout.imgGenerateReturn.queue}"/>
<rabbit:listener ref="webSocketModelValReturnListener" queues="${websocket.fanout.modelRetValReturn.queue}"/>
</rabbit:listener-container>
服务端配置:
<rabbit:fanout-exchange name="${websocket.fanout.modelRetValReturn.exchange}" durable="true" declared-by="websocketRabbitAdmin">
rabbit:bindings
<rabbit:binding queue="fanoutModelRetValReturnQueue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
前端:
export function socketClient(callback,debug = false) {
var client = generatorWs();
client.debug = debug ? str => { console.log(str); } : () => {};
client.connect(window.export_minas.socketUser, window.export_minas.socketPwd, () => {
return callback(client);
}, function() {
//error, 重连
reconnect();
}, window.export_minas.socketUser);
}
Backspace 14:32:22
socketClient(client => {
//订阅模型计算通道
function generatorWs() {
//${window.export_minas.socketHost}
var ws = new window.WebSocket(
return window.Stomp.over(ws);
}
解决办法:添加listener同时设置acknowledge="auto",这样ready状态的消息自动被消费确认 如下:
<rabbit:listener-container connection-factory="websocketRabbitConnectionFactory" message-converter="mqMessageConverter" acknowledge="auto">
<rabbit:listener ref="webSocketImgGenerateReturnListener" queues="${websocket.fanout.imgGenerateReturn.queue}"/>
<rabbit:listener ref="webSocketModelValReturnListener" queues="${websocket.fanout.modelRetValReturn.queue}"/>
</rabbit:listener-container>
服务端配置:
<rabbit:fanout-exchange name="${websocket.fanout.modelRetValReturn.exchange}" durable="true" declared-by="websocketRabbitAdmin">
rabbit:bindings
<rabbit:binding queue="fanoutModelRetValReturnQueue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:fanout-exchange name="${websocket.fanout.imgGenerateReturn.exchange}" durable="true" declared-by="websocketRabbitAdmin"> <rabbit:bindings> <rabbit:binding queue="fanoutImgGenerateReturnQueue"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange>
前端:
export function socketClient(callback,debug = false) {
var client = generatorWs();
client.debug = debug ? str => { console.log(str); } : () => {};
client.connect(window.export_minas.socketUser, window.export_minas.socketPwd, () => {
return callback(client);
}, function() {
//error, 重连
reconnect();
}, window.export_minas.socketUser);
//当链接断开立即重连 client.ws.addEventListener('close', reconnect); clientPool.push(client); function reconnect() { sokcetPool.forEach(id => { client.unsubscribe(id); }); sokcetPool = []; sokcetPool.length = 0; //重连 socketClient(callback); }
}
Backspace 14:32:22
socketClient(client => {
//订阅模型计算通道
// src="https://test-img.3dker.cn/preview/dd080d075072494481512db9c47d2137/0_0@110w.jpg" //window.export_minas.FANOUT_MODEL const queue1 = client.subscribe('/topic/FANOUT_MODEL_RETVAL_RETURN', ({ body }) => { console.log('data compu:', body); var { dfsId, box, volume, area, errorText, error } = this.parseModelData(body); //模型计算完毕 var name = dfsId2Name(dfsId); //没有对应关系,则不是本机的消息 if (!(name && name.length > 0)) return; //查看是否出错 if (error) { this.updateUFiles(name, { "error": true, "errorText": errorText, "name": name, "dfsId": dfsId, "loading": false, "crafts": null }); return; } //更新uFiles this.updateUFiles(name, { "name": name, "dfsId": dfsId, "loading": false, "loaded": 100, "crafts": null, "box": box, "volume": volume, "area": area }); }); sokcetPool.push(queue1.id); //图片生成队列 //window.export_minas.FANOUT_IMG var queue2 = client.subscribe('/topic/TOPIC_IMGGENERATERETURN', ({ body }) => { console.log('img:', body); //用dfsid自己拼下url,然后塞进files var { dfsId, imgStatus, firstImg } = this.parseData(body); var name = dfsId2Name(dfsId); //没有对应关系,则不是本机的消息 if (!(name && name.length > 0)) return; if (firstImg === 'y') { this.updateUFiles(name, { "image": true }); } if (~~imgStatus === 1) { this.updateUFiles(name, { "preview": true }); } }); sokcetPool.push(queue2.id); },this.$route.query.debug);
function generatorWs() {
//${window.export_minas.socketHost}
var ws = new window.WebSocket(
wss://test.XXXXX.cn/ws/);
return window.Stomp.over(ws);
}
相关文章推荐
- rabbitmq 重复ACK导致消息丢失
- 定时任务起的java进程没有释放导致oracle的问题not availavle & out of memory
- 基于WEB服务器导致消息中心各组件之间无法正常工作的问题分析与解决
- rabbitmq 重复ACK导致消息丢失
- RabbitMQ消息处理机制fanout,direct,topic,header
- 因为错误消息指示这是由于上一个问题导致的错误,没有写入 apport 报告。
- \t\t解决Win7下QQ消息导致系统音量变小的问题
- windows2003 server socket连接数量所导致问题及其修改方式
- workerMan学习篇:websocket+workerman聊天功能(三):点对点发送消息模拟
- IIS WEB服务扩展里没有ASP.NET v2.0导致无法正常浏览.ASPX文件
- jQuery自定义动画函数animate() easing: "easeInOutCirc"导致的animate()动画抖动问题解决方法
- 解决Tomcat catalina.out 不断成长导致档案过大的问题
- web.xml版本错误导致EL表达式无效问题
- WorkerMan学习篇:websocket+workerman聊天功能(三):点对点发送消息模拟
- RabbitMQ三种Exchange模式(fanout,direct,topic)
- Eclipse或myEclipse里,没有Project Facets选项的问题(将maven工程转成web工程)
- 关于web.config增加禁止匿名访问而导致pdf失效的问题
- 基于quartz,redis,socket.io的web实时消息推送
- webview 播放H5视频问题 黑屏 只有声音没有画面
- RabbitMQ消息队列中的几种典型问题再探