并发编程源码分析一之Log接口
2015-11-30 21:40
246 查看
并发编程实践部分源码使用github里面部分开源框架源码,重新整理,如有错误或版权问题,请指出指正。欢迎star、fork,读书笔记系列会同步更新githttps://github.com/xuminwlt/j360-jdk模块j360-jdk-application![](http://static.oschina.net/uploads/space/2015/1130/143247_6UYx_1026123.png)
![](http://static.oschina.net/uploads/space/2015/1130/143331_NqQn_1026123.png)
前言
平常开发过程中使用并发框架的场景在SSH的框架里面直接接触的机会并不是很多,基本上大量使用的就是部分原子类,然而在底层实现中,为了实现高并发的可能,jdk并发编程框架几乎所谓不在,每个并发编程类以及其参数都有各自所擅长的场景,并发编程框架不是一卡通,但是却很像搭积木,通过组合几乎很多的并发场景都不在话下,这里就是用批量日志管理接口的场景中,如何使用并发编程类实现高并发下的日志管理。内容
业务日志在每个系统中都是不可或缺的功能点,业务日志的输出也有多种的输出形式,会话界面、文本、数据库等等,通常业务日志在系统中的埋点会作为一个日志单元,一个业务系统每天产生的日志数量=每个处理流程*埋点*pv(或者同单位request),在高并发系统中,单个集群节点处理日志也通常采用并发框架批量进行处理,这里分别设置两个条件来对收集的日志进行处理:1:每nSeconds批量将收集的日志队列输出2:日志队列占用的内存>设置的阈值(保护n秒内的队列过大)类图
![](http://static.oschina.net/uploads/space/2015/1130/143247_6UYx_1026123.png)
包结构
![](http://static.oschina.net/uploads/space/2015/1130/143331_NqQn_1026123.png)
类说明
分别代表工厂类、日志方法接口类、代理类以及实现类的工厂类、方法实现类先看接口类的方法:public interface BizLogger { public void log(BizLogPo bizLogPo); public void log(List<BizLogPo> bizLogPos); }分别处理单条和多条日志再看会话打印实现类:
private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName()); @Override public void log(BizLogPo jobLogPo) { LOGGER.info(JSONUtils.toJSONString(jobLogPo)); } @Override public void log(List<BizLogPo> jobLogPos) { for (BizLogPo jobLogPo : jobLogPos) { log(jobLogPo); } }这里使用slf4j接口类作为会话打印的输出接口,实现类同样可以使用mysql、file等输出形式通过工厂类配置获得实现类,通常会使用多个实现类提供接口,而多个实现类难以同时对实现过程进行控制,这里引入了代理类来调用具体的实现类,而并发编程框架通常在代理类中进行集中控制管理:
public class BizLoggerDelegate implements BizLogger { private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class); // 3S 检查输盘一次日志 private int flushPeriod; private BizLogger jobLogger; private boolean lazyLog = false; private ScheduledExecutorService executor; private ScheduledFuture scheduledFuture; private BlockingQueue<BizLogPo> memoryQueue; // 日志批量刷盘数量 private int batchFlushSize = 100; private int overflowSize = 10000; // 内存中最大的日志量阀值 private int maxMemoryLogSize; private AtomicBoolean flushing = new AtomicBoolean(false); public BizLoggerDelegate(Config config) { BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() { @Override public BizLogger getJobLogger() { return new ConsoleBizLogger(); } }; jobLogger = jobLoggerFactory.getJobLogger(); lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false); if (lazyLog) { // 无界Queue memoryQueue = new LinkedBlockingQueue<BizLogPo>(); maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000); flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3); executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger")); scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { if (flushing.compareAndSet(false, true)) { checkAndFlush(); } } catch (Throwable t) { LOGGER.error("CheckAndFlush log error", t); } } }, flushPeriod, flushPeriod, TimeUnit.SECONDS); } } /** * 检查内存中是否有日志,如果有就批量刷盘 */ private void checkAndFlush() { try { int nowSize = memoryQueue.size(); if (nowSize == 0) { return; } List<BizLogPo> batch = new ArrayList<BizLogPo>(); for (int i = 0; i < nowSize; i++) { BizLogPo jobLogPo = memoryQueue.poll(); batch.add(jobLogPo); if (batch.size() >= batchFlushSize) { flush(batch); } } if (batch.size() > 0) { flush(batch); } } finally { flushing.compareAndSet(true, false); } } private void checkOverflowSize() { if (memoryQueue.size() > overflowSize) { throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available"); } } private void flush(List<BizLogPo> batch) { boolean flushSuccess = false; try { jobLogger.log(batch); flushSuccess = true; } finally { if (!flushSuccess) { memoryQueue.addAll(batch); } batch.clear(); } } /** * 检查内存中的日志量是否超过阀值,如果超过需要批量刷盘日志 */ private void checkCapacity() { if (memoryQueue.size() > maxMemoryLogSize) { // 超过阀值,需要批量刷盘 if (flushing.compareAndSet(false, true)) { // 这里可以采用new Thread, 因为这里只会同时new一个 new Thread(new Runnable() { @Override public void run() { try { checkAndFlush(); } catch (Throwable t) { LOGGER.error("Capacity full flush error", t); } } }).start(); } } } @Override public void log(BizLogPo jobLogPo) { if (jobLogPo == null) { return; } if (lazyLog) { checkOverflowSize(); memoryQueue.offer(jobLogPo); checkCapacity(); } else { jobLogger.log(jobLogPo); } } @Override public void log(List<BizLogPo> jobLogPos) { if (CollectionUtils.isEmpty(jobLogPos)) { return; } if (lazyLog) { checkOverflowSize(); for (BizLogPo jobLogPo : jobLogPos) { memoryQueue.offer(jobLogPo); } // checkCapacity checkCapacity(); } else { jobLogger.log(jobLogPos); } } }这里写一个测试类测试下结果:
@Test public void loggerTest() throws InterruptedException { Config config = new Config(); config.setParameter("biz.logger","console"); List<BizLogPo> list = new ArrayList<BizLogPo>(); for(int i =0;i<=10;i++){ BizLogPo jobLogPo = new BizLogPo(); jobLogPo.setMsg("hello" + i); list.add(jobLogPo); } TimeUnit.SECONDS.sleep(5); BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config); jobLoggerDelegate.log(list); }通常开发使用Spring环境时,新增Spring适配工厂类,通过Spring配置下:
public class BizLoggerFactoryBean implements FactoryBean<BizLogger>, InitializingBean, DisposableBean { public BizLogger getBizLogger() { return bizLogger; } public void setBizLogger(BizLogger bizLogger) { this.bizLogger = bizLogger; } private BizLogger bizLogger; @Override public void destroy() throws Exception { } @Override public BizLogger getObject() throws Exception { return bizLogger; } @Override public Class<?> getObjectType() { return bizLogger.getClass(); } @Override public boolean isSingleton() { return true; } @Override public void afterPropertiesSet() throws Exception { } }新增Spring的配置类:
LoggerSpringConfig ApplicationContextAware { ApplicationContext (ApplicationContext applicationContext) BeansException { .= applicationContext} (=) BizLogger () Exception { BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()} }新增Spring的测试类:
@Testpublic void loggerSpringTest(){ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class);BizLogger bizLogger = (BizLogger) context.getBean("bizLogger");List<BizLogPo> list = new ArrayList<BizLogPo>();for(int i =0;i<=10;i++){BizLogPo jobLogPo = new BizLogPo();jobLogPo.setMsg("hello" + i);list.add(jobLogPo);}bizLogger.log(list);}实际使用中,把LoggerDelelate配置到Bean里面即可。到这里一个简单的使用并发框架实现的日志处理接口完成了,当然实现类中可以使用批量SQL的形式进行处理,增强sql的吞吐量。
相关文章推荐
- Go 语言 Channel 实现原理精要
- Erlang并发编程介绍
- Java并发编程示例(二):获取和设置线程信息
- Java并发编程示例(十):线程组
- Java并发编程示例(八):处理线程的非受检异常
- Java并发编程示例(六):等待线程执行终止
- Java并发编程示例(五):线程休眠与恢复
- Java并发编程示例(九):本地线程变量的使用
- Java并发编程之栅栏(CyclicBarrier)实例介绍
- 实例讲解Java并发编程之ThreadLocal类
- Java并发编程示例(七):守护线程的创建和运行
- 实例讲解Java并发编程之闭锁
- Java并发编程中构建自定义同步工具
- Java并发编程之显示锁ReentrantLock和ReadWriteLock读写锁
- Java并发编程示例(三):线程中断
- Java并发编程示例(四):可控的线程中断
- Java并发编程示例(一):线程的创建和执行
- Java并发编程之原子变量与非阻塞同步机制
- 实例讲解Java并发编程之变量
- Python中的并发编程实例