Node.js: Streams and pipes.

Node.js is used for building a lot of network applications and there’s a lot of data being passed around. This could well be huge in size. In node, all this data is processed the moment its received, piece by piece. This is done with the help of streams. Here we discuss the usage of streams by writing a small node script that handles file upload.

Here’s the actual piece of code that handles a file upload and responds back to the client with the progress of the upload.

  var http = require('http'),
  sys = require('sys'),
  fs = require('fs');

  var server = http.createServer();
  console.log("Starting up the server");
  server.listen(8000);

  server.on('request', function(request, response) {
    var file = fs.createWriteStream('copy.csv');
    var fileSize = request.headers['content-length'];
    var uploadedSize = 0;

    request.on('data', function (chunk) {
      uploadedSize += chunk.length;
      uploadProgress = (uploadedSize/fileSize) * 100;
      response.write(Math.round(uploadProgress) + "%" + " uploaded\n" );
      var bufferStore = file.write(chunk);
      if(bufferStore == false)
        request.pause();
    });

    file.on('drain', function() {
      request.resume();
    })

    request.on('end', function() {
      response.write('Upload done!');
      response.end();
    })

  });

The basics: We create a node server that listens on port 8000. Upon receival of a request, we create a write stream ( the destination file path ). Each chunk of data received is written on to the destination path, the upload progress is calculated and responded back.

Lets break up the above snippet into pieces and make an analysis of whats happening.

A writeStream is created and ‘copy.csv’ is the destination path to which the received data will be written.

  var file = fs.createWriteStream('copy.csv');

The following piece forms the core of the upload process.

  request.on('data', function (chunk) {
    var bufferStore = file.write(chunk);
    if(bufferStore == false)
      request.pause();
    uploadedSize += chunk.length;
    uploadProgress = (uploadedSize/fileSize) * 100;
    response.write(Math.round(uploadProgress) + "%" + " uploaded\n" );
  });

  file.on('drain', function() {
    request.resume();
  })

Looking at the code – on receiving each chunk of data ( via the read stream ), its written to the write stream as
file.write(chunk);

Right now, we need to pause a bit to check whether there might be a cause of worry in this whole read-write streaming process. The answer is yes, and is very obvious. There exists a real possibility that the rate at which the data is written to the writeStream is less than the rate at which its read from the readStream. This is a genuine cause of concern and hence cannot be ignored. How we handle this forms our next two lines of code.

file.write(chunk) stores the data onto a buffer. It returns true if the write was performed and returns false if the write failed due to the buffer being full. So, we need to handle this by pausing the readStream if the buffer storage is full.

  var bufferStore = file.write(chunk);
  if(bufferStore == false)
    request.pause();

Also, we need to re-start streaming data from the read stream once the buffer is drained out. The following lines of code does just that.

  file.on('drain', function() {
    request.resume();
  })

Pipes in node: Here, we have handled the logic of keeping the read – write rate to be in sync. Node.js provides us with pipes which has this logic already encapsulated in it.

The following line,

request.pipe(file) // The notion is quite similar to UNIX pipes. Pipes the input into an output stream.

would be equivalent to

  request.on('data', function(chunk) {
    var bufferStore = file.write(chunk);
    if(bufferStore == false)
      request.pause();
  })

  file.on('drain', function() {
    request.resume();
  })

Pipe by itself maintains the read write rate to be in sync by pausing and resuming when necessary.

Now since we have handled our cause of concern, all that is left is to calculate the upload percentage upon receiving each chunk of data and respond back with the calculated percentage.

  uploadedSize += chunk.length;
  uploadProgress = (uploadedSize/fileSize) * 100;
  response.write(Math.round(uploadProgress) + "%" + " uploaded\n" );

Do note that the actual size of the upload file is calculated from the request headers.

var fileSize = request.headers['content-length'];

Now, when the request ends ( i.e the ‘end’ event is emitted by the request ), the final chunk of response is given back to the client indicating that our file upload has been done successfully.

To test this, run the node server and try making a request, something like this:

curl -v --upload-file "upload_file.csv" "http://localhost:8000"

and the upload progress could be tracked.

Advertisements

One response to this post.

  1. Good read!. I, being a noob with NodeJs , was able to make most of it. This seems a lot similar like dealing with Unix Sockets & Pipes. I hope some libraries are available with Node for making this task simpler

    Reply

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s