BlockingQueue的原理浅析及其在项目中的应用
2017-01-16 16:02
417 查看
(一)BlockingQueue的原理
1. 什么是BlockingQueue?
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
2. BlockingQueue的核心方法:
2.1放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
直到BlockingQueue里面有空间再继续.
2.2 获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
3. BlockingQueue的源码
put方法的源码
BlockingQueue使用的是ReentrantLock:
从上面的源码可以看出,当队列满的时候,会调用await()方法:
BlockingQueue使用LockSupport.park(this)来停止线程, LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park()和 LockSupport .unpark()实现线程的阻塞和唤醒 的。 LockSupport 很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继 续 执行;如果许可已经被占用,当前线 程阻塞,等待获取许可。
关于该方法的具体实现,可以参考:
(http://blog.csdn.net/hengyunabc/article/details/28126139)
4.BlockingQueue在项目中应用
项目中,需要将错误日志入库,用户在访问时会首先经过特殊字符拦截器,如果出现访问错误,会产生错误日志,并会调用Log方法,将该错误日志加入阻塞队列:
在做项目时,日志要入库,在spring容器初始化时启动线程
spring的配置如下:
LogSyncToDBThread类的定义如下:
saveLogCacheToDB()方法定义如下:
其中log方法:借助Spring的拦截器调用log方法——如果在请求过程中,发生请求异常,则调用该方法,新建logManger实体类,并将其写入BlockingQueue阻塞队列!
其中LogManager实体类定义如下:
1. 什么是BlockingQueue?
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
2. BlockingQueue的核心方法:
2.1放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
直到BlockingQueue里面有空间再继续.
2.2 获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
3. BlockingQueue的源码
put方法的源码
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
BlockingQueue使用的是ReentrantLock:
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition 10090 for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
从上面的源码可以看出,当队列满的时候,会调用await()方法:
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException * <li> Save lock state returned by {@link #getState} * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw exception * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
BlockingQueue使用LockSupport.park(this)来停止线程, LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park()和 LockSupport .unpark()实现线程的阻塞和唤醒 的。 LockSupport 很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继 续 执行;如果许可已经被占用,当前线 程阻塞,等待获取许可。
关于该方法的具体实现,可以参考:
(http://blog.csdn.net/hengyunabc/article/details/28126139)
4.BlockingQueue在项目中应用
项目中,需要将错误日志入库,用户在访问时会首先经过特殊字符拦截器,如果出现访问错误,会产生错误日志,并会调用Log方法,将该错误日志加入阻塞队列:
/** * 保存日志信息到内存文件 * * @param logEntity * 日志文件实体 */ @Override public void log(LogManager logEntity) { synchronized (blockingQueue) { if (logEntity != null) { //如果报错信息队列大于队列最大值,则证明保存队列数据线程出现问题,则不继续插入队列 if(blockingQueue.size() < maxSize){ logEntity.setRequestmsg(StringTools.subLogMsg(logEntity.getRequestmsg(),4000)); logEntity.setResponsemsg(StringTools.subLogMsg(logEntity.getResponsemsg(),4000)); logEntity.setErrormsg(StringTools.subLogMsg(logEntity.getErrormsg(),4000)); blockingQueue.put(logEntity); } } } } @Override public void log(String requestmsg, String responsemsg, String errormsg) { if(StringUtils.isEmpty(errormsg)){ return; } LogManager logEntity = new LogManager(); logEntity.setRequestmsg(StringTools.subLogMsg(requestmsg,4000)); logEntity.setResponsemsg(StringTools.subLogMsg(responsemsg,4000)); logEntity.setErrormsg(StringTools.subLogMsg(errormsg,4000)); logEntity.setCreattime(new Date()); try { logEntity.setIp(CpicmIPUtil.getServerIp()); } catch (Exception e) { } synchronized (blockingQueue) { //如果报错信息队列大于队列最大值,则证明保存队列数据线程出现问题,则不继续插入队列 if(blockingQueue.size() < maxSize){ blockingQueue.put(logEntity); } } }
在做项目时,日志要入库,在spring容器初始化时启动线程
spring的配置如下:
<!-- 启动时加载错误日志入库线程 --> <bean id="logSyncToDBThread" lazy-init="false" class="com.smu.edward.threads.LogSyncToDBThread" init-method="startSync"> </bean>
LogSyncToDBThread类的定义如下:
public class LogSyncToDBThread extends Thread { Logger logger = LoggerFactory.getLogger(LogSyncToDBThread.class); @Autowired private LogManagerService logManagerService; private boolean isExited = true; /** * Run方法,线程10秒钟运行一次保存日志文件到数据库 */ public void run() { isExited = false; while (!isExited) { if (LogManagerServiceImpl.getBlockingQueue() == null || LogManagerServiceImpl.getBlockingQueueSize() < 0) { try { Thread.sleep(10 * 1000); } catch (Exception e) { continue; } continue; } int count = 0; try { count = logManagerService.saveLogCacheToDB(); // if(count != 0){ // logger.info("向数据库同步了" + count + "条审计日志数据!"); // } } catch (Exception e) { logger.error(e.getMessage()); continue; } try { Thread.sleep(10 * 1000); } catch (Exception e) { continue; } } isExited = true; } /** * 线程同步启动方法 */ public void startSync() { start(); logger.info("错误日志入库线程启动成功"); } }
saveLogCacheToDB()方法定义如下:
@Service(value="logManagerService")
public class LogManagerServiceImpl implements LogManagerService {
//private static List<LogEntity> logCache = new ArrayList<LogEntity>();
private static final Logger logger = LoggerFactory.getLogger(LogManagerServiceImpl.class);
private static BlockingQueue blockingQueue = new BlockingQueue();
//最大队列大小限制为10000个,超过10000则不写入队列
private static final int maxSize = 10000;
@Autowired
private LogManagerRepo logManagerRepo;
/** * 保存日志信息到内存文件 * * @param logEntity * 日志文件实体 */ @Override public void log(LogManager logEntity) { synchronized (blockingQueue) { if (logEntity != null) { //如果报错信息队列大于队列最大值,则证明保存队列数据线程出现问题,则不继续插入队列 if(blockingQueue.size() < maxSize){ logEntity.setRequestmsg(StringTools.subLogMsg(logEntity.getRequestmsg(),4000)); logEntity.setResponsemsg(StringTools.subLogMsg(logEntity.getResponsemsg(),4000)); logEntity.setErrormsg(StringTools.subLogMsg(logEntity.getErrormsg(),4000)); blockingQueue.put(logEntity); } } } } @Override public void log(String requestmsg, String responsemsg, String errormsg) { if(StringUtils.isEmpty(errormsg)){ return; } LogManager logEntity = new LogManager(); logEntity.setRequestmsg(StringTools.subLogMsg(requestmsg,4000)); logEntity.setResponsemsg(StringTools.subLogMsg(responsemsg,4000)); logEntity.setErrormsg(StringTools.subLogMsg(errormsg,4000)); logEntity.setCreattime(new Date()); try { logEntity.setIp(CpicmIPUtil.getServerIp()); } catch (Exception e) { } synchronized (blockingQueue) { //如果报错信息队列大于队列最大值,则证明保存队列数据线程出现问题,则不继续插入队列 if(blockingQueue.size() < maxSize){ blockingQueue.put(logEntity); } } }
// 同步内存中的日志信息到数据库
public int saveLogCacheToDB() {
int result = 0;
try {
synchronized (blockingQueue) {
List<LogManager> logEntities = new ArrayList<LogManager>();
if (blockingQueue != null && !blockingQueue.isEmpty()) {
if (blockingQueue.size() < 200) {
// 保存到数据库
try {
while(!blockingQueue.isEmpty()){
logEntities.add((LogManager)blockingQueue.take());
}
logManagerRepo.batchInsertLog(logEntities);
// 插入条数
result = logEntities.size();
} catch (Exception e) {
logger.error(e.getMessage());
return 0;
}
} else{
// 最多取出200条保存到数据库
for (int i = 0; i < 200; i++) {
logEntities.add((LogManager)blockingQueue.take());
}
try {
// 保存到数据库
logManagerRepo.batchInsertLog(logEntities);
// 插入条数
result = logEntities.size();
} catch (Exception e) {
logger.error(e.getMessage());
// 保存数据库时发生异常,需要还原
for(LogManager log : logEntities){
blockingQueue.put(log);
}
return 0;
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage());
return 0;
}
return result;
}
public static BlockingQueue getBlockingQueue() {
synchronized (blockingQueue) {
return blockingQueue;
}
}
public static int getBlockingQueueSize() {
synchronized (blockingQueue) {
return blockingQueue.size();
}
}
public static void setBlockingQueue(BlockingQueue blockingQueue) {
LogManagerServiceImpl.blockingQueue = blockingQueue;
}
public Page<Map<String, Object>> searchLogManagerList(Pageable page,Map paramMap){
return this.logManagerRepo.searchLogManagerList(page, paramMap);
}
}
其中log方法:借助Spring的拦截器调用log方法——如果在请求过程中,发生请求异常,则调用该方法,新建logManger实体类,并将其写入BlockingQueue阻塞队列!
其中LogManager实体类定义如下:
@Entity @Table(name = "LOG_MANAGER") public class LogManager implements java.io.Serializable { /** * 列名常量 */ private static final String COL_ID = "id"; private static final String COL_REQUESTMSG = "requestmsg"; private static final String COL_RESPONSEMSG = "responsemsg"; private static final String COL_ERRORMSG = "errormsg"; private static final String COL_CREATETIME = "creattime"; private static final String COL_IP = "ip"; /** * 列属性 */ @Id @SequenceGenerator(name="SEQ_LOG_MANAGER", sequenceName="SEQ_LOG_MANAGER", allocationSize = 1) @GeneratedValue(generator="SEQ_LOG_MANAGER",strategy=GenerationType.SEQUENCE) @Column(name=COL_ID,unique = true, nullable = false, precision = 18, scale = 0) private BigDecimal id; @Column(name=COL_REQUESTMSG) private String requestmsg; @Column(name=COL_RESPONSEMSG) private String responsemsg; @Column(name=COL_ERRORMSG) private String errormsg; @Column(name=COL_CREATETIME) private Date creattime; @Column(name=COL_IP) private String ip; public BigDecimal getId() { return id; } public void setId(BigDecimal id) { this.id = id; } public String getRequestmsg() { return requestmsg; } public void setRequestmsg(String requestmsg) { this.requestmsg = requestmsg; } public String getResponsemsg() { return responsemsg; } public void setResponsemsg(String responsemsg) { this.responsemsg = responsemsg; } public String getErrormsg() { return errormsg; } public void setErrormsg(String errormsg) { this.errormsg = errormsg; } public Date getCreattime() { return creattime; } public void setCreattime(Date creattime) { this.creattime = creattime; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } }
相关文章推荐
- BlockingQueue的原理浅析及其在项目中的应用
- BlockingQueue的原理浅析及其在项目中的应用
- BlockingQueue的原理浅析及其在项目中的应用
- 浅析Kerberos原理,及其应用和管理
- 基于互联网的项目管理——Microsoft Project Central 及其应用
- JPEG 原理详细实例分析及其在嵌入式 Linux 中的应用
- ASP.NET缓存概念及其应用浅析
- MPEG-2TS码流编辑的原理及其应用
- JPEG 原理详细实例分析及其在嵌入式 Linux 中的应用
- JPEG 原理详细实例分析及其在嵌入式 Linux 中的应用
- Windows NT Session 概念的原理与应用浅析 [1] 遍历并获取信息
- WINDOWS键盘事件的挂钩监控原理及其应用技术
- (转贴)浅析GSM上层协议及其在测试中的应用
- JPEG 原理详细实例分析及其在嵌入式 Linux 中的应用
- JMS(Jboss Messaging)的一点使用心得(十三)拔网线后的重连----JMS Connection原理浅析及应用
- 基于互联网的项目管理——Microsoft Project Central 及其应用
- 理解http session的原理及其深度应用
- MPEG-2TS码流编辑的原理及其应用
- struts的应用及其工作原理
- 物流资源计划(LRP)的原理及其应用