RabbitMQ解决大量unacked问题
2016-01-30 15:16
204 查看
RabbitMQ解决大量unacked问题
为了快速响应用户请求,我们需要消息异步处理机制,比较简单的做法是用redis的List结构,我们项目使用更专业的RabbitMQ。关于redis和RabbitMQ队列处理的性能比较可以查看这篇文章http://blog.csdn.net/educast/article/details/34521603
这里不扯RabbitMQ的一些定义了,我们遇到的问题是,入队并发和速度速度很快,但是消费端的处理速度慢得惊人。为了数据安全我们做了持久化而且消费端需要ack或者unack响应。从其提供的web控制台可以看到量大的时候产生大量的unacked消息,也就是说MQ把数据放到channel里,很长时间过去了channel没有给任何响应。我们创建了600个channel问题也依旧。查看了硬件没有瓶颈,网上有文章说是channel连接断了mq无法及时识别,还会往这些失效的channel里传递数据,误认子弟啊,拉出去枪毙了。Netstat一下,发现一共MQ服务器一共有4个外部连接连入了5672端口,两个productor两个consumer跟预想的一样。Jstack一下,发现pool-xxx线程远远没有预想的多,才50个,我们创建了600个channel啊。恍然大悟,尼玛的channel中文意思虽然是通道,但是MQ使用了线程池技术,它们是共享线程的,而不是一个通道一个线程,这个有点类似http请求和http服务器的关系。
把结构图给画出来先:
![](http://img.blog.csdn.net/20160130151348200?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
Java代码
线程方面已经没有问题了,不存在什么失效连接无法及时识别这一说法,如果还慢,则jstack去跟踪这些线程池的线程,看看都在干啥,关于jstack抽空再写了,很强大的jdk工具。
为了快速响应用户请求,我们需要消息异步处理机制,比较简单的做法是用redis的List结构,我们项目使用更专业的RabbitMQ。关于redis和RabbitMQ队列处理的性能比较可以查看这篇文章http://blog.csdn.net/educast/article/details/34521603
这里不扯RabbitMQ的一些定义了,我们遇到的问题是,入队并发和速度速度很快,但是消费端的处理速度慢得惊人。为了数据安全我们做了持久化而且消费端需要ack或者unack响应。从其提供的web控制台可以看到量大的时候产生大量的unacked消息,也就是说MQ把数据放到channel里,很长时间过去了channel没有给任何响应。我们创建了600个channel问题也依旧。查看了硬件没有瓶颈,网上有文章说是channel连接断了mq无法及时识别,还会往这些失效的channel里传递数据,误认子弟啊,拉出去枪毙了。Netstat一下,发现一共MQ服务器一共有4个外部连接连入了5672端口,两个productor两个consumer跟预想的一样。Jstack一下,发现pool-xxx线程远远没有预想的多,才50个,我们创建了600个channel啊。恍然大悟,尼玛的channel中文意思虽然是通道,但是MQ使用了线程池技术,它们是共享线程的,而不是一个通道一个线程,这个有点类似http请求和http服务器的关系。
把结构图给画出来先:
Java代码
Properties prop = new Properties(); InputStream inStream = this.getClass().getResourceAsStream("/config.properties"); prop.load(inStream); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(prop.getProperty("RabbitMQHost")); factory.setUsername(prop.getProperty("RabbitMQUserName")); factory.setPassword(prop.getProperty("RabbitMQPassword")); // 关键所在,指定线程池 ExecutorService service = Executors.newFixedThreadPool(500); factory.setSharedExecutor(service); factory.setAutomaticRecoveryEnabled(true); factory.setConnectionTimeout(15000);// 15秒 factory.setRequestedHeartbeat(60); Connection connection = factory.newConnection();
线程方面已经没有问题了,不存在什么失效连接无法及时识别这一说法,如果还慢,则jstack去跟踪这些线程池的线程,看看都在干啥,关于jstack抽空再写了,很强大的jdk工具。
相关文章推荐
- fullCalendar 用法及新版注意事项
- 洛谷-求同构数的个数-NOIP2013提高组复赛
- 就算你不是电商,你应该为你的电商朋友好好看看这篇文章
- SPL学习笔记(2)---类的实现
- Spark Intro
- SpringMVC与Struts2比较
- C++枚举类型详解
- JavaScript实现右侧悬浮框
- Leetcode题解(十九)
- 车道检测问题探究(二)几何模型拟合
- JPQL查询
- UIGestureRecognizer手势
- 【第一行代码】Android 活动(activity)总结
- 定义一个大整数类,并重载乘法*运算符
- iOS 取消第一响应者
- 无聊的军官
- 如何使用angularjs实现按钮事件
- N皇后问题
- 内核态文件操作
- centos 7 安装sublime text