_read() is not implemented on Readable stream

18,068

Solution 1

why does calling push inside the read method do nothing? The only thing that works for me is just calling readable.push() elsewhere.

I think it's because you are not consuming it, you need to pipe it to an writable stream (e.g. stdout) or just consume it through a data event:

const { Readable } = require("stream");

let count = 0;
const readableStream = new Readable({
    read(size) {
        this.push('foo');
        if (count === 5) this.push(null);
        count++;
    }
});

// piping
readableStream.pipe(process.stdout)

// through the data event
readableStream.on('data', (chunk) => {
  console.log(chunk.toString());
});

Both of them should print 5 times foo (they are slightly different though). Which one you should use depends on what you are trying to accomplish.

Furthermore, these articles says you don't need to implement the read method:

You might not need it, this should work:

const { Readable } = require("stream");

const readableStream = new Readable();

for (let i = 0; i <= 5; i++) {
    readableStream.push('foo');
}
readableStream.push(null);

readableStream.pipe(process.stdout)

In this case you can't capture it through the data event. Also, this way is not very useful and not efficient I'd say, we are just pushing all the data in the stream at once (if it's large everything is going to be in memory), and then consuming it.

Solution 2

From documentation:

readable._read:

"When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. link"

readable.push:

"The readable.push() method is intended be called only by Readable implementers, and only from within the readable._read() method. link"

Solution 3

readableStream is like a pool:

  • .push(data), It's like pumping water to a pool.
  • .pipe(destination), It's like connecting the pool to a pipe and pump water to other place
  • The _read(size) run as a pumper and control how much water flow and when the data is end.

The fs.createReadStream() will create read stream with the _read() function has been auto implemented to push file data and end when end of file.

The _read(size) is auto fire when the pool is attached to a pipe. Thus, if you force calling this function without connect a way to destination, it will pump to ?where? and it affect the machine status inside _read() (may be the cursor move to wrong place,...)

The read() function must be create inside new Stream.Readable(). It's actually a function inside an object. It's not readableStream.read(), and implement readableStream.read=function(size){...} will not work.

The easy way to understand implement:

var Reader=new Object();
Reader.read=function(size){
    if (this.i==null){this.i=1;}else{this.i++;}
    this.push("abc");
    if (this.i>7){ this.push(null); }
}

const Stream = require('stream');
const renderStream = new Stream.Readable(Reader);

renderStream.pipe(process.stdout)

You can use it to reder what ever stream data to POST to other server. POST stream data with Axios :

require('axios')({
    method: 'POST',
    url: 'http://127.0.0.1:3000',
    headers: {'Content-Length': 1000000000000},
    data: renderStream
});
Share:
18,068
Alexander Mills
Author by

Alexander Mills

Dev, Devops, soccer coach. https://www.github.com/oresoftware

Updated on June 06, 2022

Comments

  • Alexander Mills
    Alexander Mills almost 2 years

    This question is how to really implement the read method of a readable stream.

    I have this implementation of a Readable stream:

    import {Readable} from "stream";
    this.readableStream = new Readable();
    

    I am getting this error

    events.js:136 throw er; // Unhandled 'error' event ^

    Error [ERR_STREAM_READ_NOT_IMPLEMENTED]: _read() is not implemented at Readable._read (_stream_readable.js:554:22) at Readable.read (_stream_readable.js:445:10) at resume_ (_stream_readable.js:825:12) at _combinedTickCallback (internal/process/next_tick.js:138:11) at process._tickCallback (internal/process/next_tick.js:180:9) at Function.Module.runMain (module.js:684:11) at startup (bootstrap_node.js:191:16) at bootstrap_node.js:613:3

    The reason the error occurs is obvious, we need to do this:

      this.readableStream = new Readable({
          read(size) {
            return true;
          }
        });
    

    I don't really understand how to implement the read method though.

    The only thing that works is just calling

    this.readableStream.push('some string or buffer');
    

    if I try to do something like this:

       this.readableStream = new Readable({
              read(size) {
                this.push('foo');   // call push here!
                return true;
              }
         });
    

    then nothing happens - nothing comes out of the readable!

    Furthermore, these articles says you don't need to implement the read method:

    https://github.com/substack/stream-handbook#creating-a-readable-stream

    https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93

    My question is - why does calling push inside the read method do nothing? The only thing that works for me is just calling readable.push() elsewhere.

  • Alexander Mills
    Alexander Mills about 6 years
    I have always been consuming it, with readable.on('data', function(){});
  • Antonio Val
    Antonio Val about 6 years
    I expanded the explanation, if you want to do it like that you need to implement the read method indeed!
  • Alexander Mills
    Alexander Mills about 6 years
    try your second example - it should throw an error saying that you need to implement _read()...
  • Antonio Val
    Antonio Val about 6 years
    It's working for me, are you closing it pushing null?