这篇文章主要介绍“Nodejs中的可读流的作用和实现方法”,在日常操作中,相信很多人在Nodejs中的可读流的作用和实现方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Nodejs中的可读流的作用和实现方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
流(stream)是 Node.js 中处理流式数据的抽象接口。 stream 模块用于构建实现了流接口的对象。【推荐学习:《nodejs 教程》】
读写大文件的过程中,不会一次性的读写到内存中。可以控制每次读写的个数
1、可读流-Readable
例:fs.createReadStream;
源码位置:lib/_stream_readable.js
2、可写流-Writable
例:fs.createWriteStream;
源码位置:lib/_stream_writable.js
3、双工流-Duplex:满足读写的功能
例:net.Socket();
源码位置:lib/_stream_duplex.js
4、转化流-Transform:用途:压缩,转码
例:
const { Transform } = require('stream');
Transform.call(this, '要转换的数据');//具体的使用详情 见node官网
-源码位置:lib/_stream_tranform.js
读取文件代码过程
const path = require("path");
const aPath = path.join(__dirname, "a.txt");//需要读取的文件
const fs = require("fs");
let rs = fs.createReadStream(aPath, {
flags: "r",
encoding: null,//默认编码格式是buffer,深挖buffer又要学习字符编码,留个坑 到时候写一个编码规范的学习整理
autoClose: true,//相当于需要调用close方法,如果为false 文件读取end的时候 就不会执行 close
start: 0,
highWaterMark: 3,//每次读取的个数 默认是64*1024个字节
});
rs.on("open", function (fd) {
// fd number类型
console.log("fd", fd);
});
// 他会监听用户,绑定了data事件,就会触发对应的回调,不停的触发
rs.on("data", function (chunk) {
//这里会打印的是ascII 值 ,所以可以toString查看详情自己看得懂的样子
console.log({ chunk }, "chunk.toString", chunk.toString());
//如果想每一段事件 读一点 可以用rs.pause() 做暂停,然后计时器 里rs.resume()再次触发data事件
rs.pause();//暂停读取
});
rs.on("close", function () {
//当文件读取完毕后 会 触发 end事件
console.log("close");
});
setInterval(() => {
rs.resume(); //再次触发data,直到读完数据为止
}, 1000);
题外话:想说下 文件流和普通可读流的区别
1、open 和close是文件流独有,支持open和close便是文件流
2、可读流都具备 (on('data'),on('end'),on('error'),resume,pause;所以只要支持这些方法就是可读流
写入文件代码过程
const fs = require("fs");
const path = require("path");
const bPath = path.join(__dirname, "b.txt");
let ws = fs.createWriteStream(bPath, {
//参数和可读流的类似
flags: "w",
encoding: "utf-8",
autoClose: true,
start: 0,
highWaterMark: 3,
});
ws.on("open", function (fd) {
console.log("open", fd);
});
ws.on("close", function () {
console.log("close");
});
//write的参数string 或者buffer,ws.write 还有一个boolea的返回值表示是真实写入文件还是放入缓存中
ws.write("1");
let flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//true
flag = ws.write("1");
console.log({ flag });//false
写一个本地服务 做例子
1、server(服务器代码)实现
const net = require("net"); //net 模块是 node自己封装的tcp层
//socket 就是双工流 能读能写 http源码就是用net模块写的 基于tcp
const server = net.createServer(function (socket) {
socket.on("data", function (data) {//监听客户端发来的消息
console.log(data.toString)
socket.write("server:hello");//写入server:hello
});
socket.on("end", function () {
console.log("客户端关闭");
});
});
server.on("err", function (err) {
console.log(err);
});
server.listen(8080);//服务端监听8080端口
2、client(客户端) 实现
const net = require("net"); //net 模块是 node自己封装的tcp层
const socket = new net.Socket(); //
socket.connect(8080, "localhost"); // 表示链接服务器本地8080端口
socket.on("connect", function (data) {
//和服务器建立链接后
socket.write("connect server");
});
socket.on("data", function (data) {
//监听数据,读取服务器传来的数据
console.log(data.toString());
socket.destroy()
});
socket.write('ok')
socket.on("error", function (err) {
console.log(err);
});
3.题外话 如果想看tcp的三次握手和四次挥手 可以 通过我上述代码 用wireshark(一个抓包工具)看实际过程
转化流是双工流的一种, 允许实现输入,并在对数据执行某些操作后返回输出,两者有依赖关系
代码过程(这个例子我的参考来处)
const stream = require('stream')
let c = 0;
const readable = stream.Readable({
highWaterMark: 2,
read: function () {
let data = c < 26 ? Number(c++ + 97) : null;
console.log('push', data);
this.push( String.fromCharCode(data));
}
})
const transform = stream.Transform({
highWaterMark: 2,
transform: function (buf, enc, next) {
console.log('transform', buf.toString());
next(null, buf);
}
})
readable.pipe(transform);
打印结果
跟着断点先了解 可读流的调用过程
就前面可读流文件的读取过程的代码为例子 打断点
rs.on('open')
rs.on('open')为断点入口进入
1、通过Stream.prototype.on.call 继承Stream类
源文件位置:no dlib/_stream_readable.js(我是通过断点点到这里 直接找,我也没找到)
再点进去 发现 Stream 是EventEmitter的子类 那么 可读流也可以支持发布订阅
2、监听的事件类型是否是data和readable任意一个 不是 继续 下一个事件的监听
rs.on('data')
data的部分做两件事
1、判断flowing(默认值是null)不为false 就自动resume方法执行继续 文件读取(这里我的案例是rs.pause();手动将flowing 值为false了所以不会继续调用)
2、那如果我没有调用rs.pause() 会继续调用resume 看看resume里做了什么
2.1 最终调用了 stream.read()继续读取文件;直到文件读取结束依次去emit end 和close事件
小结:所以data默认是会不断的读取文件直到文件读取完毕 ,如果想要文件读取变可控可以和我一样用rs.pause()
自己实现
实现思路
const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
}
module.exports = ReadStream;
constructor(path, options = {}) {
super();
//参考fs 写实例需要用到的参数
this.path = path;
this.flags = options.flags || "r";
this.encoding - options.encoding || null;//默认编码格式是buffer
this.autoClose = options.autoClose || true;//相当于需要调用close方法,如果为false 文件读取end的时候 就不会执行 close
this.start = options.start || 0;//数据读取的开始位置
this.end = options.end;
this.highWaterMark = options.highWaterMark || 64 * 1024;//默认一次读取64个字节的数据
this.offset = this.start;//fs.read的偏移量
this.fd = undefined; //初始化fd 用于 open成功后的fd做赋值 供 read里使用
this.flowing = false;//实现pause和resume备用,设置flag,当监听到data事件的时候 改 flowing为true,
this.open(); //初始化的时候就要调用open
this.on("readStreamListener", function (type) {
// console.log(type)//这里打印就能看到 实例上所有 通过on 绑定的事件名称
if (type === "data") {
//监听到data事件的时候 改 flowing为true
this.flowing = true;
this.read();
}
});
}
open() {
// 调用fs.open 读取目标文件
fs.open(this.path, this.flags, (err, fd) => {
this.fd = fd; //赋值一个fd 供后面的 read()方式使用,文件读取成功,fd是返回一个数字
this.emit("open", fd);
});
read() {
// console.log("一开始read里的", this.fd); //但是这样依旧拿不到 open后的fd,用 发布订阅 通过on来获取 绑定的事件type
//这里要做一个容错处理 ,因为open是异步读取文件,read里无法马上拿到open结果
if (typeof this.fd !== "number") {
//订阅open,给绑定一个回调事件read 直到this.fd有值
return this.once("open", () => this.read());
}
}
//fd打开后 调用fs.read
//实例上的start值是未知number,存在实际剩余的可读的文件大小<highWaterMar的情况 ,用howMuchToRead 替换highWaterMark 去做fs.read的每次读取buffer的大小
let howMuchToRead = this.end
? Math.min(this.end - this.offset + 1, this.highWaterMark)
: this.highWaterMark;
//定义一个用户 传进来的highWaterMark 大小的buffer对象
const buffer = Buffer.alloc(this.highWaterMark);
//读取文件中的内容fd给buffer 从0位置开始,每次读取howMuchToRead个。插入数据,同时更新偏移量
fs.read(
this.fd,
buffer,
0,
howMuchToRead,
this.offset,
(err, bytesRead) => {
if (bytesRead) {
// 每读完一次,偏移量=已经读到的数量
this.offset += bytesRead;
this.emit("data", buffer.slice(0, bytesRead));
//写到这里实例上的data 已经可以打印出数据了 但是 继续读取 调用this.read() 直到bytesRead不存在 说明数据读取完毕了 走else
//回调 this.read();时候判断 this.flowing 是否为true
//pause调用后this.flowing将为false
if (this.flowing) {
this.read();
}
} else {
// 执行到这 bytesRead不存在说明 文件数据读取完毕了已经 触发end
this.emit("end");//emit 实例上绑定的end事件
//destroy 还没写到 稍等 马上后面就实现...
this.destroy();
}
}
);
文件读取不去data事件,会触发对应的回调,不停的触发 所以想要变可控可以手动调用 resume()& pause()
pause的实现,调用的时候设置 this.flowing=false,打断 read()
pause() {
this.flowing = false;
}
pause 打断 read()多次读取,可以使用resume 打开 this.flowing=true 并调用read
resume() {
if (!this.flowing) {
this.flowing = true;
this.read();
}
}
文件open不成功时候抛错时调用
文件读取完毕后&&this.autoClose===true ,read()里文件读取end的时候 就执行close
destroy(err) {
if (err) {
this.emit("error");
}
// 把close放destroy里 并 在read里调用
if (this.autoClose) {
fs.close(this.fd, () => {
this.emit("close");
});
}
}
实现代码
/**
*实现简单的可读流
*/
const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
constructor(path, options = {}) {
super();
//参考fs 写实例需要用到的参数
this.path = path;
this.flags = options.flags || "r";
this.encoding - options.encoding || null;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.end = options.end;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.fd = undefined;
this.offset = this.start;
this.flowing = false;
this.open();
this.on("newListener", function (type) {
if (type === "data") {
this.flowing = true;
this.read();
}
});
}
destroy(err) {
if (err) {
this.emit("error");
}
if (this.autoClose) {
fs.close(this.fd, () => {
this.emit("close");
});
}
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
return this.destroy(err);
}
this.fd = fd;
this.emit("open", fd);
});
}
resume() {
if (!this.flowing) {
this.flowing = true;
this.read();
}
}
pause() {
this.flowing = false;
}
read() {
if (typeof this.fd !== "number") {
return this.once("open", () => this.read());
}
let howMuchToRead = this.end
? Math.min(this.end - this.offset + 1, this.highWaterMark)
: this.highWaterMark;
const buffer = Buffer.alloc(this.highWaterMark);
fs.read(
this.fd,
buffer,
0,
howMuchToRead,
this.offset,
(err, bytesRead) => {
if (bytesRead) {
this.offset += bytesRead;
this.emit("data", buffer.slice(0, bytesRead));
if (this.flowing) {
this.read();
}
} else {
this.emit("end");
this.destroy();
}
}
);
}
}
module.exports = ReadStream;
调用代码
const ReadStream = require("./initReadStream");
let rs = new ReadStream(aPath, {
flags: "r",
encoding: null, //默认编码格式是buffer
autoClose: true, //相当于需要调用close方法,如果为false 文件读取end的时候 就不会执行 close
start: 0,
highWaterMark: 3, //每次读取的个数 默认是64*1024个字节
});
到此,关于“Nodejs中的可读流的作用和实现方法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。