Merge branch 'ONOS-ONRC2014-Measurements' of github.com:OPENNETWORKINGLAB/ONOS into RAMCloud-merge
Conflicts:
src/main/java/net/onrc/onos/graph/GraphDBOperation.java
src/main/java/net/onrc/onos/graph/IDBOperation.java
src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 6483121..c195f82 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -21,10 +21,12 @@
import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import org.slf4j.Logger;
@@ -73,12 +75,24 @@
private MapFlowEntryListener mapFlowEntryListener = null;
private String mapFlowEntryListenerId = null;
+ // State related to the Flow ID map
+ protected static final String mapFlowIdName = "mapFlowId";
+ private IMap<Long, byte[]> mapFlowId = null;
+ private MapFlowIdListener mapFlowIdListener = null;
+ private String mapFlowIdListenerId = null;
+
+ // State related to the Flow Entry ID map
+ protected static final String mapFlowEntryIdName = "mapFlowEntryId";
+ private IMap<Long, byte[]> mapFlowEntryId = null;
+ private MapFlowEntryIdListener mapFlowEntryIdListener = null;
+ private String mapFlowEntryIdListenerId = null;
+
// State related to the Network Topology map
protected static final String mapTopologyName = "mapTopology";
private IMap<String, byte[]> mapTopology = null;
private MapTopologyListener mapTopologyListener = null;
private String mapTopologyListenerId = null;
-
+
// State related to the ARP map
protected static final String arpMapName = "arpMap";
private IMap<ArpMessage, byte[]> arpMap = null;
@@ -98,8 +112,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryAdded(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -116,8 +131,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryRemoved(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -134,8 +150,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryUpdated(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -152,6 +169,7 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryEvicted(EntryEvent<Long, byte[]> event) {
// NOTE: We don't use eviction for this map
}
@@ -170,8 +188,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryAdded(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -188,8 +207,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryRemoved(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -206,8 +226,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryUpdated(EntryEvent<Long, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -224,6 +245,169 @@
*
* @param event the notification event for the entry.
*/
+ @Override
+ public void entryEvicted(EntryEvent<Long, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
+ * Class for receiving notifications for FlowId state.
+ *
+ * The datagrid map is:
+ * - Key : FlowId (Long)
+ * - Value : Serialized Switch Dpid (byte[])
+ */
+ class MapFlowIdListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowId flowId = new FlowId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowId flowId = new FlowId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowId flowId = new FlowId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<Long, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
+ * Class for receiving notifications for FlowEntryId state.
+ *
+ * The datagrid map is:
+ * - Key : FlowEntryId (Long)
+ * - Value : Serialized Switch Dpid (byte[])
+ */
+ class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
public void entryEvicted(EntryEvent<Long, byte[]> event) {
// NOTE: We don't use eviction for this map
}
@@ -242,8 +426,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryAdded(EntryEvent<String, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -261,8 +446,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryRemoved(EntryEvent<String, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -280,8 +466,9 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryUpdated(EntryEvent<String, byte[]> event) {
- byte[] valueBytes = (byte[])event.getValue();
+ byte[] valueBytes = event.getValue();
//
// Decode the value and deliver the notification
@@ -299,11 +486,12 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryEvicted(EntryEvent<String, byte[]> event) {
// NOTE: We don't use eviction for this map
}
}
-
+
/**
* Class for receiving notifications for ARP requests.
*
@@ -317,11 +505,12 @@
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryAdded(EntryEvent<ArpMessage, byte[]> event) {
for (IArpEventHandler arpEventHandler : arpEventHandlers) {
arpEventHandler.arpRequestNotification(event.getKey());
}
-
+
//
// Decode the value and deliver the notification
//
@@ -334,30 +523,33 @@
flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
*/
}
-
+
/**
* Receive a notification that an entry is removed.
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryRemoved(EntryEvent<ArpMessage, byte[]> event) {
// Not used
}
-
+
/**
* Receive a notification that an entry is updated.
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryUpdated(EntryEvent<ArpMessage, byte[]> event) {
// Not used
}
-
+
/**
* Receive a notification that an entry is evicted.
*
* @param event the notification event for the entry.
*/
+ @Override
public void entryEvicted(EntryEvent<ArpMessage, byte[]> event) {
// Not used
}
@@ -374,7 +566,7 @@
System.setProperty("hazelcast.socket.send.buffer.size", "32");
*/
// System.setProperty("hazelcast.heartbeat.interval.seconds", "100");
-
+
// Init from configuration file
try {
hazelcastConfig = new FileSystemXmlConfig(configFilename);
@@ -395,7 +587,8 @@
/**
* Shutdown the Hazelcast Datagrid operation.
*/
- public void finalize() {
+ @Override
+ protected void finalize() {
close();
}
@@ -413,7 +606,7 @@
*/
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
- Collection<Class<? extends IFloodlightService>> l =
+ Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IDatagridService.class);
return l;
@@ -425,10 +618,10 @@
* @return the collection of implemented services.
*/
@Override
- public Map<Class<? extends IFloodlightService>, IFloodlightService>
+ public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
Map<Class<? extends IFloodlightService>,
- IFloodlightService> m =
+ IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
m.put(IDatagridService.class, this);
@@ -441,7 +634,7 @@
* @return the collection of modules this module depends on.
*/
@Override
- public Collection<Class<? extends IFloodlightService>>
+ public Collection<Class<? extends IFloodlightService>>
getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
@@ -477,7 +670,7 @@
hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
restApi.addRestletRoutable(new DatagridWebRoutable());
-
+
arpMap = hazelcastInstance.getMap(arpMapName);
arpMap.addEntryListener(new ArpMapListener(), true);
}
@@ -504,6 +697,16 @@
mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
+ // Initialize the FlowId-related map state
+ mapFlowIdListener = new MapFlowIdListener();
+ mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+ mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
+ // Initialize the FlowEntryId-related map state
+ mapFlowEntryIdListener = new MapFlowEntryIdListener();
+ mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
+ mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
+
// Initialize the Topology-related map state
mapTopologyListener = new MapTopologyListener();
mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -531,6 +734,16 @@
mapFlowEntry = null;
mapFlowEntryListener = null;
+ // Clear the FlowId-related map state
+ mapFlowId.removeEntryListener(mapFlowIdListenerId);
+ mapFlowId = null;
+ mapFlowIdListener = null;
+
+ // Clear the FlowEntryId-related map state
+ mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
+ mapFlowEntryId = null;
+ mapFlowEntryIdListener = null;
+
// Clear the Topology-related map state
mapTopology.removeEntryListener(mapTopologyListenerId);
mapTopology = null;
@@ -538,19 +751,19 @@
this.flowEventHandlerService = null;
}
-
+
@Override
public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
if (arpEventHandler != null) {
arpEventHandlers.add(arpEventHandler);
}
}
-
+
@Override
public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
arpEventHandlers.remove(arpEventHandler);
}
-
+
/**
* Get all Flows that are currently in the datagrid.
*
@@ -788,6 +1001,216 @@
}
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that are currently in the datagrid.
+ */
+ @Override
+ public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
+ Collection<Pair<FlowId, Dpid>> allFlowIds =
+ new LinkedList<Pair<FlowId, Dpid>>();
+
+ //
+ // Get all current entries
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
+ Long key = entry.getKey();
+ byte[] valueBytes = entry.getValue();
+
+ FlowId flowId = new FlowId(key);
+
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+
+ Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
+ allFlowIds.add(pair);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowIds;
+ }
+
+ /**
+ * Get all Flow Entry IDs that are currently in the datagrid.
+ *
+ * @return all Flow Entry IDs that ae currently in the datagrid.
+ */
+ @Override
+ public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
+ Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
+ new LinkedList<Pair<FlowEntryId, Dpid>>();
+
+ //
+ // Get all current entries
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
+ Long key = entry.getKey();
+ byte[] valueBytes = entry.getValue();
+
+ FlowEntryId flowEntryId = new FlowEntryId(key);
+
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+
+ Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
+ allFlowEntryIds.add(pair);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowEntryIds;
+ }
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, dpid);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowId.putAsync(flowId.value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ @Override
+ public void notificationSendFlowIdRemoved(FlowId flowId) {
+ //
+ // Remove the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowId.removeAsync(flowId.value());
+ }
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowIdAdded(flowId, dpid);
+ }
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ @Override
+ public void notificationSendAllFlowIdsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowId.clear();
+ Set<Long> keySet = mapFlowId.keySet();
+ for (Long key : keySet) {
+ mapFlowId.removeAsync(key);
+ }
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, dpid);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowEntryId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ */
+ @Override
+ public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
+ //
+ // Remove the entry:
+ // - Key : FlowEntryId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowEntryId.removeAsync(flowEntryId.value());
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowEntryIdAdded(flowEntryId, dpid);
+ }
+
+ /**
+ * Send a notification that all Flow Entry IDs are removed.
+ */
+ @Override
+ public void notificationSendAllFlowEntryIdsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowEntryId.clear();
+ Set<Long> keySet = mapFlowEntryId.keySet();
+ for (Long key : keySet) {
+ mapFlowEntryId.removeAsync(key);
+ }
+ }
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
@@ -883,7 +1306,7 @@
mapTopology.removeAsync(key);
}
}
-
+
@Override
public void sendArpRequest(ArpMessage arpMessage) {
//log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..a855798 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -7,10 +7,12 @@
import net.onrc.onos.ofcontroller.proxyarp.ArpMessage;
import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
/**
* Interface for providing Datagrid Service to other modules.
@@ -134,6 +136,77 @@
void notificationSendAllFlowEntriesRemoved();
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that ae currently in the datagrid.
+ */
+ Collection<Pair<FlowId, Dpid>> getAllFlowIds();
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid.
+ */
+ void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid);
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ void notificationSendFlowIdRemoved(FlowId flowId);
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid.
+ */
+ void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid);
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ void notificationSendAllFlowIdsRemoved();
+
+ /**
+ * Get all Flow Entry IDs that are currently in the datagrid.
+ *
+ * @return all Flow Entry IDs that ae currently in the datagrid.
+ */
+ Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds();
+
+ /**
+ * Send a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid.
+ */
+ void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+ /**
+ * Send a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ */
+ void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId);
+
+ /**
+ * Send a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid.
+ */
+ void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
+ * Send a notification that all Flow Entry IDs are removed.
+ */
+ void notificationSendAllFlowEntryIdsRemoved();
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
index 80a6338..27de2c0 100644
--- a/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
+++ b/src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
@@ -1,5 +1,7 @@
package net.onrc.onos.graph;
+import java.util.Map;
+
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
import org.slf4j.Logger;
@@ -35,6 +37,8 @@
}
+ @Override
+ //public void edgeRemoved(Edge e, Map<String, Object> arg1) {
public void edgeRemoved(Edge e) {
// TODO Auto-generated method stub
// Fire NetMapEvents (LinkRemoved, FlowEntryRemoved, HostRemoved, PortRemoved)
@@ -72,6 +76,8 @@
}
+ @Override
+ //public void vertexRemoved(Vertex vertex, Map<String, Object> arg1) {
public void vertexRemoved(Vertex vertex) {
// TODO Auto-generated method stub
// Generate NetMapEvents
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
index f469911..ddc7527 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
@@ -60,7 +60,7 @@
* It will close the DB connection. This is for Java garbage collection.
*/
@Override
- public void finalize() {
+ protected void finalize() {
close();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
index b34a2fc..8882a55 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
@@ -27,6 +27,7 @@
protected DBOperation dbop;
private static PerfMon pm = PerfMon.getInstance();
+
/**
* Initialize the object. Open LinkStorage using given configuration file.
* @param conf Path (absolute path for now) to configuration file.
@@ -432,7 +433,7 @@
* Finalize the object.
*/
@Override
- public void finalize() {
+ protected void finalize() {
close();
}
@@ -493,6 +494,8 @@
log.error("LinkStorageImpl:addLinkImpl failed link exists {} {} src {} dst {}",
new Object[]{dbop, lt, vportSrc, vportDst});
}
+ } else {
+ log.error("Ports not found : {}", lt);
}
return success;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
index 7c474bd..0dcf4297 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
@@ -48,7 +48,7 @@
* It will close the DB connection.
*/
@Override
- public void finalize() {
+ protected void finalize() {
close();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
index cf73c9c..24392df 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
@@ -23,18 +23,18 @@
protected DBOperation dbop;
protected final static Logger log = LoggerFactory.getLogger(TopoLinkServiceImpl.class);
- public void finalize() {
+ @Override
+ protected void finalize() {
close();
}
-
+
@Override
public void close() {
dbop.close();
}
-
+
@Override
public List<Link> getActiveLinks() {
- // TODO Auto-generated method stub
dbop = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
//dbop = GraphDBManager.getDBOperation("", "");
//dbop.commit(); //Commit to ensure we see latest data
@@ -47,12 +47,12 @@
pipe.start(sw.asVertex());
pipe.enablePath(true);
pipe.out("on").out("link").in("on").path().step(extractor);
-
+
while (pipe.hasNext() ) {
Link l = pipe.next();
links.add(l);
}
-
+
}
dbop.commit();
return links;
@@ -68,7 +68,7 @@
pipe.start(sw.asVertex());
pipe.enablePath(true);
pipe.out("on").out("link").in("on").path().step(extractor);
-
+
while (pipe.hasNext() ) {
Link l = pipe.next();
links.add(l);
@@ -84,7 +84,7 @@
long d_dpid = 0;
short s_port = 0;
short d_port = 0;
-
+
List<?> V = pipe.next();
Vertex src_sw = (Vertex)V.get(0);
Vertex dest_sw = (Vertex)V.get(3);
@@ -94,9 +94,9 @@
d_dpid = HexString.toLong((String) dest_sw.getProperty("dpid"));
s_port = (Short) src_port.getProperty("number");
d_port = (Short) dest_port.getProperty("number");
-
+
Link l = new Link(s_dpid,s_port,d_dpid,d_port);
-
+
return l;
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
index 3f7090b..f417d96 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
@@ -10,7 +10,7 @@
import org.slf4j.LoggerFactory;
public class TopoSwitchServiceImpl implements ITopoSwitchService {
-
+
private DBOperation op;
protected final static Logger log = LoggerFactory.getLogger(TopoSwitchServiceImpl.class);
@@ -22,16 +22,17 @@
public TopoSwitchServiceImpl() {
this("","");
}
-
- public void finalize() {
+
+ @Override
+ protected void finalize() {
close();
}
-
+
@Override
public void close() {
op.close();
}
-
+
@Override
public Iterable<ISwitchObject> getActiveSwitches() {
// TODO Auto-generated method stub
@@ -67,5 +68,5 @@
public IPortObject getPortOnSwitch(String dpid, short port_num) {
// TODO Auto-generated method stub
return null;
- }
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index af7bca8..1f0c163 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -3,9 +3,11 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
+import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.util.MACAddress;
import net.onrc.onos.graph.DBOperation;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IBaseObject;
@@ -139,7 +141,7 @@
//
flowProp.setFlowId(flowPath.flowId().toString());
if ( measureONOSFlowTimeProp ) {
- numPropsSet += 2;
+ numPropsSet += 1;
}
//
@@ -613,11 +615,7 @@
}
}
- // TODO: Hacks with hard-coded state names!
- if (found)
- flowProp.setUserState("FE_USER_MODIFY");
- else
- flowProp.setUserState("FE_USER_ADD");
+ flowProp.setUserState(flowEntry.flowEntryUserState().toString());
flowProp.setSwitchState(flowEntry.flowEntrySwitchState().toString());
if (measureONOSFlowEntryTimeProp) {
numProperties += 2;
@@ -804,6 +802,77 @@
}
/**
+ * Get a previously added flow entry.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowEntryId the Flow Entry ID of the flow entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ static FlowEntry getFlowEntry(DBOperation dbHandler,
+ FlowEntryId flowEntryId) {
+ IFlowEntry flowEntryObj = null;
+ try {
+ flowEntryObj = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getFlowEntry FlowEntryId:{} failed", flowEntryId);
+ return null;
+ }
+ if (flowEntryObj == null) {
+ dbHandler.commit();
+ return null; // Flow not found
+ }
+
+ //
+ // Extract the Flow Entry state
+ //
+ FlowEntry flowEntry = extractFlowEntry(flowEntryObj);
+ dbHandler.commit();
+
+ return flowEntry;
+ }
+
+ /**
+ * Get the source switch DPID of a previously added flow.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to get.
+ * @return the source switch DPID if found, otherwise null.
+ */
+ static Dpid getFlowSourceDpid(DBOperation dbHandler, FlowId flowId) {
+ IFlowPath flowObj = null;
+ try {
+ flowObj = dbHandler.searchFlowPath(flowId);
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getFlowSourceDpid FlowId:{} failed", flowId);
+ return null;
+ }
+ if (flowObj == null) {
+ dbHandler.commit();
+ return null; // Flow not found
+ }
+
+ //
+ // Extract the Flow Source DPID
+ //
+ String srcSwitchStr = flowObj.getSrcSwitch();
+ if (srcSwitchStr == null) {
+ // TODO: A work-around, becauuse of some bogus database objects
+ dbHandler.commit();
+ return null;
+ }
+
+ Dpid dpid = new Dpid(srcSwitchStr);
+
+ dbHandler.commit();
+
+ return dpid;
+ }
+
+ /**
* Get all installed flows by all installers.
*
* @param dbHandler the Graph Database handler to use.
@@ -841,12 +910,88 @@
}
/**
+ * Get all installed flows whose Source Switch is controlled by this
+ * instance.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param mySwitches the collection of the switches controlled by this
+ * instance.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getAllMyFlows(DBOperation dbHandler,
+ Map<Long, IOFSwitch> mySwitches) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllMyFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ for (IFlowPath flowObj : flowPathsObj) {
+ //
+ // Extract the Source Switch DPID and ignore if the switch
+ // is not controlled by this instance.
+ //
+ String srcSwitchStr = flowObj.getSrcSwitch();
+ if (srcSwitchStr == null) {
+ // TODO: A work-around, becauuse of some bogus database objects
+ continue;
+ }
+ Dpid dpid = new Dpid(srcSwitchStr);
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ //
+ // Extract the Flow state
+ //
+ FlowPath flowPath = extractFlowPath(flowObj);
+ if (flowPath != null)
+ flowPaths.add(flowPath);
+ }
+
+ dbHandler.commit();
+
+ return flowPaths;
+ }
+
+ /**
+ * Get a subset of installed flows.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowIds the collection of Flow IDs to get.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
+ Collection<FlowId> flowIds) {
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ // TODO: This implementation should use threads
+ for (FlowId flowId : flowIds) {
+ FlowPath flowPath = getFlow(dbHandler, flowId);
+ if (flowPath != null)
+ flowPaths.add(flowPath);
+ }
+ // dbHandler.commit();
+
+ return flowPaths;
+ }
+
+ /**
* Extract Flow Path State from a Titan Database Object @ref IFlowPath.
*
* @param flowObj the object to extract the Flow Path State from.
* @return the extracted Flow Path State.
*/
- private static FlowPath extractFlowPath(IFlowPath flowObj) {
+ static FlowPath extractFlowPath(IFlowPath flowObj) {
//
// Extract the Flow state
//
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 001fb3c..cf9b67d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -15,11 +15,15 @@
import java.util.concurrent.LinkedBlockingQueue;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.graph.GraphDBManager;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.topology.TopologyManager;
import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.EventEntry;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
@@ -31,6 +35,8 @@
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Pair;
+import net.onrc.onos.ofcontroller.util.Port;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import com.esotericsoftware.kryo2.Kryo;
@@ -46,17 +52,16 @@
* - Detect FlowPaths impacted by Topology change.
* - Recompute impacted FlowPath using cached Topology.
*/
-class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+class FlowEventHandler extends Thread implements IFlowEventHandlerService,
+ IOFSwitchListener {
+
+ private boolean enableOnrc2014MeasurementsFlows = true;
+ private boolean enableOnrc2014MeasurementsTopology = true;
+
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
- // Flag to refresh Topology object periodically
- private final static boolean refreshTopology = false;
- // Refresh delay(ms)
- private final static long refreshTopologyDelay = 5000;
- // Refresh interval(ms)
- private final static long refreshTopologyInterval = 2000;
- private Timer refreshTopologyTimer;
+ private DBOperation dbHandler;
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
@@ -74,6 +79,12 @@
new LinkedList<EventEntry<FlowPath>>();
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ private List<EventEntry<Pair<FlowId, Dpid>>> flowIdEvents =
+ new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
+ private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
+ new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
+ private List<EventEntry<Dpid>> switchDpidEvents =
+ new LinkedList<EventEntry<Dpid>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -119,6 +130,8 @@
* Startup processing.
*/
private void startup() {
+ this.dbHandler = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
+
//
// Obtain the initial Topology state
//
@@ -148,32 +161,33 @@
flowEntryEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowId state
+ //
+ Collection<Pair<FlowId, Dpid>> flowIds =
+ datagridService.getAllFlowIds();
+ for (Pair<FlowId, Dpid> pair : flowIds) {
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+ flowIdEvents.add(eventEntry);
+ }
+
+ //
+ // Obtain the initial FlowEntryId state
+ //
+ Collection<Pair<FlowEntryId, Dpid>> flowEntryIds =
+ datagridService.getAllFlowEntryIds();
+ for (Pair<FlowEntryId, Dpid> pair : flowEntryIds) {
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+ flowEntryIdEvents.add(eventEntry);
+ }
+
// Process the initial events (if any)
synchronized (allFlowPaths) {
processEvents();
}
- if (refreshTopology) {
- refreshTopologyTimer = new Timer();
- refreshTopologyTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- PerfMon pm = PerfMon.getInstance();
- log.debug("[BEFORE] {}", topology);
- long begin, end;
- synchronized(topology) {
- begin = System.nanoTime();
- pm.read_whole_topology_start();
- topology.readFromDatabase(flowManager.dbHandlerInner);
- pm.read_whole_topology_end();
- end = System.nanoTime();
- }
- // FIXME level raised for measurement. Was debug
- log.error("[AFTER] {}", topology);
- log.error("refresh takes : {}[us]", (end - begin) / 1000.0);
- }
- }, refreshTopologyDelay, refreshTopologyInterval);
- }
}
/**
@@ -199,12 +213,15 @@
// - EventEntry<TopologyElement>
// - EventEntry<FlowPath>
// - EventEntry<FlowEntry>
+ // - EventEntry<Pair<FlowId, Dpid>>
+ // - EventEntry<Pair<FlowEntryId, Dpid>>
//
for (EventEntry<?> event : collection) {
// Topology event
if (event.eventData() instanceof TopologyElement) {
EventEntry<TopologyElement> topologyEventEntry =
(EventEntry<TopologyElement>)event;
+
topologyEvents.add(topologyEventEntry);
continue;
}
@@ -224,6 +241,34 @@
flowEntryEvents.add(flowEntryEventEntry);
continue;
}
+
+ // FlowId event
+ if (event.eventData() instanceof Pair) {
+ EventEntry<Pair<FlowId, Dpid>> flowIdEventEntry =
+ (EventEntry<Pair<FlowId, Dpid>>)event;
+ flowIdEvents.add(flowIdEventEntry);
+ continue;
+ }
+
+ // Switch Dpid event
+ if (event.eventData() instanceof Dpid) {
+ EventEntry<Dpid> switchDpidEventEntry =
+ (EventEntry<Dpid>)event;
+ switchDpidEvents.add(switchDpidEventEntry);
+ continue;
+ }
+
+ // FlowEntryId event
+ // TODO: Fix the code below if we need again to handle
+ // the FlowEntryId events
+ /*
+ if (event.eventData() instanceof Pair) {
+ EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
+ (EventEntry<Pair<FlowEntryId, Dpid>>)event;
+ flowEntryIdEvents.add(flowEntryIdEventEntry);
+ continue;
+ }
+ */
}
collection.clear();
@@ -236,13 +281,89 @@
log.debug("Exception processing Network Events: ", exception);
}
}
-
+
/**
* Process the events (if any)
*/
private void processEvents() {
Collection<FlowEntry> modifiedFlowEntries;
+ if (enableOnrc2014MeasurementsFlows) {
+
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
+ switchDpidEvents.isEmpty()) {
+ return; // Nothing to do
+ }
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ // Process the Switch Dpid events
+ processSwitchDpidEvents();
+
+ // Process the Flow ID events
+ processFlowIdEvents(mySwitches);
+
+ // Fetch the topology
+ processTopologyEvents();
+
+ // Recompute all affected Flow Paths and keep only the modified
+ for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ // Assign the Flow Entry ID as needed
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+
+ //
+ // Push the modified state to the database
+ //
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ //
+ // Delete the Flow Path from the Network Map
+ //
+ if (flowPath.flowPathUserState() ==
+ FlowPathUserState.FP_USER_DELETE) {
+ log.debug("Deleting Flow Path From Database: {}", flowPath);
+ // TODO: For now the deleting of a Flow Path is blocking
+ ParallelFlowDatabaseOperation.deleteFlow(dbHandler,
+ flowPath.flowId());
+ // Send the notifications for the deleted Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ }
+
+ continue;
+ }
+
+ log.debug("Pushing Flow Path To Database: {}", flowPath);
+ //
+ // Write the Flow Path to the Network Map
+ //
+ ParallelFlowDatabaseOperation.addFlow(dbHandler, flowPath,
+ datagridService);
+ }
+
+ // Cleanup
+ topologyEvents.clear();
+ flowIdEvents.clear();
+ switchDpidEvents.clear();
+ //
+ // NOTE: Keep a cache with my Flow Paths
+ // allFlowPaths.clear();
+ shouldRecomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+
+ return;
+ }
+
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
return; // Nothing to do
@@ -379,6 +500,246 @@
}
/**
+ * Fix a flow fetched from the database.
+ *
+ * @param flowPath the Flow to fix.
+ */
+ private void fixFlowFromDatabase(FlowPath flowPath) {
+ //
+ // TODO: Bug workaround / fix :
+ // method FlowDatabaseOperation.extractFlowEntry() doesn't
+ // fetch the inPort and outPort, hence we assign them here.
+ //
+ // Assign the inPort and outPort for the Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ // Set the inPort
+ do {
+ if (flowEntry.inPort() != null)
+ break;
+ if (flowEntry.flowEntryMatch() == null)
+ break;
+ Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
+ flowEntry.setInPort(inPort);
+ } while (false);
+
+ // Set the outPort
+ do {
+ if (flowEntry.outPort() != null)
+ break;
+ for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+ if (fa.actionOutput() != null) {
+ Port outPort = new Port(fa.actionOutput().port().value());
+ flowEntry.setOutPort(outPort);
+ break;
+ }
+ }
+ } while (false);
+ }
+ }
+
+ /**
+ * Process the Switch Dpid events.
+ */
+ private void processSwitchDpidEvents() {
+ Map<Long, Dpid> addedSwitches = new HashMap<Long, Dpid>();
+ Map<Long, Dpid> removedSwitches = new HashMap<Long, Dpid>();
+
+ //
+ // Process all Switch Dpid events and update the appropriate state
+ //
+ for (EventEntry<Dpid> eventEntry : switchDpidEvents) {
+ Dpid dpid = eventEntry.eventData();
+
+ log.debug("SwitchDpid Event: {} {}", eventEntry.eventType(), dpid);
+
+ // Compute the final set of added and removed switches
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ addedSwitches.put(dpid.value(), dpid);
+ removedSwitches.remove(dpid.value());
+ break;
+ case ENTRY_REMOVE:
+ addedSwitches.remove(dpid.value());
+ removedSwitches.put(dpid.value(), dpid);
+ break;
+ }
+ }
+
+ //
+ // Remove the Flows from the local cache if the removed
+ // switch is the Source Switch.
+ //
+ // TODO: This search can be expensive for a large number of flows
+ // and should be optmized.
+ //
+ List<FlowId> deleteFlowIds = new LinkedList<FlowId>();
+ for (Dpid switchDpid : removedSwitches.values()) {
+ for (FlowPath flowPath : allFlowPaths.values()) {
+ Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+ if (srcDpid.value() == switchDpid.value())
+ deleteFlowIds.add(flowPath.flowId());
+ }
+ }
+ //
+ // Remove the Flows from the local cache
+ //
+ for (FlowId flowId : deleteFlowIds)
+ allFlowPaths.remove(flowId.value());
+
+ // Get the Flows for the added switches
+ Collection<FlowPath> flowPaths =
+ ParallelFlowDatabaseOperation.getFlowsForSwitches(dbHandler,
+ addedSwitches.values());
+ for (FlowPath flowPath : flowPaths) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
+ * Process the Flow ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ */
+ private void processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowId> shouldFetchMyFlowIds = new LinkedList<FlowId>();
+
+ //
+ // Process all Flow Id events and update the appropriate state
+ //
+ for (EventEntry<Pair<FlowId, Dpid>> eventEntry : flowIdEvents) {
+ Pair<FlowId, Dpid> pair = eventEntry.eventData();
+ FlowId flowId = pair.first;
+ Dpid dpid = pair.second;
+
+ log.debug("Flow ID Event: {} {} {}", eventEntry.eventType(),
+ flowId, dpid);
+
+ //
+ // Ignore Flows if the Source Switch is not controlled by this
+ // instance.
+ //
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD: {
+ //
+ // Add a new Flow Path
+ //
+ if (allFlowPaths.get(flowId.value()) != null) {
+ //
+ // TODO: What to do if the Flow Path already exists?
+ // Fow now, we just re-add it.
+ //
+ }
+ shouldFetchMyFlowIds.add(flowId);
+
+ break;
+ }
+
+ case ENTRY_REMOVE: {
+ //
+ // Remove an existing Flow Path.
+ //
+ // Find the Flow Path, and mark the Flow and its Flow Entries
+ // for deletion.
+ //
+ FlowPath existingFlowPath =
+ allFlowPaths.get(flowId.value());
+ if (existingFlowPath == null)
+ continue; // Nothing to do
+
+ existingFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_DELETE);
+ for (FlowEntry flowEntry : existingFlowPath.flowEntries()) {
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+
+ // Remove the Flow Path from the internal state
+ Long key = existingFlowPath.flowId().value();
+ allFlowPaths.remove(key);
+ shouldRecomputeFlowPaths.remove(key);
+ modifiedFlowPaths.put(key, existingFlowPath);
+
+ break;
+ }
+ }
+ }
+
+ // Get my Flows
+ Collection<FlowPath> myFlows =
+ ParallelFlowDatabaseOperation.getFlows(dbHandler,
+ shouldFetchMyFlowIds);
+
+ for (FlowPath flowPath : myFlows) {
+ fixFlowFromDatabase(flowPath);
+
+ switch (flowPath.flowPathType()) {
+ case FP_TYPE_SHORTEST_PATH:
+ //
+ // Reset the Data Path, in case it was set already, because
+ // we are going to recompute it anyway.
+ //
+ flowPath.flowEntries().clear();
+ shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+ flowPath);
+ break;
+ case FP_TYPE_EXPLICIT_PATH:
+ //
+ // Mark all Flow Entries for installation in the switches.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ break;
+ case FP_TYPE_UNKNOWN:
+ log.error("FlowPath event with unknown type");
+ break;
+ }
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
+ * Process the Flow Entry ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ * @return a collection of modified Flow Entries this instance needs
+ * to push to its own switches.
+ */
+ private Collection<FlowEntry> processFlowEntryIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+ //
+ // Process all Flow ID events and update the appropriate state
+ //
+ for (EventEntry<Pair<FlowEntryId, Dpid>> eventEntry : flowEntryIdEvents) {
+ Pair<FlowEntryId, Dpid> pair = eventEntry.eventData();
+ FlowEntryId flowEntryId = pair.first;
+ Dpid dpid = pair.second;
+
+ log.debug("Flow Entry ID Event: {} {} {}", eventEntry.eventType(),
+ flowEntryId, dpid);
+
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ // Fetch the Flow Entry
+ FlowEntry flowEntry = FlowDatabaseOperation.getFlowEntry(dbHandler,
+ flowEntryId);
+ if (flowEntry == null) {
+ log.debug("Flow Entry ID {} : Flow Entry not found!",
+ flowEntryId);
+ continue;
+ }
+ modifiedFlowEntries.add(flowEntry);
+ }
+
+ return modifiedFlowEntries;
+ }
+
+ /**
* Process the Flow Path events.
*/
private void processFlowPathEvents() {
@@ -464,15 +825,34 @@
* Process the Topology events.
*/
private void processTopologyEvents() {
+ boolean isTopologyModified = false;
+
+ if (enableOnrc2014MeasurementsTopology) {
+ if (topologyEvents.isEmpty())
+ return;
+
+ // TODO: Code for debugging purpose only
+ for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
+ TopologyElement topologyElement = eventEntry.eventData();
+ log.debug("Topology Event: {} {}", eventEntry.eventType(),
+ topologyElement.toString());
+ }
+
+ log.debug("[BEFORE] {}", topology.toString());
+ topology.readFromDatabase(dbHandler);
+ log.debug("[AFTER] {}", topology.toString());
+ shouldRecomputeFlowPaths.putAll(allFlowPaths);
+ return;
+ }
+
//
// Process all Topology events and update the appropriate state
//
- boolean isTopologyModified = false;
for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
TopologyElement topologyElement = eventEntry.eventData();
-
+
log.debug("Topology Event: {} {}", eventEntry.eventType(),
- topologyElement);
+ topologyElement.toString());
switch (eventEntry.eventType()) {
case ENTRY_ADD:
@@ -752,6 +1132,19 @@
private boolean recomputeFlowPath(FlowPath flowPath) {
boolean hasChanged = false;
+ if (enableOnrc2014MeasurementsFlows) {
+ // Cleanup the deleted Flow Entries from the earlier iteration
+ flowPath.dataPath().removeDeletedFlowEntries();
+
+ //
+ // TODO: Fake it that the Flow Entries have been already pushed
+ // into the switches, so we don't push them again.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+ }
+ }
+
//
// Test whether the Flow Path needs to be recomputed
//
@@ -772,7 +1165,6 @@
newDataPath = TopologyManager.computeNetworkPath(topology,
flowPath);
}
-
if (newDataPath == null) {
// We need the DataPath to compare the paths
newDataPath = new DataPath();
@@ -982,6 +1374,13 @@
*/
@Override
public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+ return;
+ }
+
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
networkEvents.add(eventEntry);
@@ -994,6 +1393,19 @@
*/
@Override
public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+ //
+ // NOTE: Must update the state to DELETE, because
+ // the notification contains the original state.
+ //
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+ return;
+ }
+
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_REMOVE, flowEntry);
networkEvents.add(eventEntry);
@@ -1006,6 +1418,13 @@
*/
@Override
public void notificationRecvFlowEntryUpdated(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+ return;
+ }
+
// NOTE: The ADD and UPDATE events are processed in same way
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
@@ -1013,6 +1432,101 @@
}
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
@@ -1050,6 +1564,40 @@
}
/**
+ * Receive a notification that a switch is added to this instance.
+ *
+ * @param sw the switch that is added.
+ */
+ @Override
+ public void addedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_ADD, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a switch is removed from this instance.
+ *
+ * @param sw the switch that is removed.
+ */
+ @Override
+ public void removedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_REMOVE, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that the ports on a switch have changed.
+ */
+ @Override
+ public void switchPortChanged(Long switchId) {
+ // Nothing to do
+ }
+
+ /**
* Get a sorted copy of all Flow Paths.
*
* @return a sorted copy of all Flow Paths.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 611b5c9..fc5ae34 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -50,8 +51,9 @@
* Flow Manager class for handling the network flows.
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
- // flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
- private final static boolean enableFlowPusher = false;
+
+ private boolean enableOnrc2014MeasurementsFlows = true;
+
protected DBOperation dbHandlerApi;
protected DBOperation dbHandlerInner;
@@ -98,7 +100,7 @@
* Shutdown the Flow Manager operation.
*/
@Override
- public void finalize() {
+ protected void finalize() {
close();
}
@@ -107,6 +109,7 @@
*/
@Override
public void close() {
+ floodlightProvider.removeOFSwitchListener(flowEventHandler);
datagridService.deregisterFlowEventHandlerService(flowEventHandler);
dbHandlerApi.close();
dbHandlerInner.close();
@@ -233,6 +236,7 @@
// - startup
//
flowEventHandler = new FlowEventHandler(this, datagridService);
+ floodlightProvider.addOFSwitchListener(flowEventHandler);
datagridService.registerFlowEventHandlerService(flowEventHandler);
flowEventHandler.start();
}
@@ -273,7 +277,13 @@
}
if (FlowDatabaseOperation.addFlow(dbHandlerApi, flowPath)) {
- datagridService.notificationSendFlowAdded(flowPath);
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
+ flowPath.dataPath().srcPort().dpid());
+ } else {
+ datagridService.notificationSendFlowAdded(flowPath);
+ }
+
return flowPath.flowId();
}
return null;
@@ -287,7 +297,11 @@
@Override
public boolean deleteAllFlows() {
if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
- datagridService.notificationSendAllFlowsRemoved();
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendAllFlowIdsRemoved();
+ } else {
+ datagridService.notificationSendAllFlowsRemoved();
+ }
return true;
}
return false;
@@ -302,7 +316,11 @@
@Override
public boolean deleteFlow(FlowId flowId) {
if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
- datagridService.notificationSendFlowRemoved(flowId);
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendFlowIdRemoved(flowId);
+ } else {
+ datagridService.notificationSendFlowRemoved(flowId);
+ }
return true;
}
return false;
@@ -320,6 +338,26 @@
}
/**
+ * Get a previously added flow entry.
+ *
+ * @param flowEntryId the Flow Entry ID of the flow entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
+ return FlowDatabaseOperation.getFlowEntry(dbHandlerApi, flowEntryId);
+ }
+
+ /**
+ * Get the source switch DPID of a previously added flow.
+ *
+ * @param flowId the Flow ID of the flow to get.
+ * @return the source switch DPID if found, otherwise null.
+ */
+ public Dpid getFlowSourceDpid(FlowId flowId) {
+ return FlowDatabaseOperation.getFlowSourceDpid(dbHandlerApi, flowId);
+ }
+
+ /**
* Get all installed flows by all installers.
*
* @return the Flow Paths if found, otherwise null.
@@ -330,6 +368,18 @@
}
/**
+ * Get all installed flows whose Source Switch is controlled by this
+ * instance.
+ *
+ * @param mySwitches the collection of the switches controlled by this
+ * instance.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ public ArrayList<FlowPath> getAllMyFlows(Map<Long, IOFSwitch> mySwitches) {
+ return FlowDatabaseOperation.getAllMyFlows(dbHandlerApi, mySwitches);
+ }
+
+ /**
* Get summary of all installed flows by all installers in a given range.
*
* @param flowId the Flow ID of the first flow in the flow range to get.
@@ -341,7 +391,17 @@
int maxFlows) {
ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
SortedMap<Long, FlowPath> sortedFlowPaths =
- flowEventHandler.getAllFlowPathsCopy();
+ new TreeMap<Long, FlowPath>();
+
+ if (enableOnrc2014MeasurementsFlows) {
+ Collection<FlowPath> databaseFlowPaths =
+ ParallelFlowDatabaseOperation.getAllFlows(dbHandlerApi);
+ for (FlowPath flowPath : databaseFlowPaths) {
+ sortedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ } else {
+ sortedFlowPaths = flowEventHandler.getAllFlowPathsCopy();
+ }
//
// Truncate each Flow Path and Flow Entry
@@ -421,6 +481,9 @@
public void flowEntriesPushedToSwitch(
Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+ if (enableOnrc2014MeasurementsFlows)
+ return;
+
//
// Process all entries
//
@@ -490,8 +553,12 @@
// - Flow Paths to the database
//
pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
- pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
- cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+ if (enableOnrc2014MeasurementsFlows) {
+ writeModifiedFlowPathsToDatabase(modifiedFlowPaths);
+ } else {
+ pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
+ cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+ }
}
/**
@@ -502,7 +569,7 @@
*
* @param modifiedFlowEntries the collection of modified Flow Entries.
*/
- private void pushModifiedFlowEntriesToSwitches(
+ void pushModifiedFlowEntriesToSwitches(
Collection<FlowEntry> modifiedFlowEntries) {
if (modifiedFlowEntries.isEmpty())
return;
@@ -679,7 +746,7 @@
*
* @param modifiedFlowPaths the collection of Flow Paths to write.
*/
- private void writeModifiedFlowPathsToDatabase(
+ void writeModifiedFlowPathsToDatabase(
Collection<FlowPath> modifiedFlowPaths) {
if (modifiedFlowPaths.isEmpty())
return;
@@ -719,7 +786,7 @@
retry = true;
} catch (Exception e) {
log.error("Exception deleting Flow Path from Network MAP: {}", e);
- }
+ }
} while (retry);
continue;
@@ -738,10 +805,12 @@
allValid = false;
break;
}
- if (flowEntry.flowEntrySwitchState() !=
- FlowEntrySwitchState.FE_SWITCH_UPDATED) {
- allValid = false;
- break;
+ if (! enableOnrc2014MeasurementsFlows) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_UPDATED) {
+ allValid = false;
+ break;
+ }
}
}
if (! allValid)
@@ -777,6 +846,36 @@
//log.error("Performance %% Flow path total time {} : {}", endTime - startTime, flowPath.toString());
}
} while (retry);
+
+ if (enableOnrc2014MeasurementsFlows) {
+ // Send the notifications
+
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ continue;
+ }
+ // datagridService.notificationSendFlowEntryIdAdded(flowEntry.flowEntryId(), flowEntry.dpid());
+
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ datagridService.notificationSendFlowEntryAdded(flowEntry);
+ break;
+ case FE_USER_MODIFY:
+ datagridService.notificationSendFlowEntryUpdated(flowEntry);
+ break;
+ case FE_USER_DELETE:
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ break;
+ case FE_USER_UNKNOWN:
+ assert(false);
+ break;
+ }
+ }
+ }
}
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..a44a898 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -1,7 +1,10 @@
package net.onrc.onos.ofcontroller.flowmanager;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
/**
@@ -51,6 +54,56 @@
void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
new file mode 100644
index 0000000..212c59f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
@@ -0,0 +1,367 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+/**
+ * Class for performing parallel Flow-related operations on the Database.
+ *
+ * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
+ * for parallelization.
+ *
+ * @author Brian O'Connor <brian@onlab.us>
+ */
+public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
+ private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
+
+ private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
+ private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+
+ /**
+ * Get all installed flows by first querying the database for all FlowPaths
+ * and then populating them from the database in parallel.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @return the Flow Paths if found, otherwise an empty list.
+ */
+ static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+ int numTasks = 0;
+ for(IFlowPath flowObj : flowPathsObj) {
+ tasks.submit(new ExtractFlowTask(flowObj));
+ numTasks++;
+ }
+ for(int i = 0; i < numTasks; i++) {
+ try {
+ FlowPath flowPath = tasks.take().get();
+ if(flowPath != null) {
+ flowPaths.add(flowPath);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error reading FlowPath from IFlowPath object");
+ }
+ }
+ dbHandler.commit();
+ return flowPaths;
+ }
+
+ /**
+ * Query the database for all flow paths that have their source switch
+ * in the provided collection
+ *
+ * Note: this function is implemented naively and inefficiently
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param switches a collection of switches whose flow paths you want
+ * @return the Flow Paths if found, otherwise an empty list.
+ */
+ static ArrayList<FlowPath> getFlowsForSwitches(DBOperation dbHandler, Collection<Dpid> switches) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ // convert the collection of switch dpids into a set of strings
+ Set<String> switchSet = new HashSet<>();
+ for(Dpid dpid : switches) {
+ switchSet.add(dpid.toString());
+ }
+
+ CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+ int numTasks = 0;
+ for(IFlowPath flowObj : flowPathsObj) {
+ if(switchSet.contains(flowObj.getSrcSwitch())) {
+ tasks.submit(new ExtractFlowTask(flowObj));
+ numTasks++;
+ }
+ }
+ for(int i = 0; i < numTasks; i++) {
+ try {
+ FlowPath flowPath = tasks.take().get();
+ if(flowPath != null) {
+ flowPaths.add(flowPath);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error reading FlowPath from IFlowPath object");
+ }
+ }
+ dbHandler.commit();
+ return flowPaths;
+ }
+
+ /**
+ * The basic parallelization unit for extracting FlowEntries from the database.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
+ */
+ private final static class ExtractFlowTask implements Callable<FlowPath> {
+ private final IFlowPath flowObj;
+
+ ExtractFlowTask(IFlowPath flowObj){
+ this.flowObj = flowObj;
+ }
+ @Override
+ public FlowPath call() throws Exception {
+ return extractFlowPath(flowObj);
+ }
+ }
+
+ /**
+ * Get a subset of installed flows in parallel.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowIds the collection of Flow IDs to get.
+ * @return the Flow Paths if found, otherwise an empty list.
+ */
+ static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
+ Collection<FlowId> flowIds) {
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+ int numTasks = 0;
+ for (FlowId flowId : flowIds) {
+ tasks.submit(new GetFlowTask(dbHandler, flowId));
+ numTasks++;
+ }
+ for(int i = 0; i < numTasks; i++) {
+ try {
+ FlowPath flowPath = tasks.take().get();
+ if(flowPath != null) {
+ flowPaths.add(flowPath);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error reading FlowPath from database");
+ }
+ }
+ // TODO: should we commit?
+ //dbHandler.commit();
+ return flowPaths;
+ }
+
+ /**
+ * The basic parallelization unit for getting FlowEntries.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.getFlow()
+ */
+ private final static class GetFlowTask implements Callable<FlowPath> {
+ private final DBOperation dbHandler;
+ private final FlowId flowId;
+
+ GetFlowTask(DBOperation dbHandler, FlowId flowId) {
+ this.dbHandler = dbHandler;
+ this.flowId = flowId;
+ }
+ @Override
+ public FlowPath call() throws Exception{
+ return getFlow(dbHandler, flowId);
+ }
+ }
+
+ /**
+ * Add a flow by creating a database task, then waiting for the result.
+ * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
+ * performance benefit.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowPath the Flow Path to install.
+ * @return true on success, otherwise false.
+ */
+ static boolean addFlow(DBOperation dbHandler, FlowPath flowPath) {
+ Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
+ // NOTE: This function is blocking
+ try {
+ return result.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Add a flow asynchronously by creating a database task.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowPath the Flow Path to install.
+ * @param datagridService the notification service for when the task is completed
+ * @return true always
+ */
+ static boolean addFlow(DBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
+ executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
+ // TODO: If we need the results, submit returns a Future that contains
+ // the result.
+ return true;
+
+ }
+
+ /**
+ * The basic parallelization unit for adding FlowPaths.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.addFlow(),
+ * which also sends a notification if a datagrid services is provided
+ */
+ private final static class AddFlowTask implements Callable<Boolean> {
+ private final DBOperation dbHandler;
+ private final FlowPath flowPath;
+ private final IDatagridService datagridService;
+
+ AddFlowTask(DBOperation dbHandler,
+ FlowPath flowPath,
+ IDatagridService datagridService) {
+ this.dbHandler = dbHandler;
+ this.flowPath = flowPath;
+ this.datagridService = datagridService;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
+ if(success) {
+ if(datagridService != null) {
+ // Send notifications for each Flow Entry
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ continue;
+ }
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ datagridService.notificationSendFlowEntryAdded(flowEntry);
+ break;
+ case FE_USER_MODIFY:
+ datagridService.notificationSendFlowEntryUpdated(flowEntry);
+ break;
+ case FE_USER_DELETE:
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ break;
+ case FE_USER_UNKNOWN:
+ assert(false);
+ break;
+ }
+ }
+ }
+ }
+ else {
+ log.error("Error adding flow path {} to database", flowPath);
+ }
+ return success;
+
+ }
+ }
+
+ /**
+ * Delete a previously added flow by creating a database task, then waiting
+ * for the result.
+ *
+ * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
+ * performance benefit.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to delete.
+ * @return true on success, otherwise false.
+ */
+ static boolean deleteFlow(DBOperation dbHandler, FlowId flowId) {
+ Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
+ // NOTE: This function is blocking
+ try {
+ return result.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Delete a previously added flow asynchronously by creating a database task.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to delete.
+ * @param datagridService the notification service for when the task is completed
+ * @return true always
+ */
+ static boolean deleteFlow(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+ executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
+ // TODO: If we need the results, submit returns a Future that contains
+ // the result.
+ return true;
+ }
+
+ /**
+ * The basic parallelization unit for deleting FlowPaths.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
+ * which also sends a notification if a datagrid services is provided
+ */
+ private final static class DeleteFlowTask implements Callable<Boolean> {
+ private final DBOperation dbHandler;
+ private final FlowId flowId;
+ private final IDatagridService datagridService;
+
+ DeleteFlowTask(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+ this.dbHandler = dbHandler;
+ this.flowId = flowId;
+ this.datagridService = datagridService;
+ }
+ @Override
+ public Boolean call() throws Exception{
+ boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+ if(success) {
+ if(datagridService != null) {
+ datagridService.notificationSendFlowIdRemoved(flowId);
+ }
+ }
+ else {
+ log.error("Error removing flow path {} from database", flowId);
+ }
+ return success;
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
new file mode 100644
index 0000000..13319e7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
@@ -0,0 +1,162 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for collecting performance measurements
+ */
+public class PerformanceMonitor {
+ private final static Map<String, Measurement> map = new ConcurrentHashMap<String, Measurement>();;
+ private final static Logger log = LoggerFactory.getLogger(PerformanceMonitor.class);
+ private static long overhead;
+
+ /**
+ * Start a performance measurement, identified by a tag
+ *
+ * Note: Only a single measurement can use the same tag at a time.
+ *
+ * @param tag for performance measurement
+ */
+ public static void start(String tag) {
+ long start = System.nanoTime();
+ Measurement m = new Measurement();
+ if(map.put(tag, m) != null) {
+ // if there was a previous entry, we have just overwritten it
+ log.error("Tag {} already exists", tag);
+ }
+ m.start();
+ overhead += System.nanoTime() - start;
+ }
+
+ /**
+ * Stop a performance measurement.
+ *
+ * You must have already started a measurement with tag.
+ *
+ * @param tag for performance measurement
+ */
+ public static void stop(String tag) {
+ long time = System.nanoTime();
+ Measurement m = map.get(tag);
+ if(m == null) {
+ log.error("Tag {} does not exist", tag);
+ }
+ else {
+ map.get(tag).stop(time);
+ }
+ overhead += System.nanoTime() - time;
+ }
+
+ /**
+ * Find a measurement, identified by tag, and return the result
+ *
+ * @param tag for performance measurement
+ * @return the time in nanoseconds
+ */
+ public static long result(String tag) {
+ Measurement m = map.get(tag);
+ if(m != null) {
+ return m.elapsed();
+ }
+ else {
+ return -1;
+ }
+ }
+
+ /**
+ * Clear all performance measurements.
+ */
+ public static void clear() {
+ map.clear();
+ overhead = 0;
+ }
+
+ /**
+ * Write all performance measurements to the log
+ */
+ public static void report() {
+ double overheadMilli = overhead / Math.pow(10, 6);
+ log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
+ }
+
+ /**
+ * A single performance measurement
+ */
+ static class Measurement {
+ long start;
+ long stop;
+
+ /**
+ * Start the measurement
+ */
+ public void start() {
+ start = System.nanoTime();
+ }
+
+ /**
+ * Stop the measurement
+ */
+ public void stop() {
+ stop = System.nanoTime();
+ }
+
+ /**
+ * Stop the measurement at a specific time
+ * @param time to stop
+ */
+ public void stop(long time){
+ stop = time;
+ }
+
+ /**
+ * Compute the elapsed time of the measurement in nanoseconds
+ *
+ * @return the measurement time in nanoseconds, or -1 if the measurement is stil running.
+ */
+ public long elapsed() {
+ if(stop == 0) {
+ return -1;
+ }
+ else {
+ return stop - start;
+ }
+ }
+
+ /**
+ * Returns the number of milliseconds for the measurement as a String.
+ */
+ public String toString() {
+ double milli = elapsed() / Math.pow(10, 6);
+ return Double.toString(milli) + "ms";
+ }
+ }
+
+ public static void main(String args[]){
+ // test the measurement overhead
+ String tag;
+ for(int i = 0; i < 100; i++){
+ tag = "foo foo foo";
+ start(tag); stop(tag);
+ tag = "bar";
+ start(tag); stop(tag);
+ tag = "baz";
+ start(tag); stop(tag);
+ report();
+ clear();
+ }
+ for(int i = 0; i < 100; i++){
+ tag = "a";
+ start(tag); stop(tag);
+ tag = "b";
+ start(tag); stop(tag);
+ tag = "c";
+ start(tag); stop(tag);
+ report();
+ clear();
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 98fe262..949cc7b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -242,12 +242,15 @@
*
*/
class FlowEntryWrapper {
- FlowEntryId flowEntryId;
- OFFlowStatisticsReply statisticsReply;
+ FlowEntryId flowEntryId;
+ IFlowEntry iFlowEntry;
+ OFFlowStatisticsReply statisticsReply;
+
public FlowEntryWrapper(IFlowEntry entry) {
flowEntryId = new FlowEntryId(entry.getFlowEntryId());
- }
+ iFlowEntry = entry;
+ }
public FlowEntryWrapper(OFFlowStatisticsReply entry) {
flowEntryId = new FlowEntryId(entry.getCookie());
@@ -268,18 +271,14 @@
double startDB = System.nanoTime();
// Get the Flow Entry state from the Network Graph
- IFlowEntry iFlowEntry = null;
- try {
- iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
- } catch (Exception e) {
- log.error("Error finding flow entry {} in Network Graph",
- flowEntryId);
- return;
- }
if (iFlowEntry == null) {
- log.error("Cannot add flow entry {} to sw {} : flow entry not found",
- flowEntryId, sw.getId());
- return;
+ try {
+ iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ log.error("Error finding flow entry {} in Network Graph",
+ flowEntryId);
+ return;
+ }
}
dbTime = System.nanoTime() - startDB;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
index 1dbfdcb..3860e05 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
@@ -112,7 +112,7 @@
*/
@LogMessageCategory("Network Topology")
public class LinkDiscoveryManager
-implements IOFMessageListener, IOFSwitchListener,
+implements IOFMessageListener, IOFSwitchListener,
ILinkDiscoveryService, IFloodlightModule {
protected IFloodlightProviderService controller;
protected final static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
@@ -125,7 +125,7 @@
// LLDP and BDDP fields
- private static final byte[] LLDP_STANDARD_DST_MAC_STRING =
+ private static final byte[] LLDP_STANDARD_DST_MAC_STRING =
HexString.fromHexString("01:80:c2:00:00:0e");
private static final long LINK_LOCAL_MASK = 0xfffffffffff0L;
private static final long LINK_LOCAL_VALUE = 0x0180c2000000L;
@@ -135,27 +135,27 @@
private static final String LLDP_BSN_DST_MAC_STRING = "ff:ff:ff:ff:ff:ff";
- // Direction TLVs are used to indicate if the LLDPs were sent
+ // Direction TLVs are used to indicate if the LLDPs were sent
// periodically or in response to a recieved LLDP
private static final byte TLV_DIRECTION_TYPE = 0x73;
private static final short TLV_DIRECTION_LENGTH = 1; // 1 byte
private static final byte TLV_DIRECTION_VALUE_FORWARD[] = {0x01};
private static final byte TLV_DIRECTION_VALUE_REVERSE[] = {0x02};
- private static final LLDPTLV forwardTLV
+ private static final LLDPTLV forwardTLV
= new LLDPTLV().
- setType((byte)TLV_DIRECTION_TYPE).
- setLength((short)TLV_DIRECTION_LENGTH).
+ setType(TLV_DIRECTION_TYPE).
+ setLength(TLV_DIRECTION_LENGTH).
setValue(TLV_DIRECTION_VALUE_FORWARD);
- private static final LLDPTLV reverseTLV
+ private static final LLDPTLV reverseTLV
= new LLDPTLV().
- setType((byte)TLV_DIRECTION_TYPE).
- setLength((short)TLV_DIRECTION_LENGTH).
+ setType(TLV_DIRECTION_TYPE).
+ setLength(TLV_DIRECTION_LENGTH).
setValue(TLV_DIRECTION_VALUE_REVERSE);
// Link discovery task details.
protected SingletonTask discoveryTask;
- protected final int DISCOVERY_TASK_INTERVAL = 1;
+ protected final int DISCOVERY_TASK_INTERVAL = 1;
protected final int LINK_TIMEOUT = 35; // original 35 secs, aggressive 5 secs
protected final int LLDP_TO_ALL_INTERVAL = 15 ; //original 15 seconds, aggressive 2 secs.
protected long lldpClock = 0;
@@ -206,7 +206,7 @@
/* topology aware components are called in the order they were added to the
* the array */
protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
-
+
protected class LinkUpdate extends LDUpdate {
public LinkUpdate(LDUpdate old) {
@@ -263,7 +263,7 @@
*/
protected Map<NodePortTuple, Long> broadcastDomainPortTimeMap;
- /**
+ /**
* Get the LLDP sending period in seconds.
* @return LLDP sending period in seconds.
*/
@@ -283,6 +283,7 @@
return portLinks;
}
+ @Override
public Set<NodePortTuple> getSuppressLLDPsInfo() {
return suppressLinkDiscovery;
}
@@ -291,6 +292,7 @@
* Add a switch port to the suppressed LLDP list.
* Remove any known links on the switch port.
*/
+ @Override
public void AddToSuppressLLDPs(long sw, short port)
{
NodePortTuple npt = new NodePortTuple(sw, port);
@@ -302,7 +304,8 @@
* Remove a switch port from the suppressed LLDP list.
* Discover links on that switchport.
*/
- public void RemoveFromSuppressLLDPs(long sw, short port)
+ @Override
+ public void RemoveFromSuppressLLDPs(long sw, short port)
{
NodePortTuple npt = new NodePortTuple(sw, port);
this.suppressLinkDiscovery.remove(npt);
@@ -317,6 +320,7 @@
return false;
}
+ @Override
public ILinkDiscovery.LinkType getLinkType(Link lt, LinkInfo info) {
if (info.getUnicastValidTime() != null) {
return ILinkDiscovery.LinkType.DIRECT_LINK;
@@ -326,7 +330,7 @@
return ILinkDiscovery.LinkType.INVALID_LINK;
}
-
+
private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
}
@@ -437,6 +441,7 @@
}
}
+ @Override
public Set<Short> getQuarantinedPorts(long sw) {
Set<Short> qPorts = new HashSet<Short>();
@@ -468,12 +473,12 @@
else operation = UpdateOperation.PORT_DOWN;
LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
-
-
+
+
controller.publishUpdate(update);
}
- /**
+ /**
* Send LLDP on known ports
*/
protected void discoverOnKnownLinkPorts() {
@@ -500,7 +505,7 @@
*/
protected IOFSwitch addRemoteSwitch(long sw, short port) {
IOnosRemoteSwitch remotesw = null;
-
+
// add a switch if we have not seen it before
remotesw = remoteSwitches.get(sw);
@@ -510,26 +515,26 @@
remoteSwitches.put(remotesw.getId(), remotesw);
log.debug("addRemoteSwitch(): added fake remote sw {}", remotesw);
}
-
+
// add the port if we have not seen it before
if (remotesw.getPort(port) == null) {
OFPhysicalPort remoteport = new OFPhysicalPort();
remoteport.setPortNumber(port);
remoteport.setName("fake_" + port);
- remoteport.setConfig(0);
+ remoteport.setConfig(0);
remoteport.setState(0);
remotesw.setPort(remoteport);
log.debug("addRemoteSwitch(): added fake remote port {} to sw {}", remoteport, remotesw.getId());
}
-
+
return remotesw;
}
-
+
/**
* Send link discovery message out of a given switch port.
* The discovery message may be a standard LLDP or a modified
- * LLDP, where the dst mac address is set to :ff.
- *
+ * LLDP, where the dst mac address is set to :ff.
+ *
* TODO: The modified LLDP will updated in the future and may
* use a different eth-type.
* @param sw
@@ -565,7 +570,7 @@
if (isLinkDiscoverySuppressed(sw, port)) {
/* Dont send LLDPs out of this port as suppressLLDPs set
- *
+ *
*/
return;
}
@@ -881,9 +886,9 @@
addOrUpdateLink(lt, newLinkInfo);
- // Check if reverse link exists.
- // If it doesn't exist and if the forward link was seen
- // first seen within a small interval, send probe on the
+ // Check if reverse link exists.
+ // If it doesn't exist and if the forward link was seen
+ // first seen within a small interval, send probe on the
// reverse link.
newLinkInfo = links.get(lt);
@@ -927,8 +932,8 @@
protected Command handlePacketIn(long sw, OFPacketIn pi,
FloodlightContext cntx) {
- Ethernet eth =
- IFloodlightProviderService.bcStore.get(cntx,
+ Ethernet eth =
+ IFloodlightProviderService.bcStore.get(cntx,
IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
if(eth.getEtherType() == Ethernet.TYPE_BSN) {
@@ -999,8 +1004,8 @@
newInfo.setFirstSeenTime(oldInfo.getFirstSeenTime());
if (log.isTraceEnabled()) {
- log.trace("addOrUpdateLink: {} {}",
- lt,
+ log.trace("addOrUpdateLink: {} {}",
+ lt,
(newInfo.getMulticastValidTime()!=null) ? "multicast" : "unicast");
}
@@ -1033,7 +1038,7 @@
// Add to portNOFLinks if the unicast valid time is null
if (newInfo.getUnicastValidTime() == null)
addLinkToBroadcastDomain(lt);
-
+
// ONOS: Distinguish added event separately from updated event
updateOperation = UpdateOperation.LINK_ADDED;
linkChanged = true;
@@ -1119,6 +1124,7 @@
return linkChanged;
}
+ @Override
public Map<Long, Set<Link>> getSwitchLinks() {
return this.switchLinks;
}
@@ -1198,7 +1204,7 @@
// ONOS: If we do not control this switch, then we should not process its port status messages
if (!registryService.hasControl(iofSwitch.getId())) return Command.CONTINUE;
-
+
if (log.isTraceEnabled()) {
log.trace("handlePortStatus: Switch {} port #{} reason {}; " +
"config is {} state is {}",
@@ -1225,7 +1231,7 @@
LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, UpdateOperation.PORT_DOWN));
controller.publishUpdate(update);
linkDeleted = true;
- }
+ }
else if (ps.getReason() ==
(byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
// If ps is a port modification and the port state has changed
@@ -1237,7 +1243,7 @@
assert(linkInfo != null);
Integer updatedSrcPortState = null;
Integer updatedDstPortState = null;
- if (lt.getSrc() == npt.getNodeId() &&
+ if (lt.getSrc() == npt.getNodeId() &&
lt.getSrcPort() == npt.getPortId() &&
(linkInfo.getSrcPortState() !=
ps.getDesc().getState())) {
@@ -1264,7 +1270,7 @@
getLinkType(lt, linkInfo),
operation));
controller.publishUpdate(update);
-
+
linkInfoChanged = true;
}
}
@@ -1378,9 +1384,9 @@
lock.writeLock().unlock();
}
}
-
+
/**
- * We don't react the port changed notifications here. we listen for
+ * We don't react the port changed notifications here. we listen for
* OFPortStatus messages directly. Might consider using this notifier
* instead
*/
@@ -1389,7 +1395,7 @@
// no-op
}
- /**
+ /**
* Delete links incident on a given switch port.
* @param npt
* @param reason
@@ -1409,7 +1415,7 @@
}
}
- /**
+ /**
* Iterates through the list of links and deletes if the
* last discovery message reception time exceeds timeout values.
*/
@@ -1430,7 +1436,7 @@
// Timeout the unicast and multicast LLDP valid times
// independently.
- if ((info.getUnicastValidTime() != null) &&
+ if ((info.getUnicastValidTime() != null) &&
(info.getUnicastValidTime() + (this.LINK_TIMEOUT * 1000) < curTime)){
info.setUnicastValidTime(null);
@@ -1440,7 +1446,7 @@
// the link would be deleted, which would trigger updateClusters().
linkChanged = true;
}
- if ((info.getMulticastValidTime()!= null) &&
+ if ((info.getMulticastValidTime()!= null) &&
(info.getMulticastValidTime()+ (this.LINK_TIMEOUT * 1000) < curTime)) {
info.setMulticastValidTime(null);
// if uTime is not null, then link will remain as openflow
@@ -1451,7 +1457,7 @@
}
// Add to the erase list only if the unicast
// time is null.
- if (info.getUnicastValidTime() == null &&
+ if (info.getUnicastValidTime() == null &&
info.getMulticastValidTime() == null){
eraseList.add(entry.getKey());
} else if (linkChanged) {
@@ -1510,11 +1516,11 @@
srcNpt = new NodePortTuple(lt.getSrc(), lt.getSrcPort());
dstNpt = new NodePortTuple(lt.getDst(), lt.getDstPort());
- if (!portBroadcastDomainLinks.containsKey(lt.getSrc()))
+ if (!portBroadcastDomainLinks.containsKey(srcNpt))
portBroadcastDomainLinks.put(srcNpt, new HashSet<Link>());
portBroadcastDomainLinks.get(srcNpt).add(lt);
- if (!portBroadcastDomainLinks.containsKey(lt.getDst()))
+ if (!portBroadcastDomainLinks.containsKey(dstNpt))
portBroadcastDomainLinks.put(dstNpt, new HashSet<Link>());
portBroadcastDomainLinks.get(dstNpt).add(lt);
}
@@ -1575,7 +1581,7 @@
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
- Collection<Class<? extends IFloodlightService>> l =
+ Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(ILinkDiscoveryService.class);
//l.add(ITopologyService.class);
@@ -1586,7 +1592,7 @@
public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
Map<Class<? extends IFloodlightService>,
- IFloodlightService> m =
+ IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
// We are the class that implements the service
@@ -1596,7 +1602,7 @@
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
- Collection<Class<? extends IFloodlightService>> l =
+ Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
l.add(IThreadPoolService.class);
@@ -1679,7 +1685,7 @@
log.error("Exception in LLDP send timer.", e);
} finally {
if (!shuttingDown) {
- // Always reschedule link discovery if we're not
+ // Always reschedule link discovery if we're not
// shutting down (no chance of SLAVE role now)
log.trace("Rescheduling discovery task");
discoveryTask.reschedule(DISCOVERY_TASK_INTERVAL,
@@ -1729,7 +1735,7 @@
if ((sw.getChannel() != null) &&
(SocketAddress.class.isInstance(
sw.getChannel().getRemoteAddress()))) {
- evTopoSwitch.ipv4Addr =
+ evTopoSwitch.ipv4Addr =
IPv4.toIPv4Address(((InetSocketAddress)(sw.getChannel().
getRemoteAddress())).getAddress().getAddress());
evTopoSwitch.l4Port =
@@ -1787,10 +1793,12 @@
evTopoCluster = evHistTopologyCluster.put(evTopoCluster, action);
}
+ @Override
public boolean isAutoPortFastFeature() {
return autoPortFastFeature;
}
+ @Override
public void setAutoPortFastFeature(boolean autoPortFastFeature) {
this.autoPortFastFeature = autoPortFastFeature;
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index 1674cf0..9fca86a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -1,6 +1,5 @@
package net.onrc.onos.ofcontroller.topology;
-import java.util.ArrayList;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
@@ -12,7 +11,6 @@
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
import net.onrc.onos.ofcontroller.core.ISwitchStorage.SwitchState;
-import org.apache.commons.lang.StringUtils;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,7 +193,10 @@
*/
public class Topology {
private final static Logger log = LoggerFactory.getLogger(Topology.class);
-
+
+ // flag to use optimized readFromDatabase() method.
+ private static final boolean enableOptimizedRead = false;
+
private Map<Long, Node> nodesMap; // The dpid->Node mapping
/**
@@ -392,289 +393,173 @@
* @param dbHandler the Graph Database handler to use.
*/
public void readFromDatabase(DBOperation dbHandler) {
- //
- // Fetch the relevant info from the Switch and Port vertices
- // from the Titan Graph.
- //
- nodesMap.clear();
+ if (enableOptimizedRead) {
+ readFromDatabaseBodyOptimized(dbHandler);
+ } else {
+ readFromDatabaseBody(dbHandler);
+ }
- // Load all switches into Map
- Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
- for (ISwitchObject switchObj : switches) {
- // Ignore inactive ports
- if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
- continue;
- }
- Vertex nodeVertex = switchObj.asVertex();
- //
- // The Switch info
- //
- String nodeDpid = nodeVertex.getProperty("dpid").toString();
- long nodeId = HexString.toLong(nodeDpid);
- addNode(nodeId);
- }
-
- //
- // Get All Ports
- //
- Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
- for (IPortObject myPortObj : ports) {
- Vertex myPortVertex = myPortObj.asVertex();
-
- // Ignore inactive ports
- if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- continue;
- }
-
- short myPort = 0;
- String idStr = myPortObj.getPortId();
- String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- continue;
- }
- String myDpid = splitter[0];
- myPort = Short.parseShort(splitter[1]);
- long myId = HexString.toLong(myDpid);
- Node me = nodesMap.get(myId);
-
- if (me == null) {
- // cannot proceed ports and switches are out of sync
- //TODO: Restart the whole read
- continue;
- }
-
- if (me.getPort(myPort) == null) {
- me.addPort(myPort);
- } else if (me.getLink(myPort) != null) {
- // Link already added..probably by neighbor
- continue;
- }
-
- //
- // The neighbor Port info
- //
- for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
-// log.debug("state : {}", neighborPortVertex.getProperty("state"));
-// log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
- // Ignore inactive ports
- if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- continue;
- }
- int neighborPort = 0;
- idStr = neighborPortVertex.getProperty("port_id").toString();
- splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- continue;
- }
- String neighborDpid = splitter[0];
- neighborPort = Short.parseShort(splitter[1]);
- long neighborId = HexString.toLong(neighborDpid);
- Node neighbor = nodesMap.get(neighborId);
-// log.debug("dpid {},{} port {}", neighborDpid, neighborId, neighborPort);
- if (neighbor == null) {
- continue;
- }
- me.addLink(myPort, neighbor, neighborPort);
- }
- }
- dbHandler.commit();
}
-
- // Only for debug use
- List<Long> logGetSw = new ArrayList<Long>(100);
- List<Long> logGetPt = new ArrayList<Long>(100);
- List<Long> logAddSw = new ArrayList<Long>(100);
- List<Long> logAddPt = new ArrayList<Long>(100);
- List<Long> logAddLk = new ArrayList<Long>(100);
- List<Long> logCommit = new ArrayList<Long>(100);
- List<Integer> logGetVertices = new ArrayList<Integer>(100);
- List<Integer> logGetProperty = new ArrayList<Integer>(100);
- public void readFromDatabaseBreakdown(DBOperation dbHandler) {
- int getVerticesCount = 0;
- int getPropertyCount = 0;
- int getVCount_sw = 0;
- int getVCount_pt = 0;
- int getVCount_lk = 0;
- int getPCount_sw = 0;
- int getPCount_pt = 0;
- int getPCount_lk = 0;
-
- //
- // Fetch the relevant info from the Switch and Port vertices
- // from the Titan Graph.
- //
+ private void readFromDatabaseBody(DBOperation dbHandler) {
+ //
+ // Fetch the relevant info from the Switch and Port vertices
+ // from the Titan Graph.
+ //
nodesMap.clear();
- long t1 = System.nanoTime();
+ Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
+ for (ISwitchObject switchObj : activeSwitches) {
+ Vertex nodeVertex = switchObj.asVertex();
+ //
+ // The Switch info
+ //
+ String nodeDpid = nodeVertex.getProperty("dpid").toString();
+ long nodeId = HexString.toLong(nodeDpid);
+ Node me = nodesMap.get(nodeId);
+ if (me == null)
+ me = addNode(nodeId);
- // Load all switches into Map
- Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+ //
+ // The local Port info
+ //
+ for (Vertex myPortVertex : nodeVertex.getVertices(Direction.OUT, "on")) {
+ // Ignore inactive ports
+ if (! myPortVertex.getProperty("state").toString().equals("ACTIVE"))
+ continue;
- long t2 = System.nanoTime();
+ int myPort = 0;
+ Object obj = myPortVertex.getProperty("number");
+ if (obj instanceof Short) {
+ myPort = (Short)obj;
+ } else if (obj instanceof Integer) {
+ myPort = (Integer)obj;
+ }
+ me.addPort(myPort);
- long t_addSw = 0;
- for (ISwitchObject switchObj : switches) {
- long t3 = System.nanoTime();
- long t4;
+ for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+ // Ignore inactive ports
+ if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
- // Ignore inactive ports
- ++getPropertyCount;
- ++getPCount_sw;
- if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
- t4 = System.nanoTime();
- t_addSw += t4 - t3;
- continue;
- }
- Vertex nodeVertex = switchObj.asVertex();
- //
- // The Switch info
- //
- ++getPropertyCount;
- ++getPCount_sw;
- String nodeDpid = nodeVertex.getProperty("dpid").toString();
- long nodeId = HexString.toLong(nodeDpid);
- addNode(nodeId);
- t4 = System.nanoTime();
- t_addSw += t4 - t3;
- }
+ int neighborPort = 0;
+ obj = neighborPortVertex.getProperty("number");
+ if (obj instanceof Short) {
+ neighborPort = (Short)obj;
+ } else if (obj instanceof Integer) {
+ neighborPort = (Integer)obj;
+ }
+ //
+ // The neighbor Switch info
+ //
+ for (Vertex neighborVertex : neighborPortVertex.getVertices(Direction.IN, "on")) {
+ // Ignore inactive switches
+ String state = neighborVertex.getProperty("state").toString();
+ if (! state.equals(SwitchState.ACTIVE.toString()))
+ continue;
- long t5 = System.nanoTime();
- //
- // Get All Ports
- //
- Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
-
- long t6 = System.nanoTime();
- long t_addPort = 0;
- long t_addLink = 0;
-
- for (IPortObject myPortObj : ports) {
- long t7 = System.nanoTime();
- long t8;
- Vertex myPortVertex = myPortObj.asVertex();
-
- // Ignore inactive ports
- ++getPropertyCount;
- ++getPCount_pt;
- if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
-
- short myPort = 0;
- ++getPropertyCount;
- ++getPCount_pt;
- String idStr = myPortObj.getPortId();
- String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
- String myDpid = splitter[0];
- myPort = Short.parseShort(splitter[1]);
- long myId = HexString.toLong(myDpid);
- Node me = nodesMap.get(myId);
-
- if (me == null) {
- // cannot proceed ports and switches are out of sync
- //TODO: Restart the whole read
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
-
- if (me.getPort(myPort) == null) {
- me.addPort(myPort);
- } else if (me.getLink(myPort) != null) {
- // Link already added..probably by neighbor
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
-
- //
- // The neighbor Port info
- //
- ++getVerticesCount;
- ++getVCount_pt;
- for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
-// log.debug("state : {}", neighborPortVertex.getProperty("state"));
-// log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
-
- long t9 = System.nanoTime();
- long t10;
-
- // Ignore inactive ports
- ++getPropertyCount;
- ++getPCount_lk;
- if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- continue;
- }
- int neighborPort = 0;
- ++getPropertyCount;
- ++getPCount_lk;
- idStr = neighborPortVertex.getProperty("port_id").toString();
- splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- continue;
- }
- String neighborDpid = splitter[0];
- neighborPort = Short.parseShort(splitter[1]);
- long neighborId = HexString.toLong(neighborDpid);
- Node neighbor = nodesMap.get(neighborId);
-// log.debug("dpid {},{} port {}", neighborDpid, neighborId, neighborPort);
- if (neighbor == null) {
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- continue;
- }
- me.addLink(myPort, neighbor, neighborPort);
-
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- }
- }
- long t11 = System.nanoTime();
- dbHandler.commit();
- long t12 = System.nanoTime();
-
- logGetSw.add((t2-t1)/1000);
- logGetPt.add((t6-t5)/1000);
- logAddSw.add(t_addSw/1000);
- logAddPt.add(t_addPort/1000);
- logAddLk.add(t_addLink/1000);
- logCommit.add((t12-t11)/1000);
- logGetVertices.add(getVerticesCount);
- logGetProperty.add(getPropertyCount);
- log.debug("getVertices[N({}),P({}),L({})] getProperty[N({}),P({}),L({})]",
- new Object[]{getVCount_sw,getVCount_pt,getVCount_lk,
- getPCount_sw,getPCount_pt,getPCount_lk});
+ String neighborDpid = neighborVertex.getProperty("dpid").toString();
+ long neighborId = HexString.toLong(neighborDpid);
+ Node neighbor = nodesMap.get(neighborId);
+ if (neighbor == null)
+ neighbor = addNode(neighborId);
+ neighbor.addPort(neighborPort);
+ me.addLink(myPort, neighbor, neighborPort);
+ }
+ }
+ }
+ }
+ dbHandler.commit();
}
- public void printMeasuredLog() {
- log.debug("getsw: {}", StringUtils.join(logGetSw, ","));
- log.debug("getpt: {}", StringUtils.join(logGetPt, ","));
- log.debug("addsw: {}", StringUtils.join(logAddSw, ","));
- log.debug("addpt: {}", StringUtils.join(logAddPt, ","));
- log.debug("addlk: {}", StringUtils.join(logAddLk, ","));
- log.debug("commit: {}", StringUtils.join(logCommit, ","));
- log.debug("getvertices: {}", StringUtils.join(logGetVertices, ","));
- log.debug("getproperty: {}", StringUtils.join(logGetProperty, ","));
+ private void readFromDatabaseBodyOptimized(DBOperation dbHandler) {
+ nodesMap.clear();
+
+ // Load all switches into Map
+ Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+ for (ISwitchObject switchObj : switches) {
+ // Ignore inactive ports
+ if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
+ continue;
+ }
+ Vertex nodeVertex = switchObj.asVertex();
+ //
+ // The Switch info
+ //
+ String nodeDpid = nodeVertex.getProperty("dpid").toString();
+ long nodeId = HexString.toLong(nodeDpid);
+ addNode(nodeId);
+ }
+
+ //
+ // Get All Ports
+ //
+ Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
+ for (IPortObject myPortObj : ports) {
+ Vertex myPortVertex = myPortObj.asVertex();
+
+ // Ignore inactive ports
+ if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
+
+ short myPort = 0;
+ String idStr = myPortObj.getPortId();
+ String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+ if (splitter.length != 2) {
+ log.error("Invalid port_id : {}", idStr);
+ continue;
+ }
+ String myDpid = splitter[0];
+ myPort = Short.parseShort(splitter[1]);
+ long myId = HexString.toLong(myDpid);
+ Node me = nodesMap.get(myId);
+
+ if (me == null) {
+ // cannot proceed ports and switches are out of sync
+ //TODO: Restart the whole read
+ continue;
+ }
+
+ if (me.getPort((int)myPort) == null) {
+ me.addPort((int)myPort);
+ } else if (me.getLink((int)myPort) != null) {
+ // Link already added..probably by neighbor
+ continue;
+ }
+
+ //
+ // The neighbor Port info
+ //
+ for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+ // Ignore inactive ports
+ if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
+ int neighborPort = 0;
+ idStr = neighborPortVertex.getProperty("port_id").toString();
+ splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+ if (splitter.length != 2) {
+ log.error("Invalid port_id : {}", idStr);
+ continue;
+ }
+ String neighborDpid = splitter[0];
+ neighborPort = Short.parseShort(splitter[1]);
+ long neighborId = HexString.toLong(neighborDpid);
+ Node neighbor = nodesMap.get(neighborId);
+ if (neighbor == null) {
+ continue;
+ }
+ if (neighbor.getPort(neighborPort) == null) {
+ neighbor.addPort(neighborPort);
+ }
+ me.addLink(myPort, neighbor, neighborPort);
+ }
+ }
+ dbHandler.commit();
}
-
+
// Only for debug use
@Override
public String toString() {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
index cbc3224..a074d19 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/TopologyManager.java
@@ -90,7 +90,8 @@
/**
* Shutdown the Topology Manager operation.
*/
- public void finalize() {
+ @Override
+ protected void finalize() {
close();
}
@@ -108,7 +109,7 @@
*/
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
- Collection<Class<? extends IFloodlightService>> l =
+ Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(ITopologyNetService.class);
return l;
@@ -120,10 +121,10 @@
* @return the collection of implemented services.
*/
@Override
- public Map<Class<? extends IFloodlightService>, IFloodlightService>
+ public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
Map<Class<? extends IFloodlightService>,
- IFloodlightService> m =
+ IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
m.put(ITopologyNetService.class, this);
@@ -136,7 +137,7 @@
* @return the collection of modules this module depends on.
*/
@Override
- public Collection<Class<? extends IFloodlightService>>
+ public Collection<Class<? extends IFloodlightService>>
getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
@@ -200,6 +201,7 @@
*
* @return the allocated topology handler.
*/
+ @Override
public Topology newDatabaseTopology() {
Topology topology = new Topology();
topology.readFromDatabase(dbHandler);
@@ -216,6 +218,7 @@
*
* @param topology the topology to release.
*/
+ @Override
public void dropTopology(Topology topology) {
topology = null;
}
@@ -312,6 +315,7 @@
* @return the data path with the computed shortest path if
* found, otherwise null.
*/
+ @Override
public DataPath getTopologyShortestPath(Topology topology,
SwitchPort src, SwitchPort dest) {
return ShortestPath.getTopologyShortestPath(topology, src, dest);