NodeJS Stream 四:Writable
什么是可寫流
可寫流是對(duì)數(shù)據(jù)流向設(shè)備的抽象,用來(lái)消費(fèi)上游流過(guò)來(lái)的數(shù)據(jù),通過(guò)可寫流程序可以把數(shù)據(jù)寫入設(shè)備,常見的是本地磁盤文件或者 TCP、HTTP 等網(wǎng)絡(luò)響應(yīng)。
看一個(gè)之前用過(guò)的例子
process.stdin.pipe(process.stdout);
*process.stdout* 是一個(gè)可寫流,程序把可讀流 process.stdin 傳過(guò)來(lái)的數(shù)據(jù)寫入的標(biāo)準(zhǔn)輸出設(shè)備。在了解了可讀流的基礎(chǔ)上理解可寫流非常簡(jiǎn)單,流就是有方向的數(shù)據(jù),其中可讀流是數(shù)據(jù)源,可寫流是目的地,中間的管道環(huán)節(jié)是雙向流。
可寫流使用
調(diào)用可寫流實(shí)例的 **write() **方法就可以把數(shù)據(jù)寫入可寫流
const fs = require('fs');
const rs = fs.createReadStream('./w.js');
const ws = fs.createWriteStream('./copy.js');
rs.setEncoding('utf-8');
rs.on('data', chunk => {
ws.write(chunk);
});
前面提到過(guò)監(jiān)聽了可讀流的 data 事件就會(huì)使可讀流進(jìn)入流動(dòng)模式,我們?cè)诨卣{(diào)事件里調(diào)用了可寫流的 write() 方法,這樣數(shù)據(jù)就被寫入了可寫流抽象的設(shè)備中,也就是當(dāng)前目錄下的 copy.js 文件。
write() 方法有三個(gè)參數(shù)
- chunk {String| Buffer},表示要寫入的數(shù)據(jù)
- encoding 當(dāng)寫入的數(shù)據(jù)是字符串的時(shí)候可以設(shè)置編碼
- callback 數(shù)據(jù)被寫入之后的回調(diào)函數(shù)
自定義可寫流
和自定義可讀流類似,簡(jiǎn)單的自定義可寫流只需要兩步
- 繼承 stream 模塊的 Writable 類
- 實(shí)現(xiàn) _write() 方法
我們來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的可寫流,把傳入可寫流的數(shù)據(jù)轉(zhuǎn)成大寫之后輸出到標(biāo)準(zhǔn)輸出設(shè)備(比較好的例子可能是寫入本地磁盤文件,但涉及過(guò)多的 fs 操作,比較麻煩,偷個(gè)懶。寫入標(biāo)準(zhǔn)輸出設(shè)備也是一種寫入行為)
const Writable = require('stream').Writable
class OutputStream extends Writable {
_write(chunk, enc, done) {
// 轉(zhuǎn)大寫之后寫入標(biāo)準(zhǔn)輸出設(shè)備
process.stdout.write(chunk.toString().toUpperCase());
// 此處不嚴(yán)謹(jǐn),應(yīng)該是監(jiān)聽寫完之后才調(diào)用 done
process.nextTick(done);
}
}
module.exports = OutputStream;
和最終可寫流暴露出來(lái)的 write() 方法一樣, _write() 方法有三個(gè)參數(shù),作用類似
- chunk 寫入的數(shù)據(jù),大部分時(shí)候是 buffer,除非 decodeStrings 被設(shè)置為 false
- encoding 如果數(shù)據(jù)是字符串,可以設(shè)置編碼,buffer 或者 object 模式會(huì)忽略
- callback 數(shù)據(jù)寫入后的回調(diào)函數(shù),可以通知流傳入下一個(gè)數(shù)據(jù);當(dāng)出現(xiàn)錯(cuò)誤的時(shí)候也可以設(shè)置一個(gè) error 參數(shù)
當(dāng)然其實(shí)還有一個(gè) _writev() 方法可以實(shí)現(xiàn),這個(gè)方法僅被滯留的寫入隊(duì)列調(diào)用,可以不實(shí)現(xiàn)。
實(shí)例化可寫流
有了可寫流的類之后我們可以實(shí)例化使用了,實(shí)例化可寫流的時(shí)候有幾個(gè) option 可選,了解一下可以幫助我們理解后面要用的知識(shí)
- objectMode
默認(rèn)是 false, 設(shè)置成 true 后 writable.write() 方法除了寫入 string 和 buffer 外,還可以寫入任意 JavaScript 對(duì)象。很有用的一個(gè)選項(xiàng),后面介紹 transform 流的時(shí)候詳細(xì)介紹 - highWaterMark
每次最多寫入的數(shù)據(jù)量, Buffer 的時(shí)候默認(rèn)值 16kb, objectMode 時(shí)默認(rèn)值 16 - decodeStrings
是否把傳入的數(shù)據(jù)轉(zhuǎn)成 Buffer,默認(rèn)是 true
這樣我們就更清楚的知道 _write() 方法傳入的參數(shù)的含義了,而且對(duì)后面介紹 back pressure 機(jī)制的理解很有幫助。
事件
和可讀流一樣,可寫流也有幾個(gè)常用的事件,有了可讀流的基礎(chǔ),理解起來(lái)比較簡(jiǎn)單
- pipe 當(dāng)可讀流調(diào)用 pipe() 方法向可寫流傳輸數(shù)據(jù)的時(shí)候會(huì)觸發(fā)可寫流的 pipe 事件
- unpipe 當(dāng)可讀流調(diào)用 unpipe() 方法移除數(shù)據(jù)傳遞的時(shí)候會(huì)觸發(fā)可寫流的 unpipe 事件
這兩個(gè)事件用于通知可寫流數(shù)據(jù)將要到來(lái)和將要被切斷,在通常情況下使用的很少。
writeable.write() 方法是有一個(gè) bool 的返回值的,前面提到了 highWaterMark,當(dāng)要求寫入的數(shù)據(jù)大于可寫流的 highWaterMark 的時(shí)候,數(shù)據(jù)不會(huì)被一次寫入,有一部分?jǐn)?shù)據(jù)被滯留,這時(shí)候 writeable.write() 就會(huì)返回 false,如果可以處理完就會(huì)返回 true
drain 當(dāng)之前存在滯留數(shù)據(jù),也就是 writeable.write() 返回過(guò) false,經(jīng)過(guò)一段時(shí)間的消化,處理完了積壓數(shù)據(jù),可以繼續(xù)寫入新數(shù)據(jù)的時(shí)候觸發(fā)(drain 的本意即為排水、枯竭,挺形象的)
除了 write() 方法可寫流還有一個(gè)常用的方法 end(),參數(shù)和 write() 方法相同,但也可以不傳入?yún)?shù),表示沒有其它數(shù)據(jù)需要寫入,可寫流可以關(guān)閉了。
finish 當(dāng)調(diào)用 writable.end() 方法,并且所有數(shù)據(jù)都被寫入底層后會(huì)觸發(fā) finish 事件
同樣出現(xiàn)錯(cuò)誤后會(huì)觸發(fā) error 事件
back pressure
了解了這些事件,結(jié)合上之前提到的可讀流的一些知識(shí),我們就能探討一些有意思的話題了。在最開始我們提到過(guò)用流相對(duì)于直接操作文件的好處之一是不會(huì)把內(nèi)存壓爆,那么流是怎么做到的呢?
最開始我們可能會(huì)想到因?yàn)榱鞑皇且淮涡园阉袛?shù)據(jù)載入內(nèi)存處理,而是一邊讀一邊寫。但我們知道一般讀取的速度會(huì)遠(yuǎn)遠(yuǎn)快于寫入的速度,那么 pipe() 方法是怎么做到供需平衡的呢?
回憶一些基礎(chǔ)知識(shí),我們自己來(lái)實(shí)現(xiàn)一下 pipe() 方法的核心原理
- 可讀流有流動(dòng)和暫停兩種模式,可以通過(guò) **pause() 和 resume() **方法切換
- 可寫流的 **write() **方法會(huì)返回是否能處理當(dāng)前的數(shù)據(jù),每次可以處理多少是 hignWatermark 決定的
- 當(dāng)可寫流處理完了積壓數(shù)據(jù)會(huì)觸發(fā) drain 事件
我們可以利用這三點(diǎn)來(lái)做到數(shù)據(jù)讀取和寫入的同步,還是使用之前的例子,但為了使消費(fèi)速度降下來(lái),我們各一秒再通知完成
class OutputStream extends Writable {
_write(chunk, enc, done) {
// 轉(zhuǎn)大寫之后寫入標(biāo)準(zhǔn)輸出設(shè)備
process.stdout.write(chunk.toString().toUpperCase());
// 故意延緩?fù)ㄖ^續(xù)傳遞數(shù)據(jù)的時(shí)間,造成寫入速度慢的現(xiàn)象
setTimeout(done, 1000);
}
}
我們使用一下自定義的兩個(gè)類
const RandomNumberStream = require('./RandomNumberStream');
const OutputStream = require('./OutputStream');
const rns = new RandomNumberStream(100);
const os = new OutputStream({
highWaterMark: 8 // 把水位降低,默認(rèn)16k還是挺大的
});
rns.on('data', chunk => {
// 當(dāng)待處理隊(duì)列大于 highWaterMark 時(shí)返回 false
if (os.write(chunk) === false) {
console.log('pause');
rns.pause(); // 暫停數(shù)據(jù)讀取
}
});
// 當(dāng)待處理隊(duì)列小于 highWaterMark 時(shí)觸發(fā) drain 事件
os.on('drain', () => {
console.log('drain')
rns.resume(); // 恢復(fù)數(shù)據(jù)讀取
});
結(jié)合前面的三點(diǎn)和注釋很容易看懂上面代碼,這就是 pipe() 方法起作用的核心原理。數(shù)據(jù)的來(lái)源的去向我們有了大概了解,后面可以開始介紹數(shù)據(jù)的加工
- duplex
- transform
?

浙公網(wǎng)安備 33010602011771號(hào)