Node.js Promise.all 限制并发数量
2016-11-17 18:06
453 查看
Promise.all 本身不负责执行,执行过程在传递给Promise.all之前已经开始,Promise.all只等待全部执行完成,执行resolve,或碰到有执行失败,立即执行reject部分。Promise.all非常好用,唯一的问题是,不能限制并发数量,所有任务同时开始执行,因为Promise.all本身不负责执行具体任务,所以也无法实现并发控制。
实现一个简单的可以控制并发数量的Promise.allLimit函数,可以通过参数来控制并发数量。代码:
同样返回一个Promise对象,可以直接替换Promise.all,不同的是,需要传递一个函数(wrap参数),用来包裹生成每一个具体执行的Promise对象,limit用来限定并发数量,在指定并发任务内,一个任务完成后,再吸入一个新任务继续执行。
callback用来解析每一次任务完成后所需要的后续动作,比如存储下载的内容、或将参数做变换,必须也返回一个Promise对象。
测试代码:
最后贡献一个下载妹子图的简单代码,默认控制并发数量10个。
执行:
实现一个简单的可以控制并发数量的Promise.allLimit函数,可以通过参数来控制并发数量。代码:
/* promise-limit.js */ /* jshint esversion: 6 */ /*jslint node: true */ Promise.allLimit = function(arr, wrap, limit, callback) { return new Promise((resolve, reject) => { var total = arr.length; var result = new Array(total); var rejected = false; var dones = 0; function run(n) { setTimeout(() => { wrap(n, arr.shift()).then(res => { return typeof callback === 'function' ? callback(n, res) : Promise.resolve(res); }).then(res => { dones++; result = res; if (!rejected) { if (arr.length) { run(total - arr.length); } else if (dones === total) { resolve(result); } } }).catch(err => { rejected = true; reject(err); }); }, 0); } arr.slice(0, limit).forEach((v, n) => { run(n); }); }); };
同样返回一个Promise对象,可以直接替换Promise.all,不同的是,需要传递一个函数(wrap参数),用来包裹生成每一个具体执行的Promise对象,limit用来限定并发数量,在指定并发任务内,一个任务完成后,再吸入一个新任务继续执行。
callback用来解析每一次任务完成后所需要的后续动作,比如存储下载的内容、或将参数做变换,必须也返回一个Promise对象。
测试代码:
Promise.allLimit([2000, 1500, 2500, 3000, 1500], function(n, time) { return new Promise((resolve, reject) => { console.log("Start Job: ", n, time); // setTimeout(2500 === time ? reject : resolve, time, "Time: " + time); // 测试reject setTimeout(resolve, time, "Time: " + time); }); }, 2, (n, res) => { // log Job n done console.log("Done Job: ", res); return Promise.resolve(n); // log Job n done }).then(result => { console.log("All Done: ", result); }).catch(err => { console.log("Error: ", err); });
最后贡献一个下载妹子图的简单代码,默认控制并发数量10个。
#!/usr/bin/env node /* jshint esversion: 6 */ /*jslint node: true */ require('./promise-limit.js'); const FS = require('fs'); const PATH = require('path'); const UTIL = require('util'); const ARGV = require('yargs').argv; const REQUEST = require('request'); const CHEERIO = require('cheerio'); if (!ARGV.url || !ARGV.dir || !/\/$/.test(ARGV.dir) || !ARGV.img || !ARGV.total || !ARGV.from || !ARGV.to) { console.log("usage: --url http://example.com --dir ./imgs/ --img '.main-image img' --total '共(\d+)页' --from '.php' --to '_%d.php' "); process.exit(); } const parallel = ARGV.parallel || 10; //并发数量 const headers = { 'Referer': ARGV.url, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'en-US,en;q=0.8,zh-CN;q=0.6,zh;q=0.4', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36' }; function fetch(url, encoding = 'utf8') { return new Promise((resolve, reject) => { REQUEST({ url: url, headers: headers, gzip: true, encoding: encoding }, (error, response, body) => { if (error) { reject(error); } else { resolve(body); } }); }); } function write(file, content) { return new Promise(function(resolve, reject) { FS.writeFile(file, content, function(err) { if (err) { reject(err); } else { resolve(); } }); }); } fetch(ARGV.url) .then(rsp => { const total = parseInt((rsp.match(new RegExp(ARGV.total)) || [0, 0])[1]); if (!total) { throw new Error('Match total error'); } console.log("Total: %d, Parallel: %d", total, parallel); return Promise.allLimit(Array.from({ length: total }, (v, k) => k + 1), (k, v) => { return fetch(v === 1 ? ARGV.url : ARGV.url.replace(ARGV.from, UTIL.format(ARGV.to, v))); }, parallel, (k, res) => { let src = CHEERIO.load(res)(ARGV.img).attr('src'); let file = (k + 1) + PATH.extname(src); return fetch(src, null).then(img => { return write(ARGV.dir + file, img); }).then(() => { console.log("OK: [" + file + "]\t" + src); return file; }).catch(err => { console.log("ER: [" + file + "]\t" + src + " : " + err.toString()); }); }); }) .then(rsp => { console.log("All Jobs Done: "); console.log(rsp); }) .catch(err => { console.log("Fetch failed: %s", err.toString()); });
执行:
./request.js --total '(\d+)</span></a><a[^<>]+><span>下一页' --from '/71636' --to '/71636/%d' --img '.main-image img' --dir ./71636/ --url 'http://www.mzitu.com/71636' --parallel 10
相关文章推荐
- 高并发下的Node.js与负载均衡
- js控制图片定时切换不限制数量
- node.js中的events.emitter.removeAllListeners方法使用说明
- node.js promise 流程控制
- [Node.js] Promise,Q及Async
- 使用js限制字符的输入的数量(还可输入多少字符)
- 在Node.js中使用promise摆脱回调金字塔
- Node.js真的有高并发优势吗?看看Node.js和Tomcat的并发测试结果
- Promise解决node.js回调问题
- A chatroom for all! Part 1 - Introduction to Node.js(转发)
- js控制图片定时切换不限制数量
- 一段经典的node.js 数据库高并发实现
- 关于node.js的web框架的应用及并发性能测试 推荐
- 【360开源】thinkjs:基于Promise的Node.js MVC框架 (转)
- 百万级并发 Node.js也能行
- 高并发下的Node.js与负载均衡
- jQ/js限制textarea字符输入数量代码
- 了不起的Node.js: 将JavaScript进行到底(Web开发首选,实时,跨多服务器,高并发)
- Node.js回调黑洞全解:Async、Promise 和 Generator
- 10.2 NSOperation/NSOperationQueue:提供了一些在GCD中不容易实现的特性,如:限制最大并发数量,操作之间的依赖关系.