Thursday, December 04, 2008

Publish Subscribe in Javascript

Recently, I came across a couple of articles on Ajaxian that reminded me of an implementation I had written a few of months back. This can be easily extended to implement the HTML5 "postMessage" functionality in browsers not supporting HTML5 (I am working on this currently). Additionally, with a little additions, this library can be also used to implement asynchronous comet-like messages from the sever. Here's a sample usage code for the library usage and the library itself:

Usage:


/*
* Subscribe to all the channels. Subscriptions can be wildcards e.g.
* "/foo/bar" : Subscribes to the channel /foo/bar only
* "/foo/bar/*" : Subscribes to the channel all the children of /foo/bar/ e.g. /foo/bar/baz, /foo/bar/buz but not /foo/bar/baz/boo
* "/foo/bar/**" : Subscribes to all the descentends of /foo/bar/ e.g. /foo/bar/baz, /foo/bar/baz/boo, /foo/bar/bar/baz/boo
* Below, we shall receive all the messages to all the channels
*/
jaf.core.MessageBus.subscribe("/**", function(objMsg, strChannel) {
alert(strChannel + ": " + objMsg);
});

/*
* To publish a message, just call the MessageBus's publish method like so:
*/
jaf.core.MessageBus.publish("/foo/bar", {
name: "MessageName",
messageHeader: "This is a header string but can be any object",
body: "This is body but can be any arbitrary object"
});



Library:

/**
* Copyright: Aniket Naik
* From the "jaf"
*
*/
// create a jaf.core namespace
// jaf.namespace("jaf.core");

var jaf = {};
jaf.core = {};

/**
* @class
* This class contains utility functions required throughout the framework
*/
jaf.core.Util = function() {
/** @scope jaf.core.Util */
return {
/**
* Trims the white spaces from the begining of the specified String
* @param {String} s The String to trim
* @return The "left trimmed" string
*/
ltrim: function(s) {
return s.replace(/(^\s+)/g, "");
},

/**
* Trims the white spaces from the end of the specified String
* @param {String} s The String to trim
* @return The "right trimmed" string
*/
rtrim: function(s) {
return s.replace(/\s+$/g, "");
},

/**
* Trims the white spaces from the begining and the end of the specified String
* @param {String} s The String to trim
* @return The "trimmed" string
*/
trim: function(s) {
return s.replace(/(^\s+)|\s+$/g, "");
},

/**
* Gets whether the specified String s starts with the given String
* strSearch
* @param {String} s The string to search
* @param {String} strSearch The search String
* @return true if s starts with strSearch
* @see endsWith(String, String)
*/
startsWith: function(s, strSearch) {
return s.indexOf(strSearch) == 0;
},

hasOwnProperty: function(object, property) {
if(object.hasOwnProperty) {
return object.hasOwnProperty(property);
}
var prop = object[property];
return typeof(pro) !== "undefined" && prop !== object.constructor.prototype[property];
},

/**
* Gets whether the specified String s ends with the given String
* strSearch
* @param {String} s The string to search
* @param {String} strSearch The search String
* @return true if s ends with strSearch
* @see startsWith(String, String)
*/
endsWith: function(s, strSearch) {
if(s.indexOf(strSearch) == -1) {
return false;
}
return (s.lastIndexOf(strSearch) + strSearch.length) == s.length;
},

/**
* Gets the string representation of the specified object. This method is
* used for debugging
* @param {Object} Object to convert to string
* @return {String} The string representation of the object
*/
toObjectSource: function(obj) {
if(obj == null) {
return "[null]";
}
if(obj == undefined) {
return "[undefined]";
}

var str = "[";
var member = null;
for(var each in obj) {
member = obj[each];
str += each + "=" + member + ", "
}
return str + "]";
},

/**
* Gets all the keys (properties) of the given object as an array
* @param {Object} obj The object to retrieve keys
* @param {Array} Properties of the object as an array
*/
objectKeys: function(obj) {
var arr = [];
for(key in obj) {
if(this.hasOwnProperty(obj, key)) {
arr.push(key);
}
}
return arr;
}

};
}();


/**
* A Channel/Topic/Queue to which messages are published. Each channel is represented
* by a path name of the form /some/channel/name which is hierarchical
* @constructor
* @param {String} strName The name of this channel. Should be of the form /some/name
*/
jaf.core.Channel = function(strName) {
var name = strName;
var subscribers = [];

/** @scope jaf.core.Channel */
return {
/**
* Adds a subscriber to this channel. All the messages published to this
* channel will be delivered to it.
* @param {Function} funSubscriber The subscriber function. This function takes one
* parameter, the message object
*/
addSubscriber: function(funSubscriber) {
if(typeof(funSubscriber) === "function") {
subscribers.push(funSubscriber);
}
},

/**
* Removes a previously added subscriber from this channel
* @param {Function} funSubscriber The subscriber to remove
*/
removeSubscriber: function(funSubscriber) {
for(var i = 0, len = subscribers.length; i < len; i++) {
if(subscribers[i] === funSubscriber) {
return subscribers.splice(i, 1);
}
}
return null;
},

/**
* Gets all subscribers as array
* @return {Array} an array of subscribers added to this channel
*/
getSubscribers: function() {
var len = subscribers.length;
var copy = new Array(len);
for(var i = 0; i < len; i++) {
copy[i] = subscribers[i];
}
return copy;
},

/**
* Publishes a messages to this channel. All the subscribers added to this
* channel will receive this message
* @param {Object} objMessage The message to publish
*/
publish: function(objMessage) {
for(var i = 0, len = subscribers.length; i < len; i++) {
subscribers[i](objMessage, strName);
}
},

getName: function() {
return name;
},

toString: function() {
return "Channel[" + name + "]";
}
};
};


/**
* @class
* Defines a simple publish/subscribe messaging system where messages can be sent
* or subscribed for.
* @constructor
*/
jaf.core.MessageBus = function(parentMessageBus) {
var channels = {};
var patternSubscribers = {};
var Util = jaf.core.Util;

/**
* IDEA: The parent child relationship will work as follows:
* This message bus will subscribe to the parent message bus and listen to
* all messages on all channels. As soon as messages come to those channels,
* they will be published to channels with same path with this message bus
* On the unload of this document or view, the listener to the parent message
* bus is removed
*/
var parent = parentMessageBus || null;

/**
* Creates a new Channel for the specified name. This method is always called
* after calling checkChannelName(name)
* @param {String} theName The name for this channel. This should be of the
* form "/some/name"
* @return {Channel} the newly created channel object
*/
function createChannel(theName) {
// trim the trailing slash if any
if(theName.length > 1 && Util.endsWith(theName, "/")) {
theName = theName.substring(0, theName.length - 1);
}
return new jaf.core.Channel(theName);
}

/**
* Gets the segments of the specified channel name as array.
* e.g. If the channel name is specified as "/some/name", an array containing
* two segments is returned like this: ["some", "name"]
* @param {String} theName The channel name
* @return {Array} The array containing the segments
*/
function getSegments(theName) {
var segs = theName.split("/");
if(segs[0] === "") {
segs.slice(0, 1);
}
return segs;
// return theName.split("/");
}

/**
* Checks whether the specified channel name is a valid name
* @param {String} theName The name of the channel
*/
function checkChannelName(theName) {
if(!theName || Util.trim(theName).length == 0 || !Util.startsWith(theName, "/")) {
throw new Error("Invalid channel name: " + theName);
}
}

/**
* Determines if the specified name is a wildcard pattern
* @return true if the name ends with /* or /**
*/
function isPattern(theName) {
return Util.endsWith(theName, "/*") || Util.endsWith(theName, "/**");
}

/**
* Determines if strName matches with channel name specified by
* strChannel. This method also supports wildcard pattern matching
* for two wildcards:

* "/*" For children and "/**" For descendents
* @return true if strName matches with strChannel
*/
function matches(strName, strChannel) {
if(strName == strChannel) {
return true;
}

if(!isPattern(strName) || isPattern(strChannel)) {
return false;
}

// okay theName is a pattern
var segs = getSegments(strName);
var channelSegs = getSegments(strChannel);
var depth = segs.length;
var channelDepth = channelSegs.length;

if(depth > channelDepth) {
return false;
}

// if(isPattern(strName)) {
segs.splice(segs.length - 1, 1);
depth = segs.length;

var channelPart = (channelSegs.slice(0, depth).join("/"));
var namePart = (segs.join("/"));
if(channelPart !== namePart) {
return false;
}

if(Util.endsWith(strName, "/**")) {
return true;
}else {
return (channelDepth - depth) == 1;
}
// }
}

/**
* Adds any of the subscribers that are pattern subscribers to a Channel.
* This is called internally when a message is published to a channel and
* that channel does not exist yet. When this happens, a new Channel is created
* and any previously registered pattern subscribers are added to the channel
* @param {jaf.core.Channel} objChannel The channel to which wildcard or pattern
* subscribers are added if they match.
*/
function addPatternSubscribers(objChannel) {
var patternChannels = Util.objectKeys(patternSubscribers);
for(var i = 0, len = patternChannels.length; i < len; i++) {
var patternCh = patternChannels[i];
if(matches(patternCh, objChannel.getName())) {
var subscribers = patternSubscribers[patternCh];
if(subscribers) {
for(var j = 0, jlen = subscribers.length; j < jlen; j++) {
objChannel.addSubscriber(subscribers[j]);
}
}
}
}
}

return {
/**
* Publishes the message objMessage to channel strChannel
* All the subscribers registered with this channel will get the message
* @param strChannel {String} The name of the channel to publish to
* @param objMessage {Object} The message to publish
*/
publish: function(strChannel, objMessage) {
if(isPattern(strChannel)) {
alert("Publishing to wildcard channels coming soon.. ;)");
// IDEA: Support publish to wildcard channels?
return;
}else {
var channel = channels[strChannel];
if(! channel) {
channel = createChannel(strChannel);
channels[strChannel] = channel;
addPatternSubscribers(channel);
}
channel.publish(objMessage);
}
},

/**
* Subscribes to specified channel. If the channel specified by strChannel
* does not exist, its created. Any message published to this channel will
* be delivered to funCallback who is the subscriber to this channel
* @param strChannel {String} The name of the channel. This can be a wildcard
* pattern like so: /some/channel/* or some/channel
* @param funCallback {Function} The callback function that will be called
* with message as the parameter
*/
subscribe: function(strChannel, funCallback) {
// check if the channel name is valid
checkChannelName(strChannel);

// check for valid callback function
if(!typeof(funCallback) === "function") {
throw new Error("Invalid subsription callback. Must be a function");
}

var subscriber = funCallback;

// check if this subscription is a pattern i.e. /some/name/** | some/name/*
if(isPattern(strChannel)) {
if(patternSubscribers[strChannel]) {
patternSubscribers[strChannel].push(subscriber);
}else {
patternSubscribers[strChannel] = [subscriber];
}

// check if this subscriber matches for any of the channel already created
var channelNames = Util.objectKeys(channels);
for(var i = 0, len = channelNames.length; i < len; i++) {
var cName = channelNames[i];
if(matches(strChannel, cName)) {
channels[cName].addSubscriber(subscriber);
}
}
}else { //no pattern so existing or a new channel
var channel = channels[strChannel];
if(! channel) {
channel = createChannel(strChannel);
channels[strChannel] = channel;
}
channel.addSubscriber(subscriber);
}
},

/**
* Unsubscribes to specified channel.
* @param strChannel {String} The name of the channel (can be a wildcard
* in which case funCallback from all the matching channels is removed
* @param funCallback {Function} The callback function that was used when
* subscribing
*/
unsubscribe: function(strChannel, funCallback) {
var subscriber = null;
if(isPattern(strChannel)) {
var arrSubs = patternSubscribers[strChannel];
var subsIndex = -1;
if(arrSubs) {
for(var i = 0, len = arrSubs.length; i < len; i++) {
if(arrSubs[i] == funCallback) {
subsIndex = i;
break;
}
}

if(subsIndex != -1) {
arrSubs.splice(subsIndex, 1);
}
}
}

// check if this subscriber matches for any of the channel already created
var channelNames = Util.objectKeys(channels);
for(var j = 0, jlen = channelNames.length; j < jlen; j++) {
var cName = channelNames[j];
if(matches(cName, strChannel)) {
subscriber = channels[cName].removeSubscriber(subscriber);
}
}
},

toString: function() {
return "MessageBus"
}
};
}(null);