这篇文章主要讲解了“nodejs流基类怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“nodejs流基类怎么实现”吧!
流是对数据生产,消费的一种抽象,今天先分析一下流基类的实现
const EE = require('events');const util = require('util');// 流的基类function Stream() { EE.call(this);}// 继承事件订阅分发的能力util.inherits(Stream, EE);
流的基类只提供了一个函数就是pipe。用于实现管道化。这个方法代码比较多,分开说。
function ondata(chunk) { // 源流有数据到达,并且目的流可写 if (dest.writable) { // 目的流过载并且源流实现了pause方法,那就暂停可读流的读取操作,等待目的流触发drain事件 if (false === dest.write(chunk) && source.pause) { source.pause(); } } } // 监听data事件,可读流有数据的时候,会触发data事件 source.on('data', ondata); function ondrain() { // 目的流可写了,并且可读流可读,切换成自动读取模式 if (source.readable && source.resume) { source.resume(); } } // 监听drain事件,目的流可以消费数据了就会触发该事件 dest.on('drain', ondrain);
这是管道化时流控实现的地方,主要是利用了write返回值和drain事件。
// 目的流不是标准输出或标准错误,并且end不等于false if (!dest._isStdio && (!options || options.end !== false)) { // 源流没有数据可读了,执行end回调,告诉目的流,没有数据可读了 source.on('end', onend); // 源流关闭了,执行close回调 source.on('close', onclose); } // 两个函数只会执行一次,也只会执行一个 var didOnEnd = false; function onend() { if (didOnEnd) return; didOnEnd = true; // 执行目的流的end函数,说明写数据完毕 dest.end(); } function onclose() { if (didOnEnd) return; didOnEnd = true; // 销毁目的流 if (typeof dest.destroy === 'function') dest.destroy(); }
这里是处理源流结束和关闭后,通知目的流的逻辑。
// remove all the event listeners that were added. function cleanup() { source.removeListener('data', ondata); dest.removeListener('drain', ondrain); source.removeListener('end', onend); source.removeListener('close', onclose); source.removeListener('error', onerror); dest.removeListener('error', onerror); source.removeListener('end', cleanup); source.removeListener('close', cleanup); dest.removeListener('close', cleanup); } function onerror(er) { // 出错了,清除注册的事件,包括正在执行的onerror函数 cleanup(); // 如果用户没有监听流的error事件,则抛出错误,所以我们业务代码需要监听error事件 if (EE.listenerCount(this, 'error') === 0) { throw er; // Unhandled stream error in pipe. } } // 监听流的error事件 source.on('error', onerror); dest.on('error', onerror); // 源流关闭或者没有数据可读时,清除注册的事件 source.on('end', cleanup); source.on('close', cleanup); // 目的流关闭了也清除他注册的事件 dest.on('close', cleanup);
这里主要是处理了error事件和流关闭/结束/出错时清除订阅的事件。这就是流基类的所有逻辑。
感谢各位的阅读,以上就是“nodejs流基类怎么实现”的内容了,经过本文的学习后,相信大家对nodejs流基类怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4217331/blog/4408869