Node.js stream API: events or callbacks?

Last year, I wrote a blog post about events and node streams. In this post, I proposed an alternative API for streams: callback-oriented rather than event-oriented.

For readable streams, the proposal was to have a simple read(cb) call, where cb is a callback with a function(err, data) signature. A null data value signals the end of stream.

I did not discuss writable streams in this early post but shortly afterwards I implemented wrappers for both readable and writable streams in streamline.js’ streams module and I used a very similar design for the writable stream API: a simple write(data, cb) function (similarly, a null data ends the stream).

Note: the parameters are swapped in the streamline API (write(cb, data)) because it makes it easier to deal with optional parameters. In this post I will stick to the standard node.js convention of passing the callback as last parameter.

I have been using this callback-based API for more than a year and I have found it very pleasant to work with: it is simple and robust (no risk of losing data events); it handles flow control naturally and it blends really well with streamline.js. For example, I could easily re-implement the pump/pipe functionality with a simple loop:

function pump(inStream, outStream, _) {
  var data;
  do {
    data = inStream.read(_);
    outStream.write(data, _);
  } while (data != null);
}

State of affairs

I find the current node.js stream API quite hairy in comparison. On the read side we have three events (data, end, error) and two calls (pause and resume). On the write side we have two events (drain, error) and two calls (write and end).

The event-oriented API is also more fragile because you run the risk of losing events if you do not attach your event handlers early enough (unless you pause the stream immediately after creating it).

And from the exchanges that I see on the node mailing list, I have the impression that this API is not completely sorted out yet. There are talks about upcoming changes in 0.9.

I have tried to inject the idea of a callback based API into the debate but I’ve been unsuccessful so far. Discussions quickly turned sour. I got challenged on the fact that flow control would not work with such an API but I didn’t get any response when I asked for a scenario that would demonstrate where the potential problem would be.

Equivalence

So I’m writing this post to try to shed some light on the issue. What I’ll try to do in this post is prove that the two APIs are equivalent, the corrolary being that we should then be free to choose whatever API style we want.

To prove the equivalence, I am going to create wrappers:

  • A first set of wrappers that transform streams with event-oriented APIs into streams with callback-oriented APIs.
  • A second set of wrappers that transform streams with callback-oriented APIs into streams with event-oriented APIs.

There will be three wrappers in each set: a Read wrapper for readable streams, a Write wrapper for writable streams, and a wrapper that handles both read and write.

After introducing these wrappers, I will demonstrate on a small example that we get an equivalent stream when we wrap a stream twice, first in callback style and then in event style.

In this presentation I will deliberately ignore peripheral issues like encoding, close events, etc. So I won’t deal with all the subtleties of the actual node.js APIs.

The callback read wrapper

The callback read wrapper implements the asynchronous read(cb) API on top of a standard node.js readable stream.

exports.CallbackReadWrapper = function(stream) {
  var _chunks = [];
  var _error;
  var _done = false;

  stream.on('error', function(err) {
    _onData(err);
  });
  stream.on('data', function(data) {
    _onData(null, data);
  });
  stream.on('end', function() {
    _onData(null, null);
  });

  function memoize(err, chunk) {
    if (err) _error = err;
    else if (chunk) {
      _chunks.push(chunk);
      stream.pause();
    } else _done = true;
  };

  var _onData = memoize;

  this.read = function(cb) {
    if (_chunks.length > 0) {
      var chunk = _chunks.splice(0, 1)[0];
      if (_chunks.length === 0) {
        stream.resume();
      }
      return cb(null, chunk);
    } else if (_done) {
      return cb(null, null);
    } else if (_error) {
      return cb(_error);
    } else _onData = function(err, chunk) {
      if (!err && !chunk) _done = true;
      _onData = memoize;
      cb(err, chunk);
    };
  }
}

This implementation does not make the assumption that data events will never be delivered after a pause() call, as this assumption was not valid in earlier versions of node. This is why it uses an array of chunks to memoize. The code could be simplified if we made this assumption.

The callback write wrapper

The callback write wrapper implements the asynchronous write(data, cb) API on top of a standard node.js writable stream.

exports.CallbackWriteWrapper = function(stream) {
  var _error;
  var _onDrain;

  stream.on('error', function(err) {
    if (_onDrain) _onDrain(err);
    else _error = err;
  });
  stream.on('drain', function() {
    _onDrain && _onDrain();
  });

  this.write = function(data, cb) {
    if (_error) return cb(_error);
    if (data != null) {
      if (!stream.write(data)) {
        _onDrain = function(err) {
          _onDrain = null;
          cb(err);
        };
      } else {
        process.nextTick(cb);
      }
    } else {
      stream.end();
    }
  }
}

The process.nextTick call guarantees that we won’t blow the stack if stream.write always returns true.

The event read wrapper

The event read wrapper is the dual of the callback read wrapper. It implements the node.js readable stream API on top of an asynchronous read(cb) function.

exports.EventReadWrapper = function(stream) {
  var self = this;
  var q = [],
    paused;

  function doRead(err, data) {
    if (err) self.emit('error', err);
    else if (data != null) {
      if (paused) {
        q.push(data);
      } else {
        self.emit('data', data);
        stream.read(doRead);
      }
    } else {
      if (paused) {
        q.push(null);
      } else {
        self.emit('end');
      }
    }
  }
  self.pause = function() {
    paused = true;
  }
  self.resume = function() {
    var data;
    while ((data = q.shift()) !== undefined) {
      if (data != null) self.emit('data', data);
      else self.emit('end');
    }
    paused = false;
    stream.read(doRead);
  }

  stream.read(doRead);
}

exports.EventReadWrapper.prototype = new EventEmitter();

The event write wrapper

The event write wrapper is the dual of the callback write wrapper. It implements the node.js writable stream API on top of an asynchronous write(data, cb) function.

exports.EventWriteWrapper = function(stream) {
  var self = this;
  var chunks = [];

  function written(err) {
    if (err) self.emit('error', err);
    else {
      chunks.splice(0, 1);
      if (chunks.length === 0) self.emit('drain');
      else stream.write(chunks[0], written);
    }
  }
  this.write = function(data) {
    chunks.push(data);
    if (chunks.length === 1) stream.write(data, written);
    return chunks.length === 0;
  }
  this.end = function(data) {
    if (data != null) self.write(data);
    self.write(null);
  }
}

exports.EventWriteWrapper.prototype = new EventEmitter();

The combined wrappers

The combined wrappers implement both APIs (read and write). Their implementation is straightforwards:

exports.CallbackWrapper = function(stream) {
  exports.CallbackReadWrapper.call(this, stream);
  exports.CallbackWriteWrapper.call(this, stream);
}

exports.EventWrapper = function(stream) {
  exports.EventReadWrapper.call(this, stream);
  exports.EventWriteWrapper.call(this, stream);
}

exports.EventWrapper.prototype = new EventEmitter();

Equivalence demo

The demo program is based on the following program:

"use strict";
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');

http.createServer(function(request, response) {
  response.writeHead(200, {
    'Content-Type': 'text/plain; charset=utf8',
    'Content-Encoding': 'deflate',    
  });
  var source = fs.createReadStream(__dirname + '/wrappers.js');
  var deflate = zlib.createDeflate();
  util.pump(source, deflate);
  util.pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');

This is a simple program that serves a static file in compressed form. It uses two util.pump calls. The first one pumps the source stream into the deflate stream and the second one pumps the deflate stream into the response stream.

Then we modify this program to wrap the three streams twice before passing them to util.pump:

"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');

http.createServer(function(request, response) {
  response.writeHead(200, {
    'Content-Type': 'text/plain; charset=utf8',
    'Content-Encoding': 'deflate',    
  });
  var source = fs.createReadStream(__dirname + '/wrappers.js');
  var deflate = zlib.createDeflate();
  source = new wrappers.EventReadWrapper(new wrappers.CallbackReadWrapper(source));
  response = new wrappers.EventWriteWrapper(new wrappers.CallbackWriteWrapper(response));
  deflate = new wrappers.EventWrapper(new wrappers.CallbackWrapper(deflate));
  util.pump(source, deflate);
  util.pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');

This program works like the previous one (maybe just a little bit slower), which shows that the doubly wrapped streams behave like the original unwrapped streams:

EventWrapper(CallbackWrapper(stream)) <=> stream

Note that this program won’t exercise the full pause/resume/drain API with a small input file like wrappers.js. You have to try it with a large file to exercise all events.

The next demo is a streamline.js variant that transforms the three streams into callback-oriented streams and uses the pump loop that I gave in the introduction:

"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');

http.createServer(function(request, response) {
  response.writeHead(200, {
    'Content-Type': 'text/plain; charset=utf8',
    'Content-Encoding': 'deflate',    
  });
  var source = fs.createReadStream(__dirname + '/wrappers.js');
  var deflate = zlib.createDeflate();
  source = new wrappers.CallbackReadWrapper(source);
  response = new wrappers.CallbackWriteWrapper(response);
  deflate = new wrappers.CallbackWrapper(deflate);
  pump(source, deflate);
  pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');


function pump(inStream, outStream, _) {
  var data;
  do {
    data = inStream.read(_);
    outStream.write(data, _);
  } while (data != null);
}

This program too behaves like the original one.

Conclusions

This experiment demonstrates that event-based and callback-based streams are equivalent. My preference goes to the callback version, as you may have guessed. I’m submitting this as I think that it should be given some consideration when discussing evolutions of the stream API.

Notes:

  • The APIs are not completely equivalent though. One difference is that the event-driven API supports multiple observers. But in most pumping/piping scenarios the stream has a single observer. And callback APIs can also be tweaked to support multiple observers (streamline’s futures support that).
  • It is also important to verify that the flow control patterns are similar and that, for example, the callback version does not do excessive buffering. This is the case as the queues don’t hold more than two elements in pumping/piping scenarios.

Source code is available as a gist.

This entry was posted in Asynchronous JavaScript, Uncategorized. Bookmark the permalink.

6 Responses to Node.js stream API: events or callbacks?

  1. ket nonting says:

    I do not fully understand this, but do you mean this code send data chunk instead of continuous data? Is there any live demo? Thanks

    • Hi Ket,

      Both APIs work with chunks. The event API is a push-style API as it pushes the chunks as data events. The callback API is more pull-style as the stream consumer reads the chunks with a read call.

      It is a bit difficult to imagine a continuous stream API. Whether you choose events or callbacks you have to break the data in chunks anyway. Maybe a chunk size of 1 could be considered continuous but it is inefficient.

      My intent with this article was to show that the two API styles are equivalent. This is is a rather academic exercise and the specifics of the demo program that I used are rather anecdotal (but I wanted a demo that would be non trivial, hence the two pumps). You can run it (just get the gist) but don’t expect it to do anything spectacular (it just serves a file with compression). The interesting part is that the 3 demo programs of the gist give the same result and and that they handle back-pressure with similar memory requirements.

  2. Mihai Ene says:

    I often add multiple observers to event emitters, one of those observers observing “once”. How would you include the “once” as a callback, without a naive (if()) implementation?

  3. Jeroen says:

    It appears to me these recommendations have partly made it into the stream API. Writable streams take a callback as third argument. Not sure if those callbacks already existed back then?

    Also note that readable streams have a synchronous read() method where data (if available) is immediately returned. I very much like the idea of asynchronous read as outlined in this article.

    • The streams2 API did not exist at the time. It has probably been influenced by the discussions I had with the node.js team. But I still find the outcome too complex. Why introduce a dual API with a read() call and a readable event when a simple callback-based API does the job? I’m amazed when I see the number of questions that get raised around the streams2 API (in GitHub issues and in the mailing list). Most of these questions are a direct consequence of the complexity of the API and they go away with the simpler API.

      Also the simple API lends itself naturally to monadic designs. See https://github.com/Sage/ez-streams. What matters here is not that just the familiar array-like API but also the ease of exposing all sorts of components as readable or writable streams. See the device source files in https://github.com/Sage/ez-streams/tree/master/lib/devices.

      We are sticking to the simpler API and introducing the ez-streams wrappers in our project. It’s been working really well so far. As we can map both ways with node streams, there is no real risk.

      • Jeroen says:

        Thanks, I understand now that the synchronous read() call is supposed to be used in conjunction with the “readable” event. My main concern about the “readable” event is its incompatibility with the event-based flowing mode (triggered by calling .on(“data”)). E.g., consider the following simplification of a very realistic scenario:

        var PassThrough = require(“stream”).PassThrough;
        var readable = new PassThrough();
        var writable = new PassThrough();
        readable.pipe(writable);
        readable.pause();

        It throws a ‘Cannot switch to old mode now.’ error.
        It appears that .pipe() uses the newer “readable” event and thereby renders it useless in combination with .pause() and .resume();

        Considering your wrappers in this post being based on .pause() and .resume() as well, is ez-streams not experiencing similar problems in combination with .pipe()?

Leave a comment