用Hbase存储Log4j日志数据:HbaseAppender
2013-06-22 17:07
309 查看
业务需求:
需求很简单,就是把多个系统的日志数据统一存储到Hbase数据库中,方便统一查看和监控。解决思路:
写针对Hbase存储的Log4j Appender,有一个简单的日志储存策略,把Log4j的存储和Hbase的存储分开进行,当到达一定量的时候批量写入Hbase。Log4j的日志暂时存到一个队列,启动一个计划任务定时检查是否到达指定的量级,到达后批量写入Hbase将队列清空。
带来一个问题是在Log4j最后一次的数据可能未达到量级程序关闭而丢失,所以如果日志非常重要的话请同时开启文件存储!
具体代码
代码部分略掉所有import,请自行导入。log4j.properties
log4j.rootLogger=INFO,HbaseAppender #HbaseAppender log4j.appender.HbaseAppender=cn.bg.log.HbaseAppender log4j.appender.HbaseAppender.layout=org.apache.log4j.PatternLayout log4j.appender.HbaseAppender.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
HbaseAppender
package cn.bg.log; public class HbaseAppender extends AppenderSkeleton implements Runnable { private int batchSize = 10; private int period = 1000; private String hbLogName = "test"; private String hbLogFamily = "bg"; private int hbPools = 2; private Queue<LoggingEvent> loggingEvents; private ScheduledExecutorService executor; private ScheduledFuture<?> task; private Configuration conf; private HTablePool hTablePool; private HTableInterface htable; /** * log4j初始设置,启动日志处理计划任务 */ @Override public void activateOptions() { try { super.activateOptions(); //创建一个计划任务,并自定义线程名 executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HbaseAppender")); //日志队列 loggingEvents = new ConcurrentLinkedQueue<LoggingEvent>(); //启动计划任务,如果run函数有异常任务将中断! task = executor.scheduleWithFixedDelay(this, period, period, TimeUnit.MILLISECONDS); System.out.println("ActivateOptions ok!"); } catch (Exception e) { System.err.println("Error during activateOptions: " + e); } } /** * 初始HBASE * * @return */ private boolean initHbase() { try { if (conf == null) { //根据classpath下hbase-site.xml创建hbase连接,基于zookeeper conf = HBaseConfiguration.create(); //htable链接池 hTablePool = new HTablePool(conf, hbPools); htable = hTablePool.getTable(hbLogName); System.out.println("Init Hbase OK!"); } return true; } catch (Exception e) { task.cancel(false); executor.shutdown(); System.err.println("Init Hbase fail !"); return false; } } @Override public void run() { if (conf == null || htable == null) { initHbase(); } try { //日志数据超出批量处理大小 if (batchSize <= loggingEvents.size()) { LoggingEvent event; List<Put> logs = new ArrayList<Put>(); //循环处理日志队列 while ((event = loggingEvents.poll()) != null) { try { //创建日志并指定ROW KEY Put log = new Put((event.getThreadName() + event.getLevel().toString() + System.currentTimeMillis()).getBytes()); //写日志内容 log.add(hbLogFamily.getBytes(), "log".getBytes(), layout.format(event).getBytes()); logs.add(log); } catch (Exception e) { System.err.println("Error logging put " + e); } } //批量写入HBASE if (logs.size() > 0) htable.put(logs); } } catch (Exception e) { System.err.println("Error run " + e); } } /** * 日志事件 * * @param loggingEvent */ @Override protected void append(LoggingEvent loggingEvent) { try { populateEvent(loggingEvent); //添加到日志队列 loggingEvents.add(loggingEvent); } catch (Exception e) { System.err.println("Error populating event and adding to queue" + e); } } /** * 事件测试 * * @param event */ protected void populateEvent(LoggingEvent event) { event.getThreadName(); event.getRenderedMessage(); event.getNDC(); event.getMDCCopy(); event.getThrowableStrRep(); event.getLocationInformation(); } @Override public void close() { try { task.cancel(false); executor.shutdown(); hTablePool.close(); htable.close(); } catch (IOException e) { System.err.println("Error close " + e); } } @Override public boolean requiresLayout() { return true; } //设置每一批日志处理数量 public void setBatchSize(int batchSize) { this.batchSize = batchSize; } /** * 设置计划任务执行间隔 * * @param period */ public void setPeriod(int period) { this.period = period; } /** * 设置日志存储HBASE表名 * * @param hbLogName */ public void setHbLogName(String hbLogName) { this.hbLogName = hbLogName; } /** * 日志表的列族名字 * @param hbLogFamily */ public void setHbLogFamily(String hbLogFamily) { this.hbLogFamily = hbLogFamily; } }
NamedThreadFactory
package cn.bg.log; public class NamedThreadFactory implements ThreadFactory { private final String prefix; private final ThreadFactory threadFactory; private final AtomicInteger atomicInteger = new AtomicInteger(); public NamedThreadFactory(final String prefix){ this(prefix, Executors.defaultThreadFactory()); } public NamedThreadFactory(final String prefix, final ThreadFactory threadFactory){ this.prefix = prefix; this.threadFactory = threadFactory; } @Override public Thread newThread(Runnable r) { Thread t = this.threadFactory.newThread(r); t.setName(this.prefix + this.atomicInteger.incrementAndGet()); return t; } }
完!
相关文章推荐
- 万亿级日志与行为数据存储查询技术剖析——Hbase系预聚合方案、Dremel系parquet列存储、预聚合系、Lucene系
- Flume+HBase采集和存储日志数据
- Flume+hbase 日志数据采集与存储
- Flume+HBase采集和存储日志数据
- 大数据技术应用(一) 应用Flume+HBase采集和存储日志数据
- Facebook开源LogDevice:一种用于日志的分布式数据存储系统
- HBase学习之四: mapreduce处理数据后存储到hbase及错误java.lang.NoClassDefFoundError的解决办法
- 重新认识HBase,Cassandra列存储——本质是还是行存储,只是可以动态改变列(每行对应的数据字段)数量而已,当心不是parquet
- Checklists学习日志之使用NSUserDefaults存储数据
- 爬虫代码实现四:采用Hbase存储爬虫数据(1)
- MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中?
- HBase数据存储格式
- Hbase 存储结构 数据模型
- 大数据时代的结构化存储—HBase在阿里的应用实践
- Log4j日志存储到数据库——优化
- LogDevice:Facebook开发的分布式日志数据存储系统
- log4j之不同类产生日志存储不同日志文件中
- Hadoop Hbase适合存储哪类数据?
- HBASE松散数据存储设计初识
- Hadoop Hbase适合存储哪类数据?(转)