基于RabbitMQ实现的实时日志监控
2017-04-21 13:34
549 查看
写在前面:这个功能本来是在学习消息中间件时突发奇想的一个应用场景。本意是想实现一个页面级别的完整日志监控,最终能在页面快速跟踪后台日志,实现页面级别的tail -f的效果,就不用再去服务器上跟踪log日志了。
实现效果:页面可以实时跟踪平台上所有用户的操作日志。
实现的方式:
使用RabbitMQ的订阅发布模式,日志信息在拦截器中随时广播发布。页面打开监控就开始监听日志,停止监控就不再监听。
与页面的交互采用comet4j的形式, 这样可以避免页面一直进行ajax轮询。
前期准备:
前期准备需要搭建一个RabbitMQ服务器,然后maven工程需要引入相关工具包。再加上Comet4j的maven依赖,以及js。这些就不敷述了,度娘一大把。
关键代码:
1、拦截器中广播发布消息。关键代码:
2、消息发布端工具代码: 比较简单,就一直往rabbitMQ 发消息就行了。
3、接收端的处理:
由于接受端的代码需要一直挂起。采用一个线程的方式去接受数据。然后通过comet4j组件推送到页面上进行处理。
controller中的方法 只需要保证同时只会有一个消息接受线程就行。
页面上的处理方式:
实现后的总结:功能最终是实现了,再回头讨论下开始提到的实现初衷,对现有的功能进行一下总结。
一、关于日志监控:目前的阶段只能看到简单的访问情况,因为所有日志都是从拦截器中发出来的。要实现日志监控,最靠谱的当然是要跟踪log4j日志了。这马上就引入了下一个新的问题:如何将log4j与RabbitMQ进行整合?
关于这个问题,目前想到的有两个解决方案:
1>扩展log4j,写一个appender整合RabbitMQ.
2>改用kafka消息中间件。kafka 0.8.**版本中有kafka.producer.KafkaLog4jAppender 已经实现了log4j与kafka的整合。
二、网上传言,当日志的量上去了之后,kafka会比RabbitMQ更合适。因为RabbitMQ相比Kafka更可靠的优势在处理日志上就会体现不出来。
三、由于所有消息都是通过第三方的消息中间件来提供的服务。于是, 由此可以扩展,对网络内的多个应用都可以进行统一的日志监控。这样,分布式的监控平台以及调度系统的雏形就出来了。
实现效果:页面可以实时跟踪平台上所有用户的操作日志。
实现的方式:
使用RabbitMQ的订阅发布模式,日志信息在拦截器中随时广播发布。页面打开监控就开始监听日志,停止监控就不再监听。
与页面的交互采用comet4j的形式, 这样可以避免页面一直进行ajax轮询。
前期准备:
前期准备需要搭建一个RabbitMQ服务器,然后maven工程需要引入相关工具包。再加上Comet4j的maven依赖,以及js。这些就不敷述了,度娘一大把。
关键代码:
1、拦截器中广播发布消息。关键代码:
public class AuthFilter extends HandlerInterceptorAdapter { public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { .... RabbitMQProducer.sendFanout(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date())+":用户 访问功能:<font color='red'>"+request.getRequestURI()+"</font>"); .... } }
2、消息发布端工具代码: 比较简单,就一直往rabbitMQ 发消息就行了。
@Repository public class RabbitMQProducer { private static Logger logger = Logger.getLogger(RabbitMQProducer.class); private static Channel channel; @PostConstruct private static void init() throws Exception{ logger.info("=======================初始化RabbmqProducer================================"); if(null == channel){ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(Cons.rqServerPath); factory.setUsername(Cons.rqServerUsername); factory.setPassword(Cons.rqServerPassword); Connection connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(Cons.rqExchangeName, "fanout"); } logger.info("=======================初始化RabbmqProducer 完成================================"); CometContext cc = CometContext.getInstance(); cc.registChannel(Cons.COMET4j_CHANNEL_DALOG); logger.info("=======================初始化CometContext 完成================================"); } public static void sendFanout(String message){ try { if(null == channel){ init(); } channel.basicPublish(Cons.rqExchangeName, "", null, (message).getBytes("UTF-8")); logger.info("发送RabbitMQ 消息 => "+message); } catch (Exception e) { logger.warn("RabbitMQ 消息发送失败 ",e); } } }
3、接收端的处理:
由于接受端的代码需要一直挂起。采用一个线程的方式去接受数据。然后通过comet4j组件推送到页面上进行处理。
import java.io.IOException; import org.apache.log4j.Logger; import org.comet4j.core.CometEngine; import com.common.Cons; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitMQReceiverTask extends Thread { private Logger logger = Logger.getLogger(this.getClass()); // comet4j private CometEngine engine; // RabbitMQ private Channel channel; public RabbitMQReceiverTask() { } public RabbitMQReceiverTask(CometEngine engine) { this.engine = engine; } @Override public void run() { try { if (null == channel) { logger.info("初始化RabbitMQReceiver"); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(Cons.rqServerPath); factory.setUsername(Cons.rqServerUsername); factory.setPassword(Cons.rqServerPassword); Connection connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(Cons.rqExchangeName, "fanout"); } channel.exchangeDeclare(Cons.rqExchangeName, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, Cons.rqExchangeName, ""); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); logger.info("RabbitMQReceiver 接收到消息 =》" + message); engine.sendToAll(Cons.COMET4j_CHANNEL_DALOG, message); logger.info("Comet4j 向前台推送日志消息 =》" + message); } }; channel.basicConsume(queueName, true, consumer); } catch (Exception e) { logger.warn("RabbitMQReceiverTask 组件初始化失败", e); } } }
controller中的方法 只需要保证同时只会有一个消息接受线程就行。
private Logger logger = Logger.getLogger(getClass()); private RabbitMQReceiverTask rqThread; @RequestMapping(value="comet4jCommand") public Object comet4jCommand(String cmd){ logger.info("Comet4jController.comet4jConn: cmd =>"+cmd+";"); CometEngine engine = CometContext.getInstance().getEngine(); rqThread = CometThreadContext.getThread(); if("start".equals(cmd)){//启动RQConsumer logger.info("确认启动日志推送线程"); if(null == rqThread){ rqThread = new RabbitMQReceiverTask(engine); //线程停起来好麻烦, 干脆线程启动起来后就不停止了, 一直往前台推送,只是页面上设置一个标志是否接收推送的消息好了。 //这个线程就设置为守护线程,直到应用停止才会停止。 rqThread.setDaemon(true); rqThread.start(); } } // else{//停止RQConsumer // logger.info("确认终止日志推送线程"); // if(null != rqThread){ // rqThread.stop(); // rqThread=null; // } // } CometThreadContext.setThread(rqThread); JSONObject res = new JSONObject(); res.put("res", "command received"); return res; }
页面上的处理方式:
function init(){ // 监听后台某个频道 JS.Engine.on({ start : function(cId, channelList, engine) { console.info('连接已建立,连接ID为:' + cId); }, stop : function(cause, cId, url, engine) { console.info('连接已断开,连接ID为:' + cId + ',断开原因:' + cause + ',断开的连接地址:' + url); }, DALog : function(message) { $("<p>"+message+"</p>").appendTo("#logContent"); $("#logContent").scrollTop($('#logContent')[0].scrollHeight); } }); } var pageload = function() { $scope.status="off"; init(); $('.scroll_content').slimscroll({ height : '600px' }) $scope.comet4jStart = function(){ $scope.status="on"; //发起命令,后台去启动日志推送线程,前台再连接comet4j接收推送的日志 $http.get("comet4jConn/comet4jCommand.do?cmd=start").success(function(data) { console.info("comet4jStart =>"+data); // 建立连接,conn 即web.xml中 CometServlet的<url-pattern> JS.Engine.start("conn"); }); } $scope.comet4jStop = function(){ $scope.status="off"; //后台推送线程启动了就不管他了。 只前台停止接收comet4j推送的日志。 JS.Engine.stop("主动断开连接"); } }
实现后的总结:功能最终是实现了,再回头讨论下开始提到的实现初衷,对现有的功能进行一下总结。
一、关于日志监控:目前的阶段只能看到简单的访问情况,因为所有日志都是从拦截器中发出来的。要实现日志监控,最靠谱的当然是要跟踪log4j日志了。这马上就引入了下一个新的问题:如何将log4j与RabbitMQ进行整合?
关于这个问题,目前想到的有两个解决方案:
1>扩展log4j,写一个appender整合RabbitMQ.
2>改用kafka消息中间件。kafka 0.8.**版本中有kafka.producer.KafkaLog4jAppender 已经实现了log4j与kafka的整合。
二、网上传言,当日志的量上去了之后,kafka会比RabbitMQ更合适。因为RabbitMQ相比Kafka更可靠的优势在处理日志上就会体现不出来。
三、由于所有消息都是通过第三方的消息中间件来提供的服务。于是, 由此可以扩展,对网络内的多个应用都可以进行统一的日志监控。这样,分布式的监控平台以及调度系统的雏形就出来了。
相关文章推荐
- 基于Saltstatck实现页面实时显示tomcat启动日志(17)
- 基于https实现webSocket通信实时在web页面输出日志(两个日志输出)
- [Asp.net]SignalR实现实时日志监控
- 基于iLog3的实时日志实现
- Go/Python/Erlang编程语言对比分析及示例 基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池) 封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil 分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!
- 基于邮件系统的远程实时监控系统的实现 Python版
- 基于 websocket 实现远程实时日志 在浏览器中查看设备的运行日志
- 基于python的websocket开发,tomcat日志web页面实时打印监控案例
- [Asp.net]SignalR实现实时日志监控
- 跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现
- 基于Saltstatck实现页面实时显示tomcat启动日志(17)
- [Asp.net]SignalR实现实时日志监控
- SignalR实现实时日志监控
- C++基于OpenCV实现实时监控和运动检测记录
- 基于Silverlight + WCF设计实现汽车实时数据监控
- RS-232-C端口实时监控软件的设计实现
- 网站实时监控系统的设计与实现
- 在 Linux 上实现基于 Socket 的多进程实时通信
- 转: RS-232-C端口实时监控软件的设计实现
- linux下基于jrtplib库的实时传送实现