Easy node.js streams

JavaScript is a great playground for experimentation. After ES6 generators and Galaxy I went back to one of my pet topics: streams. The simple streams API that we have been using in our product works really well but I was getting a bit frustrated with it: too low level! Working with this simple read/write API felt a bit like working with arrays without the ES5 functional goodies (forEach, filter, map, reduce, etc.). You get the job done with loops but you lack the elegance of functional chaining. So, I decided to fix it and a new project was born: ez-streams.

I had been keeping an eye on streams2, the streaming API that got introduced in node 0.10.0, but I have not been convinced: too complex, people seem to be struggling with it, exception handling has problems, etc. So I went with a different design. Compatibility with steams2 was crucial but it was just too hard to get where I wanted to go by building directly on top of it.

The ez-streams project is now starting to take shape and I’ve just published a first version to NPM. The README gives an overview of the API and I don’t want to repeat it here. You should probably glance through it before reading this post to get a feel for the API. Here I want to focus on API design issues and explain why I took this route.

This project is a natural continuation of my earlier work on streamline.js. So I will be using the streamline syntax for the examples in this post. But the ez-streams API is just a regular callback based API and you don’t have to write your code with streamline to use it. You can call it from regular JavaScript code. I have included pure callback versions of some of the examples, to show how the API plays with vanilla JavaScript.

Minimal essential API

The first idea in this project was to keep the essential API as small and simple as possible. The essential ez-streams API consists in two function signatures:

  • an asynchronous read(_) function which characterizes reader streams.
  • an asynchronous write(_, val) function which characterizes writer streams.

The complete reader API is much more sophisticated but all the other calls are implemented directly or indirectly around the read(_) call. This makes it very easy to implement readers: all you have to do is pass a read function to a helper that will decorate it with the rest of the API.

For example, here is how you can expose a mongodb cursor as an EZ stream:

var reader = function(cursor) {
    return ez.devices.generic.reader(function(_) {
        var obj = cursor.nextObject(_);
        return obj == null ? undefined : obj;
    });
}

Also, there was no reason to limit this API to string and buffer types: read could very well return integers, Booleans, objects. Even null or undefined. I decided to reserve undefined as an end-of-stream marker because I wanted streams to be able to transport all the types that are serializable in JSON. Symmetrically, I used undefined as end-of-stream marker for write. So there was no need for a separate end method, writer.write(_) would do the job.

As a consequence the API is not tainted by datatype specific issues. For example there is nothing in the reader and writer API about string encoding. This issue is handled in the devices that you put at both ends of your data processing chains. Better keep things orthogonal!

I may sound like an extremist in API minimalism here but I think that this is a very important point. A simple API is easier to wrap around existing APIs, it lends itself naturally to algebraic (monadic) designs, etc. This is probably the main reason why I did not go with node’s stream APIs (version 1 or 2).

Function application rather than pipes

The EZ streams design is directly influenced by ES5’s functional array API. It actually started as an attempt to mimick completely the ES5 design and the rest of the API followed naturally. It is also more remotely influenced by jQuery.

There is a pipe function in the EZ streams API but it plays a less prominent role than in node’s standard stream API. The pipe calls do not appear between processing steps. Instead, pipe only appears at the end of the chains, to transfer the data to a writer. The typical structure of an EZ streams chain is:

reader . op1(fn1) . op2(fn2)  .... opN(fnN) . pipe(_, writer)

All operations produce a reader, except the last one which is a reducer. pipe is a reducer but it is not the only one: forEach, some, every and of course reduce are all reducers and you can end your chains with any of them.

Most operations take a callback function as parameter (fn1, fn2, etc. above). The callback depends on the operation. It can be a filter, a mapper, a complex transform, etc. These callbacks allow you to inject your own logic into the chain.

The classical node pattern is different. It is directly inspired from UNIX’s command piping:

source | op1 | op2 | ... | opN

which becomes:

source . pipe(stream1) . pipe(stream2) ... .pipe(streamN)

The node design forces you to package your logic as streams, usually duplex streams which receive data from one pipe, transform it and send their results to another pipe.

I find the EZ stream design more natural and easier to use: it does not force you to package your code as streams and handle low-level stream events. Instead, you just have to provide a set of callbacks that are specific to the operations that you apply. Moreover, the most basic operations, like filter and map are aligned on the ES5 array API. So you are using familiar patterns.

Mixing functional and imperative styles

The general structure of an ez-streams processing chain is very functional. The basic operations (filter, map, reduce) are directly modelled after the ES5 functional array API. They are applied to a reader, and they produce another reader on which other operations can be chained.

But there is one important operation that somehow violates this rule: transform. The transform call itself is functional and is chained exactly like the other operations. But its callback receives 3 parameters: a continuation callback, a reader and a writer. You write the body of your transformation as a function that reads its input from the reader parameter and writes its output to the writer parameter.

Let us look at the CSV parser that I used as example in the README:

var csvParser = function(_, reader, writer) {
	// get a lines parser from our transforms library
	var linesParser = ez.transforms.lines.parser();
	// transform the raw text reader into a lines reader
	reader = reader.transform(linesParser);
	// read the first line and split it to get the keys
	var keys = reader.read(_).split(',');
	// read the other lines
	reader.forEach(_, function(_, line) {
		// ignore empty line (we get one at the end if file is terminated by newline)
		if (line.length === 0) return;
		// split the line to get the values
		var values = line.split(',');
		// convert it to an object with the keys that we got before
		var obj = {};
		keys.forEach(function(key, i) {
			obj[key] = values[i];
		});
		// send the object downwards.
		writer.write(_, obj);
	});
};

This is clearly imperative style. Full of calls like read, write or forEach that smell side-effects.

This could be seen as a weakness of the design. Why introduce imperative style in this wonderful functional world?

The reason is simple: because it is usually easier to write transforms in imperative style!

If instead you try to write your transforms directly as chainable functions, you have to write functions that transform a reader into another reader. These functions usually take the form of a state automata. You have to write state machines!

Some folks find it natural and fun to write state machines. I don’t! I find it more difficult and more error prone than writing mundane loops with read and write calls. State machines are great but I’d rather let a program generate them than write them myself (I love regular expressions).

So the role of the transform function is simply to put the developpers back into their imperative shoes (*).

(*) Fortunately I noticed my horrible mistake and rephrased this in gender-neutral form before publishing. I don’t want to be the next victim!

When it comes to programming styles my religion is that you should just be pragmatic and use the style that best fits your problem instead of trying to fit everything into one style which has been arbitrarily designated as superior. Functional, imperative and object oriented styles all have a role to play in modern programming and a good developer is someone who uses the right style at the right moment, not someone why tries to force everything into a single style. I’d have a lot more to say but I’ll keep it for another post.

Exception handling

Exception handling works rather naturally with EZ streams: all processing chains are terminated by a reducer, and this reducer, unlike the previous chain elements, takes a continuation callback as first parameter. Exceptions that occur inside the chain are funneled through this continuation callback.

So, if you write your code with streamline.js, you can trap the exceptions with a try/catch around the whole chain. For example:

try {
    ez.devices.file.text.reader('users.csv').transform(ez.transforms.csv.parser())
        .filter(function(_, item) {
            return item.gender === 'F';
        })
        .transform(ez.transforms.json.formatter({ space: '\t' }))
        .pipe(_, ez.devices.file.text.writer('females.json'));
} catch (ex) {
    logger.write(_, ex);
}

If you use EZ streams with raw callbacks, you just need to test the first parameter of your continuation callback. The previous example becomes:

ez.devices.file.text.reader('users.csv').transform(ez.transforms.csv.parser())
    .filter(function(cb, item) {
        cb(null, item.gender === 'F');
    })
    .transform(ez.transforms.json.formatter({ space: '\t' }))
    .pipe(function(err) {
        if (err) logger.write(function(e) {}, err);
    }, ez.devices.file.text.writer('females.json'));

Of course, you can also trap exceptions in all the processing callbacks that you install in the chain (the callbacks that you pass to filter, map, transform, etc.). If you trap an exception in such callbacks and return a normal result instead, processing will continue as usual and your reducer callback will not receive the exception.

So you do not have to use domains or other advanced error handling techniques; the EZ streams API is just a regular API with continuation callbacks and exceptions are always propagated through these callbacks. If they are not, this is a bug.

Backpressure and buffering

Developers who implement node streams keep talking about backpressure. From what I understand, they have to write special code to inform their inputs when outputs are not processing data fast enough, so that the inputs get paused. Then, once the outputs get drained a bit, the inputs can be resumed.

Frankly, I do not understand any of this. We have been writing a lot of code with the low-level read/write API (the essential API) and we have never run into situations where we would need to worry about backpressure and write special code to pause inputs.

This is because EZ streams handle read and write operations in a decoupled way at the low level. When wrapping a node readable stream, our read function buffers a bit of input with low and high water marks. We pause the stream when the high mark is reached and we resume it when the buffer goes below the low mark. On the output side, our write wrapper handles the little drain event dance so that we don’t overflow the output buffers. There is no explicit coordination between inputs and outputs, it works almost magically, thanks to the event loop:

  • If the input is too fast when we pipe data, the input stream gets paused when its buffer hits the high mark. Then the output gets a chance to drain its output buffers and process the data which has been buffered upstream. When the input buffers fall below the low mark, the input stream will be resumed and it will likely fill again its input buffers before the output gets drained. So input will be paused again, etc., etc.
  • If, on the other hand, the output is faster, the input stream will have empty buffers at all times and the pipe will be waiting for input most of the time.

So backpressure is a non issue with EZ streams. You don’t need to worry about it!

What you should worry about instead is buffering because it will impact the throughput of your stream chains. If you do not buffer at all, your pipeline will likely be inefficient because data will remain buffered at the source of the chain whenever some other operation is waiting for I/O further down the line. To keep the data flowing you need to inject buffers into the chain. These buffers will keep the upstream chain busy (until they fill, of course) while the downstream operations are waiting for I/O. Then, when the downstream operation will be ready to accept new input, it will get it from the buffer instead of having to pull it from the beginning of the chain.

The EZ streams API includes a buffer operation that you can inject in your processing chains to add buffering. The typical pattern is:

reader.transform(T1).buffer(N).transform(T2).pipe(_, writer);

This will buffer N items between transforms T1 and T2.

Note that buffering can become really tricky when you start to have diamond shape topologies (a fork followed by a join). If you are unlucky and if one of the branches is systematically dequeued faster than the other, you will need illimited buffering in the fork node to keep things flowing. I hit this in one of my unit tests but fortunately this was with a very academic example and it seems unlikely to hit this problem with real data processing chains. But who knows?

Streams and pipes revisited

I have been rather critical of node’s streaming/piping philosophy in the past. I just did not buy the idea that code would be packaged as streams and that you would assemble a whole application by just piping streams into each other. I think that my reluctance came primarily from the complexity of the API. Implementing a node.js stream is a real endeavor, and I could just not imagine our team use such a complex API as a standard code packaging pattern.

I’ve been playing with ez-streams for a few weeks now, and I’m starting to really like the idea of exposing a lot of data through the reader API, and of packaging a lot of operations as reusable and configurable filters, mappers, transforms, etc. So I feel that I’m getting more into the streams and pipes vision. But I only buy it if the API is simple and algebraic.

One word of caution to close this post: the ez-streams implementation is still immature. I have written basic unit tests, and they pass but I’m very far from having tested all possible edge conditions. So don’t expect everything to be perfectly oiled. On the other hand, I’m rather pleased with the current shape of the API and I don’t expect to make fundamental changes, except maybe in advanced operations like fork and join.

Advertisements
This entry was posted in Uncategorized. Bookmark the permalink.

6 Responses to Easy node.js streams

  1. David says:

    I’m fascinated by your journey for better streams and read your related posts, though I haven’t used any of your tools yet.

    I’ve used node streams for reading and storing data coming from hardware sensors. I’ve also used it for transforming and serving data from databases. The fundamental difference here is that when certain stream sources, like sensors or events cannot be paused, push pattern makes sense. When using a stream with a controllable datasource, a pull stream makes more sense, as you can control the flow from source. What are your thoughts on handling these differences?

    Also what do you think about the highland.js library which is being developed by the guy who wrote async? Hes come up with some nice patterns for dealing with forking and joining streams, parallel and synchronous execution and dealing with streams which emit streams.

    • What can you do if stream cannot be paused? I see two options: buffer or discard.

      You can also use fast external storage to limit the amount of buffering in memory. But, if your processing / output speed cannot cope with a high steady input rate it seems like you’ll have to start discarding input at some point.

      And this, regardless of how you process the stream (push or pull). It you pull, buffering is natural. In push mode it may feel like you are not buffering but this is just an illusion: you’ll be keeping data in your processing pipeline (usually trapped inside closures – typically waiting for a drain event to signal that the way out is open)!

      With ez-streams, incoming data will be buffered if the stream cannot be paused. It would make sense to have an option to set a limit on buffering and start discarding above the limit, or the ability of diverting the flow to fast external storage (disk) above a limit. I haven’t implemented this because we are dealing with pausable streams in our application.

      I have looked at highland.js. It looks similar to ez-streams. One important difference is that highland uses synchronous callbacks in the various operations (map, filter, …) while ez-streams uses asynchronous callbacks. For us, it is very important to have asynchronous callbacks everywhere because some of the tests require external queries (database or files).

      highland has nice patterns for advanced forking/joining. My focus was more on the basic operations and on trying to make them as simple as possible. To me the strength of ez-streams is in the ease of creating streams (the devices library) and transforms (the transforms library).

  2. Pingback: My humble experiences with Node.js | Tabagus - The Web and Data Portal

  3. Michael Smith says:

    How do you specify how many bytes you want to read with read()?

    When you talk about being able to pull different types of data such as number, I was envisioning something similar to:

    stream.read(4,function(buf) {
        var n = buf.readInt32LE(0)
    })
    
  4. The generic API deals with any datatype which is why there is no length parameter (length is natural with strings and buffers, but not with objects).

    But there is a helper API to deal specifically with binary streams:

    var rawReader = ez.devices.files.reader('foo.bar');
    var reader = ez.helpers.binary.reader(rawReader, options);
    var len = reader.readInt32(_);
    var buf = reader.read(_, len);
    

    The helper also contains methods to peek (read without advancing) and unread. This is handy for parsers that need lookahead.

    The complete API is on https://github.com/Sage/ez-streams/blob/master/src/helpers/binary.md

  5. Michael,

    FYI, I recently created a variant of ez-streams that eliminates the callback parameter. I published it on https://github.com/Sage/f-streams (it is based on https://github.com/Sage/f-promise).

    With this library, the code becomes:

    import { run } from 'f-promise';
    import { helpers, devices } from 'f-streams';
    
    function foo() {
      const rawReader = devices.files.reader('foo.bar');
      const reader = helpers.binary.reader(rawReader, options);
      let len = reader.readInt32();
      let buf = reader.read(len);
      // more...
    }
    
    run(foo).catch(err => err && console.error(err));
    

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s