10W级数据更新操作__生产消费者模式
2017-07-07 10:30
239 查看
背景需求
最近有这么一个需求:由于本地系统信息与另一个系统数据可能不一致,两个系统有各自的独立数据库和业务,在通信过程中网络等原因,导致两者之间的关键信息有差异,因此本地数据库中可能有10W条记录需要更新,本地数据库的信息需要逐条与远程http请求数据,对比或更新。技术分析
如果将本地数据库全部取出放入一个集合中,然后遍历并发送http请求核对数据,显然不太现实。存储10W条记录需要多大的缓存?而且这样做对系统资源占用也很高。采用生产消费者模式批量处理数据。生产消费者模式维护一个队列,一个线程添加数据,另一个线程取数据,可通过控制线程的数量来控制处理的速度。
思路:用分页取批量数据,放入一个阻塞式队列
LinkedBlockingQueue中,开启另一个线程从队列中取数据,循环以上过程,直到全部数据处理完毕(数据处理到最后一页),队列为空。
代码实现
Talk is cheap, show me codepublic class CheckRecordWithChannel { private static final Logger logger = LoggerFactory.getLogger(CheckRecordWithChannel.class); //500条数据 private volatile BlockingQueue<WechatTransInfo> orderQueue = new LinkedBlockingQueue<>(500); public static final int PAGE_NUM = 200; //生产完成标志 private boolean produceFlag = false; //消费完成标志 private boolean cosumerFlag = false; //当前页面数 private int currentPageNum = 0; private CheckOrderBusinessHandle businessHandle 4000 ; /** * 构造方法 * @param businessHandle 需要实现的特殊业务方法 */ public CheckRecordWithChannel (CheckOrderBusinessHandle businessHandle) { this.businessHandle = businessHandle; } /** * 主方法 * @return * @throws InterruptedException */ public boolean checkOrderBusiness() throws InterruptedException { //创建线程池,生产者、消费都的数量可以多用几个 ExecutorService checkOrderService = Executors.newCachedThreadPool(); LocalOrderProducer producer = this.new LocalOrderProducer(); OrderConsumer consumer = this.new OrderConsumer(); checkOrderService.submit(producer); checkOrderService.submit(consumer); while (true) { Thread.sleep(2000L); if (produceFlag && cosumerFlag && orderQueue.isEmpty()) { List<Runnable> shutdownList = checkOrderService.shutdownNow(); logger.warn("------{} 页处理完成,{} 线程停止运行-----", currentPageNum,shutdownList); return true; } } } /** * 分页查询数据业务 */ public Page<WechatTransInfo> queryOrderWithPage(int pageNum) { //方便以后拓展 return businessHandle.queryOrderWithPage(this.currentPageNum++, pageNum); } /** * 生产方法, 阻塞式,put方法 * @param orderList * @return 是否还有数据, false表示没有数据 * @throws InterruptedException 线程被中断 */ public boolean produceOrder() throws InterruptedException { //数据标识 boolean hasData = true; Page<Info> queryOrderPage = queryOrderWithPage(PAGE_NUM); if (null == queryOrderPage) { return false; } //最后一次的查询结果 if (!queryOrderPage.hasNextPage()) { hasData = false; } //循环插入200条数据,队列满就阻塞等待 for (Info order : queryOrderPage.getContent()) { orderQueue.put(order); } return hasData; } /** * 消费者方法 */ public boolean cosumerOrder(Info orderInfo) { //方便以后拓展 return businessHandle.checkOrderWithOrg(orderInfo); } /** * 生产者 * <p> 查询本地数据库订单,并put到orderIdQueue中,查询结束则flag=true * @author */ class LocalOrderProducer implements Runnable { @Override public void run() { try { // 如果数据库未查询完,继续生产 while (!produceFlag) { Thread.sleep(2000L); //放慢生产者速度 //如果数据库中没有数据 if(!produceOrder()) { Thread.sleep(5000L); produceFlag = true; } } logger.debug("----producer was done---"); } catch (InterruptedException e) { logger.error("--- producer thread was interrupted--{}-", e); } } } /** * 消费者 * <p> 非阻塞式消费 * @author */ class OrderConsumer implements Runnable { @Override public void run() { try { // 第一次阻塞取数据 Thread.sleep(5000L); Info orderInfo = null; //如果生产者还在生产或者队列不为空,则进入继续消费处理 while (!produceFlag || null != (orderInfo = orderQueue.poll())) { //如果队列为空,则等待2s生产者生产 logger.debug("---is get order data, ? {} ---", null != orderInfo); if (null == orderInfo) { Thread.sleep(2000L); } cosumerOrder(orderInfo); //orderInfo = orderQueue.poll(); } if (produceFlag && orderQueue.isEmpty()) { cosumerFlag = true; } } catch (InterruptedException e) { logger.error("---thread was interrupted--{}-", e); } } } }
public interface CheckOrderBusinessHandle { /** * 查询本地数据库业务 * <p> 顺序迭代查询数据库, * pageNum是每次查询数量 * @return 返回查询的结果 */ Page<Info> queryOrderWithPage(int currentPageNum, int pageNum); /** * 处理业务逻辑,更新本地数据库 * @param order 信息 */ boolean checkOrderWithOrg(Info orderInfo); }
CheckRecordWithChannel 构造方法初始化handle用来处理生产和消费的具体业务;
两个Runnable 实现类分别是生产和消费线程,其中生产采用阻塞式,如果队列满且数据库未查询完毕则阻塞,直到数据库全部查询完毕并put到队列中时,该线程退出put;消费采用非阻塞式poll,当生产完毕且队列为空时,则退出poll;
Page是Spring框架中的一个分页bean;
LinkedBlockingQueue用volatile修饰防止线程操作过程中数据不一致;
CheckOrderBusinessHandle是一个接口,具体需要根据业务去实现
运行调用
实现具体业务接口
public void checkOrderWith() { try { new CheckRecordWithChannel (new CheckOrderBusinessHandle() { @Override public Page<Info> queryOrderWithPage(int currentPageNum, int pageNum) { //TODO } @Override public boolean checkOrderWithOrg(Info orderInfo) { //TODO } }).checkOrderBusiness(); } catch (InterruptedException e) { logger.error("----check thread was stoped, {}----", e); } }
相关文章推荐
- 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
- 关于JDBC中的ResultSet的更新数据操作
- Sqlserver 在查询分析器里如何访问远程的的数据库,进行数据查询更新等操作。
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- ASP.Net 更新储存在xml文件中的数据(update操作)
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- 备份数据,安全进行生产库的操作
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- 游标操作,使用游标,更新数据
- 在Oracle的连接视图上进行数据更新操作
- SDI文档模式下调用对话框实现在对话框上绘图操作,并且进行文档和对话框之间的数据传输
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- 设计模式实战:在WinForm中用Command模式实现可以撤销的数据操作
- Asp.Net 2.0 操作mysql 数据基类(读取,删除,插入,更新)
- 基于生产-消费者模式的任务异步线程池设计与实现
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作
- 稳扎稳打Silverlight(57) - 4.0通信之WCF RIA Services: 概述, 通过 DomainDataSource 实现数据的添加、查询、更新和删除操作