您的位置:首页 > 编程语言

并发编程源码分析一之Log接口

2015-11-30 21:40 246 查看
并发编程实践部分源码使用github里面部分开源框架源码,重新整理,如有错误或版权问题,请指出指正。欢迎star、fork,读书笔记系列会同步更新githttps://github.com/xuminwlt/j360-jdk模块j360-jdk-application

前言

平常开发过程中使用并发框架的场景在SSH的框架里面直接接触的机会并不是很多,基本上大量使用的就是部分原子类,然而在底层实现中,为了实现高并发的可能,jdk并发编程框架几乎所谓不在,每个并发编程类以及其参数都有各自所擅长的场景,并发编程框架不是一卡通,但是却很像搭积木,通过组合几乎很多的并发场景都不在话下,这里就是用批量日志管理接口的场景中,如何使用并发编程类实现高并发下的日志管理。

内容

业务日志在每个系统中都是不可或缺的功能点,业务日志的输出也有多种的输出形式,会话界面、文本、数据库等等,通常业务日志在系统中的埋点会作为一个日志单元,一个业务系统每天产生的日志数量=每个处理流程*埋点*pv(或者同单位request),在高并发系统中,单个集群节点处理日志也通常采用并发框架批量进行处理,这里分别设置两个条件来对收集的日志进行处理:1:每nSeconds批量将收集的日志队列输出2:日志队列占用的内存>设置的阈值(保护n秒内的队列过大)

类图

包结构

类说明

分别代表工厂类、日志方法接口类、代理类以及实现类的工厂类、方法实现类先看接口类的方法:
public interface BizLogger {
public void log(BizLogPo bizLogPo);

public void log(List<BizLogPo> bizLogPos);
}
分别处理单条和多条日志再看会话打印实现类:
private Logger LOGGER = LoggerFactory.getLogger(ConsoleBizLogger.class.getSimpleName());

@Override
public void log(BizLogPo jobLogPo) {
LOGGER.info(JSONUtils.toJSONString(jobLogPo));
}

@Override
public void log(List<BizLogPo> jobLogPos) {
for (BizLogPo jobLogPo : jobLogPos) {
log(jobLogPo);
}
}
这里使用slf4j接口类作为会话打印的输出接口,实现类同样可以使用mysql、file等输出形式通过工厂类配置获得实现类,通常会使用多个实现类提供接口,而多个实现类难以同时对实现过程进行控制,这里引入了代理类来调用具体的实现类,而并发编程框架通常在代理类中进行集中控制管理:
public class BizLoggerDelegate implements BizLogger {

private static final Logger LOGGER = LoggerFactory.getLogger(BizLoggerDelegate.class);

// 3S 检查输盘一次日志
private int flushPeriod;

private BizLogger jobLogger;
private boolean lazyLog = false;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;
private BlockingQueue<BizLogPo> memoryQueue;
// 日志批量刷盘数量
private int batchFlushSize = 100;
private int overflowSize = 10000;
// 内存中最大的日志量阀值
private int maxMemoryLogSize;
private AtomicBoolean flushing = new AtomicBoolean(false);

public BizLoggerDelegate(Config config) {
BizLoggerFactory jobLoggerFactory = new BizLoggerFactory() {
@Override
public BizLogger getJobLogger() {
return new ConsoleBizLogger();
}
};
jobLogger = jobLoggerFactory.getJobLogger();
lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false);
if (lazyLog) {

// 无界Queue
memoryQueue = new LinkedBlockingQueue<BizLogPo>();
maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000);
flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3);

executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger"));
scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
if (flushing.compareAndSet(false, true)) {
checkAndFlush();
}
} catch (Throwable t) {
LOGGER.error("CheckAndFlush log error", t);
}
}
}, flushPeriod, flushPeriod, TimeUnit.SECONDS);

}
}

/**
* 检查内存中是否有日志,如果有就批量刷盘
*/
private void checkAndFlush() {
try {
int nowSize = memoryQueue.size();
if (nowSize == 0) {
return;
}
List<BizLogPo> batch = new ArrayList<BizLogPo>();
for (int i = 0; i < nowSize; i++) {
BizLogPo jobLogPo = memoryQueue.poll();
batch.add(jobLogPo);

if (batch.size() >= batchFlushSize) {
flush(batch);
}
}
if (batch.size() > 0) {
flush(batch);
}

} finally {
flushing.compareAndSet(true, false);
}
}

private void checkOverflowSize() {
if (memoryQueue.size() > overflowSize) {
throw new BizLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available");
}
}

private void flush(List<BizLogPo> batch) {
boolean flushSuccess = false;
try {
jobLogger.log(batch);
flushSuccess = true;
} finally {
if (!flushSuccess) {
memoryQueue.addAll(batch);
}
batch.clear();
}
}

/**
* 检查内存中的日志量是否超过阀值,如果超过需要批量刷盘日志
*/
private void checkCapacity() {
if (memoryQueue.size() > maxMemoryLogSize) {
// 超过阀值,需要批量刷盘
if (flushing.compareAndSet(false, true)) {
// 这里可以采用new Thread, 因为这里只会同时new一个
new Thread(new Runnable() {
@Override
public void run() {
try {
checkAndFlush();
} catch (Throwable t) {
LOGGER.error("Capacity full flush error", t);
}
}
}).start();
}
}
}

@Override
public void log(BizLogPo jobLogPo) {
if (jobLogPo == null) {
return;
}
if (lazyLog) {
checkOverflowSize();
memoryQueue.offer(jobLogPo);
checkCapacity();
} else {
jobLogger.log(jobLogPo);
}
}

@Override
public void log(List<BizLogPo> jobLogPos) {
if (CollectionUtils.isEmpty(jobLogPos)) {
return;
}
if (lazyLog) {
checkOverflowSize();
for (BizLogPo jobLogPo : jobLogPos) {
memoryQueue.offer(jobLogPo);
}
// checkCapacity
checkCapacity();
} else {
jobLogger.log(jobLogPos);
}
}

}
这里写一个测试类测试下结果:
@Test
public void loggerTest() throws InterruptedException {
Config config = new Config();
config.setParameter("biz.logger","console");

List<BizLogPo> list = new ArrayList<BizLogPo>();
for(int i =0;i<=10;i++){
BizLogPo jobLogPo = new BizLogPo();
jobLogPo.setMsg("hello" + i);
list.add(jobLogPo);
}
TimeUnit.SECONDS.sleep(5);
BizLoggerDelegate jobLoggerDelegate = new BizLoggerDelegate(config);
jobLoggerDelegate.log(list);
}
通常开发使用Spring环境时,新增Spring适配工厂类,通过Spring配置下:
public class BizLoggerFactoryBean implements FactoryBean<BizLogger>,
InitializingBean, DisposableBean {

public BizLogger getBizLogger() {
return bizLogger;
}

public void setBizLogger(BizLogger bizLogger) {
this.bizLogger = bizLogger;
}

private BizLogger bizLogger;

@Override
public void destroy() throws Exception {

}

@Override
public BizLogger getObject() throws Exception {
return bizLogger;
}

@Override
public Class<?> getObjectType() {
return bizLogger.getClass();
}

@Override
public boolean isSingleton() {
return true;
}

@Override
public void afterPropertiesSet() throws Exception {

}
}
新增Spring的配置类:
LoggerSpringConfig ApplicationContextAware {

ApplicationContext (ApplicationContext applicationContext) BeansException {
.= applicationContext}

(=)
BizLogger () Exception {
BizLoggerFactoryBean bizLoggerFactoryBean = BizLoggerFactoryBean()bizLoggerFactoryBean.setBizLogger(ConsoleBizLogger())bizLoggerFactoryBean.getObject()}

}
新增Spring的测试类:
@Testpublic void loggerSpringTest(){ApplicationContext context = new AnnotationConfigApplicationContext(LoggerSpringConfig.class);BizLogger bizLogger = (BizLogger) context.getBean("bizLogger");List<BizLogPo> list = new ArrayList<BizLogPo>();for(int i =0;i<=10;i++){BizLogPo jobLogPo = new BizLogPo();jobLogPo.setMsg("hello" + i);list.add(jobLogPo);}bizLogger.log(list);}
实际使用中,把LoggerDelelate配置到Bean里面即可。到这里一个简单的使用并发框架实现的日志处理接口完成了,当然实现类中可以使用批量SQL的形式进行处理,增强sql的吞吐量。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  并发编程