Added new notification framework:
* The listener has to implement the IEventChannelListener interface.
* A listener subscribes to a notification channel by using
IDatagridService.addListener(), and unsubscribes by using
IDatagridService.removeListener()
The channel is created and started automatically when the first
listener is added.
* A channel can be created by using IDatagridService.createChannel()
e.g., by the publisher of events. Note that createChannel() automatically
starts the event channel operation.
* A publisher uses IEventChannel.addEntry() and removeEntry() to generate
add/delete events.
* The listener receives the add/remove events by implementing
the IEventChannelListener.entryAdded() and entryRemoved() methods.
Example of usage:
Listener/Subscriber:
private FooFlowPath fooFlowPath = new FooFlowPath();
datagridService.addListener("mapFooFlowPath", fooFlowPath,
Long.class, FlowPath.class);
...
class FooFlowPath implements IEventChannelListener<Long, FlowPath> {
/**
* Receive a notification that an entry is added.
*
* @param value the value for the entry.
*/
@Override
public void entryAdded(FlowPath value) {
// Process the event
}
/**
* Receive a notification that an entry is removed.
*
* @param value the value for the entry.
*/
@Override
public void entryRemoved(FlowPath value) {
// Process the event
}
...
}
Sender/Publisher:
private IEventChannel<Long, FlowPath> fooFlowPathChannel = null;
fooFlowPathChannel = datagridService.createChannel("mapFooFlowPath",
Long.class, FlowPath.class);
...
// Transmit an event
fooFlowPathChannel.addEntry(flowPath.flowId().value(), flowPath);
Change-Id: Ie3246a4e200d5b6293c1f175df3652cdf571be69
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index e80ef56..b240299 100755
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -69,6 +69,9 @@
private KryoFactory kryoFactory = new KryoFactory();
private IFlowEventHandlerService flowEventHandlerService = null;
+ private Map<String, IEventChannel<?, ?>> eventChannels =
+ new HashMap<String, IEventChannel<?, ?>>();
+
// State related to the Flow map
protected static final String mapFlowName = "mapFlow";
private IMap<Long, byte[]> mapFlow = null;
@@ -727,6 +730,113 @@
}
/**
+ * Create an event channel.
+ *
+ * If the channel already exists, just return it.
+ * NOTE: The channel is started automatically.
+ *
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> createChannel(String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.startup();
+
+ return eventChannel;
+ }
+
+ /**
+ * Create an event channel implementation.
+ *
+ * If the channel already exists, just return it.
+ * NOTE: The caller must call IEventChannel.startup() to startup the
+ * channel operation.
+ *
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ private <K, V> IEventChannel<K, V> createChannelImpl(String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<K, V> castedEventChannel;
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ // Add the channel if the first listener
+ if (genericEventChannel == null) {
+ castedEventChannel =
+ new HazelcastEventChannel<K, V>(hazelcastInstance,
+ channelName, typeK, typeV);
+ eventChannels.put(channelName, castedEventChannel);
+ } else {
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ //
+ if (! genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
+ throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
+ }
+ castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
+ }
+
+ return castedEventChannel;
+ }
+
+ /**
+ * Add event channel listener.
+ *
+ * NOTE: The channel is started automatically right after the listener
+ * is added.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to add.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> addListener(String channelName,
+ IEventChannelListener<K, V> listener,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.addListener(listener);
+ eventChannel.startup();
+
+ return eventChannel;
+ }
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to remove.
+ */
+ @Override
+ public <K, V> void removeListener(String channelName,
+ IEventChannelListener<K, V> listener) {
+ IEventChannel<K, V> castedEventChannel;
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ if (genericEventChannel != null) {
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ // NOTE: Using "ClassCastException" exception below doesn't work.
+ //
+ castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
+ castedEventChannel.removeListener(listener);
+ }
+ }
+
+ /**
* Register Flow Event Handler Service for receiving Flow-related
* notifications.
*
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java b/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java
new file mode 100644
index 0000000..b453139
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastEventChannel.java
@@ -0,0 +1,362 @@
+package net.onrc.onos.datagrid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+/**
+ * A datagrid event channel that uses Hazelcast as a datagrid.
+ */
+public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
+ private HazelcastInstance hazelcastInstance; // The Hazelcast instance
+ private String channelName; // The event channel name
+ private Class<?> typeK; // The class type of the key
+ private Class<?> typeV; // The class type of the value
+ private IMap<K, byte[]> channelMap = null; // The Hazelcast channel map
+ private List<IEventChannelListener<K, V>> listeners = // Channel listeners
+ new ArrayList<IEventChannelListener<K, V>>();
+
+ // The map entry listener
+ private MapEntryListener mapEntryListener = new MapEntryListener<K>();
+ private String mapListenerId = null; // The map listener ID
+
+ // TODO: We should use a single global KryoFactory instance
+ private KryoFactory kryoFactory = new KryoFactory();
+
+ // Maximum serialized event size
+ private final static int MAX_BUFFER_SIZE = 64*1024;
+
+ /**
+ * Constructor for a given event channel name.
+ *
+ * @param hazelcastInstance the Hazelcast instance to use.
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ */
+ public HazelcastEventChannel(HazelcastInstance hazelcastInstance,
+ String channelName, Class<K> typeK,
+ Class<V> typeV) {
+ this.hazelcastInstance = hazelcastInstance;
+ this.channelName = channelName;
+ this.typeK = typeK;
+ this.typeV = typeV;
+ }
+
+ /**
+ * Verify the key and value types of a channel.
+ *
+ * @param typeK the type of the key to verify.
+ * @param typeV the type of the value to verify.
+ * @return true if the key and value types of the channel match,
+ * otherwise false.
+ */
+ @Override
+ public boolean verifyKeyValueTypes(Class typeK, Class typeV) {
+ return (this.typeK == typeK) && (this.typeV == typeV);
+ }
+
+ /**
+ * Cleanup and destroy the channel.
+ */
+ @Override
+ protected void finalize() {
+ shutdown();
+ }
+
+ /**
+ * Startup the channel operation.
+ */
+ @Override
+ public void startup() {
+ if (channelMap == null) {
+ channelMap = hazelcastInstance.getMap(channelName);
+ mapListenerId = channelMap.addEntryListener(mapEntryListener,
+ true);
+ }
+ }
+
+ /**
+ * Shutdown the channel operation.
+ */
+ @Override
+ public void shutdown() {
+ if (channelMap != null) {
+ channelMap.removeEntryListener(mapListenerId);
+ channelMap = null;
+ mapListenerId = null;
+ }
+ }
+
+ /**
+ * Add event channel listener.
+ *
+ * @param listener the listener to add.
+ */
+ @Override
+ public void addListener(IEventChannelListener<K, V> listener) {
+ if (listeners.contains(listener))
+ return; // Nothing to do: already a listener
+ listeners.add(listener);
+ }
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param listener the listener to remove.
+ */
+ @Override
+ public void removeListener(IEventChannelListener<K, V> listener) {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Add an entry to the channel.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ @Override
+ public void addEntry(K key, V value) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, value);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry in the map:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ //
+ channelMap.putAsync(key, valueBytes);
+ }
+
+ /**
+ * Remove an entry from the channel.
+ *
+ * @param key the key of the entry to remove.
+ */
+ @Override
+ public void removeEntry(K key) {
+ //
+ // Remove the entry:
+ // - Key : Type <K>
+ // - Value : Serialized Value (byte[])
+ //
+ channelMap.removeAsync(key);
+ }
+
+ /**
+ * Update an entry in the channel.
+ *
+ * @param key the key of the entry to update.
+ * @param value the value of the entry to update.
+ */
+ @Override
+ public void updateEntry(K key, V value) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ addEntry(key, value);
+ }
+
+ /**
+ * Get an entry from the channel.
+ *
+ * @param key the key of the entry to get.
+ * @return the entry if found, otherwise null.
+ */
+ @Override
+ @Deprecated
+ public V getEntry(K key) {
+ byte[] valueBytes = channelMap.get(key);
+ if (valueBytes == null)
+ return null;
+
+ Kryo kryo = kryoFactory.newKryo();
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ V value = (V)kryo.readObject(input, typeV);
+ kryoFactory.deleteKryo(kryo);
+
+ return value;
+ }
+
+ /**
+ * Get all entries in the channel.
+ *
+ * @return all entries that are currently in the channel.
+ */
+ @Override
+ @Deprecated
+ public Collection<V> getAllEntries() {
+ Collection<V> allEntries = new LinkedList<V>();
+
+ if (channelMap == null)
+ return allEntries; // Nothing found
+
+ //
+ // Get all entries
+ //
+ Collection<byte[]> values = channelMap.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ V value = (V)kryo.readObject(input, typeV);
+ allEntries.add(value);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allEntries;
+ }
+
+ /**
+ * Remove all entries in the channel.
+ */
+ @Override
+ @Deprecated
+ public void removeAllEntries() {
+ //
+ // Remove all entries
+ //
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // channelMap.clear();
+ Set<K> keySet = channelMap.keySet();
+ for (K key : keySet) {
+ channelMap.removeAsync(key);
+ }
+ }
+
+ /**
+ * Class for receiving event notifications for the channel.
+ *
+ * The datagrid map is:
+ * - Key: Type K
+ * - Value: Serialized V (byte[])
+ */
+ private class MapEntryListener<K> implements EntryListener<K, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryAdded(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V)kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryAdded(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryRemoved(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V)kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryRemoved(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryUpdated(EntryEvent<K, byte[]> event) {
+ //
+ // Decode the value
+ //
+ byte[] valueBytes = event.getValue();
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ V value = (V)kryo.readObject(input, typeV);
+
+ //
+ // Deliver the notification
+ //
+ int index = 0;
+ for (IEventChannelListener listener : listeners) {
+ V copyValue = value;
+ if (index++ > 0) {
+ // Each listener should get a deep copy of the value
+ copyValue = kryo.copy(value);
+ }
+ listener.entryUpdated(copyValue);
+ }
+ kryoFactory.deleteKryo(kryo);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ @Override
+ public void entryEvicted(EntryEvent<K, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index d3cf98e..118cbfa 100755
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -21,6 +21,45 @@
* Interface for providing Datagrid Service to other modules.
*/
public interface IDatagridService extends IFloodlightService {
+ /**
+ * Create an event channel.
+ *
+ * If the channel already exists, just return it.
+ * NOTE: The channel is started automatically.
+ *
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ <K, V> IEventChannel<K, V> createChannel(String channelName,
+ Class<K> typeK, Class<V> typeV);
+
+ /**
+ * Add event channel listener.
+ *
+ * NOTE: The channel is started automatically right after the listener
+ * is added.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to add.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ <K, V> IEventChannel<K, V> addListener(String channelName,
+ IEventChannelListener<K, V> listener,
+ Class<K> typeK, Class<V> typeV);
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to remove.
+ */
+ <K, V> void removeListener(String channelName,
+ IEventChannelListener<K, V> listener);
+
/*
* register all the intents as one batch
*/
diff --git a/src/main/java/net/onrc/onos/datagrid/IEventChannel.java b/src/main/java/net/onrc/onos/datagrid/IEventChannel.java
new file mode 100644
index 0000000..7479491
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datagrid/IEventChannel.java
@@ -0,0 +1,88 @@
+package net.onrc.onos.datagrid;
+
+import java.util.Collection;
+
+/**
+ * Event Channel Interface.
+ */
+public interface IEventChannel<K, V> {
+ /**
+ * Startup the channel operation.
+ */
+ void startup();
+
+ /**
+ * Shutdown the channel operation.
+ */
+ void shutdown();
+
+ /**
+ * Verify the key and value types of a channel.
+ *
+ * @param typeK the type of the key to verify.
+ * @param typeV the type of the value to verify.
+ * @return true if the key and value types of the channel match,
+ * otherwise false.
+ */
+ boolean verifyKeyValueTypes(Class typeK, Class typeV);
+
+ /**
+ * Add event channel listener.
+ *
+ * @param listener the listener to add.
+ */
+ void addListener(IEventChannelListener<K, V> listener);
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param listener the listener to remove.
+ */
+ void removeListener(IEventChannelListener<K, V> listener);
+
+ /**
+ * Add an entry to the channel.
+ *
+ * @param key the key of the entry to add.
+ * @param value the value of the entry to add.
+ */
+ void addEntry(K key, V value);
+
+ /**
+ * Remove an entry from the channel.
+ *
+ * @param key the key of the entry to remove.
+ */
+ void removeEntry(K key);
+
+ /**
+ * Update an entry in the channel.
+ *
+ * @param key the key of the entry to update.
+ * @param value the value of the entry to update.
+ */
+ void updateEntry(K key, V value);
+
+ /**
+ * Get an entry from the channel.
+ *
+ * @param key the key of the entry to get.
+ * @return the entry if found, otherwise null.
+ */
+ @Deprecated
+ V getEntry(K key);
+
+ /**
+ * Get all entries in the channel.
+ *
+ * @return all entries that are currently in the channel.
+ */
+ @Deprecated
+ Collection<V> getAllEntries();
+
+ /**
+ * Remove all entries in the channel.
+ */
+ @Deprecated
+ void removeAllEntries();
+}
diff --git a/src/main/java/net/onrc/onos/datagrid/IEventChannelListener.java b/src/main/java/net/onrc/onos/datagrid/IEventChannelListener.java
new file mode 100644
index 0000000..3f785ee
--- /dev/null
+++ b/src/main/java/net/onrc/onos/datagrid/IEventChannelListener.java
@@ -0,0 +1,27 @@
+package net.onrc.onos.datagrid;
+
+/**
+ * Event Channel Listener Interface.
+ */
+public interface IEventChannelListener<K, V> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param value the value for the entry.
+ */
+ void entryAdded(V value);
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param value the value for the entry.
+ */
+ void entryRemoved(V value);
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param value the value for the entry.
+ */
+ void entryUpdated(V value);
+}