我们知道 可读流是作为数据生产者,而可写流作为数据消费者。





其实这和磁盘文件的读取和写入速度有关,一般来说,从磁盘中读取文件的速度 要快于 向磁盘中写入文件。





即 生产者的生产速度,快于,消费者的消费速度。





const fs = require('fs')
const path = require('path')const rs = fs.createReadStream(path.join(__dirname, 'test.txt'), {highWaterMark: 4  // 可读流缓冲区水位线默认64KB
})const ws = fs.createWriteStream(path.join(__dirname, 'test2.txt'), {highWaterMark: 1 // 可写流缓冲区水位线默认16KB
})rs.on('data', (chunk) => {let flag = ws.write(chunk)if (!flag) {rs.pause()}
})ws.on('drain', () => {rs.resume()






Readable.prototype.pipe = function(dest, pipeOpts) {const src = this;const state = this._readableState;if (state.pipes.length === 1) {if (!state.multiAwaitDrain) {state.multiAwaitDrain = true;state.awaitDrainWriters = new SafeSet(state.awaitDrainWriters ? [state.awaitDrainWriters] : []);}}state.pipes.push(dest);debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);const doEnd = (!pipeOpts || pipeOpts.end !== false) &&dest !== process.stdout &&dest !== process.stderr;const endFn = doEnd ? onend : unpipe;if (state.endEmitted)process.nextTick(endFn);elsesrc.once('end', endFn);dest.on('unpipe', onunpipe);function onunpipe(readable, unpipeInfo) {debug('onunpipe');if (readable === src) {if (unpipeInfo && unpipeInfo.hasUnpiped === false) {unpipeInfo.hasUnpiped = true;cleanup();}}}function onend() {debug('onend');dest.end();}let ondrain;let cleanedUp = false;function cleanup() {debug('cleanup');// Cleanup event handlers once the pipe is broken.dest.removeListener('close', onclose);dest.removeListener('finish', onfinish);if (ondrain) {dest.removeListener('drain', ondrain);}dest.removeListener('error', onerror);dest.removeListener('unpipe', onunpipe);src.removeListener('end', onend);src.removeListener('end', unpipe);src.removeListener('data', ondata);cleanedUp = true;// If the reader is waiting for a drain event from this// specific writer, then it would cause it to never start// flowing again.// So, if this is awaiting a drain, then we just call it now.// If we don't know, then assume that we are waiting for one.if (ondrain && state.awaitDrainWriters &&(!dest._writableState || dest._writableState.needDrain))ondrain();}function pause() {// If the user unpiped during `dest.write()`, it is possible// to get stuck in a permanently paused state if that write// also returned false.// => Check whether `dest` is still a piping destination.if (!cleanedUp) {if (state.pipes.length === 1 && state.pipes[0] === dest) {debug('false write response, pause', 0);state.awaitDrainWriters = dest;state.multiAwaitDrain = false;} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {debug('false write response, pause', state.awaitDrainWriters.size);state.awaitDrainWriters.add(dest);}src.pause();}if (!ondrain) {// When the dest drains, it reduces the awaitDrain counter// on the source.  This would be more elegant with a .once()// handler in flow(), but adding and removing repeatedly is// too slow.ondrain = pipeOnDrain(src, dest);dest.on('drain', ondrain);}}src.on('data', ondata);function ondata(chunk) {debug('ondata');const ret = dest.write(chunk);debug('dest.write', ret);if (ret === false) {pause();}}// If the dest has an error, then stop piping into it.// However, don't suppress the throwing behavior for this.function onerror(er) {debug('onerror', er);unpipe();dest.removeListener('error', onerror);if (EE.listenerCount(dest, 'error') === 0) {const s = dest._writableState || dest._readableState;if (s && !s.errorEmitted) {// User incorrectly emitted 'error' directly on the stream.errorOrDestroy(dest, er);} else {dest.emit('error', er);}}}// Make sure our error handler is attached before userland ones.prependListener(dest, 'error', onerror);// Both close and finish should trigger unpipe, but only once.function onclose() {dest.removeListener('finish', onfinish);unpipe();}dest.once('close', onclose);function onfinish() {debug('onfinish');dest.removeListener('close', onclose);unpipe();}dest.once('finish', onfinish);function unpipe() {debug('unpipe');src.unpipe(dest);}// Tell the dest that it's being piped to.dest.emit('pipe', src);// Start the flow if it hasn't been started already.if (dest.writableNeedDrain === true) {if (state.flowing) {pause();}} else if (!state.flowing) {debug('pipe resume');src.resume();}return dest;







