Added new notification framework:
* The listener has to implement the IEventChannelListener interface.
* A listener subscribes to a notification channel by using
IDatagridService.addListener(), and unsubscribes by using
IDatagridService.removeListener()
The channel is created and started automatically when the first
listener is added.
* A channel can be created by using IDatagridService.createChannel()
e.g., by the publisher of events. Note that createChannel() automatically
starts the event channel operation.
* A publisher uses IEventChannel.addEntry() and removeEntry() to generate
add/delete events.
* The listener receives the add/remove events by implementing
the IEventChannelListener.entryAdded() and entryRemoved() methods.
Example of usage:
Listener/Subscriber:
private FooFlowPath fooFlowPath = new FooFlowPath();
datagridService.addListener("mapFooFlowPath", fooFlowPath,
Long.class, FlowPath.class);
...
class FooFlowPath implements IEventChannelListener<Long, FlowPath> {
/**
* Receive a notification that an entry is added.
*
* @param value the value for the entry.
*/
@Override
public void entryAdded(FlowPath value) {
// Process the event
}
/**
* Receive a notification that an entry is removed.
*
* @param value the value for the entry.
*/
@Override
public void entryRemoved(FlowPath value) {
// Process the event
}
...
}
Sender/Publisher:
private IEventChannel<Long, FlowPath> fooFlowPathChannel = null;
fooFlowPathChannel = datagridService.createChannel("mapFooFlowPath",
Long.class, FlowPath.class);
...
// Transmit an event
fooFlowPathChannel.addEntry(flowPath.flowId().value(), flowPath);
Change-Id: Ie3246a4e200d5b6293c1f175df3652cdf571be69
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index e80ef56..b240299 100755
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -69,6 +69,9 @@
private KryoFactory kryoFactory = new KryoFactory();
private IFlowEventHandlerService flowEventHandlerService = null;
+ private Map<String, IEventChannel<?, ?>> eventChannels =
+ new HashMap<String, IEventChannel<?, ?>>();
+
// State related to the Flow map
protected static final String mapFlowName = "mapFlow";
private IMap<Long, byte[]> mapFlow = null;
@@ -727,6 +730,113 @@
}
/**
+ * Create an event channel.
+ *
+ * If the channel already exists, just return it.
+ * NOTE: The channel is started automatically.
+ *
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> createChannel(String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.startup();
+
+ return eventChannel;
+ }
+
+ /**
+ * Create an event channel implementation.
+ *
+ * If the channel already exists, just return it.
+ * NOTE: The caller must call IEventChannel.startup() to startup the
+ * channel operation.
+ *
+ * @param channelName the event channel name.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ private <K, V> IEventChannel<K, V> createChannelImpl(String channelName,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<K, V> castedEventChannel;
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ // Add the channel if the first listener
+ if (genericEventChannel == null) {
+ castedEventChannel =
+ new HazelcastEventChannel<K, V>(hazelcastInstance,
+ channelName, typeK, typeV);
+ eventChannels.put(channelName, castedEventChannel);
+ } else {
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ //
+ if (! genericEventChannel.verifyKeyValueTypes(typeK, typeV)) {
+ throw new ClassCastException("Key-value type mismatch for event channel " + channelName);
+ }
+ castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
+ }
+
+ return castedEventChannel;
+ }
+
+ /**
+ * Add event channel listener.
+ *
+ * NOTE: The channel is started automatically right after the listener
+ * is added.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to add.
+ * @param typeK the type of the Key in the Key-Value store.
+ * @param typeV the type of the Value in the Key-Value store.
+ * @return the event channel for the channel name.
+ */
+ @Override
+ public <K, V> IEventChannel<K, V> addListener(String channelName,
+ IEventChannelListener<K, V> listener,
+ Class<K> typeK, Class<V> typeV) {
+ IEventChannel<K, V> eventChannel =
+ createChannelImpl(channelName, typeK, typeV);
+ eventChannel.addListener(listener);
+ eventChannel.startup();
+
+ return eventChannel;
+ }
+
+ /**
+ * Remove event channel listener.
+ *
+ * @param channelName the event channel name.
+ * @param listener the listener to remove.
+ */
+ @Override
+ public <K, V> void removeListener(String channelName,
+ IEventChannelListener<K, V> listener) {
+ IEventChannel<K, V> castedEventChannel;
+ IEventChannel<?, ?> genericEventChannel =
+ eventChannels.get(channelName);
+
+ if (genericEventChannel != null) {
+ //
+ // TODO: Find if we can use Java internal support to check for
+ // type mismatch.
+ // NOTE: Using "ClassCastException" exception below doesn't work.
+ //
+ castedEventChannel = (IEventChannel<K, V>)genericEventChannel;
+ castedEventChannel.removeListener(listener);
+ }
+ }
+
+ /**
* Register Flow Event Handler Service for receiving Flow-related
* notifications.
*