Added initial pieces for topology related notifications.
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 37ada35..cfc6182 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -19,6 +19,7 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
@@ -59,6 +60,12 @@
private MapFlowListener mapFlowListener = null;
private String mapFlowListenerId = null;
+ // State related to the Network Topology map
+ protected static final String mapTopologyName = "mapTopology";
+ private IMap<String, byte[]> mapTopology = null;
+ private MapTopologyListener mapTopologyListener = null;
+ private String mapTopologyListenerId = null;
+
/**
* Class for receiving notifications for Flow state.
*
@@ -135,6 +142,84 @@
}
/**
+ * Class for receiving notifications for Network Topology state.
+ *
+ * The datagrid map is:
+ * - Key: TopologyElement ID (String)
+ * - Value: Serialized TopologyElement (byte[])
+ */
+ class MapTopologyListener implements EntryListener<String, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent event) {
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvTopologyElementAdded(topologyElement);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent event) {
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvTopologyElementRemoved(topologyElement);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent event) {
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowService.notificationRecvTopologyElementUpdated(topologyElement);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
* Initialize the Hazelcast Datagrid operation.
*
* @param conf the configuration filename.
@@ -256,9 +341,16 @@
@Override
public void registerFlowService(IFlowService flowService) {
this.flowService = flowService;
+
+ // Initialize the Flow-related map state
mapFlowListener = new MapFlowListener();
mapFlow = hazelcastInstance.getMap(mapFlowName);
mapFlowListenerId = mapFlow.addEntryListener(mapFlowListener, true);
+
+ // Initialize the Topology-related map state
+ mapTopologyListener = new MapTopologyListener();
+ mapTopology = hazelcastInstance.getMap(mapTopologyName);
+ mapTopologyListenerId = mapTopology.addEntryListener(mapTopologyListener, true);
}
/**
@@ -270,9 +362,16 @@
*/
@Override
public void deregisterFlowService(IFlowService flowService) {
+ // Clear the Flow-related map state
mapFlow.removeEntryListener(mapFlowListenerId);
mapFlow = null;
mapFlowListener = null;
+
+ // Clear the Topology-related map state
+ mapTopology.removeEntryListener(mapTopologyListenerId);
+ mapTopology = null;
+ mapTopologyListener = null;
+
this.flowService = null;
}
@@ -370,4 +469,101 @@
mapFlow.removeAsync(key);
}
}
+
+ /**
+ * Get all Topology Elements that are currently in the datagrid.
+ *
+ * @return all Topology Elements that are currently in the datagrid.
+ */
+ @Override
+ public Collection<TopologyElement> getAllTopologyElements() {
+ Collection<TopologyElement> allTopologyElements =
+ new LinkedList<TopologyElement>();
+
+ //
+ // Get all current entries
+ //
+ Collection<byte[]> values = mapTopology.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ allTopologyElements.add(topologyElement);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allTopologyElements;
+ }
+
+ /**
+ * Send a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ @Override
+ public void notificationSendTopologyElementAdded(TopologyElement topologyElement) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, topologyElement);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : TopologyElement ID (String)
+ // - Value : Serialized TopologyElement (byte[])
+ //
+ mapTopology.putAsync(topologyElement.elementId(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ @Override
+ public void notificationSendTopologyElementRemoved(TopologyElement topologyElement) {
+ //
+ // Remove the entry:
+ // - Key : TopologyElement ID (String)
+ // - Value : Serialized TopologyElement (byte[])
+ //
+ mapTopology.removeAsync(topologyElement.elementId());
+ }
+
+ /**
+ * Send a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ @Override
+ public void notificationSendTopologyElementUpdated(TopologyElement topologyElement) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendTopologyElementAdded(topologyElement);
+ }
+
+ /**
+ * Send a notification that all Topology Elements are removed.
+ */
+ @Override
+ public void notificationSendAllTopologyElementsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapTopology.clear();
+ Set<String> keySet = mapTopology.keySet();
+ for (String key : keySet) {
+ mapTopology.removeAsync(key);
+ }
+ }
}
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 498f58b..bf8e41f 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -5,6 +5,7 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -62,4 +63,37 @@
* Send a notification that all Flows are removed.
*/
void notificationSendAllFlowsRemoved();
+
+ /**
+ * Get all Topology Elements that are currently in the datagrid.
+ *
+ * @return all Topology Elements that are currently in the datagrid.
+ */
+ Collection<TopologyElement> getAllTopologyElements();
+
+ /**
+ * Send a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ void notificationSendTopologyElementAdded(TopologyElement topologyElement);
+
+ /**
+ * Send a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ void notificationSendTopologyElementRemoved(TopologyElement topologyElement);
+
+ /**
+ * Send a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ void notificationSendTopologyElementUpdated(TopologyElement topologyElement);
+
+ /**
+ * Send a notification that all Topology Elements are removed.
+ */
+ void notificationSendAllTopologyElementsRemoved();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 012f333..44607a9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -31,6 +31,7 @@
import net.onrc.onos.ofcontroller.flowmanager.web.FlowWebRoutable;
import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
import net.onrc.onos.ofcontroller.topology.Topology;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.*;
import org.openflow.protocol.OFType;
@@ -76,6 +77,10 @@
protected BlockingQueue<EventEntry<FlowPath>> flowPathEvents =
new LinkedBlockingQueue<EventEntry<FlowPath>>();
+ // The queue with Topology Element updates
+ protected BlockingQueue<EventEntry<TopologyElement>> topologyEvents =
+ new LinkedBlockingQueue<EventEntry<TopologyElement>>();
+
/**
* Periodic task for reading the Flow Entries and pushing changes
* into the switches.
@@ -494,8 +499,19 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
- // Register with the Datagrid Service and obtain the initial state
+ // Register with the Datagrid Service
datagridService.registerFlowService(this);
+
+ // Obtain the initial Topology state
+ Collection<TopologyElement> topologyElements =
+ datagridService.getAllTopologyElements();
+ for (TopologyElement topologyElement : topologyElements) {
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
+ topologyEvents.add(eventEntry);
+ }
+
+ // Obtain the initial Flow state
Collection<FlowPath> flowPaths = datagridService.getAllFlows();
for (FlowPath flowPath : flowPaths) {
EventEntry<FlowPath> eventEntry =
@@ -864,4 +880,34 @@
public void notificationRecvFlowUpdated(FlowPath flowPath) {
// TODO
}
+
+ /**
+ * Receive a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ @Override
+ public void notificationRecvTopologyElementAdded(TopologyElement topologyElement) {
+ // TODO
+ }
+
+ /**
+ * Receive a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ @Override
+ public void notificationRecvTopologyElementRemoved(TopologyElement topologyElement) {
+ // TODO
+ }
+
+ /**
+ * Receive a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ @Override
+ public void notificationRecvTopologyElementUpdated(TopologyElement topologyElement) {
+ // TODO
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index 8c908cf..a15f56c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -4,6 +4,7 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
import net.onrc.onos.ofcontroller.util.FlowId;
@@ -136,4 +137,25 @@
* @param flowPath the flow that is updated.
*/
void notificationRecvFlowUpdated(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ void notificationRecvTopologyElementAdded(TopologyElement topologyElement);
+
+ /**
+ * Receive a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ void notificationRecvTopologyElementRemoved(TopologyElement topologyElement);
+
+ /**
+ * Receive a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ void notificationRecvTopologyElementUpdated(TopologyElement topologyElement);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyElement.java b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyElement.java
new file mode 100644
index 0000000..6809125
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyElement.java
@@ -0,0 +1,90 @@
+package net.onrc.onos.ofcontroller.topology;
+
+/**
+ * Class for storing information about a Topology Element: Switch, Port or
+ * Link.
+ */
+public class TopologyElement {
+ /**
+ * The Element Type.
+ */
+ enum Type {
+ ELEMENT_SWITCH, // Network Switch
+ ELEMENT_PORT, // Switch Port
+ ELEMENT_LINK // Unidirectional Link between Switch Ports
+ }
+
+ private Type elementType; // The element type
+ private long fromSwitchDpid = 0; // The Switch DPID
+ private int fromSwitchPort = 0; // The Switch Port
+ private long toSwitchDpid = 0; // The Neighbor Switch DPID
+ private int toSwitchPort = 0; // The Neighbor Switch Port
+
+ /**
+ * Constructor to create a Topology Element for a Switch.
+ *
+ * @param switchDpid the Switch DPID.
+ */
+ public TopologyElement(long switchDpid) {
+ this.elementType = Type.ELEMENT_SWITCH;
+ this.fromSwitchDpid = switchDpid;
+ }
+
+ /**
+ * Constructor to create a Topology Element for a Switch Port.
+ *
+ * @param switchDpid the Switch DPID.
+ * @param switchPort the Switch Port.
+ */
+ public TopologyElement(long switchDpid, int switchPort) {
+ this.elementType = Type.ELEMENT_PORT;
+ this.fromSwitchDpid = switchDpid;
+ this.fromSwitchPort = switchPort;
+ }
+
+ /**
+ * Constructor to create a Topology Element for an unidirectional Link
+ * between Switch Ports.
+ *
+ * @param fromSwitchDpid the Switch DPID the Link begins from.
+ * @param fromSwitchPort the Switch Port the Link begins from.
+ * @param toSwitchDpid the Switch DPID the Link ends to.
+ * @param toSwitchPort the Switch Port the Link ends to.
+ */
+ public TopologyElement(long fromSwitchDpid, int fromSwitchPort,
+ long toSwitchDpid, int toSwitchPort) {
+ this.elementType = Type.ELEMENT_LINK;
+ this.fromSwitchDpid = fromSwitchDpid;
+ this.fromSwitchPort = fromSwitchPort;
+ this.toSwitchDpid = toSwitchDpid;
+ this.toSwitchPort = toSwitchPort;
+ }
+
+ /**
+ * Get the Topology Element ID.
+ *
+ * The Topology Element ID has the following format:
+ * - Switch: "Switch=<Dpid>"
+ * Example: "Switch=00:00:00:00:00:00:00:01"
+ * - Switch Port: "Port=<Dpid>/<PortId>"
+ * Example: "Port=00:00:00:00:00:00:00:01/1"
+ * - Link: "Link=<FromDpid>/<FromPortId>/<ToDpid>/<ToPortId>"
+ * Example: "Link=00:00:00:00:00:00:00:01/1/00:00:00:00:00:00:00:02/1"
+ *
+ * @return the Topology Element ID.
+ */
+ public String elementId() {
+ switch (elementType) {
+ case ELEMENT_SWITCH:
+ return "Switch=" + fromSwitchDpid;
+ case ELEMENT_PORT:
+ return "Port=" + fromSwitchDpid + "/" + fromSwitchPort;
+ case ELEMENT_LINK:
+ return "Link=" + fromSwitchDpid + "/" + fromSwitchPort +
+ toSwitchDpid + "/" + toSwitchPort;
+ }
+
+ assert(false);
+ return null;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
index 405a3ee..e0da7a8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/serializers/KryoFactory.java
@@ -1,11 +1,14 @@
package net.onrc.onos.ofcontroller.util.serializers;
import java.util.ArrayList;
+
import com.esotericsoftware.kryo2.Kryo;
-import net.onrc.onos.ofcontroller.util.*;
import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.ofcontroller.util.*;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
+
/**
* Class factory for allocating Kryo instances for
* serialization/deserialization of classes.
@@ -81,6 +84,8 @@
// kryo.setReferences(false);
//
kryo.register(ArrayList.class);
+
+ // FlowPath and related classes
kryo.register(CallerId.class);
kryo.register(DataPath.class);
kryo.register(DataPathEndpoints.class);
@@ -117,6 +122,9 @@
kryo.register(Switch.class);
kryo.register(SwitchPort.class);
+ // Topology-related classes
+ kryo.register(TopologyElement.class);
+
return kryo;
}
}