您的位置:首页 > 其它

解决消息总线里监听线程过多的问题

2016-09-21 10:19 316 查看
在设计基于队列消息总线时,最好避免订阅者直接监听队列,而是再实现一套发布订阅模式,订阅者订阅的不是消息队列的信息,而是系统实现的总线。每次订阅时,判断此订阅消息(队列)是否存在,不存在则往【监听容器】里放,并且初始化一个默认的监听者监听此队列,这个监听者如收到消息,则对订阅者发送消息,这样可以避免直接监听队列造成的线程过多

思路示例:

public class Bus<TMessage> where TMessage : class
{
Dictionary<Type , List<object>> _handlers = new Dictionary<Type , List<object>>();

public void Publish( TMessage msg )
{
string queueName = msg.GetType().FullName;
XXXMQ mq = new XXXMQ( queueName );
mq.Text = Newtonsoft.Json.JsonConvert.SerializeObject( mq );
mq.Send();
}

public void Subscribe( TMessage msg , IHandler handler)
{
Type msgType = msg.GetType();
if( _handlers.ContainsKey( msgType ) )
{
List<object> handlers = _handlers[ msgType ];
handlers.Add( handler );
}
else
{
List<object> handlers = new List<object>();
handlers.Add( handler );
_handlers.Add( msgType , handlers );
ListenMQ( msg );
}
}

private void ListenMQ( TMessage msg )
{
Task.Factory.StartNew( () =>
{
while( true )
{
string queueName = msg.GetType().FullName;
XXXMQ mq = new XXXMQ( queueName );
string text = mq.Receive(); //监听队列

TMessage message = Newtonsoft.Json.JsonConvert.DeserializeObject<TMessage>( text );
List<object> handlers = _handlers[ message.GetType() ];
foreach( var handler in handlers )
{
handler( message );
}
}
} );
}

}


不过这样也有一个问题,因为每一个订阅者并不是直接监听队列,如果发生问题没有保障,容错性降低了,所以需要在总线里实现一套容错机制(失败重试之类)

最后还是推荐订阅者直接监听队列
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: