What's the proper way to handle back-pressure in a node.js Transform stream?

13,157

Solution 1

This question from 2013 is all I was able to find on how to deal with "back pressure" when creating node Transform streams.

From the node 7.10.0 Transform stream and Readable stream documentation what I gathered was that once push returned false, nothing else should be pushed until _read was called.

The Transform documentation doesn't mention _read except to mention that the base Transform class implements it (and _write). I found the information about push returning false and _read being called in the Readable stream documentation.

The only other authoritative comment I found on Transform back pressure only mentioned it as an issue, and that was in a comment at the top of the node file _stream_transform.js.

Here's the section about back pressure from that comment:

// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.

Solution example

Here's the solution I pieced together to handle the back pressure in a Transform stream which I'm pretty sure works. (I haven't written any real tests, which would require writing a Writable stream to control the back pressure.)

This is a rudimentary Line transform which needs work as a line transform but does demonstrate handling the "back pressure".

const stream = require('stream');

class LineTransform extends stream.Transform
{
    constructor(options)
    {
        super(options);

        this._lastLine = "";
        this._continueTransform = null;
        this._transforming = false;
        this._debugTransformCallCount = 0;
    }

    _transform(chunk, encoding, callback)
    {
        if (encoding === "buffer")
            return callback(new Error("Buffer chunks not supported"));

        if (this._continueTransform !== null)
            return callback(new Error("_transform called before previous transform has completed."));

        // DEBUG: Uncomment for debugging help to see what's going on
        //console.error(`${++this._debugTransformCallCount} _transform called:`);

        // Guard (so we don't call _continueTransform from _read while it is being
        // invoked from _transform)
        this._transforming = true;

        // Do our transforming (in this case splitting the big chunk into lines)
        let lines = (this._lastLine + chunk).split(/\r\n|\n/);
        this._lastLine = lines.pop();

        // In order to respond to "back pressure" create a function
        // that will push all of the lines stopping when push returns false,
        // and then resume where it left off when called again, only calling
        // the "callback" once all lines from this transform have been pushed.
        // Resuming (until done) will be done by _read().
        let nextLine = 0;
        this._continueTransform = () =>
            {
                let backpressure = false;
                while (nextLine < lines.length)
                {

                    if (!this.push(lines[nextLine++] + "\n"))
                    {
                        // we've got more to push, but we got backpressure so it has to wait.
                        if (backpressure)
                            return;

                        backpressure = !this.push(lines[nextLine++] + "\n");
                    }
                }

                // DEBUG: Uncomment for debugging help to see what's going on
                //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);

                // All lines are pushed, remove this function from the LineTransform instance
                this._continueTransform = null;
                return callback();
            };

        // Start pushing the lines
        this._continueTransform();

        // Turn off guard allowing _read to continue the transform pushes if needed.
        this._transforming = false;
    }

    _flush(callback)
    {
        if (this._lastLine.length > 0)
        {
            this.push(this._lastLine);
            this._lastLine = "";
        }

        return callback();
    }

    _read(size)
    {
        // DEBUG: Uncomment for debugging help to see what's going on
        //if (this._transforming)
        //    console.error(`_read called during _transform ${this._debugTransformCallCount}`);

        // If a transform has not pushed every line yet, continue that transform
        // otherwise just let the base class implementation do its thing.
        if (!this._transforming && this._continueTransform !== null)
            this._continueTransform();
        else
            super._read(size);
    }
}

I tested the above by running it with the DEBUG lines uncommented on a ~10000 line ~200KB file. Redirect stdout or stderr to a file (or both) to separate the debugging statements from the expected output. (node test.js > out.log 2> err.log)

const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);

Helpful debugging hint

While writing this initially I didn't realize that _read could be called before _transform returned, so I hadn't implemented the this._transforming guard and I was getting the following error:

Error: no writecb in Transform class
    at afterTransform (_stream_transform.js:71:33)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
    at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
    at LineTransform.Transform._read (_stream_transform.js:167:10)
    at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
    at LineTransform.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:331:12)
    at writeOrBuffer (_stream_writable.js:317:5)
    at LineTransform.Writable.write (_stream_writable.js:243:11)

Looking at the node implementation I realized that this error meant that the callback given to _transform was being called more than once. There wasn't much information to be found about this error either so I thought I'd include what I figured out here.

Solution 2

push will return false if the stream you are writing to (in this case, a file output stream) has too much data buffered. Since you're writing to disk, this makes sense: you are processing data faster than you can write it out.

When out's buffer is full, your transform stream will fail to push, and start buffering data itself. If that buffer should fill, then inp's will start to fill. This is how things should be working. The piped streams are only going to process data as fast as the slowest link in the chain can handle it (once your buffers are full).

Solution 3

I think Transform is suitable for this, but I would perform the inflate as a separate step in the pipeline.

Here's a quick and largely untested example:

var zlib        = require('zlib');
var stream      = require('stream');
var transformer = new stream.Transform();

// Properties used to keep internal state of transformer.
transformer._buffers    = [];
transformer._inputSize  = 0;
transformer._targetSize = 1024 * 38;

// Dump one 'output packet'
transformer._dump       = function(done) {
  // concatenate buffers and convert to binary string
  var buffer = Buffer.concat(this._buffers).toString('binary');

  // Take first 1024 packets.
  var packetBuffer = buffer.substring(0, this._targetSize);

  // Keep the rest and reset counter.
  this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
  this._inputSize = this._buffers[0].length;

  // output header
  this.push('HELLO WORLD');

  // output compressed packet buffer
  zlib.deflate(packetBuffer, function(err, compressed) {
    // TODO: handle `err`
    this.push(compressed);
    if (done) {
      done();
    }
  }.bind(this));
};

// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform  = function(chunk, encoding, done) {
  this._buffers.push(chunk);
  this._inputSize += chunk.length;

  if (this._inputSize >= this._targetSize) {
    this._dump(done);
  } else {
    done();
  }
};

// Flush any remaining buffers.
transformer._flush = function() {
  this._dump();
};

// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
  .pipe(zlib.createInflate())
  .pipe(transformer)
  .pipe(fs.createWriteStream('depth_1000000.out'));

Solution 4

Ran into a similar problem lately, needing to handle backpressure in an inflating transform stream - the secret to handling push() returning false is to register and handle the 'drain' event on the stream

_transform(data, enc, callback) {
  const continueTransforming = () => {
    // ... do some work / parse the data, keep state of where we're at etc
    if(!this.push(event)) 
         this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data
    if(allDone)
       callback();
  }
  continueTransforming()
}

NOTE this is a bit hacky as we're reaching into the internals and pipes can even be an array of Readables but it does work in the common case of ....pipe(transform).pipe(...

Would be great if someone from the Node community can suggest a "correct" method for handling .push() returning false

Solution 5

I ended up following Ledion's example and created a utility Transform class which assists with backpressure. The utility adds an async method named addData, which the implementing Transform can await.

'use strict';

const { Transform } = require('stream');

/**
 * The BackPressureTransform class adds a utility method addData which
 * allows for pushing data to the Readable, while honoring back-pressure.
 */
class BackPressureTransform extends Transform {
  constructor(...args) {
    super(...args);
  }

  /**
   * Asynchronously add a chunk of data to the output, honoring back-pressure.
   *
   * @param {String} data
   * The chunk of data to add to the output.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the data has been added.
   */
  async addData(data) {
    // if .push() returns false, it means that the readable buffer is full
    // when this occurs, we must wait for the internal readable to emit
    // the 'drain' event, signalling the readable is ready for more data
    if (!this.push(data)) {
      await new Promise((resolve, reject) => {
        const errorHandler = error => {
          this.emit('error', error);
          reject();
        };
        const boundErrorHandler = errorHandler.bind(this);

        this._readableState.pipes.on('error', boundErrorHandler);
        this._readableState.pipes.once('drain', () => {
          this._readableState.pipes.removeListener('error', boundErrorHandler);
          resolve();
        });
      });
    }
  }
}

module.exports = {
  BackPressureTransform
};

Using this utility class, my Transforms look like this now:

'use strict';

const { BackPressureTransform } = require('./back-pressure-transform');

/**
 * The Formatter class accepts the transformed row to be added to the output file.
 * The class provides generic support for formatting the result file.
 */
class Formatter extends BackPressureTransform {
  constructor() {
    super({
      encoding: 'utf8',
      readableObjectMode: false,
      writableObjectMode: true
    });

    this.anyObjectsWritten = false;
  }

  /**
   * Called when the data pipeline is complete.
   *
   * @param {Function} callback
   * The function which is called when final processing is complete.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the flush completes.
   */
  async _flush(callback) {
    // if any object is added, close the surrounding array
    if (this.anyObjectsWritten) {
      await this.addData('\n]');
    }

    callback(null);
  }

  /**
   * Given the transformed row from the ETL, format it to the desired layout.
   *
   * @param {Object} sourceRow
   * The transformed row from the ETL.
   *
   * @param {String} encoding
   * Ignored in object mode.
   *
   * @param {Function} callback
   * The callback function which is called when the formatting is complete.
   *
   * @returns {Promise<void>}
   * A Promise resolving after the row is transformed.
   */
  async _transform(sourceRow, encoding, callback) {
    // before the first object is added, surround the data as an array
    // between each object, add a comma separator
    await this.addData(this.anyObjectsWritten ? ',\n' : '[\n');

    // update state
    this.anyObjectsWritten = true;

    // add the object to the output
    const parsed = JSON.stringify(sourceRow, null, 2).split('\n');
    for (const [index, row] of parsed.entries()) {
      // prepend the row with 2 additional spaces since we're inside a larger array
      await this.addData(`  ${row}`);

      // add line breaks except for the last row
      if (index < parsed.length - 1) {
        await this.addData('\n');
      }
    }

    callback(null);
  }
}

module.exports = {
  Formatter
};
Share:
13,157

Related videos on Youtube

raju
Author by

raju

You can get a good look at a bio by sticking your head up its ass but wouldn't you rather take its word for it?

Updated on September 15, 2022

Comments

  • raju
    raju over 1 year

    Intro

    These are my first adventures in writing node.js server side. It's been fun so far but I'm having some difficulty understanding the proper way to implement something as it relates to node.js streams.

    Problem

    For test and learning purposes I'm working with large files whose content is zlib compressed. The compressed content is binary data, each packet being 38 bytes in length. I'm trying to create a resulting file that looks almost identical to the original file except that there is an uncompressed 31 byte header for every 1024 38 byte packets.

    original file content (decompressed)

    +----------+----------+----------+----------+
    | packet 1 | packet 2 |  ......  | packet N |
    | 38 bytes | 38 bytes |  ......  | 38 bytes |
    +----------+----------+----------+----------+
    

    resulting file content

    +----------+--------------------------------+----------+--------------------------------+
    | header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
    | 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |
    +----------+--------------------------------+----------+--------------------------------+
    

    As you can see, it's somewhat of a translation problem. Meaning, I'm taking some source stream as input and then slightly transforming it into some output stream. Therefore, it felt natural to implement a Transform stream.

    The class simply attempts to accomplish the following:

    1. Takes stream as input
    2. zlib inflates the chunks of data to count the number of packets, putting together 1024 of them, zlib deflating, and prepending a header.
    3. Passes the new resulting chunk on through the pipeline via this.push(chunk).

    A use case would be something like:

    var fs = require('fs');
    var me = require('./me'); // Where my Transform stream code sits
    var inp = fs.createReadStream('depth_1000000');
    var out = fs.createWriteStream('depth_1000000.out');
    inp.pipe(me.createMyTranslate()).pipe(out);
    

    Question(s)

    Assuming Transform is a good choice for this use case, I seem to be running into a possible back-pressure issue. My call to this.push(chunk) within _transform keeps returning false. Why would this be and how to handle such things?

  • raju
    raju over 10 years
    Thank you very much for the detailed answer as it provides great insight into exactly what I'm trying to do. What I'm curious about is in your _dump function, how would you handle when this.push(compressed) returned false? This happens once in a while and I think it has to be a side effect of back-pressure on the stream. In these cases, how should one recover without losing data? Does that make sense?
  • robertklep
    robertklep over 10 years
    @ScottSaad I'm not entirely sure to be honest. I also couldn't really find much about handling back pressure issues in transform streams :(
  • raju
    raju over 10 years
    Wanted to let you know that I updated the question to be a bit more pinpointed as it refers to the back-pressure thing but wanted to thank you for the time you put into the above code. It's great! :)
  • raju
    raju over 10 years
    I suppose I'm confused as to what to do with the information when it returns false. To me, if a function has a return code (in this case true or false, it tells the caller some important piece of information. From what you've described above, it sounds like it's completely safe to ignore the return code. Is that what you're suggesting? If so, why? (poor documentation, etc)?
  • mako-taco
    mako-taco over 10 years
    When you're using pipe, you can ignore the 'false'. I believe that pipe was designed to handle this internally. If you wanted to write your own custom backpressure mechanism, you could do so using the value returned from push
  • Liam Mitchell
    Liam Mitchell over 5 years
    It's 2018 is this still kind of work-around still required today, or will pipe handle the backpressure internally from my transform? If so could probably make a more generic solution as a module, to use a "BackPressure" transformer after the transform stream running into a backpressure issue, which could have options to process the chunks at a fixed sized buffer and queue limits. Maybe someone already made such a module? More information on backpressuring on streams. nodejs.org/en/docs/guides/backpressuring-in-streams
  • Liam Mitchell
    Liam Mitchell over 5 years
    TLDR: When using pipes node handles backpressure but with custom transformers you need to handle it internally. So yes we need to handle internally.
  • Admin
    Admin about 4 years
    can you explain what's going on here conceptually? seems just like code dump
  • robertklep
    robertklep about 4 years
    @MrCholo the code reads at least 1024*38 bytes from an inflated stream, and writes it back in chunks of exactly 1024*38 bytes with an additional header in front of it (in the example, the string 'HELLO WORLD'). Each of these chunks (1024*38 + header.length bytes) is zlib-compressed.
  • Admin
    Admin about 4 years
    ok sorry I don't understand, I still don't understand how it works - why exactly 1024*38, can you back up a bit
  • robertklep
    robertklep about 4 years
    @MrCholo the 'exactly 1024*38' requirement is from the original question. In _transform you see that the code collects all the chunks of data (into this._buffers) until its size exceeds this._targetSize (which is set to 1024*38). In _dump the first 1024*38 bytes are extracted (using buffer.substring()), the header is passed on (using this.push()) and the extracted bytes are deflated using zlib (the result of which is also passed on). Be aware that the size of this._buffers may be larger than 1024*38 bytes, so there's some additional code to keep the excess for the 'next round'.
  • Martin Kreidenweis
    Martin Kreidenweis over 2 years
    I think the better way is to continue processing as soon as _read is called, as described in stackoverflow.com/a/43811543/12749249 , instead of looking at the internal _readableState.