这篇文章将为大家详细讲解有关nodejs可读流的源码分析是怎样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
可读流是对数据消费的抽象,nodejs中可读流有两种工作模式:流式和暂停式,流式就是有数据的时候就会触发回调,并且把数据传给回调,暂停式就是需要用户自己手动执行读取的操作。我们通过源码去了解一下可读流实现的一些逻辑。因为实现的代码比较多,逻辑也比较绕,本文只分析一些主要的逻辑,有兴趣的可以参考文档或者自行深入看源码了解细节。我们先看一下ReadableState,这个对象是表示可读流的一些状态和属性的。
function ReadableState(options, stream) { options = options || {}; // 是否是双向流 var isDuplex = stream instanceof Stream.Duplex; // 数据模式 this.objectMode = !!options.objectMode; // 双向流的时候,设置读端的模式 if (isDuplex) this.objectMode = this.objectMode || !!options.readableObjectMode; // 读到highWaterMark个字节则停止,对象模式的话则是16个对象 this.highWaterMark = getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex); // 存储数据的缓冲区 this.buffer = new BufferList(); // 可读数据的长度 this.length = 0; // 管道的目的源和个数 this.pipes = null; this.pipesCount = 0; // 工作模式 this.flowing = null; // 流是否已经结束 this.ended = false; // 是否触发过end事件了 this.endEmitted = false; // 是否正在读取数据 this.reading = false; // 是否同步执行事件 this.sync = true; // 是否需要触发readable事件 this.needReadable = false; // 是否触发了readable事件 this.emittedReadable = false; // 是否监听了readable事件 this.readableListening = false; // 是否正在执行resume的过程 this.resumeScheduled = false; // has it been destroyed // 流是否已销毁 this.destroyed = false; // 数据编码格式 this.defaultEncoding = options.defaultEncoding || 'utf8'; // 在管道化中,有多少个写者已经达到阈值,需要等待触发drain事件,awaitDrain记录达到阈值的写者个数 this.awaitDrain = 0; // 执行maybeReadMore函数的时候,设置为true this.readingMore = false; this.decoder = null; this.encoding = null; // 编码解码器 if (options.encoding) { if (!StringDecoder) StringDecoder = require('string_decoder').StringDecoder; this.decoder = new StringDecoder(options.encoding); this.encoding = options.encoding; }}
ReadableState里包含了一大堆字段,我们可以先不管他,等待用到的时候,再回头看。接着我们开始看可读流的实现。
function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); this._readableState = new ReadableState(options, this); // 可读 this.readable = true; // 用户实现的两个函数 if (options) { if (typeof options.read === 'function') this._read = options.read; if (typeof options.destroy === 'function') this._destroy = options.destroy; } // 初始化父类 Stream.call(this);}
上面的逻辑不多,需要关注的是read和destroy这两个函数,如果我们是直接使用Readable使用可读流,那再options里是必须传read函数的,destroy是可选的。如果我们是以继承的方式使用Readable,那必须实现_read函数。nodejs只是抽象了流的逻辑,具体的操作(比如可读流就是读取数据)是由用户自己实现的,因为读取操作是业务相关的。下面我们分析一下可读流的操作。
对用户来说,可读流是用户获取数据的地方,但是对可读流来说,他提供数据给用户的前提是他自己得有数据,所以可读流首先需要生产数据。生产数据的逻辑由_read函数实现。_read函数的逻辑大概是
const data = getSomeData();readableStream.push(data);
通过push函数,往可读流里写入数据,然后就可以为用户提供数据,我们看看push的实现,只列出主要逻辑。
Readable.prototype.push = function(chunk, encoding) { // 省略了编码处理的代码 return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);};function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { var state = stream._readableState; // push null代表流结束 if (chunk === null) { state.reading = false; onEofChunk(stream, state); } else { addChunk(stream, state, chunk, false); } // 返回是否还可以读取更多数据 return needMoreData(state);}function addChunk(stream, state, chunk, addToFront) { // 是流模式并且缓存没有数据,则直接触发data事件 if (state.flowing && state.length === 0 && !state.sync) { stream.emit('data', chunk); } else { // 否则先把数据缓存起来 state.length += state.objectMode ? 1 : chunk.length; if (addToFront) state.buffer.unshift(chunk); else state.buffer.push(chunk); // 监听了readable事件,则触发readable事件 if (state.needReadable) emitReadable(stream); } // 继续读取数据,如果可以的话 maybeReadMore(stream, state);}
总的来说,可读流首先要从某个地方获取数据,根据当前的工作模式,直接交付给用户,或者先缓存起来。并可以的情况下,继续获取数据。
用户可以通过read函数或者监听data事件来从可读流中获取数据
Readable.prototype.read = function(n) { n = parseInt(n, 10); var state = this._readableState; // 计算可读的大小 n = howMuchToRead(n, state); var ret; // 需要读取的大于0,则取读取数据到ret返回 if (n > 0) ret = fromList(n, state); else ret = null; // 减去刚读取的长度 state.length -= n; // 如果缓存里没有数据或者读完后小于阈值了,则可读流可以继续从底层资源里获取数据 if (state.length === 0 || state.length - n < state.highWaterMark) { this._read(state.highWaterMark); } // 触发data事件 if (ret !== null) this.emit('data', ret); return ret;};
读取数据的操作就是计算缓存里有多少数据可以读,和用户需要的数据大小,取小的,然后返回给用户,并触发data事件。如果数据还没有达到阈值,则触发可读流从底层资源中获取数据。
function destroy(err, cb) { // 设置已销毁标记 if (this._readableState) { this._readableState.destroyed = true; } // 执行_destroy钩子函数,用户可以重写这个函数 this._destroy(err || null, (err) => { // 出错,但是没有设置回调,则执行触发error事件 if (!cb && err) { process.nextTick(() => { this.emit('error', err); }, this, err); } else if (cb) { // 有回调则执行 cb(err); } }); return this;}
我们看一下Readable提供的默认_destroy函数。
Readable.prototype._destroy = function(err, cb) { this.push(null); cb(err);};
刚才分析push函数时已经看到this.push(null)表示流结束了。销毁流意味着关闭流对应的底层资源,不再提供数据服务。
总结:本文就分析到这里,流的实现代码不算很难,但是非常绕,有兴趣的可以详细看源码,最后分享很久之前画的一个图(链接https://www.processon.com/view/link/5cc7e9e5e4b09eb4ac2e0688)。
关于nodejs可读流的源码分析是怎样的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4217331/blog/4408870