Thursday, December 04, 2008

Publish Subscribe in Javascript

Recently, I came across a couple of articles on Ajaxian that reminded me of an implementation I had written a few of months back. This can be easily extended to implement the HTML5 "postMessage" functionality in browsers not supporting HTML5 (I am working on this currently). Additionally, with a little additions, this library can be also used to implement asynchronous comet-like messages from the sever. Here's a sample usage code for the library usage and the library itself:

Usage:


/*
 * Subscribe to all the channels. Subscriptions can be wildcards e.g.
 * "/foo/bar"   : Subscribes to the channel /foo/bar only
 * "/foo/bar/*"  : Subscribes to the channel all the children of /foo/bar/ e.g. /foo/bar/baz, /foo/bar/buz but not /foo/bar/baz/boo
 * "/foo/bar/**" : Subscribes to all the descentends of /foo/bar/ e.g. /foo/bar/baz, /foo/bar/baz/boo, /foo/bar/bar/baz/boo
 * Below, we shall receive all the messages to all the channels
 */

jaf
.core.MessageBus.subscribe("/**", function(objMsg, strChannel) {
  alert
(strChannel + ": " + objMsg);
});

/*
 * To publish a message, just call the MessageBus's publish method like so:
 */

jaf
.core.MessageBus.publish("/foo/bar", {
   name
: "MessageName",
   messageHeader
: "This is a header string but can be any object",
   body
: "This is body but can be any arbitrary object"
});



Library:

/**
 * Copyright:
Aniket Naik
 * From the "jaf"
 *
 */

// create a jaf.core namespace
// jaf.namespace("jaf.core");

var jaf = {};
jaf
.core = {};

/**
 * @class
 * This class contains utility functions required throughout the framework
 */

jaf
.core.Util = function()   {
   
/** @scope jaf.core.Util */
   
return   {
     
/**
       * Trims the white spaces from the begining of the specified String
       * @param {String} s The String to trim
       * @return The "left trimmed" string
       */

      ltrim
: function(s) {
         
return s.replace(/(^\s+)/g, "");  
     
},
     
     
/**
       * Trims the white spaces from the end of the specified String
       * @param {String} s The String to trim
       * @return The "right trimmed" string
       */

      rtrim
: function(s) {
         
return s.replace(/\s+$/g, "");  
     
},
     
     
/**
       * Trims the white spaces from the begining and the end of the specified String
       * @param {String} s The String to trim
       * @return The "trimmed" string
       */

      trim
: function(s) {
         
return s.replace(/(^\s+)|\s+$/g, "");  
     
},
     
     
/**
       * Gets whether the specified String
s starts with the given String
       *
strSearch
       * @param {String} s The string to search
       * @param {String} strSearch The search String
       * @return true if
s starts with strSearch
       * @see endsWith(String, String)
       */

      startsWith
: function(s, strSearch)  {
         
return s.indexOf(strSearch) == 0;
     
},
     
     
hasOwnProperty: function(object, property)   {
         
if(object.hasOwnProperty)  {
           
return object.hasOwnProperty(property);
         
}
         
var prop = object[property];
         
return typeof(pro) !== "undefined" && prop !== object.constructor.prototype[property];
     
},
     
     
/**
       * Gets whether the specified String
s ends with the given String
       *
strSearch
       * @param {String} s The string to search
       * @param {String} strSearch The search String
       * @return true if
s ends with strSearch
       * @see startsWith(String, String)
       */

      endsWith
: function(s, strSearch)   {
         
if(s.indexOf(strSearch) == -1)   {
           
return false;
         
}
         
return (s.lastIndexOf(strSearch) + strSearch.length) == s.length;
     
},
     
     
/**
       * Gets the string representation of the specified object. This method is
       * used for debugging
       * @param {Object} Object to convert to string
       * @return {String} The string representation of the object
       */

      toObjectSource
: function(obj)   {
         
if(obj == null)   {
           
return "[null]";
         
}
         
if(obj == undefined) {
           
return "[undefined]";
         
}
         
         
var str = "[";
         
var member = null;
         
for(var each in obj) {
            member
= obj[each];
            str
+= each + "=" + member + ", "
         
}
         
return str + "]";
     
},
     
     
/**
       * Gets all the keys (properties) of the given object as an array
       * @param {Object} obj The object to retrieve keys
       * @param {Array} Properties of the object as an array
       */

      objectKeys
: function(obj)  {
         
var arr = [];
         
for(key in obj)   {
           
if(this.hasOwnProperty(obj, key))   {
               arr
.push(key);
           
}
         
}
         
return arr;
     
}
     
   
};
}();


/**
 * A Channel/Topic/Queue to which messages are published. Each channel is represented
 * by a path name of the form /some/channel/name which is hierarchical
 * @constructor
 * @param {String} strName The name of this channel. Should be of the form /some/name
 */

jaf
.core.Channel = function(strName)  {
   
var name = strName;
   
var subscribers = [];
   
   
/** @scope jaf.core.Channel */
   
return {
     
/**
       * Adds a subscriber to this channel. All the messages published to this
       * channel will be delivered to it.
       * @param {Function} funSubscriber The subscriber function. This function takes one
       *        parameter, the message object
       */

      addSubscriber
: function(funSubscriber) {
         
if(typeof(funSubscriber) === "function")  {
            subscribers
.push(funSubscriber);
         
}
     
},
     
     
/**
       * Removes a previously added subscriber from this channel
       * @param {Function} funSubscriber The subscriber to remove
       */

      removeSubscriber
: function(funSubscriber) {
         
for(var i = 0, len = subscribers.length; i < len; i++)   {
           
if(subscribers[i] === funSubscriber) {
               
return subscribers.splice(i, 1);
           
}
         
}
         
return null;
     
},
     
     
/**
       * Gets all subscribers as array
       * @return {Array} an array of subscribers added to this channel
       */

      getSubscribers
: function()  {
         
var len = subscribers.length;
         
var copy  = new Array(len);
         
for(var i = 0; i < len; i++)  {
            copy
[i] = subscribers[i];
         
}
         
return copy;
     
},
     
     
/**
       * Publishes a messages to this channel. All the subscribers added to this
       * channel will receive this message
       * @param {Object} objMessage The message to publish
       */

      publish
: function(objMessage) {
         
for(var i = 0, len = subscribers.length; i < len; i++)   {
            subscribers
[i](objMessage, strName);
         
}
     
},
     
      getName
: function()  {
         
return name;
     
},
     
     
toString: function() {
         
return "Channel[" + name + "]";
     
}
   
};
};


/**
 * @class
 * Defines a simple publish/subscribe messaging system where messages can be sent
 * or subscribed for.
 * @constructor
 */

jaf
.core.MessageBus = function(parentMessageBus)   {
   
var channels = {};
   
var patternSubscribers = {};
   
var Util = jaf.core.Util;
   
   
/**
    * IDEA: The parent child relationship will work as follows:
    * This message bus will subscribe to the parent message bus and listen to
    * all messages on all channels. As soon as messages come to those channels,
    * they will be published to channels with same path with this message bus
    * On the unload of this document or view, the listener to the parent message
    * bus is removed
    */

   
var parent = parentMessageBus || null;
   
   
/**
    * Creates a new Channel for the specified name. This method is always called
    * after calling checkChannelName(name)
    * @param {String} theName The name for this channel. This should be of the
    * form "/some/name"
    * @return {Channel} the newly created channel object
    */

   
function createChannel(theName)   {
     
// trim the trailing slash if any
     
if(theName.length > 1 && Util.endsWith(theName, "/"))  {
         theName
= theName.substring(0, theName.length - 1);
     
}
     
return new jaf.core.Channel(theName);
   
}
   
   
/**
    * Gets the segments of the specified channel name as array.
    * e.g. If the channel name is specified as "/some/name",  an array containing
    * two segments is returned like this: ["some", "name"]
    * @param {String} theName The channel name
    * @return {Array} The array containing the segments
    */

   
function getSegments(theName) {
     
var segs = theName.split("/");
     
if(segs[0] === "")  {
         segs
.slice(0, 1);
     
}
     
return segs;
     
// return theName.split("/");
   
}
   
   
/**
    * Checks whether the specified channel name is a valid name
    * @param {String} theName The name of the channel
    */

   
function checkChannelName(theName)   {
     
if(!theName || Util.trim(theName).length == 0 || !Util.startsWith(theName, "/"))   {
         
throw new Error("Invalid channel name: " + theName);
     
}
   
}
   
   
/**
    * Determines if the specified name is a wildcard pattern
    * @return true if the name ends with /* or /**
    */

   
function isPattern(theName)   {
     
return Util.endsWith(theName, "/*") || Util.endsWith(theName, "/**");
   
}
   
   
/**
    * Determines if
strName matches with channel name specified by
    *
strChannel. This method also supports wildcard pattern matching
    * for two wildcards:

    * "/*" For children and "/**" For descendents
    * @return true if strName matches with strChannel
    */

   
function matches(strName, strChannel) {
     
if(strName == strChannel) {
         
return true;
     
}
     
     
if(!isPattern(strName) || isPattern(strChannel))  {
         
return false;
     
}
     
     
// okay theName is a pattern
     
var segs = getSegments(strName);
     
var channelSegs = getSegments(strChannel);
     
var depth = segs.length;
     
var channelDepth = channelSegs.length;
     
     
if(depth > channelDepth)   {
         
return false;
     
}
     
     
// if(isPattern(strName)) {
      segs
.splice(segs.length - 1, 1);
      depth
= segs.length;

     
var channelPart = (channelSegs.slice(0, depth).join("/"));
     
var namePart = (segs.join("/"));
     
if(channelPart !== namePart)  {
         
return false;
     
}

     
if(Util.endsWith(strName, "/**"))   {
         
return true;
     
}else {
         
return (channelDepth - depth) == 1;
     
}
     
// }
   
}
   
   
/**
    * Adds any of the subscribers that are pattern subscribers to a Channel.
    * This is called internally when a message is published to a channel and
    * that channel does not exist yet. When this happens, a new Channel is created
    * and any previously registered pattern subscribers are added to the channel
    * @param {jaf.core.Channel} objChannel The channel to which wildcard or pattern
    * subscribers are added if they match.
    */

   
function addPatternSubscribers(objChannel)   {
     
var patternChannels = Util.objectKeys(patternSubscribers);
     
for(var i = 0, len = patternChannels.length; i < len; i++)  {
         
var patternCh = patternChannels[i];
         
if(matches(patternCh, objChannel.getName()))   {
           
var subscribers = patternSubscribers[patternCh];
           
if(subscribers)   {
               
for(var j = 0, jlen = subscribers.length; j < jlen; j++)  {
                  objChannel
.addSubscriber(subscribers[j]);
               
}
           
}
         
}
     
}
   
}
   
   
return {
     
/**
       * Publishes the message
objMessage to channel strChannel
       * All the subscribers registered with this channel will get the message
       * @param strChannel {String} The name of the channel to publish to
       * @param objMessage {Object} The message to publish
       */

      publish
: function(strChannel, objMessage)  {
         
if(isPattern(strChannel))  {
            alert
("Publishing to wildcard channels coming soon.. ;)");
           
// IDEA: Support publish to wildcard channels?
           
return;
         
}else {
           
var channel = channels[strChannel];
           
if(! channel)  {
               channel
= createChannel(strChannel);
               channels
[strChannel] = channel;
               addPatternSubscribers
(channel);
           
}
            channel
.publish(objMessage);
         
}
     
},
     
     
/**
       * Subscribes to specified channel. If the channel specified by
strChannel
       * does not exist, its created. Any message published to this channel will
       * be delivered to
funCallback who is the subscriber to this channel
       * @param strChannel {String} The name of the channel. This can be a wildcard
       * pattern like so: /some/channel/* or some/channel
       * @param funCallback {Function} The callback function that will be called
       * with message as the parameter
       */

      subscribe
: function(strChannel, funCallback)   {
         
// check if the channel name is valid
         checkChannelName
(strChannel);
         
         
// check for valid callback function
         
if(!typeof(funCallback) === "function")   {
           
throw new Error("Invalid subsription callback. Must be a function");
         
}
         
         
var subscriber = funCallback;
         
         
// check if this subscription is a pattern i.e. /some/name/** | some/name/*
         
if(isPattern(strChannel))  {
           
if(patternSubscribers[strChannel])  {
               patternSubscribers
[strChannel].push(subscriber);
           
}else {
               patternSubscribers
[strChannel] = [subscriber];
           
}
           
           
// check if this subscriber matches for any of the channel already created
           
var channelNames = Util.objectKeys(channels);
           
for(var i = 0, len = channelNames.length; i < len; i++)  {
               
var cName = channelNames[i];
               
if(matches(strChannel, cName)) {
                  channels
[cName].addSubscriber(subscriber);
               
}
           
}
         
}else {  //no pattern so existing or a new channel
           
var channel = channels[strChannel];
           
if(! channel)  {
               channel
= createChannel(strChannel);
               channels
[strChannel] = channel;
           
}
            channel
.addSubscriber(subscriber);
         
}        
     
},
     
     
/**
       * Unsubscribes to specified channel.
       * @param strChannel {String} The name of the channel (can be a wildcard
       * in which case funCallback from all the matching channels is removed
       * @param funCallback {Function} The callback function that was used when
       * subscribing
       */

      unsubscribe
: function(strChannel, funCallback) {
         
var subscriber = null;
         
if(isPattern(strChannel))  {
           
var arrSubs = patternSubscribers[strChannel];
           
var subsIndex = -1;
           
if(arrSubs) {
               
for(var i = 0, len = arrSubs.length; i < len; i++) {
                 
if(arrSubs[i] == funCallback) {
                     subsIndex
= i;
                     
break;
                 
}
               
}
               
               
if(subsIndex != -1)  {
                  arrSubs
.splice(subsIndex, 1);
               
}
           
}
         
}
         
         
// check if this subscriber matches for any of the channel already created
         
var channelNames = Util.objectKeys(channels);
         
for(var j = 0, jlen = channelNames.length; j < jlen; j++)  {
           
var cName = channelNames[j];
           
if(matches(cName, strChannel)) {
               subscriber
= channels[cName].removeSubscriber(subscriber);
           
}
         
}
     
},
     
     
toString: function() {
         
return "MessageBus"
     
}
   
};
}(null);