Added new notification framework:
 * The listener has to implement the IEventChannelListener interface.
 * A listener subscribes to a notification channel by using
   IDatagridService.addListener(), and unsubscribes by using
   IDatagridService.removeListener()
   The channel is created and started automatically when the first
   listener is added.
 * A channel can be created by using IDatagridService.createChannel()
   e.g., by the publisher of events. Note that createChannel() automatically
   starts the event channel operation.

 * A publisher uses IEventChannel.addEntry() and removeEntry() to generate
   add/delete events.

 * The listener receives the add/remove events by implementing
   the IEventChannelListener.entryAdded() and entryRemoved() methods.

Example of usage:

Listener/Subscriber:

    private FooFlowPath fooFlowPath = new FooFlowPath();

    datagridService.addListener("mapFooFlowPath", fooFlowPath,
                                   Long.class, FlowPath.class);
...

    class FooFlowPath implements IEventChannelListener<Long, FlowPath> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param value the value for the entry.
         */
        @Override
        public void entryAdded(FlowPath value) {
            // Process the event
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param value the value for the entry.
         */
        @Override
        public void entryRemoved(FlowPath value) {
            // Process the event
        }
        ...
    }

Sender/Publisher:

    private IEventChannel<Long, FlowPath> fooFlowPathChannel = null;
    fooFlowPathChannel = datagridService.createChannel("mapFooFlowPath",
            Long.class, FlowPath.class);
    ...

    // Transmit an event
    fooFlowPathChannel.addEntry(flowPath.flowId().value(), flowPath);

Change-Id: Ie3246a4e200d5b6293c1f175df3652cdf571be69
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index e80ef56..b240299 100755
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -69,6 +69,9 @@
     private KryoFactory kryoFactory = new KryoFactory();
     private IFlowEventHandlerService flowEventHandlerService = null;
 
+    private Map<String, IEventChannel<?, ?>> eventChannels =
+	new HashMap<String, IEventChannel<?, ?>>();
+
     // State related to the Flow map
     protected static final String mapFlowName = "mapFlow";
     private IMap<Long, byte[]> mapFlow = null;
@@ -727,6 +730,113 @@
     }
 
     /**
+     * Create an event channel.
+     *
+     * If the channel already exists, just return it.
+     * NOTE: The channel is started automatically.
+     *
+     * @param channelName the event channel name.
+     * @param typeK the type of the Key in the Key-Value store.
+     * @param typeV the type of the Value in the Key-Value store.
+     * @return the event channel for the channel name.
+     */
+    @Override
+    public <K, V> IEventChannel<K, V> createChannel(String channelName,
+					     Class<K> typeK, Class<V> typeV) {
+	IEventChannel<K, V> eventChannel =
+	    createChannelImpl(channelName, typeK, typeV);
+	eventChannel.startup();
+
+	return eventChannel;
+    }
+
+    /**
+     * Create an event channel implementation.
+     *
+     * If the channel already exists, just return it.
+     * NOTE: The caller must call IEventChannel.startup() to startup the
+     * channel operation.
+     *
+     * @param channelName the event channel name.
+     * @param typeK the type of the Key in the Key-Value store.
+     * @param typeV the type of the Value in the Key-Value store.
+     * @return the event channel for the channel name.
+     */
+    private <K, V> IEventChannel<K, V> createChannelImpl(String channelName,
+					     Class<K> typeK, Class<V> typeV) {
+	IEventChannel<K, V> castedEventChannel;
+	IEventChannel<?, ?> genericEventChannel =
+	    eventChannels.get(channelName);
+
+	// Add the channel if the first listener
+	if (genericEventChannel == null) {
+	    castedEventChannel =
+		new HazelcastEventChannel<K, V>(hazelcastInstance,
+						channelName, typeK, typeV);
+	    eventChannels.put(channelName, castedEventChannel);
+	} else {
+	    //
+	    // TODO: Find if we can use Java internal support to check for
+	    // type mismatch.
+	    //
+	    if (! genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
+		throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
+	    }
+	    castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
+	}
+
+	return castedEventChannel;
+    }
+
+    /**
+     * Add event channel listener.
+     *
+     * NOTE: The channel is started automatically right after the listener
+     * is added.
+     *
+     * @param channelName the event channel name.
+     * @param listener the listener to add.
+     * @param typeK the type of the Key in the Key-Value store.
+     * @param typeV the type of the Value in the Key-Value store.
+     * @return the event channel for the channel name.
+     */
+    @Override
+    public <K, V> IEventChannel<K, V> addListener(String channelName,
+				   IEventChannelListener<K, V> listener,
+				   Class<K> typeK, Class<V> typeV) {
+	IEventChannel<K, V> eventChannel =
+	    createChannelImpl(channelName, typeK, typeV);
+	eventChannel.addListener(listener);
+	eventChannel.startup();
+
+	return eventChannel;
+    }
+
+    /**
+     * Remove event channel listener.
+     *
+     * @param channelName the event channel name.
+     * @param listener the listener to remove.
+     */
+    @Override
+    public <K, V> void removeListener(String channelName,
+				      IEventChannelListener<K, V> listener) {
+	IEventChannel<K, V> castedEventChannel;
+	IEventChannel<?, ?> genericEventChannel =
+	    eventChannels.get(channelName);
+
+	if (genericEventChannel != null) {
+	    //
+	    // TODO: Find if we can use Java internal support to check for
+	    // type mismatch.
+	    // NOTE: Using "ClassCastException" exception below doesn't work.
+	    //
+	    castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
+	    castedEventChannel.removeListener(listener);
+	}
+    }
+
+    /**
      * Register Flow Event Handler Service for receiving Flow-related
      * notifications.
      *
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java b/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java
new file mode 100644
index 0000000..b453139
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java
@@ -0,0 +1,362 @@
+package net.onrc.onos.datagrid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+/**
+ * A datagrid event channel that uses Hazelcast as a datagrid.
+ */
+public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
+    private HazelcastInstance hazelcastInstance; // The Hazelcast instance
+    private String channelName;			// The event channel name
+    private Class<?> typeK;			// The class type of the key
+    private Class<?> typeV;			// The class type of the value
+    private IMap<K, byte[]> channelMap = null;	// The Hazelcast channel map
+    private List<IEventChannelListener<K, V>> listeners = // Channel listeners
+	new ArrayList<IEventChannelListener<K, V>>();
+
+    // The map entry listener
+    private MapEntryListener mapEntryListener = new MapEntryListener<K>();
+    private String mapListenerId = null;	// The map listener ID
+
+    // TODO: We should use a single global KryoFactory instance
+    private KryoFactory kryoFactory = new KryoFactory();
+
+    // Maximum serialized event size
+    private final static int MAX_BUFFER_SIZE = 64*1024;
+
+    /**
+     * Constructor for a given event channel name.
+     *
+     * @param hazelcastInstance the Hazelcast instance to use.
+     * @param channelName the event channel name.
+     * @param typeK the type of the Key in the Key-Value store.
+     * @param typeV the type of the Value in the Key-Value store.
+     */
+    public HazelcastEventChannel(HazelcastInstance hazelcastInstance,
+				 String channelName, Class<K> typeK,
+				 Class<V> typeV) {
+	this.hazelcastInstance = hazelcastInstance;
+	this.channelName = channelName;
+	this.typeK = typeK;
+	this.typeV = typeV;
+    }
+
+    /**
+     * Verify the key and value types of a channel.
+     *
+     * @param typeK the type of the key to verify.
+     * @param typeV the type of the value to verify.
+     * @return true if the key and value types of the channel match,
+     * otherwise false.
+     */
+    @Override
+    public boolean verifyKeyValueTypes(Class typeK, Class typeV) {
+	return (this.typeK == typeK) && (this.typeV == typeV);
+    }
+
+    /**
+     * Cleanup and destroy the channel.
+     */
+    @Override
+    protected void finalize() {
+	shutdown();
+    }
+
+    /**
+     * Startup the channel operation.
+     */
+    @Override
+    public void startup() {
+	if (channelMap == null) {
+	    channelMap = hazelcastInstance.getMap(channelName);
+	    mapListenerId = channelMap.addEntryListener(mapEntryListener,
+							true);
+	}
+    }
+
+    /**
+     * Shutdown the channel operation.
+     */
+    @Override
+    public void shutdown() {
+	if (channelMap != null) {
+	    channelMap.removeEntryListener(mapListenerId);
+	    channelMap = null;
+	    mapListenerId = null;
+	}
+    }
+
+    /**
+     * Add event channel listener.
+     *
+     * @param listener the listener to add.
+     */
+    @Override
+    public void addListener(IEventChannelListener<K, V> listener) {
+	if (listeners.contains(listener))
+	    return;		// Nothing to do: already a listener
+	listeners.add(listener);
+    }
+
+    /**
+     * Remove event channel listener.
+     *
+     * @param listener the listener to remove.
+     */
+    @Override
+    public void removeListener(IEventChannelListener<K, V> listener) {
+	listeners.remove(listener);
+    }
+
+    /**
+     * Add an entry to the channel.
+     *
+     * @param key the key of the entry to add.
+     * @param value the value of the entry to add.
+     */
+    @Override
+    public void addEntry(K key, V value) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, value);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry in the map:
+	//  - Key : Type <K>
+	//  - Value : Serialized Value (byte[])
+	//
+	channelMap.putAsync(key, valueBytes);
+    }
+
+    /**
+     * Remove an entry from the channel.
+     *
+     * @param key the key of the entry to remove.
+     */
+    @Override
+    public void removeEntry(K key) {
+	//
+	// Remove the entry:
+	//  - Key : Type <K>
+	//  - Value : Serialized Value (byte[])
+	//
+	channelMap.removeAsync(key);
+    }
+
+    /**
+     * Update an entry in the channel.
+     *
+     * @param key the key of the entry to update.
+     * @param value the value of the entry to update.
+     */
+    @Override
+    public void updateEntry(K key, V value) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	addEntry(key, value);
+    }
+
+    /**
+     * Get an entry from the channel.
+     *
+     * @param key the key of the entry to get.
+     * @return the entry if found, otherwise null.
+     */
+    @Override
+    @Deprecated
+    public V getEntry(K key) {
+	byte[] valueBytes = channelMap.get(key);
+	if (valueBytes == null)
+	    return null;
+
+	Kryo kryo = kryoFactory.newKryo();
+	//
+	// Decode the value
+	//
+	Input input = new Input(valueBytes);
+	V value = (V)kryo.readObject(input, typeV);
+	kryoFactory.deleteKryo(kryo);
+
+	return value;
+    }
+
+    /**
+     * Get all entries in the channel.
+     *
+     * @return all entries that are currently in the channel.
+     */
+    @Override
+    @Deprecated
+    public Collection<V> getAllEntries() {
+	Collection<V> allEntries = new LinkedList<V>();
+
+	if (channelMap == null)
+	    return allEntries;		// Nothing found
+
+	//
+	// Get all entries
+	//
+	Collection<byte[]> values = channelMap.values();
+	Kryo kryo = kryoFactory.newKryo();
+	for (byte[] valueBytes : values) {
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+	    allEntries.add(value);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allEntries;
+    }
+
+    /**
+     * Remove all entries in the channel.
+     */
+    @Override
+    @Deprecated
+    public void removeAllEntries() {
+	//
+	// Remove all entries
+	//
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// channelMap.clear();
+	Set<K> keySet = channelMap.keySet();
+	for (K key : keySet) {
+	    channelMap.removeAsync(key);
+	}
+    }
+
+    /**
+     * Class for receiving event notifications for the channel.
+     *
+     * The datagrid map is:
+     *  - Key: Type K
+     *  - Value: Serialized V (byte[])
+     */
+    private class MapEntryListener<K> implements EntryListener<K, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryAdded(EntryEvent<K, byte[]> event) {
+	    //
+	    // Decode the value
+	    //
+	    byte[] valueBytes = event.getValue();
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+
+	    //
+	    // Deliver the notification
+	    //
+	    int index = 0;
+	    for (IEventChannelListener listener : listeners) {
+		V copyValue = value;
+		if (index++ > 0) {
+		    // Each listener should get a deep copy of the value
+		    copyValue = kryo.copy(value);
+		}
+		listener.entryAdded(copyValue);
+	    }
+	    kryoFactory.deleteKryo(kryo);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryRemoved(EntryEvent<K, byte[]> event) {
+	    //
+	    // Decode the value
+	    //
+	    byte[] valueBytes = event.getValue();
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+
+	    //
+	    // Deliver the notification
+	    //
+	    int index = 0;
+	    for (IEventChannelListener listener : listeners) {
+		V copyValue = value;
+		if (index++ > 0) {
+		    // Each listener should get a deep copy of the value
+		    copyValue = kryo.copy(value);
+		}
+		listener.entryRemoved(copyValue);
+	    }
+	    kryoFactory.deleteKryo(kryo);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryUpdated(EntryEvent<K, byte[]> event) {
+	    //
+	    // Decode the value
+	    //
+	    byte[] valueBytes = event.getValue();
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+
+	    //
+	    // Deliver the notification
+	    //
+	    int index = 0;
+	    for (IEventChannelListener listener : listeners) {
+		V copyValue = value;
+		if (index++ > 0) {
+		    // Each listener should get a deep copy of the value
+		    copyValue = kryo.copy(value);
+		}
+		listener.entryUpdated(copyValue);
+	    }
+	    kryoFactory.deleteKryo(kryo);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryEvicted(EntryEvent<K, byte[]> event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+}
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index d3cf98e..118cbfa 100755
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -21,6 +21,45 @@
  * Interface for providing Datagrid Service to other modules.
  */
 public interface IDatagridService extends IFloodlightService {
+    /**
+     * Create an event channel.
+     *
+     * If the channel already exists, just return it.
+     * NOTE: The channel is started automatically.
+     *
+     * @param channelName the event channel name.
+     * @param typeK the type of the Key in the Key-Value store.
+     * @param typeV the type of the Value in the Key-Value store.
+     * @return the event channel for the channel name.
+     */
+    <K, V> IEventChannel<K, V> createChannel(String channelName,
+					     Class<K> typeK, Class<V> typeV);
+
+    /**
+     * Add event channel listener.
+     *
+     * NOTE: The channel is started automatically right after the listener
+     * is added.
+     *
+     * @param channelName the event channel name.
+     * @param listener the listener to add.
+     * @param typeK the type of the Key in the Key-Value store.
+     * @param typeV the type of the Value in the Key-Value store.
+     * @return the event channel for the channel name.
+     */
+    <K, V> IEventChannel<K, V> addListener(String channelName,
+			   IEventChannelListener<K, V> listener,
+			   Class<K> typeK, Class<V> typeV);
+
+    /**
+     * Remove event channel listener.
+     *
+     * @param channelName the event channel name.
+     * @param listener the listener to remove.
+     */
+    <K, V> void removeListener(String channelName,
+			      IEventChannelListener<K, V> listener);
+
     /*
      * register all the intents as one batch
      */
diff --git a/src/main/java/net/onrc/onos/datagrid/IEventChannel.java b/src/main/java/net/onrc/onos/datagrid/IEventChannel.java
new file mode 100644
index 0000000..7479491
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datagrid/IEventChannel.java
@@ -0,0 +1,88 @@
+package net.onrc.onos.datagrid;
+
+import java.util.Collection;
+
+/**
+ * Event Channel Interface.
+ */
+public interface IEventChannel<K, V> {
+    /**
+     * Startup the channel operation.
+     */
+    void startup();
+
+    /**
+     * Shutdown the channel operation.
+     */
+    void shutdown();
+
+    /**
+     * Verify the key and value types of a channel.
+     *
+     * @param typeK the type of the key to verify.
+     * @param typeV the type of the value to verify.
+     * @return true if the key and value types of the channel match,
+     * otherwise false.
+     */
+    boolean verifyKeyValueTypes(Class typeK, Class typeV);
+
+    /**
+     * Add event channel listener.
+     *
+     * @param listener the listener to add.
+     */
+    void addListener(IEventChannelListener<K, V> listener);
+
+    /**
+     * Remove event channel listener.
+     *
+     * @param listener the listener to remove.
+     */
+    void removeListener(IEventChannelListener<K, V> listener);
+
+    /**
+     * Add an entry to the channel.
+     *
+     * @param key the key of the entry to add.
+     * @param value the value of the entry to add.
+     */
+    void addEntry(K key, V value);
+
+    /**
+     * Remove an entry from the channel.
+     *
+     * @param key the key of the entry to remove.
+     */
+    void removeEntry(K key);
+
+    /**
+     * Update an entry in the channel.
+     *
+     * @param key the key of the entry to update.
+     * @param value the value of the entry to update.
+     */
+    void updateEntry(K key, V value);
+
+    /**
+     * Get an entry from the channel.
+     *
+     * @param key the key of the entry to get.
+     * @return the entry if found, otherwise null.
+     */
+    @Deprecated
+    V getEntry(K key);
+
+    /**
+     * Get all entries in the channel.
+     *
+     * @return all entries that are currently in the channel.
+     */
+    @Deprecated
+    Collection<V> getAllEntries();
+
+    /**
+     * Remove all entries in the channel.
+     */
+    @Deprecated
+    void removeAllEntries();
+}
diff --git a/src/main/java/net/onrc/onos/datagrid/IEventChannelListener.java b/src/main/java/net/onrc/onos/datagrid/IEventChannelListener.java
new file mode 100644
index 0000000..3f785ee
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datagrid/IEventChannelListener.java
@@ -0,0 +1,27 @@
+package net.onrc.onos.datagrid;
+
+/**
+ * Event Channel Listener Interface.
+ */
+public interface IEventChannelListener<K, V> {
+    /**
+     * Receive a notification that an entry is added.
+     *
+     * @param value the value for the entry.
+     */
+    void entryAdded(V value);
+
+    /**
+     * Receive a notification that an entry is removed.
+     *
+     * @param value the value for the entry.
+     */
+    void entryRemoved(V value);
+
+    /**
+     * Receive a notification that an entry is updated.
+     *
+     * @param value the value for the entry.
+     */
+    void entryUpdated(V value);
+}