Archive for the ‘Node.js’ Category

Node.js: Event Emitters and Listeners

When building client side applications with Javascript, events and handling them are quite common. Consider, for example,


$('#toBeClicked').on('click', function() { alert('clicked') })

Here we handle a click event on the element with id ‘toBeClicked’. When the element is clicked, a ‘click’ event is emitted which is handled by the above statement written by us.

Just like in the DOM,  many objects in node emit events. When they do so, they inherit from the EventEmitter constructor.

Lets straight away build a custom event emitter and see whats going on,


EventEmitter = require('events').EventEmitter       // The Event Emitter constructor in the events.js module .

Now, we create our custom emitter which is an instance of EventEmitter,

emitter = new EventEmitter();

We listen for the error event,

emitter.on('error', function(msg1, msg2) { console.log('ERR: ' + msg1 + ' ' + msg2 ) })

Here, function(msg1, msg2) { console.log('ERR' + msg1 + ' ' + msg2 ) }

is the callback to be performed once the event is emitted,

We could also listen as,

emitter.addListener('error', function(msg1, msg2) { console.log('ERR: ' + msg1 + ' ' + msg2 ) } )

Now we emit the error event,

emitter.emit('error', 'Bug detected', '@ Step 4')

Once the ‘error’ event is emitted, the listener performs the callback and we see the error logged in the console(as we have written in the callback function)
ERR: Bug detected @ Step 4

We could add listeners on any more custom events and handle them when the event is emitted.

Now that we have got to know how events are emitted and handled via listeners, lets try out a small server that listens for requests and processes them.


var http = require('http'),
    sys = require('sys');

var server = http.createServer(function(request, response) {
  request.on('data', function (chunk) { console.log(chunk.toString()); });

  request.on('end', function() {
    response.write('Request Completed!');
    response.end();
  });

});

console.log("Starting up the server");
server.listen(8000);

Here, http.createServer method returns an object which inherits from the EventEmitter constructor.

Check out the nodejs api doc for the createServer method in the http.js module,

http.createServer([requestListener]), It says that requestListener is a function which is automatically added to the ‘request’ event.

To check whats going on behind the scenes here, lets dive into the code base of nodejs,
As can be seen from code, the createServer method is within the http.js module. Inspecting the http.js module,

exports.createServer = function(requestListener) {
  return new Server(requestListener);
};

Check for the _http_server.js module to find the Server constructor which has the following lines of code,

if (requestListener) {
  this.addListener('request', requestListener); // event listener for the server
}

As per the above snippet, ‘this‘(the current server instance ) listens for the ‘request’ event and attaches the requestListener function to be performed when the event is emitted.

Here,

function(request, response) {
  request.on('data', function (chunk) { console.log(chunk.toString()); });

  request.on('end', function() {
    response.write('Request Completed!');
    response.end();
  });
}

is our requestListener.

Now, further inspecting the _http_server.js modulewe could also see how the request event is emitted,

self.emit('request', req, res); // event emitter for the server

‘req’ and ‘res’ are the request, response objects that are passed as arguments to the requestListener function called when the ‘request’ event is emitted. Here self is ‘this’ ( our current server instance).

We could well make the server listen for the request event on our own. For this,

var server = http.createServer()
server.on('request', function(request, response) {
  request.on('data', function (chunk) { console.log(chunk.toString()); });

  request.on('end', function() {
    response.write('Request Completed!');
    response.end();
  })
});

Here when we create the server instance, we do not pass the requestListener( Do notice that requestListener was only optional in http.createServer([requestListener]) ). Instead we attach a listener of our own on the server which listens to the ‘request’ event and performs the callback function when the request event is emitted, i.e,

server.on('request', function(request, response) { ... });
Advertisements

Node.js: Streams and pipes.

Node.js is used for building a lot of network applications and there’s a lot of data being passed around. This could well be huge in size. In node, all this data is processed the moment its received, piece by piece. This is done with the help of streams. Here we discuss the usage of streams by writing a small node script that handles file upload.

Here’s the actual piece of code that handles a file upload and responds back to the client with the progress of the upload.

  var http = require('http'),
  sys = require('sys'),
  fs = require('fs');

  var server = http.createServer();
  console.log("Starting up the server");
  server.listen(8000);

  server.on('request', function(request, response) {
    var file = fs.createWriteStream('copy.csv');
    var fileSize = request.headers['content-length'];
    var uploadedSize = 0;

    request.on('data', function (chunk) {
      uploadedSize += chunk.length;
      uploadProgress = (uploadedSize/fileSize) * 100;
      response.write(Math.round(uploadProgress) + "%" + " uploaded\n" );
      var bufferStore = file.write(chunk);
      if(bufferStore == false)
        request.pause();
    });

    file.on('drain', function() {
      request.resume();
    })

    request.on('end', function() {
      response.write('Upload done!');
      response.end();
    })

  });

The basics: We create a node server that listens on port 8000. Upon receival of a request, we create a write stream ( the destination file path ). Each chunk of data received is written on to the destination path, the upload progress is calculated and responded back.

Lets break up the above snippet into pieces and make an analysis of whats happening.

A writeStream is created and ‘copy.csv’ is the destination path to which the received data will be written.

  var file = fs.createWriteStream('copy.csv');

The following piece forms the core of the upload process.

  request.on('data', function (chunk) {
    var bufferStore = file.write(chunk);
    if(bufferStore == false)
      request.pause();
    uploadedSize += chunk.length;
    uploadProgress = (uploadedSize/fileSize) * 100;
    response.write(Math.round(uploadProgress) + "%" + " uploaded\n" );
  });

  file.on('drain', function() {
    request.resume();
  })

Looking at the code – on receiving each chunk of data ( via the read stream ), its written to the write stream as
file.write(chunk);

Right now, we need to pause a bit to check whether there might be a cause of worry in this whole read-write streaming process. The answer is yes, and is very obvious. There exists a real possibility that the rate at which the data is written to the writeStream is less than the rate at which its read from the readStream. This is a genuine cause of concern and hence cannot be ignored. How we handle this forms our next two lines of code.

file.write(chunk) stores the data onto a buffer. It returns true if the write was performed and returns false if the write failed due to the buffer being full. So, we need to handle this by pausing the readStream if the buffer storage is full.

  var bufferStore = file.write(chunk);
  if(bufferStore == false)
    request.pause();

Also, we need to re-start streaming data from the read stream once the buffer is drained out. The following lines of code does just that.

  file.on('drain', function() {
    request.resume();
  })

Pipes in node: Here, we have handled the logic of keeping the read – write rate to be in sync. Node.js provides us with pipes which has this logic already encapsulated in it.

The following line,

request.pipe(file) // The notion is quite similar to UNIX pipes. Pipes the input into an output stream.

would be equivalent to

  request.on('data', function(chunk) {
    var bufferStore = file.write(chunk);
    if(bufferStore == false)
      request.pause();
  })

  file.on('drain', function() {
    request.resume();
  })

Pipe by itself maintains the read write rate to be in sync by pausing and resuming when necessary.

Now since we have handled our cause of concern, all that is left is to calculate the upload percentage upon receiving each chunk of data and respond back with the calculated percentage.

  uploadedSize += chunk.length;
  uploadProgress = (uploadedSize/fileSize) * 100;
  response.write(Math.round(uploadProgress) + "%" + " uploaded\n" );

Do note that the actual size of the upload file is calculated from the request headers.

var fileSize = request.headers['content-length'];

Now, when the request ends ( i.e the ‘end’ event is emitted by the request ), the final chunk of response is given back to the client indicating that our file upload has been done successfully.

To test this, run the node server and try making a request, something like this:

curl -v --upload-file "upload_file.csv" "http://localhost:8000"

and the upload progress could be tracked.