您的位置:首页 > 其它

10W级数据更新操作__生产消费者模式

2017-07-07 10:30 239 查看

背景需求

最近有这么一个需求:由于本地系统信息与另一个系统数据可能不一致,两个系统有各自的独立数据库和业务,在通信过程中网络等原因,导致两者之间的关键信息有差异,因此本地数据库中可能有10W条记录需要更新,本地数据库的信息需要逐条与远程http请求数据,对比或更新。

技术分析

如果将本地数据库全部取出放入一个集合中,然后遍历并发送http请求核对数据,显然不太现实。存储10W条记录需要多大的缓存?而且这样做对系统资源占用也很高。

采用生产消费者模式批量处理数据。生产消费者模式维护一个队列,一个线程添加数据,另一个线程取数据,可通过控制线程的数量来控制处理的速度。

思路:用分页取批量数据,放入一个阻塞式队列
LinkedBlockingQueue
中,开启另一个线程从队列中取数据,循环以上过程,直到全部数据处理完毕(数据处理到最后一页),队列为空。

代码实现

Talk is cheap, show me code

public class CheckRecordWithChannel {

private static final Logger logger = LoggerFactory.getLogger(CheckRecordWithChannel.class);

//500条数据
private volatile BlockingQueue<WechatTransInfo> orderQueue = new LinkedBlockingQueue<>(500);

public static final int PAGE_NUM = 200;

//生产完成标志
private boolean produceFlag = false;
//消费完成标志
private boolean cosumerFlag = false;
//当前页面数
private int currentPageNum = 0;

private CheckOrderBusinessHandle businessHandle
4000
;

/**
* 构造方法
* @param businessHandle 需要实现的特殊业务方法
*/
public CheckRecordWithChannel (CheckOrderBusinessHandle businessHandle) {
this.businessHandle = businessHandle;
}

/**
* 主方法
* @return
* @throws InterruptedException
*/
public boolean checkOrderBusiness() throws InterruptedException {
//创建线程池,生产者、消费都的数量可以多用几个
ExecutorService checkOrderService = Executors.newCachedThreadPool();
LocalOrderProducer producer = this.new LocalOrderProducer();
OrderConsumer consumer = this.new OrderConsumer();
checkOrderService.submit(producer);
checkOrderService.submit(consumer);
while (true) {
Thread.sleep(2000L);
if (produceFlag && cosumerFlag && orderQueue.isEmpty()) {
List<Runnable> shutdownList = checkOrderService.shutdownNow();
logger.warn("------{} 页处理完成,{} 线程停止运行-----", currentPageNum,shutdownList);
return true;
}
}
}

/**
* 分页查询数据业务
*/
public Page<WechatTransInfo> queryOrderWithPage(int pageNum) {
//方便以后拓展
return businessHandle.queryOrderWithPage(this.currentPageNum++, pageNum);
}

/**
* 生产方法, 阻塞式,put方法
* @param orderList
* @return 是否还有数据, false表示没有数据
* @throws InterruptedException 线程被中断
*/
public boolean produceOrder() throws InterruptedException
{
//数据标识
boolean hasData = true;
Page<Info> queryOrderPage = queryOrderWithPage(PAGE_NUM);
if (null == queryOrderPage) {
return false;
}
//最后一次的查询结果
if (!queryOrderPage.hasNextPage()) {
hasData = false;
}
//循环插入200条数据,队列满就阻塞等待
for (Info order : queryOrderPage.getContent()) {
orderQueue.put(order);
}
return hasData;
}

/**
* 消费者方法
*/
public boolean cosumerOrder(Info orderInfo) {
//方便以后拓展
return businessHandle.checkOrderWithOrg(orderInfo);
}

/**
* 生产者
* <p> 查询本地数据库订单,并put到orderIdQueue中,查询结束则flag=true
* @author
*/
class LocalOrderProducer implements Runnable {
@Override
public void run() {
try {
// 如果数据库未查询完,继续生产
while (!produceFlag) {
Thread.sleep(2000L);  //放慢生产者速度
//如果数据库中没有数据
if(!produceOrder()) {
Thread.sleep(5000L);
produceFlag = true;
}
}
logger.debug("----producer was done---");
} catch (InterruptedException e) {
logger.error("--- producer thread was interrupted--{}-", e);
}
}

}

/**
* 消费者
* <p> 非阻塞式消费
* @author
*/
class OrderConsumer implements Runnable {
@Override
public void run() {
try {
// 第一次阻塞取数据
Thread.sleep(5000L);
Info orderInfo = null;
//如果生产者还在生产或者队列不为空,则进入继续消费处理
while (!produceFlag || null != (orderInfo = orderQueue.poll())) {
//如果队列为空,则等待2s生产者生产
logger.debug("---is get order data, ? {} ---", null != orderInfo);
if (null == orderInfo) {
Thread.sleep(2000L);
}
cosumerOrder(orderInfo);
//orderInfo = orderQueue.poll();
}

if (produceFlag && orderQueue.isEmpty()) {
cosumerFlag = true;
}
} catch (InterruptedException e) {
logger.error("---thread was interrupted--{}-", e);
}

}

}

}


public interface CheckOrderBusinessHandle {

/**
* 查询本地数据库业务
* <p> 顺序迭代查询数据库,
* pageNum是每次查询数量
* @return 返回查询的结果
*/
Page<Info> queryOrderWithPage(int currentPageNum, int pageNum);

/**
* 处理业务逻辑,更新本地数据库
* @param order  信息
*/
boolean checkOrderWithOrg(Info orderInfo);
}


CheckRecordWithChannel 构造方法初始化handle用来处理生产和消费的具体业务;

两个Runnable 实现类分别是生产和消费线程,其中生产采用阻塞式,如果队列满且数据库未查询完毕则阻塞,直到数据库全部查询完毕并put到队列中时,该线程退出put;消费采用非阻塞式poll,当生产完毕且队列为空时,则退出poll;

Page是Spring框架中的一个分页bean;

LinkedBlockingQueue用volatile修饰防止线程操作过程中数据不一致;

CheckOrderBusinessHandle是一个接口,具体需要根据业务去实现

运行调用

实现具体业务接口

public void checkOrderWith() {

try {
new CheckRecordWithChannel (new CheckOrderBusinessHandle() {

@Override
public Page<Info> queryOrderWithPage(int currentPageNum,
int pageNum) {
//TODO
}

@Override
public boolean checkOrderWithOrg(Info orderInfo) {
//TODO
}
}).checkOrderBusiness();
} catch (InterruptedException e) {
logger.error("----check thread was stoped, {}----", e);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐