Added new notification framework:
 * The listener has to implement the IEventChannelListener interface.
 * A listener subscribes to a notification channel by using
   IDatagridService.addListener(), and unsubscribes by using
   IDatagridService.removeListener()
   The channel is created and started automatically when the first
   listener is added.
 * A channel can be created by using IDatagridService.createChannel()
   e.g., by the publisher of events. Note that createChannel() automatically
   starts the event channel operation.

 * A publisher uses IEventChannel.addEntry() and removeEntry() to generate
   add/delete events.

 * The listener receives the add/remove events by implementing
   the IEventChannelListener.entryAdded() and entryRemoved() methods.

Example of usage:

Listener/Subscriber:

    private FooFlowPath fooFlowPath = new FooFlowPath();

    datagridService.addListener("mapFooFlowPath", fooFlowPath,
                                   Long.class, FlowPath.class);
...

    class FooFlowPath implements IEventChannelListener<Long, FlowPath> {
        /**
         * Receive a notification that an entry is added.
         *
         * @param value the value for the entry.
         */
        @Override
        public void entryAdded(FlowPath value) {
            // Process the event
        }

        /**
         * Receive a notification that an entry is removed.
         *
         * @param value the value for the entry.
         */
        @Override
        public void entryRemoved(FlowPath value) {
            // Process the event
        }
        ...
    }

Sender/Publisher:

    private IEventChannel<Long, FlowPath> fooFlowPathChannel = null;
    fooFlowPathChannel = datagridService.createChannel("mapFooFlowPath",
            Long.class, FlowPath.class);
    ...

    // Transmit an event
    fooFlowPathChannel.addEntry(flowPath.flowId().value(), flowPath);

Change-Id: Ie3246a4e200d5b6293c1f175df3652cdf571be69
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java b/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java
new file mode 100644
index 0000000..b453139
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java
@@ -0,0 +1,362 @@
+package net.onrc.onos.datagrid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+/**
+ * A datagrid event channel that uses Hazelcast as a datagrid.
+ */
+public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
+    private HazelcastInstance hazelcastInstance; // The Hazelcast instance
+    private String channelName;			// The event channel name
+    private Class<?> typeK;			// The class type of the key
+    private Class<?> typeV;			// The class type of the value
+    private IMap<K, byte[]> channelMap = null;	// The Hazelcast channel map
+    private List<IEventChannelListener<K, V>> listeners = // Channel listeners
+	new ArrayList<IEventChannelListener<K, V>>();
+
+    // The map entry listener
+    private MapEntryListener mapEntryListener = new MapEntryListener<K>();
+    private String mapListenerId = null;	// The map listener ID
+
+    // TODO: We should use a single global KryoFactory instance
+    private KryoFactory kryoFactory = new KryoFactory();
+
+    // Maximum serialized event size
+    private final static int MAX_BUFFER_SIZE = 64*1024;
+
+    /**
+     * Constructor for a given event channel name.
+     *
+     * @param hazelcastInstance the Hazelcast instance to use.
+     * @param channelName the event channel name.
+     * @param typeK the type of the Key in the Key-Value store.
+     * @param typeV the type of the Value in the Key-Value store.
+     */
+    public HazelcastEventChannel(HazelcastInstance hazelcastInstance,
+				 String channelName, Class<K> typeK,
+				 Class<V> typeV) {
+	this.hazelcastInstance = hazelcastInstance;
+	this.channelName = channelName;
+	this.typeK = typeK;
+	this.typeV = typeV;
+    }
+
+    /**
+     * Verify the key and value types of a channel.
+     *
+     * @param typeK the type of the key to verify.
+     * @param typeV the type of the value to verify.
+     * @return true if the key and value types of the channel match,
+     * otherwise false.
+     */
+    @Override
+    public boolean verifyKeyValueTypes(Class typeK, Class typeV) {
+	return (this.typeK == typeK) && (this.typeV == typeV);
+    }
+
+    /**
+     * Cleanup and destroy the channel.
+     */
+    @Override
+    protected void finalize() {
+	shutdown();
+    }
+
+    /**
+     * Startup the channel operation.
+     */
+    @Override
+    public void startup() {
+	if (channelMap == null) {
+	    channelMap = hazelcastInstance.getMap(channelName);
+	    mapListenerId = channelMap.addEntryListener(mapEntryListener,
+							true);
+	}
+    }
+
+    /**
+     * Shutdown the channel operation.
+     */
+    @Override
+    public void shutdown() {
+	if (channelMap != null) {
+	    channelMap.removeEntryListener(mapListenerId);
+	    channelMap = null;
+	    mapListenerId = null;
+	}
+    }
+
+    /**
+     * Add event channel listener.
+     *
+     * @param listener the listener to add.
+     */
+    @Override
+    public void addListener(IEventChannelListener<K, V> listener) {
+	if (listeners.contains(listener))
+	    return;		// Nothing to do: already a listener
+	listeners.add(listener);
+    }
+
+    /**
+     * Remove event channel listener.
+     *
+     * @param listener the listener to remove.
+     */
+    @Override
+    public void removeListener(IEventChannelListener<K, V> listener) {
+	listeners.remove(listener);
+    }
+
+    /**
+     * Add an entry to the channel.
+     *
+     * @param key the key of the entry to add.
+     * @param value the value of the entry to add.
+     */
+    @Override
+    public void addEntry(K key, V value) {
+	//
+	// Encode the value
+	//
+	byte[] buffer = new byte[MAX_BUFFER_SIZE];
+	Kryo kryo = kryoFactory.newKryo();
+	Output output = new Output(buffer, -1);
+	kryo.writeObject(output, value);
+	byte[] valueBytes = output.toBytes();
+	kryoFactory.deleteKryo(kryo);
+
+	//
+	// Put the entry in the map:
+	//  - Key : Type <K>
+	//  - Value : Serialized Value (byte[])
+	//
+	channelMap.putAsync(key, valueBytes);
+    }
+
+    /**
+     * Remove an entry from the channel.
+     *
+     * @param key the key of the entry to remove.
+     */
+    @Override
+    public void removeEntry(K key) {
+	//
+	// Remove the entry:
+	//  - Key : Type <K>
+	//  - Value : Serialized Value (byte[])
+	//
+	channelMap.removeAsync(key);
+    }
+
+    /**
+     * Update an entry in the channel.
+     *
+     * @param key the key of the entry to update.
+     * @param value the value of the entry to update.
+     */
+    @Override
+    public void updateEntry(K key, V value) {
+	// NOTE: Adding an entry with an existing key automatically updates it
+	addEntry(key, value);
+    }
+
+    /**
+     * Get an entry from the channel.
+     *
+     * @param key the key of the entry to get.
+     * @return the entry if found, otherwise null.
+     */
+    @Override
+    @Deprecated
+    public V getEntry(K key) {
+	byte[] valueBytes = channelMap.get(key);
+	if (valueBytes == null)
+	    return null;
+
+	Kryo kryo = kryoFactory.newKryo();
+	//
+	// Decode the value
+	//
+	Input input = new Input(valueBytes);
+	V value = (V)kryo.readObject(input, typeV);
+	kryoFactory.deleteKryo(kryo);
+
+	return value;
+    }
+
+    /**
+     * Get all entries in the channel.
+     *
+     * @return all entries that are currently in the channel.
+     */
+    @Override
+    @Deprecated
+    public Collection<V> getAllEntries() {
+	Collection<V> allEntries = new LinkedList<V>();
+
+	if (channelMap == null)
+	    return allEntries;		// Nothing found
+
+	//
+	// Get all entries
+	//
+	Collection<byte[]> values = channelMap.values();
+	Kryo kryo = kryoFactory.newKryo();
+	for (byte[] valueBytes : values) {
+	    //
+	    // Decode the value
+	    //
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+	    allEntries.add(value);
+	}
+	kryoFactory.deleteKryo(kryo);
+
+	return allEntries;
+    }
+
+    /**
+     * Remove all entries in the channel.
+     */
+    @Override
+    @Deprecated
+    public void removeAllEntries() {
+	//
+	// Remove all entries
+	//
+	// NOTE: We remove the entries one-by-one so the per-entry
+	// notifications will be delivered.
+	//
+	// channelMap.clear();
+	Set<K> keySet = channelMap.keySet();
+	for (K key : keySet) {
+	    channelMap.removeAsync(key);
+	}
+    }
+
+    /**
+     * Class for receiving event notifications for the channel.
+     *
+     * The datagrid map is:
+     *  - Key: Type K
+     *  - Value: Serialized V (byte[])
+     */
+    private class MapEntryListener<K> implements EntryListener<K, byte[]> {
+	/**
+	 * Receive a notification that an entry is added.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryAdded(EntryEvent<K, byte[]> event) {
+	    //
+	    // Decode the value
+	    //
+	    byte[] valueBytes = event.getValue();
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+
+	    //
+	    // Deliver the notification
+	    //
+	    int index = 0;
+	    for (IEventChannelListener listener : listeners) {
+		V copyValue = value;
+		if (index++ > 0) {
+		    // Each listener should get a deep copy of the value
+		    copyValue = kryo.copy(value);
+		}
+		listener.entryAdded(copyValue);
+	    }
+	    kryoFactory.deleteKryo(kryo);
+	}
+
+	/**
+	 * Receive a notification that an entry is removed.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryRemoved(EntryEvent<K, byte[]> event) {
+	    //
+	    // Decode the value
+	    //
+	    byte[] valueBytes = event.getValue();
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+
+	    //
+	    // Deliver the notification
+	    //
+	    int index = 0;
+	    for (IEventChannelListener listener : listeners) {
+		V copyValue = value;
+		if (index++ > 0) {
+		    // Each listener should get a deep copy of the value
+		    copyValue = kryo.copy(value);
+		}
+		listener.entryRemoved(copyValue);
+	    }
+	    kryoFactory.deleteKryo(kryo);
+	}
+
+	/**
+	 * Receive a notification that an entry is updated.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryUpdated(EntryEvent<K, byte[]> event) {
+	    //
+	    // Decode the value
+	    //
+	    byte[] valueBytes = event.getValue();
+	    Kryo kryo = kryoFactory.newKryo();
+	    Input input = new Input(valueBytes);
+	    V value = (V)kryo.readObject(input, typeV);
+
+	    //
+	    // Deliver the notification
+	    //
+	    int index = 0;
+	    for (IEventChannelListener listener : listeners) {
+		V copyValue = value;
+		if (index++ > 0) {
+		    // Each listener should get a deep copy of the value
+		    copyValue = kryo.copy(value);
+		}
+		listener.entryUpdated(copyValue);
+	    }
+	    kryoFactory.deleteKryo(kryo);
+	}
+
+	/**
+	 * Receive a notification that an entry is evicted.
+	 *
+	 * @param event the notification event for the entry.
+	 */
+	@Override
+	public void entryEvicted(EntryEvent<K, byte[]> event) {
+	    // NOTE: We don't use eviction for this map
+	}
+    }
+}