您的位置:首页 > 其它

rabbitmq websocket fanout 没有ack导致消息堆积问题

2017-09-01 00:00 519 查看
后台服务往fanoutModelRetValReturnQueue ,fanoutModelRetValReturnQueue发送消息,前端websocket订阅,实时拿到计算结果,中间遇到了一个大坑,配置的队列没有ack导致rabitmq队列中消息ready状态的数量一直累积最后消息无法实时到前台

解决办法:添加listener同时设置acknowledge="auto",这样ready状态的消息自动被消费确认 如下:

<rabbit:listener-container connection-factory="websocketRabbitConnectionFactory" message-converter="mqMessageConverter" acknowledge="auto">
<rabbit:listener ref="webSocketImgGenerateReturnListener" queues="${websocket.fanout.imgGenerateReturn.queue}"/>
<rabbit:listener ref="webSocketModelValReturnListener" queues="${websocket.fanout.modelRetValReturn.queue}"/>
</rabbit:listener-container>
服务端配置:

<rabbit:fanout-exchange name="${websocket.fanout.modelRetValReturn.exchange}" durable="true" declared-by="websocketRabbitAdmin">
rabbit:bindings
<rabbit:binding queue="fanoutModelRetValReturnQueue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:fanout-exchange name="${websocket.fanout.imgGenerateReturn.exchange}" durable="true" declared-by="websocketRabbitAdmin">
<rabbit:bindings>
<rabbit:binding queue="fanoutImgGenerateReturnQueue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>

前端:

export function socketClient(callback,debug = false) {
var client = generatorWs();
client.debug = debug ? str => { console.log(str); } : () => {};
client.connect(window.export_minas.socketUser, window.export_minas.socketPwd, () => {
return callback(client);
}, function() {
//error, 重连
reconnect();
}, window.export_minas.socketUser);

//当链接断开立即重连
client.ws.addEventListener('close', reconnect);
clientPool.push(client);
function reconnect() {
sokcetPool.forEach(id => {
client.unsubscribe(id);
});
sokcetPool = [];
sokcetPool.length = 0;
//重连
socketClient(callback);
}

}
Backspace 14:32:22
socketClient(client => {
//订阅模型计算通道

// src="https://test-img.3dker.cn/preview/dd080d075072494481512db9c47d2137/0_0@110w.jpg"
//window.export_minas.FANOUT_MODEL
const queue1 = client.subscribe('/topic/FANOUT_MODEL_RETVAL_RETURN', ({
body
}) => {
console.log('data compu:', body);
var {
dfsId,
box,
volume,
area,
errorText,
error
} = this.parseModelData(body);

//模型计算完毕
var name = dfsId2Name(dfsId);
//没有对应关系,则不是本机的消息
if (!(name && name.length > 0))
return;

//查看是否出错
if (error) {
this.updateUFiles(name, {
"error": true,
"errorText": errorText,
"name": name,
"dfsId": dfsId,
"loading": false,
"crafts": null
});
return;
}

//更新uFiles
this.updateUFiles(name, {
"name": name,
"dfsId": dfsId,
"loading": false,
"loaded": 100,
"crafts": null,
"box": box,
"volume": volume,
"area": area
});
});

sokcetPool.push(queue1.id);

//图片生成队列
//window.export_minas.FANOUT_IMG
var queue2 = client.subscribe('/topic/TOPIC_IMGGENERATERETURN', ({
body
}) => {
console.log('img:', body);
//用dfsid自己拼下url,然后塞进files
var {
dfsId,
imgStatus,
firstImg
} = this.parseData(body);
var name = dfsId2Name(dfsId);
//没有对应关系,则不是本机的消息
if (!(name && name.length > 0))
return;

if (firstImg === 'y') {
this.updateUFiles(name, {
"image": true
});
}
if (~~imgStatus === 1) {
this.updateUFiles(name, {
"preview": true
});
}
});

sokcetPool.push(queue2.id);
},this.$route.query.debug);

function generatorWs() {
//${window.export_minas.socketHost}
var ws = new window.WebSocket(
wss://test.XXXXX.cn/ws/
);
return window.Stomp.over(ws);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: