在Unix中流是一个标准的概念,有标准的输入、输出和标准的错误
例如:
打印出所有的js文件交给grep 来过滤出包含http文件的内容,称之为Unix的管道
cat *.js | grep http
从上节得知Buffer是保存字节的数据,而流是用来暂存和移动数据的,它俩通常是结合起来来使用,我们来拷贝文件,像读取logo,是全部的读取入到内存中,然后再写入到文件中,对于体积比较大的的文件就不够用了假设我们的服务器需要不断的去读取文件,然后返回给客户端,同时又有好多人都在请求这个文件,这样每个请求都去读入一次内存,然后内存很快就爆掉了,最好的方式是边读边写
这就需要借助流来完成,那NodeJs中哪些模块涉及到了流;比如http、文件系统、压缩模块、tcp socket并且流是以buffer的形式存在,这样更高效。
改造logo图片读取操作
var fs = require('fs') var source = fs.readFileSync('logo.png') fs.writeFileSync('stream_copy_logo.png',source)
但是这样的操作会不会太简单了,而没有办法精细的控制数据在流里面的传输,
以上这些都不用担心,Stream是基于事件机制进行工作的,
流在各个方面的变化都可以被我们监听到
var fs = require('fs') //声明一个可读流 var readStream = fs.createReadStream('logo_stream.js') //Stream在传输的时候会触发data事件 readStream .on('data',function(chunk){ console.log('data emits') console.log(Buffer.isBuffer(chunk)) console.log(chunk.toString('utf8')) }) //还有readable事件,可读的 .on('readable',function(){ console.log('data readable') }) .on('end',function(){ console.log('data ends') }) .on('close',function(){ console.log('data close') }) .on('error',function(e){ console.log('data read error:'+e) })
运行结果如下:
借助于Stream的事件机制,我们就能实现更多个性化的定制,
从而对流里面的流程进行更精细化的控制,改造上述代码:
var fs = require('fs') //声明一个可读流 var readStream = fs.createReadStream('logo_stream.js') var n = 0 //Stream在传输的时候会触发data事件 readStream .on('data',function(chunk){ n++ console.log('data emits') console.log(Buffer.isBuffer(chunk)) //console.log(chunk.toString('utf8')) //流暂停 readStream.pause() //设置定时器,模拟异步处理 console.log('data pause') setTimeout(function(){ console.log('data pause end') //再重新启动 readStream.resume() },3000) }) //还有readable事件,可读的 .on('readable',function(){ console.log('data readable') }) .on('end',function(){ console.log(n) console.log('data ends') }) .on('close',function(){ console.log('data close') }) .on('error',function(e){ console.log('data read error:'+e) })
运行效果如下:
换一个大一点的文件,3M左右的;打印结果如下:
大概每次是64kb
用事件的方式来重构复制图片的操作
var fs = require('fs') //放入一个大文件 var readStream = fs.createReadStream('1.pdf') var writeStream = fs.createWriteStream('1_stream.pdf'); //必然触发一个事件 readStream.on('data',function(chunk){ //写入目标 if(writeStream.write(chunk)=== false){ //判断是否已经写入到目标,来解决爆仓 console.log('still cached') readStream.pause() } }) readStream.on('end',function(){ writeStream.end() }) //耗尽方法 writeStream.on('drain',function(){ console.log('data drains') readStream.resume() }) /* 这是个标准的文件的拷贝操作,但是会有问题; 如果读的快,写的慢;因为读写的速度并不是恒定的,这个时候数据流内部的 缓存可能会被爆仓,那应该怎么办 */
运行结果如下:
边读边写效果.
Stream的种类
Readable:可读流,用来提供数据;外部来源的数据会被存储到buffer里缓存起来,两种模式:流动模式,暂停模式
Writable:可写流,消费数据;
Duplex:双通流,可读可写
Transform:转换流,双通
各自事件,属性都大同小异.场景如下:请求一张图片的数据,在浏览器中显示出来
var http = require('http') var fs = require('fs') http .createServer(function(req,res){ /*fs.readFile('logo.png',function(err,data){ if(err){ res.end('file not exist') }else{ res.wirteHeader(200,{'Context-Type':'text/html'}) res.end(data) } })*/ //利用pipe就能够更简约的实现这套逻辑 fs.createReadStream('logo.png').pipe(res) }) .listen(8090)
运行效果如下:
不止是本地图片的读取,也可以是网络环境下的
使用NodeJs中的request模块
//使用之前先安装,npm install request var request = require('request') request('url').pipe(res) //这样就可以实现边下载边显示
运行结果同上。
在这里pipe方法会自动帮我们监听data和end事件,还可以自动控制后端压力,通过对内存空间的调度就能自动控制流量、避免掉目标被快速读取,只有末端真正需要数据的时候,数据才会从源头被取出来然后顺着管道一路走下去
再次重构读取pdf文件
//只需要2行代码 var fs = require('fs') fs.createReadStream('1.pdf').pipe(fs.createWriteStream('1_pipe.pdf'))
pipe做通道连接时的例子:
var Readable = require('stream').Readable var Writable = require('stream').Writable //拿到两个实例 var readStream = new Readable() var writStream = new Writable() //push一些数据 readStream.push('I ') readStream.push('Love ') readStream.push('NodeJs ') //读取完毕 readStream.push(null) //重写方法 writStream._write = function(chunk,encode,cb){ console.log(chunk.toString()) cb() } //最后,使用 pipe连接起来 readStream.pipe(writStream)
运行结果如下:
来实现一个定制的可读流,可写流、转换流
var stream = require('stream') var util = require('util') //定制的可写流 function ReadStream(){ //首先改变它的上下文,让它可以调用Stream里面可读类的方法 stream.Readable.call(this) } //来让我们声明的可读流继承流里面可读的原型 util.inherits(ReadStream,stream.Readable) //然后就可以为可读流添加原型链上的read方法 ReadStream.prototype._read = function(){ //只干一件事,push数据 this.push('I ') this.push('Love ') this.push('NodeJs ') this.push(null) } //声明可写流 function WriteStream(){ stream.Writable.call(this) //声明cache this._cached = new Buffer('') } util.inherits(WriteStream,stream.Writable) WriteStream.prototype._write = function(chunk,encode,cb){ console.log(chunk.toString()) cb() } //声明转换流 function TransformStream(){ stream.Transform.call(this) } util.inherits(TransformStream,stream.Transform) TransformStream.prototype._transform = function(chunk,encode,cb){ this.push(chunk) cb() } //flush TransformStream.prototype._flush = function(cb){ this.push('Oh Year!') cb() } //生成实例 var rs = new ReadStream() var ws = new WriteStream() var ts = new TransformStream() //读到的数据pipe给转换流 rs.pipe(ts).pipe(ws)
运行结果如下:
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。