Merge branch 'master' into ONOS-ONRC2014-Measurements
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 33091b9..effbe81 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -73,6 +73,12 @@
private MapFlowEntryListener mapFlowEntryListener = null;
private String mapFlowEntryListenerId = null;
+ // State related to the Flow ID map
+ protected static final String mapFlowIdName = "mapFlowId";
+ private IMap<Long, byte[]> mapFlowId = null;
+ private MapFlowIdListener mapFlowIdListener = null;
+ private String mapFlowIdListenerId = null;
+
// State related to the Network Topology map
protected static final String mapTopologyName = "mapTopology";
private IMap<String, byte[]> mapTopology = null;
@@ -238,6 +244,78 @@
}
/**
+ * Class for receiving notifications for FlowId state.
+ *
+ * The datagrid map is:
+ * - Key : FlowId (Long)
+ * - Value : Serialized FlowId (byte[])
+ */
+ class MapFlowIdListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<Long, byte[]> event) {
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdAdded(flowId);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<Long, byte[]> event) {
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdRemoved(flowId);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<Long, byte[]> event) {
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdUpdated(flowId);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<Long, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
* Class for receiving notifications for Network Topology state.
*
* The datagrid map is:
@@ -521,6 +599,11 @@
mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
+ // Initialize the FlowId-related map state
+ mapFlowIdListener = new MapFlowIdListener();
+ mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+ mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
// Initialize the Topology-related map state
mapTopologyListener = new MapTopologyListener();
mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -548,6 +631,11 @@
mapFlowEntry = null;
mapFlowEntryListener = null;
+ // Clear the FlowId-related map state
+ mapFlowId.removeEntryListener(mapFlowIdListenerId);
+ mapFlowId = null;
+ mapFlowIdListener = null;
+
// Clear the Topology-related map state
mapTopology.removeEntryListener(mapTopologyListenerId);
mapTopology = null;
@@ -805,6 +893,101 @@
}
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that are currently in the datagrid.
+ */
+ @Override
+ public Collection<FlowId> getAllFlowIds() {
+ Collection<FlowId> allFlowIds = new LinkedList<FlowId>();
+
+ //
+ // Get all current entries
+ //
+ Collection<byte[]> values = mapFlowId.values();
+ Kryo kryo = kryoFactory.newKryo();
+ for (byte[] valueBytes : values) {
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowId flowId = kryo.readObject(input, FlowId.class);
+ allFlowIds.add(flowId);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowIds;
+ }
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ @Override
+ public void notificationSendFlowIdAdded(FlowId flowId) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, flowId);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized FlowId (byte[])
+ //
+ mapFlowId.putAsync(flowId.value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ @Override
+ public void notificationSendFlowIdRemoved(FlowId flowId) {
+ //
+ // Remove the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized FlowId (byte[])
+ //
+ mapFlowId.removeAsync(flowId.value());
+ }
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ @Override
+ public void notificationSendFlowIdUpdated(FlowId flowId) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowIdAdded(flowId);
+ }
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ @Override
+ public void notificationSendAllFlowIdsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowId.clear();
+ Set<Long> keySet = mapFlowId.keySet();
+ for (Long key : keySet) {
+ mapFlowId.removeAsync(key);
+ }
+ }
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 0f03d77..d4e7b00 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -134,6 +134,39 @@
void notificationSendAllFlowEntriesRemoved();
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that are currently in the datagrid.
+ */
+ Collection<FlowId> getAllFlowIds();
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ void notificationSendFlowIdAdded(FlowId flowId);
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ void notificationSendFlowIdRemoved(FlowId flowId);
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ void notificationSendFlowIdUpdated(FlowId flowId);
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ void notificationSendAllFlowIdsRemoved();
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index e075bad..d4a563b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -5,7 +5,9 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.util.MACAddress;
import net.onrc.onos.graph.GraphDBOperation;
@@ -464,6 +466,45 @@
}
/**
+ * Get the source switch DPID of a previously added flow.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to get.
+ * @return the source switch DPID if found, otherwise null.
+ */
+ static Dpid getFlowSourceDpid(GraphDBOperation dbHandler, FlowId flowId) {
+ IFlowPath flowObj = null;
+ try {
+ flowObj = dbHandler.searchFlowPath(flowId);
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getFlowSourceDpid FlowId:{} failed", flowId);
+ return null;
+ }
+ if (flowObj == null) {
+ dbHandler.commit();
+ return null; // Flow not found
+ }
+
+ //
+ // Extract the Flow Source DPID
+ //
+ String srcSwitchStr = flowObj.getSrcSwitch();
+ if (srcSwitchStr == null) {
+ // TODO: A work-around, becauuse of some bogus database objects
+ dbHandler.commit();
+ return null;
+ }
+
+ Dpid dpid = new Dpid(srcSwitchStr);
+
+ dbHandler.commit();
+
+ return dpid;
+ }
+
+ /**
* Get all installed flows by all installers.
*
* @param dbHandler the Graph Database handler to use.
@@ -501,6 +542,60 @@
}
/**
+ * Get all installed flows whose Source Switch is controlled by this
+ * instance.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param mySwitches the collection of the switches controlled by this
+ * instance.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getAllMyFlows(GraphDBOperation dbHandler,
+ Map<Long, IOFSwitch> mySwitches) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllMyFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ for (IFlowPath flowObj : flowPathsObj) {
+ //
+ // Extract the Source Switch DPID and ignore if the switch
+ // is not controlled by this instance.
+ //
+ String srcSwitchStr = flowObj.getSrcSwitch();
+ if (srcSwitchStr == null) {
+ // TODO: A work-around, becauuse of some bogus database objects
+ continue;
+ }
+ Dpid dpid = new Dpid(srcSwitchStr);
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ //
+ // Extract the Flow state
+ //
+ FlowPath flowPath = extractFlowPath(flowObj);
+ if (flowPath != null)
+ flowPaths.add(flowPath);
+ }
+
+ dbHandler.commit();
+
+ return flowPaths;
+ }
+
+ /**
* Extract Flow Path State from a Titan Database Object @ref IFlowPath.
*
* @param flowObj the object to extract the Flow Path State from.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 8b1f7c0..0f421c3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -14,10 +14,12 @@
import net.floodlightcontroller.core.IOFSwitch;
import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.topology.TopologyManager;
import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.EventEntry;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
@@ -29,6 +31,7 @@
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Port;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import com.esotericsoftware.kryo2.Kryo;
@@ -44,9 +47,14 @@
* - Recompute impacted FlowPath using cached Topology.
*/
class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+
+ private boolean enableOnrc2014MeasurementsFlows = true;
+ private boolean enableOnrc2014MeasurementsTopology = true;
+
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
-
+
+ private GraphDBOperation dbHandler;
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
private Topology topology; // The network topology
@@ -63,6 +71,8 @@
new LinkedList<EventEntry<FlowPath>>();
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ private List<EventEntry<FlowId>> flowIdEvents =
+ new LinkedList<EventEntry<FlowId>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -108,6 +118,8 @@
* Startup processing.
*/
private void startup() {
+ this.dbHandler = new GraphDBOperation("");
+
//
// Obtain the initial Topology state
//
@@ -137,6 +149,16 @@
flowEntryEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowId state
+ //
+ Collection<FlowId> flowIds = datagridService.getAllFlowIds();
+ for (FlowId flowId : flowIds) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ flowIdEvents.add(eventEntry);
+ }
+
// Process the initial events (if any)
synchronized (allFlowPaths) {
processEvents();
@@ -166,12 +188,14 @@
// - EventEntry<TopologyElement>
// - EventEntry<FlowPath>
// - EventEntry<FlowEntry>
+ // - EventEntry<FlowId>
//
for (EventEntry<?> event : collection) {
// Topology event
if (event.eventData() instanceof TopologyElement) {
EventEntry<TopologyElement> topologyEventEntry =
(EventEntry<TopologyElement>)event;
+
topologyEvents.add(topologyEventEntry);
continue;
}
@@ -191,6 +215,14 @@
flowEntryEvents.add(flowEntryEventEntry);
continue;
}
+
+ // FlowId event
+ if (event.eventData() instanceof FlowId) {
+ EventEntry<FlowId> flowIdEventEntry =
+ (EventEntry<FlowId>)event;
+ flowIdEvents.add(flowIdEventEntry);
+ continue;
+ }
}
collection.clear();
@@ -203,13 +235,63 @@
log.debug("Exception processing Network Events: ", exception);
}
}
-
+
/**
* Process the events (if any)
*/
private void processEvents() {
Collection<FlowEntry> modifiedFlowEntries;
+ if (enableOnrc2014MeasurementsFlows) {
+
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty()) {
+ return; // Nothing to do
+ }
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ // Fetch and prepare my flows
+ prepareMyFlows(mySwitches);
+
+ // Fetch the topology
+ processTopologyEvents();
+
+ // Recompute all affected Flow Paths and keep only the modified
+ for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ // Assign the Flow Entry ID as needed
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+
+ // Extract my modified Flow Entries
+ modifiedFlowEntries = processFlowIdEvents(mySwitches);
+
+ //
+ // Push the modified state to the Flow Manager
+ //
+ flowManager.pushModifiedFlowState(modifiedFlowPaths.values(),
+ modifiedFlowEntries);
+
+ // Cleanup
+ topologyEvents.clear();
+ flowIdEvents.clear();
+ //
+ allFlowPaths.clear();
+ shouldRecomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+
+ return;
+ }
+
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
return; // Nothing to do
@@ -346,6 +428,122 @@
}
/**
+ * Prepare my flows.
+ *
+ * @param mySwitches the collection of my switches.
+ */
+ private void prepareMyFlows(Map<Long, IOFSwitch> mySwitches) {
+ if (! topologyEvents.isEmpty()) {
+ // Fetch my flows from the database
+ ArrayList<FlowPath> myFlows = FlowDatabaseOperation.getAllMyFlows(dbHandler, mySwitches);
+ for (FlowPath flowPath : myFlows) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+
+ //
+ // TODO: Bug workaround / fix :
+ // method FlowDatabaseOperation.extractFlowEntry() doesn't
+ // fetch the inPort and outPort, hence we assign them here.
+ //
+ // Assign the inPort and outPort for the Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ // Set the inPort
+ do {
+ if (flowEntry.inPort() != null)
+ break;
+ if (flowEntry.flowEntryMatch() == null)
+ break;
+ Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
+ flowEntry.setInPort(inPort);
+ } while (false);
+
+ // Set the outPort
+ do {
+ if (flowEntry.outPort() != null)
+ break;
+ for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+ if (fa.actionOutput() != null) {
+ Port outPort = new Port(fa.actionOutput().port().value());
+ flowEntry.setOutPort(outPort);
+ break;
+ }
+ }
+ } while (false);
+ }
+ }
+ }
+
+ //
+ // Automatically add all Flow ID events (for the Flows this instance
+ // is responsible for) to the collection of Flows to recompute.
+ //
+ for (EventEntry<FlowId> eventEntry : flowIdEvents) {
+ FlowId flowId = eventEntry.eventData();
+ FlowPath flowPath = allFlowPaths.get(flowId.value());
+ if (flowPath == null) {
+ if (! topologyEvents.isEmpty())
+ continue; // Optimization: Not my flow
+ Dpid dpid = FlowDatabaseOperation.getFlowSourceDpid(dbHandler,
+ flowId);
+ if ((dpid != null) && (mySwitches.get(dpid.value()) != null)) {
+ flowPath = FlowDatabaseOperation.getFlow(dbHandler,
+ flowId);
+ }
+ }
+ if (flowPath != null) {
+ shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+ flowPath);
+ }
+ }
+ }
+
+ /**
+ * Process the Flow ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ * @return a collection of modified Flow Entries this instance needs
+ * to push to its own switches.
+ */
+ private Collection<FlowEntry> processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+ //
+ // Process all Flow ID events and update the appropriate state
+ //
+ for (EventEntry<FlowId> eventEntry : flowIdEvents) {
+ FlowId flowId = eventEntry.eventData();
+
+ log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
+
+ //
+ // Lookup the Flow ID in the Flows that were read from the
+ // database in in a previous step. If not found, read from
+ // the database.
+ //
+ FlowPath flowPath = allFlowPaths.get(flowId.value());
+ if (flowPath == null)
+ flowPath = FlowDatabaseOperation.getFlow(dbHandler, flowId);
+ if (flowPath == null) {
+ log.debug("Flow ID {} : Flow not found!", flowId);
+ continue;
+ }
+
+ // Collect only my flow entries that are not updated.
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ continue;
+ }
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+ modifiedFlowEntries.add(flowEntry);
+ }
+ }
+
+ return modifiedFlowEntries;
+ }
+
+ /**
* Process the Flow Path events.
*/
private void processFlowPathEvents() {
@@ -431,15 +629,25 @@
* Process the Topology events.
*/
private void processTopologyEvents() {
+ if (enableOnrc2014MeasurementsTopology) {
+ if (topologyEvents.isEmpty())
+ return;
+ log.debug("[BEFORE] {}", topology.toString());
+ topology.readFromDatabase(dbHandler);
+ log.debug("[AFTER] {}", topology.toString());
+ shouldRecomputeFlowPaths.putAll(allFlowPaths);
+ return;
+ }
+
//
// Process all Topology events and update the appropriate state
//
boolean isTopologyModified = false;
for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
TopologyElement topologyElement = eventEntry.eventData();
-
+
log.debug("Topology Event: {} {}", eventEntry.eventType(),
- topologyElement);
+ topologyElement.toString());
switch (eventEntry.eventType()) {
case ENTRY_ADD:
@@ -972,6 +1180,43 @@
}
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ @Override
+ public void notificationRecvFlowIdAdded(FlowId flowId) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ @Override
+ public void notificationRecvFlowIdRemoved(FlowId flowId) {
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_REMOVE, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ @Override
+ public void notificationRecvFlowIdUpdated(FlowId flowId) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowId> eventEntry =
+ new EventEntry<FlowId>(EventEntry.Type.ENTRY_ADD, flowId);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index dd98f4e..baa544b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -48,6 +48,9 @@
* Flow Manager class for handling the network flows.
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
+
+ private boolean enableOnrc2014MeasurementsFlows = true;
+
protected GraphDBOperation dbHandlerApi;
protected GraphDBOperation dbHandlerInner;
@@ -265,7 +268,12 @@
}
if (FlowDatabaseOperation.addFlow(dbHandlerApi, flowPath)) {
- datagridService.notificationSendFlowAdded(flowPath);
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendFlowIdAdded(flowPath.flowId());
+ } else {
+ datagridService.notificationSendFlowAdded(flowPath);
+ }
+
return flowPath.flowId();
}
return null;
@@ -279,7 +287,11 @@
@Override
public boolean deleteAllFlows() {
if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
- datagridService.notificationSendAllFlowsRemoved();
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendAllFlowIdsRemoved();
+ } else {
+ datagridService.notificationSendAllFlowsRemoved();
+ }
return true;
}
return false;
@@ -294,7 +306,11 @@
@Override
public boolean deleteFlow(FlowId flowId) {
if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
- datagridService.notificationSendFlowRemoved(flowId);
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendFlowIdRemoved(flowId);
+ } else {
+ datagridService.notificationSendFlowRemoved(flowId);
+ }
return true;
}
return false;
@@ -312,6 +328,16 @@
}
/**
+ * Get the source switch DPID of a previously added flow.
+ *
+ * @param flowId the Flow ID of the flow to get.
+ * @return the source switch DPID if found, otherwise null.
+ */
+ public Dpid getFlowSourceDpid(FlowId flowId) {
+ return FlowDatabaseOperation.getFlowSourceDpid(dbHandlerApi, flowId);
+ }
+
+ /**
* Get all installed flows by all installers.
*
* @return the Flow Paths if found, otherwise null.
@@ -322,6 +348,18 @@
}
/**
+ * Get all installed flows whose Source Switch is controlled by this
+ * instance.
+ *
+ * @param mySwitches the collection of the switches controlled by this
+ * instance.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ public ArrayList<FlowPath> getAllMyFlows(Map<Long, IOFSwitch> mySwitches) {
+ return FlowDatabaseOperation.getAllMyFlows(dbHandlerApi, mySwitches);
+ }
+
+ /**
* Get summary of all installed flows by all installers in a given range.
*
* @param flowId the Flow ID of the first flow in the flow range to get.
@@ -413,6 +451,9 @@
public void flowEntriesPushedToSwitch(
Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+ if (enableOnrc2014MeasurementsFlows)
+ return;
+
//
// Process all entries
//
@@ -482,8 +523,12 @@
// - Flow Paths to the database
//
pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
- pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
- cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+ if (enableOnrc2014MeasurementsFlows) {
+ writeModifiedFlowPathsToDatabase(modifiedFlowPaths);
+ } else {
+ pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
+ cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+ }
}
/**
@@ -714,6 +759,11 @@
}
} while (retry);
+ if (enableOnrc2014MeasurementsFlows) {
+ // Send the notification
+ datagridService.notificationSendFlowIdRemoved(flowPath.flowId());
+ }
+
continue;
}
@@ -730,10 +780,12 @@
allValid = false;
break;
}
- if (flowEntry.flowEntrySwitchState() !=
- FlowEntrySwitchState.FE_SWITCH_UPDATED) {
- allValid = false;
- break;
+ if (! enableOnrc2014MeasurementsFlows) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_UPDATED) {
+ allValid = false;
+ break;
+ }
}
}
if (! allValid)
@@ -759,6 +811,11 @@
log.error("Exception writing Flow Path to Network MAP: ", e);
}
} while (retry);
+
+ if (enableOnrc2014MeasurementsFlows) {
+ // Send the notification
+ datagridService.notificationSendFlowIdAdded(flowPath.flowId());
+ }
}
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..62edf70 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -2,6 +2,7 @@
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
/**
@@ -51,6 +52,27 @@
void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ */
+ void notificationRecvFlowIdAdded(FlowId flowId);
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ void notificationRecvFlowIdRemoved(FlowId flowId);
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ */
+ void notificationRecvFlowIdUpdated(FlowId flowId);
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
new file mode 100644
index 0000000..13319e7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
@@ -0,0 +1,162 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for collecting performance measurements
+ */
+public class PerformanceMonitor {
+ private final static Map<String, Measurement> map = new ConcurrentHashMap<String, Measurement>();;
+ private final static Logger log = LoggerFactory.getLogger(PerformanceMonitor.class);
+ private static long overhead;
+
+ /**
+ * Start a performance measurement, identified by a tag
+ *
+ * Note: Only a single measurement can use the same tag at a time.
+ *
+ * @param tag for performance measurement
+ */
+ public static void start(String tag) {
+ long start = System.nanoTime();
+ Measurement m = new Measurement();
+ if(map.put(tag, m) != null) {
+ // if there was a previous entry, we have just overwritten it
+ log.error("Tag {} already exists", tag);
+ }
+ m.start();
+ overhead += System.nanoTime() - start;
+ }
+
+ /**
+ * Stop a performance measurement.
+ *
+ * You must have already started a measurement with tag.
+ *
+ * @param tag for performance measurement
+ */
+ public static void stop(String tag) {
+ long time = System.nanoTime();
+ Measurement m = map.get(tag);
+ if(m == null) {
+ log.error("Tag {} does not exist", tag);
+ }
+ else {
+ map.get(tag).stop(time);
+ }
+ overhead += System.nanoTime() - time;
+ }
+
+ /**
+ * Find a measurement, identified by tag, and return the result
+ *
+ * @param tag for performance measurement
+ * @return the time in nanoseconds
+ */
+ public static long result(String tag) {
+ Measurement m = map.get(tag);
+ if(m != null) {
+ return m.elapsed();
+ }
+ else {
+ return -1;
+ }
+ }
+
+ /**
+ * Clear all performance measurements.
+ */
+ public static void clear() {
+ map.clear();
+ overhead = 0;
+ }
+
+ /**
+ * Write all performance measurements to the log
+ */
+ public static void report() {
+ double overheadMilli = overhead / Math.pow(10, 6);
+ log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
+ }
+
+ /**
+ * A single performance measurement
+ */
+ static class Measurement {
+ long start;
+ long stop;
+
+ /**
+ * Start the measurement
+ */
+ public void start() {
+ start = System.nanoTime();
+ }
+
+ /**
+ * Stop the measurement
+ */
+ public void stop() {
+ stop = System.nanoTime();
+ }
+
+ /**
+ * Stop the measurement at a specific time
+ * @param time to stop
+ */
+ public void stop(long time){
+ stop = time;
+ }
+
+ /**
+ * Compute the elapsed time of the measurement in nanoseconds
+ *
+ * @return the measurement time in nanoseconds, or -1 if the measurement is stil running.
+ */
+ public long elapsed() {
+ if(stop == 0) {
+ return -1;
+ }
+ else {
+ return stop - start;
+ }
+ }
+
+ /**
+ * Returns the number of milliseconds for the measurement as a String.
+ */
+ public String toString() {
+ double milli = elapsed() / Math.pow(10, 6);
+ return Double.toString(milli) + "ms";
+ }
+ }
+
+ public static void main(String args[]){
+ // test the measurement overhead
+ String tag;
+ for(int i = 0; i < 100; i++){
+ tag = "foo foo foo";
+ start(tag); stop(tag);
+ tag = "bar";
+ start(tag); stop(tag);
+ tag = "baz";
+ start(tag); stop(tag);
+ report();
+ clear();
+ }
+ for(int i = 0; i < 100; i++){
+ tag = "a";
+ start(tag); stop(tag);
+ tag = "b";
+ start(tag); stop(tag);
+ tag = "c";
+ start(tag); stop(tag);
+ report();
+ clear();
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index fc75591..92da9ba 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -387,6 +387,8 @@
// Fetch the relevant info from the Switch and Port vertices
// from the Titan Graph.
//
+ nodesMap = new TreeMap<Long,Node>();
+
Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
for (ISwitchObject switchObj : activeSwitches) {
Vertex nodeVertex = switchObj.asVertex();
@@ -451,4 +453,74 @@
}
dbHandler.commit();
}
+
+ // TODO Merge into loops in readFromDatabase() can reduce execution time.
+ /**
+ * Check given two topology are identical or not.
+ * @param topo1
+ * @param topo2
+ * @return true if identical
+ */
+ private boolean compareTopology(Map<Long,Node> topo1, Map<Long,Node> topo2) {
+ if (topo1.size() != topo2.size()) {
+ return false;
+ }
+
+ for (Map.Entry<Long,Node> nodeEntry : topo1.entrySet()) {
+ Long dpid = nodeEntry.getKey();
+ if (! topo2.containsKey(dpid)) {
+ return false;
+ }
+
+ Node n1 = nodeEntry.getValue();
+ Node n2 = topo2.get(dpid);
+
+ // check port identity
+ if (n1.ports().size() != n2.ports().size()) {
+ return false;
+ }
+ for (Integer port : n1.ports().keySet()) {
+ if (! n2.ports().containsKey(port)) {
+ return false;
+ }
+ }
+
+ // check link identity
+ if (n1.links.size() != n2.links.size()) {
+ return false;
+ }
+ for (Map.Entry<Integer, Node.Link> linkEntry : n1.links.entrySet()) {
+ Integer p1 = linkEntry.getKey();
+ Node.Link l1 = linkEntry.getValue();
+
+ if (! n2.links.containsKey(p1)) {
+ return false;
+ }
+ Node.Link l2 = n2.links.get(p1);
+
+ // Supposition: Link's "me" and "neighbor" is properly set.
+ if (l1.myPort != l2.myPort ||
+ l1.neighborPort != l2.neighborPort) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ // Only for debug use
+ @Override
+ public String toString() {
+ long numNodes = nodesMap.size();
+ long numLinks = 0;
+ for (Map.Entry<Long, Node> entry : nodesMap.entrySet()) {
+ Node n = entry.getValue();
+ for (Map.Entry<Integer, Node.Link> linkEntry : n.links.entrySet()) {
+ if (n.nodeId > linkEntry.getValue().neighbor.nodeId) {
+ ++numLinks;
+ }
+ }
+ }
+ return "Topology has " + numNodes + " Nodes and " + numLinks + " Links.";
+ }
}