深入理解 Node.js Stream 内部机制
2017-09-01 00:00
1156 查看
摘要:nodejs
相信很多人对Node.js的Stream已经不陌生了,不论是请求流、响应流、文件流还是socket流,这些流的底层都是使用stream模块封装的,甚至我们平时用的最多的console.log打印日志也使用了它,不信你打开Node.jsruntime的源码,看看lib/console.js:
Stream模块做了很多事情,了解了Stream,那么Node.js中其他很多模块理解起来就顺畅多了。
lib/module.js
lib/_stream_readable.js
lib/_stream_writable.js
lib/_stream_tranform.js
lib/_stream_duplex.js
把Readable和Writable看明白,Tranform和Duplex就不难理解了。
FlowingMode,流动模式,在Stream上绑定ondata方法就会自动触发这个模式,比如:
这个模式的流程图如下:
资源的数据流并不是直接流向消费者,而是先push到缓存池,缓存池有一个水位标记highWatermark,超过这个标记阈值,push的时候会返回false,什么场景下会出现这种情况呢?
消费者主动执行了.pause()
消费速度比数据push到缓存池的生产速度慢
有个专有名词来形成这种情况,叫做「背压」,WritableStream也存在类似的情况。
流动模式,这个名词还是很形象的,缓存池就像一个水桶,消费者通过管口接水,同时,资源池就像一个水泵,不断地往水桶中泵水,而highWaterMark是水桶的浮标,达到阈值就停止蓄水。下面是一个简单的Demo:
另外一种模式是Non-FlowingMode,没流动,也就是暂停模式,这是Stream的预设模式,Stream实例的_readableState.flow有三个状态,分别是:
_readableState.flow=null,暂时没有消费者过来
_readableState.flow=false,主动触发了.pause()
_readableState.flow=true,流动模式
当我们监听了onreadable事件后,会进入这种模式,比如:
constmyReadable=newMyReadable(dataSource);
myReadable.setEncoding('utf8');
myReadable.on('readable',()=>{});
监听readable的回调函数第一个参数不会传递内容,需要我们通过myReadable.read()主动读取,为啥呢,可以看看下面这张图:
资源池会不断地往缓存池输送数据,直到highWaterMark阈值,消费者监听了readable事件并不会消费数据,需要主动调用.read([size])函数才会从缓存池取出,并且可以带上size参数,用多少就取多少:
这里需要注意一点,只要数据达到缓存池都会触发一次readable事件,有可能出现「消费者正在消费数据的时候,又触发了一次readable事件,那么下次回调中read到的数据可能为空」的情况。我们可以通过_readableState.buffer来查看缓存池到底缓存了多少资源:
上面的代码我们只消费一次缓存池的数据,那么在消费后,缓存池又收到了一次资源池的push操作,此时还会触发一次readable事件,我们可以看看这次存了多大的buffer。
需要注意的是,buffer大小也是有上限的,默认设置为16kb,也就是16384个字节长度,它最大可设置为8Mb,没记错的话,这个值好像是Node的newspacememory的大小。
上面介绍了ReadableStream大概的机制,还有很多细节部分没有提到,比如FlowingMode在不同Node版本中的Stream实现不太一样,实际上,它有三个版本,上面提到的是第2和第3个版本的实现;再比如MixinsMode模式,一般我们只推荐(允许)使用ondata和onreadable的一种来处理ReadableStream,但是如果要求在Non-FlowingMode的情况下使用ondata如何实现呢?那么就可以考虑MixinsMode了。
当生产者写入速度过快,把队列池装满了之后,就会出现「背压」,这个时候是需要告诉生产者暂停生产的,当队列释放之后,WritableStream会给生产者发送一个drain消息,让它恢复生产。下面是一个写入一百万条数据的Demo:
我们构造一个WritableStream,在写入到资源池的时候,我们稍作处理,让它效率低一点:
最后执行的结果是:
drain7268
drain4536
drain1804
end
说明程序遇到了三次「背压」,如果我们没有在上面绑定writer.once('drain'),那么最后的结果就是Stream将第一次获取的数据消耗完变结束了程序。
readable.pipe(writable);
这句代码的语意性很强,readable通过pipe(管道)传输给writable,pipe的实现大致如下(伪代码):
上面做了五件事情:
emit(pipe),通知写入
.write(),新数据过来,写入
.pause(),消费者消费速度慢,暂停写入
.resume(),消费者完成消费,继续写入
returnwritable,支持链式调用
当然,上面只是最简单的逻辑,还有很多异常和临界判断没有加入,具体可以去看看Node.js的代码(/lib/streamreadable.js)。
DuplexStream实现特别简单,不到一百行代码,它继承了ReadableStream,并拥有WritableStream的方法(源码地址):
constutil=require('util');
constReadable=require('_stream_readable');
constWritable=require('_stream_writable');
util.inherits(Duplex,Readable);
varkeys=Object.keys(Writable.prototype);
for(varv=0;v<keys.length;v++){
varmethod=keys[v];
if(!Duplex.prototype[method])
Duplex.prototype[method]=Writable.prototype[method];
}
我们可以通过options参数来配置它为只可读、只可写或者半工模式,一个简单的Demo:
输出的结果为:
write
read1
read0
可以看出,两个管道是相互之间不干扰的。
Transform的处理就是通过_transform函数将Duplex的Readable连接到Writable,由于Readable的生产效率与Writable的消费效率是一样的,所以这里Transform内部不存在「背压」问题,背压问题的源头是外部的生产者和消费者速度差造成的。
关于TransfromStream,我写了一个简单的Demo:
了解了这些Stream的内部机制,对我们后续深入理解上层代码有很大的促进作用,特别希望初学Node.js的同学花点时间进来看看
相信很多人对Node.js的Stream已经不陌生了,不论是请求流、响应流、文件流还是socket流,这些流的底层都是使用
functionwrite(ignoreErrors,stream,string,errorhandler){ //... stream.once('error',noop); stream.write(string,errorhandler); //... } Console.prototype.log=functionlog(...args){ write(this._ignoreErrors, this._stdout, `${util.format.apply(null,args)}\n`, this._stdoutErrorHandler); };
Stream模块做了很多事情,了解了Stream,那么Node.js中其他很多模块理解起来就顺畅多了。
stream模块
如果你了解生产者和消费者问题的解法,那理解stream就基本没有压力了,它不仅仅是资料的起点和落点,还包含了一系列状态控制,可以说一个stream就是一个状态管理单元。了解内部机制的最佳方式除了看Node.js官方文档,还可以去看看Node.js的源码:把
ReadableStream
ReadableStream存在两种模式,一种是叫做constreadable=getReadableStreamSomehow(); readable.on('data',(chunk)=>{ console.log(`Received${chunk.length}bytesofdata.`); });
这个模式的流程图如下:
资源的数据流并不是直接流向消费者,而是先push到缓存池,缓存池有一个水位标记
消费者主动执行了
消费速度比数据push到缓存池的生产速度慢
有个专有名词来形成这种情况,叫做「背压」,WritableStream也存在类似的情况。
流动模式,这个名词还是很形象的,缓存池就像一个水桶,消费者通过管口接水,同时,资源池就像一个水泵,不断地往水桶中泵水,而highWaterMark是水桶的浮标,达到阈值就停止蓄水。下面是一个简单的Demo:
constReadable=require('stream').Readable; //Stream实现 classMyReadableextendsReadable{ constructor(dataSource,options){ super(options); this.dataSource=dataSource; } //继承了Readable的类必须实现这个函数 //触发系统底层对流的读取 _read(){ constdata=this.dataSource.makeData(); this.push(data); } } //模拟资源池 constdataSource={ data:newArray(10).fill('-'), //每次读取时pop一个数据 makeData(){ if(!dataSource.data.length)returnnull; returndataSource.data.pop(); } }; constmyReadable=newMyReadable(dataSource); myReadable.setEncoding('utf8'); myReadable.on('data',(chunk)=>{ console.log(chunk); });
另外一种模式是
当我们监听了onreadable事件后,会进入这种模式,比如:
监听
资源池会不断地往缓存池输送数据,直到highWaterMark阈值,消费者监听了readable事件并不会消费数据,需要主动调用
constmyReadable=newMyReadable(dataSource); myReadable.setEncoding('utf8'); myReadable.on('readable',()=>{ letchunk; while(null!==(chunk=myReadable.read())){ console.log(`Received${chunk.length}bytesofdata.`); } });
这里需要注意一点,只要数据达到缓存池都会触发一次readable事件,有可能出现「消费者正在消费数据的时候,又触发了一次readable事件,那么下次回调中read到的数据可能为空」的情况。我们可以通过
letonce=false; myReadable.on('readable',(chunk)=>{ console.log(myReadable._readableState.buffer.length); if(once)return; once=true; console.log(myReadable.read()); });
上面的代码我们只消费一次缓存池的数据,那么在消费后,缓存池又收到了一次资源池的push操作,此时还会触发一次readable事件,我们可以看看这次存了多大的buffer。
需要注意的是,buffer大小也是有上限的,默认设置为16kb,也就是16384个字节长度,它最大可设置为8Mb,没记错的话,这个值好像是Node的newspacememory的大小。
上面介绍了ReadableStream大概的机制,还有很多细节部分没有提到,比如
WritableStream
原理与ReadableStream是比较相似的,数据流过来的时候,会直接写入到资源池,当写入速度比较缓慢或者写入暂停时,数据流会进入队列池缓存起来,如下图所示:当生产者写入速度过快,把队列池装满了之后,就会出现「背压」,这个时候是需要告诉生产者暂停生产的,当队列释放之后,WritableStream会给生产者发送一个
functionwriteOneMillionTimes(writer,data,encoding,callback){ leti=10000; write(); functionwrite(){ letok=true; while(i-->0&&ok){ //写入结束时回调 ok=writer.write(data,encoding,i===0?callback:null); } if(i>0){ //这里提前停下了,'drain'事件触发后才可以继续写入 console.log('drain',i); writer.once('drain',write); } } }
我们构造一个WritableStream,在写入到资源池的时候,我们稍作处理,让它效率低一点:
constWritable=require('stream').Writable; constwriter=newWritable({ write(chunk,encoding,callback){ //比process.nextTick()稍慢 setTimeout(()=>{ callback&&callback(); }); } }); writeOneMillionTimes(writer,'simple','utf8',()=>{ console.log('end'); });
最后执行的结果是:
说明程序遇到了三次「背压」,如果我们没有在上面绑定
pipe
了解了Readable和Writable,pipe这个常用的函数应该就很好理解了,这句代码的语意性很强,readable通过pipe(管道)传输给writable,pipe的实现大致如下(伪代码):
Readable.prototype.pipe=function(writable,options){
this.on('data',(chunk)=>{
letok=writable.write(chunk);
//背压,暂停
!ok&&this.pause();
});
writable.on('drain',()=>{
//恢复
this.resume();
});
//告诉writable有流要导入
writable.emit('pipe',this);
//支持链式调用
returnwritable;
};
上面做了五件事情:
当然,上面只是最简单的逻辑,还有很多异常和临界判断没有加入,具体可以去看看Node.js的代码(/lib/streamreadable.js)。
DuplexStream
Duplex,双工的意思,它的输入和输出可以没有任何关系,DuplexStream实现特别简单,不到一百行代码,它继承了ReadableStream,并拥有WritableStream的方法(源码地址):
constReadable=require('_stream_readable');
constWritable=require('_stream_writable');
util.inherits(Duplex,Readable);
varkeys=Object.keys(Writable.prototype);
for(varv=0;v<keys.length;v++){
varmethod=keys[v];
if(!Duplex.prototype[method])
Duplex.prototype[method]=Writable.prototype[method];
}
我们可以通过options参数来配置它为只可读、只可写或者半工模式,一个简单的Demo:
constTransform=require('stream').Transform;
constMAP={
'Barret':'靖',
'Lee':'李'
};
classTranslateextendsTransform{
constructor(dataSource,options){
super(options);
}
_transform(buf,enc,next){
constkey=buf.toString();
constdata=MAP[key];
this.push(data);
next();
}
}
vartransform=newTranslate();
transform.on('data',data=>console.log(data.toString()));
transform.write('Lee');
transform.write('Barret');
transform.end();
输出的结果为:
可以看出,两个管道是相互之间不干扰的。
TransformStream
TransformStream集成了DuplexStream,它同样具备Readable和Writable的能力,只不过它的输入和输出是存在相互关联的,中间做了一次转换处理。常见的处理有Gzip压缩、解压等。Transform的处理就是通过
关于TransfromStream,我写了一个简单的Demo:
小结
本文主要参考和查阅Node.js官网的文档和源码,细节问题都是从源码中找到的答案,如有理解不准确之处,还请斧正。关于Stream,这篇文章只是讲述了基础的原理,还有很多细节之处没有讲到,要真正理解它,还是需要多读读文档,写写代码。了解了这些Stream的内部机制,对我们后续深入理解上层代码有很大的促进作用,特别希望初学Node.js的同学花点时间进来看看
相关文章推荐
- 深入理解 Stream (Node.js)
- 深入浅出Node.js(三):深入Node.js的模块机制
- 深入理解ASP.NET的内部运行机制
- 浅谈Node.js:理解stream
- 深入浅出Node.js(三):深入Node.js的模块机制
- 深入理解 js 引擎的执行机制
- Node.js机制及原理理解初步
- 深入理解ASP.NET的内部运行机制(转)
- 理解 Node.js 事件驱动机制的原理
- 深入理解Node.js中通用基础设计模式
- node.js学习之路(二)之“深入理解面向对象的JavaScript”
- 深入理解node.js http模块
- 深入理解ASP.NET的内部运行机制
- 深入浅出Node.js(三):深入Node.js的模块机制
- 韩顺平 javascript教学视频_学习笔记23_js事件驱动机制深入理解_js常用事件_js版计算器
- node.js学习(三)简单的node程序&&模块简单使用&&commonJS规范&&深入理解模块原理
- JS继承机制的深入理解--动态原型存在的问题与解决
- Node.js机制及原理理解初步
- 深入理解ASP.NET的内部运行机制
- 深入理解JS引擎的执行机制