Added initial pieces for topology related notifications.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 37ada35..cfc6182 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -19,6 +19,7 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
@@ -59,6 +60,12 @@
private MapFlowListener mapFlowListener = null;
private String mapFlowListenerId = null;
+ // State related to the Network Topology map
+ protected static final String mapTopologyName = "mapTopology";
+ private IMap<String, byte[]> mapTopology = null;
+ private MapTopologyListener mapTopologyListener = null;
+ private String mapTopologyListenerId = null;
+
/**
* Class for receiving notifications for Flow state.
*
@@ -135,6 +142,84 @@
}
/**
+ * Class for receiving notifications for Network Topology state.
+ *
+ * The datagrid map is:
+ * - Key: TopologyElement ID (String)
+ * - Value: Serialized TopologyElement (byte[])
+ */
+ class MapTopologyListener implements EntryListener<String, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent event) {
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvTopologyElementAdded(topologyElement);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent event) {
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvTopologyElementRemoved(topologyElement);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent event) {
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvTopologyElementUpdated(topologyElement);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
* Initialize the Hazelcast Datagrid operation.
*
* @param conf the configuration filename.
@@ -256,9 +341,16 @@
@Override
public void registerFlowService(IFlowService flowService) {
this.flowService = flowService;
+
+ // Initialize the Flow-related map state
mapFlowListener = new MapFlowListener();
mapFlow = hazelcastInstance.getMap(mapFlowName);
mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
+
+ // Initialize the Topology-related map state
+ mapTopologyListener = new MapTopologyListener();
+ mapTopology = hazelcastInstance.getMap(mapTopologyName);
+ mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
}
/**
@@ -270,9 +362,16 @@
*/
@Override
public void deregisterFlowService(IFlowService flowService) {
+ // Clear the Flow-related map state
mapFlow.removeEntryListener(mapFlowListenerId);
mapFlow = null;
mapFlowListener = null;
+
+ // Clear the Topology-related map state
+ mapTopology.removeEntryListener(mapTopologyListenerId);
+ mapTopology = null;
+ mapTopologyListener = null;
+
this.flowService = null;
}
@@ -370,4 +469,101 @@
mapFlow.removeAsync(key);
}
}
+
+ /**
+ * Get all Topology Elements that are currently in the datagrid.
+ *
+ * @return all Topology Elements that are currently in the datagrid.
+ */
+ @Override
+ public Collection<TopologyElement> getAllTopologyElements() {
+ Collection<TopologyElement> allTopologyElements =
+ new LinkedList<TopologyElement>();
+
+ //
+ // Get all current entries
+ //
+ Collection<byte[]> values = mapTopology.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ allTopologyElements.add(topologyElement);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allTopologyElements;
+ }
+
+ /**
+ * Send a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ @Override
+ public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, topologyElement);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : TopologyElement ID (String)
+ // - Value : Serialized TopologyElement (byte[])
+ //
+ mapTopology.putAsync(topologyElement.elementId(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ @Override
+ public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
+ //
+ // Remove the entry:
+ // - Key : TopologyElement ID (String)
+ // - Value : Serialized TopologyElement (byte[])
+ //
+ mapTopology.removeAsync(topologyElement.elementId());
+ }
+
+ /**
+ * Send a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ @Override
+ public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendTopologyElementAdded(topologyElement);
+ }
+
+ /**
+ * Send a notification that all Topology Elements are removed.
+ */
+ @Override
+ public void notificationSendAllTopologyElementsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapTopology.clear();
+ Set<String> keySet = mapTopology.keySet();
+ for (String key : keySet) {
+ mapTopology.removeAsync(key);
+ }
+ }
}