Merge branch 'RAMCloud-master' into RAMCloud-new_dynamics
Conflicts:
src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
diff --git a/perf-scripts/flow-manager-perf.py b/perf-scripts/flow-manager-perf.py
new file mode 100644
index 0000000..9eb022e
--- /dev/null
+++ b/perf-scripts/flow-manager-perf.py
@@ -0,0 +1,216 @@
+#!/usr/bin/python
+'''
+ Script that tests Flow Manager performance
+ Author: Brian O'Connor <bocon@onlab.us>
+
+'''
+
+import csv
+import os
+import sys
+import re
+from time import sleep, strftime
+from subprocess import Popen, call, check_output, PIPE
+from datetime import datetime
+
+try:
+ import pexpect
+except:
+ # install pexpect if it cannot be found and re-import
+ print '* Installing Pexpect'
+ call( 'apt-get install -y python-pexpect', stdout=PIPE, shell=True )
+ import pexpect
+
+ONOS_HOME = '..'
+ONOS_LOG = '%s/onos-logs/onos.%s.log' % ( ONOS_HOME, check_output( 'hostname').strip() )
+ONOS_URL = 'http://127.0.0.1:8080/wm/onos/flows/get/%d/json'
+ONOS_LOG = '/tmp/onos-0.logs/onos.onos-vm.log'
+print "ONOS Log File:", ONOS_LOG
+
+PORT = 's1-eth2'
+N = 1
+
+# ----------------- Running the test and output -------------------------
+
+class Result(object):
+ def __init__(self, tsharkTime, flowmods, onosTime, overhead, details):
+ self.tsharkTime = tsharkTime
+ self.flowmods = flowmods
+ self.onosTime = onosTime
+ self.overhead = overhead
+ # sorted by start time
+ self.tags = sorted(details, key=lambda x: float(x[2]))
+ self.details = sorted(details, key=lambda x: float(x[2]), reverse=True)
+
+ def __repr__(self):
+ return '%f %f %f %d %s' % (self.tsharkTime, self.onosTime, self.overhead, self.flowmods, self.tags)
+
+
+def clearResults():
+ cmd = 'curl %s' % ONOS_URL % -200
+ call( cmd, shell=True )
+ pass
+
+def reportResults():
+ cmd = 'curl %s' % ONOS_URL % -100
+ call( cmd, shell=True )
+
+def test():
+ # Start tailing the onos log
+ tail = pexpect.spawn( "tail -0f %s" % ONOS_LOG )
+ tshark = pexpect.spawn( 'tshark -i lo -R "of.type == 12 || of.type == 14"' )
+ tshark.expect('Capturing on lo')
+ sleep(1) # wait for tshark to start
+
+ clearResults() # REST call to ONOS
+ # Take link down
+ call( 'ifconfig %s down' % PORT, shell=True )
+
+ # collect openflow packets using tshark
+ count = 0
+ timeout = 6000
+ start = -1
+ end = -1
+ while True:
+ i = tshark.expect( ['(\d+\.\d+)', pexpect.TIMEOUT], timeout=timeout )
+ if i == 1:
+ break
+ time = float(tshark.match.group(1))
+ if start == -1:
+ start = time
+ if time > end:
+ end = time
+ i = tshark.expect( ['Port Status', 'Flow Mod'] )
+ if i == 1:
+ count += 1
+ timeout = 3 #sec
+ elapsed = (end - start) * 1000
+
+ sleep(2)
+ # read the performance results from ONOS
+ reportResults() # REST call
+
+ # Wait for performance results in the log
+ tail.expect('Performance Results: \(avg/start/stop/count\)', timeout=10)
+ i = tail.expect('TotalTime:([\d\.]+)/Overhead:([\d\.]+)')
+ totalTime = float(tail.match.group(1))
+ overhead = float(tail.match.group(2))
+ tags = re.findall( r'([\w\.]+)=([\d\.]+)/([\d\.]+)/([\d\.]+)/(\d+)', tail.before )
+ result = Result(elapsed, count, totalTime, overhead, tags)
+ # Output results
+ print result
+ plot(result)
+
+ # Bring port back up
+ call( 'ifconfig %s up' % PORT, shell=True )
+
+ tail.terminate()
+ tshark.terminate()
+ return []
+
+def outputResults(filename, results):
+ with open(filename, 'a') as csvfile:
+ writer = csv.writer(csvfile)
+ writer.writerow(results)
+
+def runPerf(n):
+ filename = 'results-flowmanager-%s.csv' % strftime( '%Y%m%d-%H%M%S' )
+ print 'Starting experiments:'
+ start = datetime.now()
+ for i in range(n):
+ results = test()
+ #outputResults(filename, results)
+ sys.stdout.write('$')
+ sys.stdout.flush()
+ sleep(5) # sleep for 5 seconds between tests
+ sys.stdout.write('.')
+ sys.stdout.flush()
+ totalTime = datetime.now() - start
+ print '\nExperiments complete in %s (h:m:s.s)' % totalTime
+
+def plot(result):
+ import matplotlib.pyplot as plt
+ import pylab
+ import numpy as np
+ from matplotlib.ticker import MaxNLocator
+
+ tags = [ x[0] for x in result.details ]
+ numTags = len(tags)
+ scores = [ float(x[1]) for x in result.details ]
+ offset = [ float(x[2]) for x in result.details ]
+ rankings = [ float(x[3]) - float(x[2]) for x in result.details ]
+ counts = [ x[4] for x in result.details ]
+
+ fig, ax1 = plt.subplots(figsize=(15, 9))
+ plt.subplots_adjust(left=0.3, right=0.8)
+ pos = np.arange(numTags)+0.5 # Center bars on the Y-axis ticks
+ rects = ax1.barh(pos, rankings, left=offset, align='center', height=0.5, color='m')
+
+ ax1.axis([0, result.onosTime, 0, numTags])
+ pylab.yticks(pos, tags)
+ ax1.set_title('TITLE HERE')
+ #plt.text(result.onosTime/2, -0.5,
+ # 'Iteration: ' + str(1), horizontalalignment='center', size='small')
+
+ # Set the right-hand Y-axis ticks and labels and set X-axis tick marks at the
+ # deciles
+ ax2 = ax1.twinx()
+ print MaxNLocator(7)
+ ax2.xaxis.set_major_locator(MaxNLocator(7)) # max number of xaxis ticks
+ #ax2.plot([100, 100], [0, 5], 'white', alpha=0.1)
+ ax2.xaxis.grid(True, linestyle='--', which='major', color='grey', alpha=0.25)
+ #Plot a solid vertical gridline to highlight the median position
+ #plt.plot([50, 50], [0, 5], 'grey', alpha=0.25)
+
+ # Build up the score labels for the right Y-axis by first appending a carriage
+ # return to each string and then tacking on the appropriate meta information
+ # (i.e., 'laps' vs 'seconds'). We want the labels centered on the ticks, so if
+ # there is no meta info (like for pushups) then don't add the carriage return to
+ # the string
+
+ '''
+ scoreLabels = [withnew(i, scr) for i, scr in enumerate(scores)]
+ scoreLabels = [i+j for i, j in zip(scoreLabels, testMeta)]
+ '''
+ scoreLabels = ['%.3f ms\n%s'%(i,j) for i,j in zip(scores,counts)]
+ # set the tick locations
+ ax2.set_yticks(pos)
+ # set the tick labels
+ ax2.set_yticklabels(scoreLabels)
+ # make sure that the limits are set equally on both yaxis so the ticks line up
+ ax2.set_ylim(ax1.get_ylim())
+
+ ax1.set_xlabel('Time (ms)')
+ ax2.set_ylabel('Average iteration / Count')
+
+ # Lastly, write in the ranking inside each bar to aid in interpretation
+ for rect in rects:
+ # Rectangle widths are already integer-valued but are floating
+ # type, so it helps to remove the trailing decimal point and 0 by
+ # converting width to int type
+ width = int(rect.get_width())
+ offset = int(rect.get_x())
+ percent = width / result.onosTime
+ onePercent = 0.01 * result.onosTime
+
+ rankStr = str(width) + 'ms'
+ if (percent < 0.09): # The bars aren't wide enough to print the ranking inside
+ xloc = offset + width + onePercent # Shift the text to the right side of the right edge
+ clr = 'black' # Black against white background
+ align = 'left'
+ else:
+ xloc = offset + 0.98*width # Shift the text to the left side of the right edge
+ clr = 'white' # White on magenta
+ align = 'right'
+
+ # Center the text vertically in the bar
+ yloc = rect.get_y()+rect.get_height()/2.0
+ ax1.text(xloc, yloc, rankStr, horizontalalignment=align,
+ verticalalignment='center', color=clr)
+
+ plt.show()
+ plt.savefig('test.png')
+if __name__ == '__main__':
+ n = N
+ runPerf(n)
+
diff --git a/pom.xml b/pom.xml
index ef5aee1..b62b934 100644
--- a/pom.xml
+++ b/pom.xml
@@ -296,7 +296,7 @@
<dependency>
<groupId>com.tinkerpop.blueprints</groupId>
<artifactId>blueprints-core</artifactId>
- <version>2.4.0</version>
+ <version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.tinkerpop.rexster</groupId>
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 5be4191..13a0157 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -18,15 +18,18 @@
import net.floodlightcontroller.restserver.IRestApiService;
import net.onrc.onos.datagrid.web.DatagridWebRoutable;
import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor.Measurement;
import net.onrc.onos.ofcontroller.proxyarp.ArpReplyNotification;
import net.onrc.onos.ofcontroller.proxyarp.IArpReplyEventHandler;
import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import org.slf4j.Logger;
@@ -44,6 +47,8 @@
import com.hazelcast.core.IMap;
import com.hazelcast.instance.GroupProperties;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
+
/**
* A datagrid service that uses Hazelcast as a datagrid.
* The relevant data is stored in the Hazelcast datagrid and shared as
@@ -75,6 +80,18 @@
private MapFlowEntryListener mapFlowEntryListener = null;
private String mapFlowEntryListenerId = null;
+ // State related to the Flow ID map
+ protected static final String mapFlowIdName = "mapFlowId";
+ private IMap<Long, byte[]> mapFlowId = null;
+ private MapFlowIdListener mapFlowIdListener = null;
+ private String mapFlowIdListenerId = null;
+
+ // State related to the Flow Entry ID map
+ protected static final String mapFlowEntryIdName = "mapFlowEntryId";
+ private IMap<Long, byte[]> mapFlowEntryId = null;
+ private MapFlowEntryIdListener mapFlowEntryIdListener = null;
+ private String mapFlowEntryIdListenerId = null;
+
// State related to the Network Topology map
protected static final String mapTopologyName = "mapTopology";
private IMap<String, byte[]> mapTopology = null;
@@ -246,6 +263,168 @@
}
/**
+ * Class for receiving notifications for FlowId state.
+ *
+ * The datagrid map is:
+ * - Key : FlowId (Long)
+ * - Value : Serialized Switch Dpid (byte[])
+ */
+ class MapFlowIdListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowId flowId = new FlowId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdAdded(flowId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowId flowId = new FlowId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdRemoved(flowId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowId flowId = new FlowId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowIdUpdated(flowId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<Long, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
+ * Class for receiving notifications for FlowEntryId state.
+ *
+ * The datagrid map is:
+ * - Key : FlowEntryId (Long)
+ * - Value : Serialized Switch Dpid (byte[])
+ */
+ class MapFlowEntryIdListener implements EntryListener<Long, byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdAdded(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdRemoved(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<Long, byte[]> event) {
+ Long keyLong = event.getKey();
+ FlowEntryId flowEntryId = new FlowEntryId(keyLong);
+
+ byte[] valueBytes = event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvFlowEntryIdUpdated(flowEntryId, dpid);
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<Long, byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
+
+ /**
* Class for receiving notifications for Network Topology state.
*
* The datagrid map is:
@@ -280,6 +459,9 @@
*/
@Override
public void entryRemoved(EntryEvent<String, byte[]> event) {
+// String tag = "TopologyEntryRemoved.NotificationReceived." + event.getKey();
+ String tag = "TopologyEntryRemoved.NotificationReceived";
+ PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
byte[] valueBytes = event.getValue();
//
@@ -291,6 +473,9 @@
kryo.readObject(input, TopologyElement.class);
kryoFactory.deleteKryo(kryo);
flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
+// PerformanceMonitor.stop(tag);
+ m.stop();
+// PerformanceMonitor.report(tag);
}
/**
@@ -549,6 +734,16 @@
mapFlowEntry = hazelcastInstance.getMap(mapFlowEntryName);
mapFlowEntryListenerId = mapFlowEntry.addEntryListener(mapFlowEntryListener, true);
+ // Initialize the FlowId-related map state
+ mapFlowIdListener = new MapFlowIdListener();
+ mapFlowId = hazelcastInstance.getMap(mapFlowIdName);
+ mapFlowIdListenerId = mapFlowId.addEntryListener(mapFlowIdListener, true);
+
+ // Initialize the FlowEntryId-related map state
+ mapFlowEntryIdListener = new MapFlowEntryIdListener();
+ mapFlowEntryId = hazelcastInstance.getMap(mapFlowEntryIdName);
+ mapFlowEntryIdListenerId = mapFlowEntryId.addEntryListener(mapFlowEntryIdListener, true);
+
// Initialize the Topology-related map state
mapTopologyListener = new MapTopologyListener();
mapTopology = hazelcastInstance.getMap(mapTopologyName);
@@ -576,6 +771,16 @@
mapFlowEntry = null;
mapFlowEntryListener = null;
+ // Clear the FlowId-related map state
+ mapFlowId.removeEntryListener(mapFlowIdListenerId);
+ mapFlowId = null;
+ mapFlowIdListener = null;
+
+ // Clear the FlowEntryId-related map state
+ mapFlowEntryId.removeEntryListener(mapFlowEntryIdListenerId);
+ mapFlowEntryId = null;
+ mapFlowEntryIdListener = null;
+
// Clear the Topology-related map state
mapTopology.removeEntryListener(mapTopologyListenerId);
mapTopology = null;
@@ -845,6 +1050,216 @@
}
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that are currently in the datagrid.
+ */
+ @Override
+ public Collection<Pair<FlowId, Dpid>> getAllFlowIds() {
+ Collection<Pair<FlowId, Dpid>> allFlowIds =
+ new LinkedList<Pair<FlowId, Dpid>>();
+
+ //
+ // Get all current entries
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ for (Map.Entry<Long, byte[]> entry : mapFlowId.entrySet()) {
+ Long key = entry.getKey();
+ byte[] valueBytes = entry.getValue();
+
+ FlowId flowId = new FlowId(key);
+
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+
+ Pair<FlowId, Dpid> pair = new Pair(flowId, dpid);
+ allFlowIds.add(pair);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowIds;
+ }
+
+ /**
+ * Get all Flow Entry IDs that are currently in the datagrid.
+ *
+ * @return all Flow Entry IDs that ae currently in the datagrid.
+ */
+ @Override
+ public Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds() {
+ Collection<Pair<FlowEntryId, Dpid>> allFlowEntryIds =
+ new LinkedList<Pair<FlowEntryId, Dpid>>();
+
+ //
+ // Get all current entries
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ for (Map.Entry<Long, byte[]> entry : mapFlowEntryId.entrySet()) {
+ Long key = entry.getKey();
+ byte[] valueBytes = entry.getValue();
+
+ FlowEntryId flowEntryId = new FlowEntryId(key);
+
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ Dpid dpid = kryo.readObject(input, Dpid.class);
+
+ Pair<FlowEntryId, Dpid> pair = new Pair(flowEntryId, dpid);
+ allFlowEntryIds.add(pair);
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return allFlowEntryIds;
+ }
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, dpid);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowId.putAsync(flowId.value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ @Override
+ public void notificationSendFlowIdRemoved(FlowId flowId) {
+ //
+ // Remove the entry:
+ // - Key : FlowId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowId.removeAsync(flowId.value());
+ }
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowIdAdded(flowId, dpid);
+ }
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ @Override
+ public void notificationSendAllFlowIdsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowId.clear();
+ Set<Long> keySet = mapFlowId.keySet();
+ for (Long key : keySet) {
+ mapFlowId.removeAsync(key);
+ }
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ //
+ // Encode the value
+ //
+ byte[] buffer = new byte[MAX_BUFFER_SIZE];
+ Kryo kryo = kryoFactory.newKryo();
+ Output output = new Output(buffer, -1);
+ kryo.writeObject(output, dpid);
+ byte[] valueBytes = output.toBytes();
+ kryoFactory.deleteKryo(kryo);
+
+ //
+ // Put the entry:
+ // - Key : FlowEntryId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowEntryId.putAsync(flowEntryId.value(), valueBytes);
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ */
+ @Override
+ public void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId) {
+ //
+ // Remove the entry:
+ // - Key : FlowEntryId (Long)
+ // - Value : Serialized Switch Dpid (byte[])
+ //
+ mapFlowEntryId.removeAsync(flowEntryId.value());
+ }
+
+ /**
+ * Send a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid.
+ */
+ @Override
+ public void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ // NOTE: Adding an entry with an existing key automatically updates it
+ notificationSendFlowEntryIdAdded(flowEntryId, dpid);
+ }
+
+ /**
+ * Send a notification that all Flow Entry IDs are removed.
+ */
+ @Override
+ public void notificationSendAllFlowEntryIdsRemoved() {
+ //
+ // Remove all entries
+ // NOTE: We remove the entries one-by-one so the per-entry
+ // notifications will be delivered.
+ //
+ // mapFlowEntryId.clear();
+ Set<Long> keySet = mapFlowEntryId.keySet();
+ for (Long key : keySet) {
+ mapFlowEntryId.removeAsync(key);
+ }
+ }
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 1478129..80d1781 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -9,10 +9,12 @@
import net.onrc.onos.ofcontroller.proxyarp.IPacketOutEventHandler;
import net.onrc.onos.ofcontroller.proxyarp.PacketOutNotification;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.Pair;
/**
* Interface for providing Datagrid Service to other modules.
@@ -150,6 +152,77 @@
void notificationSendAllFlowEntriesRemoved();
/**
+ * Get all Flow IDs that are currently in the datagrid.
+ *
+ * @return all Flow IDs that ae currently in the datagrid.
+ */
+ Collection<Pair<FlowId, Dpid>> getAllFlowIds();
+
+ /**
+ * Send a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid.
+ */
+ void notificationSendFlowIdAdded(FlowId flowId, Dpid dpid);
+
+ /**
+ * Send a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ */
+ void notificationSendFlowIdRemoved(FlowId flowId);
+
+ /**
+ * Send a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid.
+ */
+ void notificationSendFlowIdUpdated(FlowId flowId, Dpid dpid);
+
+ /**
+ * Send a notification that all Flow IDs are removed.
+ */
+ void notificationSendAllFlowIdsRemoved();
+
+ /**
+ * Get all Flow Entry IDs that are currently in the datagrid.
+ *
+ * @return all Flow Entry IDs that ae currently in the datagrid.
+ */
+ Collection<Pair<FlowEntryId, Dpid>> getAllFlowEntryIds();
+
+ /**
+ * Send a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid.
+ */
+ void notificationSendFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+ /**
+ * Send a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ */
+ void notificationSendFlowEntryIdRemoved(FlowEntryId flowEntryId);
+
+ /**
+ * Send a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid.
+ */
+ void notificationSendFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
+ * Send a notification that all Flow Entry IDs are removed.
+ */
+ void notificationSendAllFlowEntryIdsRemoved();
+
+ /**
* Get all Topology Elements that are currently in the datagrid.
*
* @return all Topology Elements that are currently in the datagrid.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
index fedc90a..b43c58a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
@@ -492,6 +492,8 @@
log.error("LinkStorageImpl:addLinkImpl failed link exists {} {} src {} dst {}",
new Object[]{dbop, lt, vportSrc, vportDst});
}
+ } else {
+ log.error("Ports not found : {}", lt);
}
return success;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
index 3a6db70..99a2eb1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoLinkServiceImpl.java
@@ -35,8 +35,7 @@
@Override
public List<Link> getActiveLinks() {
- // TODO Auto-generated method stub
- dbop = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloud.conf");
+ dbop = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
//dbop = GraphDBManager.getDBOperation("", "");
//dbop.commit(); //Commit to ensure we see latest data
Iterable<ISwitchObject> switches = dbop.getActiveSwitches();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
index d1bd491..df5ab70 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/internal/TopoSwitchServiceImpl.java
@@ -10,7 +10,7 @@
import org.slf4j.LoggerFactory;
public class TopoSwitchServiceImpl implements ITopoSwitchService {
-
+
private DBOperation op;
protected final static Logger log = LoggerFactory.getLogger(TopoSwitchServiceImpl.class);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
index 4d564e9..98b80e6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
@@ -45,6 +45,8 @@
import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
import net.onrc.onos.registry.controller.RegistryException;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
+
import org.openflow.protocol.OFPhysicalPort;
import org.openflow.util.HexString;
import org.slf4j.Logger;
@@ -355,12 +357,16 @@
@Override
public void switchPortRemoved(Long switchId, OFPhysicalPort port) {
// Remove all links that might be connected already
+ PerformanceMonitor.start("SwitchPortRemoved.DbAccess");
+
List<Link> links = linkStore.getLinks(switchId, port.getPortNumber());
// Remove all reverse links as well
List<Link> reverseLinks = linkStore.getReverseLinks(switchId, port.getPortNumber());
links.addAll(reverseLinks);
if (swStore.deletePort(HexString.toHexString(switchId), port.getPortNumber())) {
+ PerformanceMonitor.stop("SwitchPortRemoved.DbAccess");
+ PerformanceMonitor.start("SwitchPortRemoved.NotificationSend");
// TODO publish DELETE_PORT event here
TopologyElement topologyElement =
new TopologyElement(switchId, port.getPortNumber());
@@ -375,6 +381,9 @@
link.getDstPort());
datagridService.notificationSendTopologyElementRemoved(topologyElementLink);
}
+ PerformanceMonitor.stop("SwitchPortRemoved.NotificationSend");
+ PerformanceMonitor.report("SwitchPortRemoved.DbAccess");
+ PerformanceMonitor.report("TopologyEntryRemoved.NotificationReceived");
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index e3dcad0..d8e50fc 100755
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -4,9 +4,11 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
+import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.util.MACAddress;
import net.onrc.onos.graph.DBOperation;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IBaseObject;
@@ -565,11 +567,7 @@
}
}
- // TODO: Hacks with hard-coded state names!
- if (found)
- flowProp.setUserState("FE_USER_MODIFY");
- else
- flowProp.setUserState("FE_USER_ADD");
+ flowProp.setUserState(flowEntry.flowEntryUserState().toString());
flowProp.setSwitchState(flowEntry.flowEntrySwitchState().toString());
if (measureONOSFlowEntryTimeProp) {
numProperties += 2;
@@ -763,6 +761,77 @@
}
/**
+ * Get a previously added flow entry.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowEntryId the Flow Entry ID of the flow entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ static FlowEntry getFlowEntry(DBOperation dbHandler,
+ FlowEntryId flowEntryId) {
+ IFlowEntry flowEntryObj = null;
+ try {
+ flowEntryObj = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getFlowEntry FlowEntryId:{} failed", flowEntryId);
+ return null;
+ }
+ if (flowEntryObj == null) {
+ dbHandler.commit();
+ return null; // Flow not found
+ }
+
+ //
+ // Extract the Flow Entry state
+ //
+ FlowEntry flowEntry = extractFlowEntry(flowEntryObj);
+ dbHandler.commit();
+
+ return flowEntry;
+ }
+
+ /**
+ * Get the source switch DPID of a previously added flow.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to get.
+ * @return the source switch DPID if found, otherwise null.
+ */
+ static Dpid getFlowSourceDpid(DBOperation dbHandler, FlowId flowId) {
+ IFlowPath flowObj = null;
+ try {
+ flowObj = dbHandler.searchFlowPath(flowId);
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getFlowSourceDpid FlowId:{} failed", flowId);
+ return null;
+ }
+ if (flowObj == null) {
+ dbHandler.commit();
+ return null; // Flow not found
+ }
+
+ //
+ // Extract the Flow Source DPID
+ //
+ String srcSwitchStr = flowObj.getSrcSwitch();
+ if (srcSwitchStr == null) {
+ // TODO: A work-around, becauuse of some bogus database objects
+ dbHandler.commit();
+ return null;
+ }
+
+ Dpid dpid = new Dpid(srcSwitchStr);
+
+ dbHandler.commit();
+
+ return dpid;
+ }
+
+ /**
* Get all installed flows by all installers.
*
* @param dbHandler the Graph Database handler to use.
@@ -800,12 +869,88 @@
}
/**
+ * Get all installed flows whose Source Switch is controlled by this
+ * instance.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param mySwitches the collection of the switches controlled by this
+ * instance.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getAllMyFlows(DBOperation dbHandler,
+ Map<Long, IOFSwitch> mySwitches) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllMyFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ for (IFlowPath flowObj : flowPathsObj) {
+ //
+ // Extract the Source Switch DPID and ignore if the switch
+ // is not controlled by this instance.
+ //
+ String srcSwitchStr = flowObj.getSrcSwitch();
+ if (srcSwitchStr == null) {
+ // TODO: A work-around, becauuse of some bogus database objects
+ continue;
+ }
+ Dpid dpid = new Dpid(srcSwitchStr);
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ //
+ // Extract the Flow state
+ //
+ FlowPath flowPath = extractFlowPath(flowObj);
+ if (flowPath != null)
+ flowPaths.add(flowPath);
+ }
+
+ dbHandler.commit();
+
+ return flowPaths;
+ }
+
+ /**
+ * Get a subset of installed flows.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowIds the collection of Flow IDs to get.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
+ Collection<FlowId> flowIds) {
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ // TODO: This implementation should use threads
+ for (FlowId flowId : flowIds) {
+ FlowPath flowPath = getFlow(dbHandler, flowId);
+ if (flowPath != null)
+ flowPaths.add(flowPath);
+ }
+ // dbHandler.commit();
+
+ return flowPaths;
+ }
+
+ /**
* Extract Flow Path State from a Titan Database Object @ref IFlowPath.
*
* @param flowObj the object to extract the Flow Path State from.
* @return the extracted Flow Path State.
*/
- private static FlowPath extractFlowPath(IFlowPath flowObj) {
+ static FlowPath extractFlowPath(IFlowPath flowObj) {
//
// Extract the Flow state
//
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 67a2ad1..1705569 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -15,11 +15,15 @@
import java.util.concurrent.LinkedBlockingQueue;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.graph.GraphDBManager;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.topology.TopologyManager;
import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.EventEntry;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
@@ -31,6 +35,8 @@
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.Pair;
+import net.onrc.onos.ofcontroller.util.Port;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
import com.esotericsoftware.kryo.Kryo;
@@ -46,17 +52,16 @@
* - Detect FlowPaths impacted by Topology change.
* - Recompute impacted FlowPath using cached Topology.
*/
-class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+class FlowEventHandler extends Thread implements IFlowEventHandlerService,
+ IOFSwitchListener {
+
+ private boolean enableOnrc2014MeasurementsFlows = true;
+ private boolean enableOnrc2014MeasurementsTopology = true;
+
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
- // Flag to refresh Topology object periodically
- private final static boolean refreshTopology = false;
- // Refresh delay(ms)
- private final static long refreshTopologyDelay = 5000;
- // Refresh interval(ms)
- private final static long refreshTopologyInterval = 2000;
- private Timer refreshTopologyTimer;
+ private DBOperation dbHandler;
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
@@ -74,6 +79,12 @@
new LinkedList<EventEntry<FlowPath>>();
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ private List<EventEntry<Pair<FlowId, Dpid>>> flowIdEvents =
+ new LinkedList<EventEntry<Pair<FlowId, Dpid>>>();
+ private List<EventEntry<Pair<FlowEntryId, Dpid>>> flowEntryIdEvents =
+ new LinkedList<EventEntry<Pair<FlowEntryId, Dpid>>>();
+ private List<EventEntry<Dpid>> switchDpidEvents =
+ new LinkedList<EventEntry<Dpid>>();
// All internally computed Flow Paths
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
@@ -119,6 +130,8 @@
* Startup processing.
*/
private void startup() {
+ this.dbHandler = GraphDBManager.getDBOperation("ramcloud", "/tmp/ramcloudconf");
+
//
// Obtain the initial Topology state
//
@@ -148,32 +161,33 @@
flowEntryEvents.add(eventEntry);
}
+ //
+ // Obtain the initial FlowId state
+ //
+ Collection<Pair<FlowId, Dpid>> flowIds =
+ datagridService.getAllFlowIds();
+ for (Pair<FlowId, Dpid> pair : flowIds) {
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+ flowIdEvents.add(eventEntry);
+ }
+
+ //
+ // Obtain the initial FlowEntryId state
+ //
+ Collection<Pair<FlowEntryId, Dpid>> flowEntryIds =
+ datagridService.getAllFlowEntryIds();
+ for (Pair<FlowEntryId, Dpid> pair : flowEntryIds) {
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, pair);
+ flowEntryIdEvents.add(eventEntry);
+ }
+
// Process the initial events (if any)
synchronized (allFlowPaths) {
processEvents();
}
- if (refreshTopology) {
- refreshTopologyTimer = new Timer();
- refreshTopologyTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- PerfMon pm = PerfMon.getInstance();
- log.debug("[BEFORE] {}", topology);
- long begin, end;
- synchronized(topology) {
- begin = System.nanoTime();
- pm.read_whole_topology_start();
- topology.readFromDatabase(flowManager.dbHandlerInner);
- pm.read_whole_topology_end();
- end = System.nanoTime();
- }
- // FIXME level raised for measurement. Was debug
- log.error("[AFTER] {}", topology);
- log.error("refresh takes : {}[us]", (end - begin) / 1000.0);
- }
- }, refreshTopologyDelay, refreshTopologyInterval);
- }
}
/**
@@ -199,12 +213,15 @@
// - EventEntry<TopologyElement>
// - EventEntry<FlowPath>
// - EventEntry<FlowEntry>
+ // - EventEntry<Pair<FlowId, Dpid>>
+ // - EventEntry<Pair<FlowEntryId, Dpid>>
//
for (EventEntry<?> event : collection) {
// Topology event
if (event.eventData() instanceof TopologyElement) {
EventEntry<TopologyElement> topologyEventEntry =
(EventEntry<TopologyElement>)event;
+
topologyEvents.add(topologyEventEntry);
continue;
}
@@ -224,6 +241,34 @@
flowEntryEvents.add(flowEntryEventEntry);
continue;
}
+
+ // FlowId event
+ if (event.eventData() instanceof Pair) {
+ EventEntry<Pair<FlowId, Dpid>> flowIdEventEntry =
+ (EventEntry<Pair<FlowId, Dpid>>)event;
+ flowIdEvents.add(flowIdEventEntry);
+ continue;
+ }
+
+ // Switch Dpid event
+ if (event.eventData() instanceof Dpid) {
+ EventEntry<Dpid> switchDpidEventEntry =
+ (EventEntry<Dpid>)event;
+ switchDpidEvents.add(switchDpidEventEntry);
+ continue;
+ }
+
+ // FlowEntryId event
+ // TODO: Fix the code below if we need again to handle
+ // the FlowEntryId events
+ /*
+ if (event.eventData() instanceof Pair) {
+ EventEntry<Pair<FlowEntryId, Dpid>> flowEntryIdEventEntry =
+ (EventEntry<Pair<FlowEntryId, Dpid>>)event;
+ flowEntryIdEvents.add(flowEntryIdEventEntry);
+ continue;
+ }
+ */
}
collection.clear();
@@ -236,13 +281,137 @@
log.debug("Exception processing Network Events: ", exception);
}
}
-
+
/**
* Process the events (if any)
*/
private void processEvents() {
Collection<FlowEntry> modifiedFlowEntries;
+ if (enableOnrc2014MeasurementsFlows) {
+
+ PerformanceMonitor.start("EventHandler.ProcessAllEvents");
+
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
+ switchDpidEvents.isEmpty()) {
+ return; // Nothing to do
+ }
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ // Process the Switch Dpid events
+ PerformanceMonitor.start("EventHandler.SwitchDpidEvents");
+ processSwitchDpidEvents();
+ PerformanceMonitor.stop("EventHandler.SwitchDpidEvents");
+
+ // Process the Flow ID events
+ PerformanceMonitor.start("EventHandler.FlowIdEvents");
+ processFlowIdEvents(mySwitches);
+ PerformanceMonitor.stop("EventHandler.FlowIdEvents");
+
+ // Fetch the topology
+ PerformanceMonitor.start("EventHandler.ReadTopology");
+ processTopologyEvents();
+ PerformanceMonitor.stop("EventHandler.ReadTopology");
+
+ // Recompute all affected Flow Paths and keep only the modified
+ PerformanceMonitor.start("EventHandler.RecomputeFlows");
+ for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ // Assign the Flow Entry ID as needed
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+ PerformanceMonitor.stop("EventHandler.RecomputeFlows");
+
+ //
+ // Push the modified state to the database
+ //
+ PerformanceMonitor.start("EventHandler.WriteFlowsToDb");
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ //
+ // Delete the Flow Path from the Network Map
+ //
+ if (flowPath.flowPathUserState() ==
+ FlowPathUserState.FP_USER_DELETE) {
+ log.debug("Deleting Flow Path From Database: {}", flowPath);
+ // TODO: For now the deleting of a Flow Path is blocking
+ ParallelFlowDatabaseOperation.deleteFlow(dbHandler,
+ flowPath.flowId());
+ //
+ // NOTE: For now the sending of the notifications
+ // is outside of this loop, so the performance measurements
+ // are more accurate.
+ //
+ /*
+ // Send the notifications for the deleted Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ }
+ */
+
+ continue;
+ }
+
+ log.debug("Pushing Flow Path To Database: {}", flowPath);
+ //
+ // Write the Flow Path to the Network Map
+ //
+ ParallelFlowDatabaseOperation.addFlow(dbHandler, flowPath,
+ datagridService);
+ }
+ PerformanceMonitor.stop("EventHandler.WriteFlowsToDb");
+
+ //
+ // Send the notifications for the deleted Flow Entries
+ // NOTE: This code was pulled outside of the above loop,
+ // so the performance measurements are more accurate.
+ //
+ PerformanceMonitor.start("EventHandler.NotificationSend.FlowEntryRemoved");
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ if (flowPath.flowPathUserState() ==
+ FlowPathUserState.FP_USER_DELETE) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ }
+ }
+ }
+ PerformanceMonitor.stop("EventHandler.NotificationSend.FlowEntryRemoved");
+
+ // Cleanup
+ topologyEvents.clear();
+ flowIdEvents.clear();
+ switchDpidEvents.clear();
+ //
+ // NOTE: Keep a cache with my Flow Paths
+ // allFlowPaths.clear();
+ shouldRecomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+
+ PerformanceMonitor.stop("EventHandler.ProcessAllEvents");
+
+
+// PerformanceMonitor.report("EventHandler.SwitchDpidEvents");
+// PerformanceMonitor.report("EventHandler.FlowIdEvents");
+// PerformanceMonitor.report("EventHandler.ReadTopology");
+// PerformanceMonitor.report("EventHandler.RecomputeFlows");
+// PerformanceMonitor.report("EventHandler.WriteFlowsToDb");
+// PerformanceMonitor.report("EventHandler.NotificationSend.FlowEntryRemoved");
+// PerformanceMonitor.report("EventHandler.ProcessAllEvents");
+// PerformanceMonitor.report();
+// PerformanceMonitor.clear();
+
+ return;
+ }
+
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
return; // Nothing to do
@@ -383,6 +552,246 @@
}
/**
+ * Fix a flow fetched from the database.
+ *
+ * @param flowPath the Flow to fix.
+ */
+ private void fixFlowFromDatabase(FlowPath flowPath) {
+ //
+ // TODO: Bug workaround / fix :
+ // method FlowDatabaseOperation.extractFlowEntry() doesn't
+ // fetch the inPort and outPort, hence we assign them here.
+ //
+ // Assign the inPort and outPort for the Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ // Set the inPort
+ do {
+ if (flowEntry.inPort() != null)
+ break;
+ if (flowEntry.flowEntryMatch() == null)
+ break;
+ Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
+ flowEntry.setInPort(inPort);
+ } while (false);
+
+ // Set the outPort
+ do {
+ if (flowEntry.outPort() != null)
+ break;
+ for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+ if (fa.actionOutput() != null) {
+ Port outPort = new Port(fa.actionOutput().port().value());
+ flowEntry.setOutPort(outPort);
+ break;
+ }
+ }
+ } while (false);
+ }
+ }
+
+ /**
+ * Process the Switch Dpid events.
+ */
+ private void processSwitchDpidEvents() {
+ Map<Long, Dpid> addedSwitches = new HashMap<Long, Dpid>();
+ Map<Long, Dpid> removedSwitches = new HashMap<Long, Dpid>();
+
+ //
+ // Process all Switch Dpid events and update the appropriate state
+ //
+ for (EventEntry<Dpid> eventEntry : switchDpidEvents) {
+ Dpid dpid = eventEntry.eventData();
+
+ log.debug("SwitchDpid Event: {} {}", eventEntry.eventType(), dpid);
+
+ // Compute the final set of added and removed switches
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ addedSwitches.put(dpid.value(), dpid);
+ removedSwitches.remove(dpid.value());
+ break;
+ case ENTRY_REMOVE:
+ addedSwitches.remove(dpid.value());
+ removedSwitches.put(dpid.value(), dpid);
+ break;
+ }
+ }
+
+ //
+ // Remove the Flows from the local cache if the removed
+ // switch is the Source Switch.
+ //
+ // TODO: This search can be expensive for a large number of flows
+ // and should be optmized.
+ //
+ List<FlowId> deleteFlowIds = new LinkedList<FlowId>();
+ for (Dpid switchDpid : removedSwitches.values()) {
+ for (FlowPath flowPath : allFlowPaths.values()) {
+ Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+ if (srcDpid.value() == switchDpid.value())
+ deleteFlowIds.add(flowPath.flowId());
+ }
+ }
+ //
+ // Remove the Flows from the local cache
+ //
+ for (FlowId flowId : deleteFlowIds)
+ allFlowPaths.remove(flowId.value());
+
+ // Get the Flows for the added switches
+ Collection<FlowPath> flowPaths =
+ ParallelFlowDatabaseOperation.getFlowsForSwitches(dbHandler,
+ addedSwitches.values());
+ for (FlowPath flowPath : flowPaths) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
+ * Process the Flow ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ */
+ private void processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowId> shouldFetchMyFlowIds = new LinkedList<FlowId>();
+
+ //
+ // Process all Flow Id events and update the appropriate state
+ //
+ for (EventEntry<Pair<FlowId, Dpid>> eventEntry : flowIdEvents) {
+ Pair<FlowId, Dpid> pair = eventEntry.eventData();
+ FlowId flowId = pair.first;
+ Dpid dpid = pair.second;
+
+ log.debug("Flow ID Event: {} {} {}", eventEntry.eventType(),
+ flowId, dpid);
+
+ //
+ // Ignore Flows if the Source Switch is not controlled by this
+ // instance.
+ //
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD: {
+ //
+ // Add a new Flow Path
+ //
+ if (allFlowPaths.get(flowId.value()) != null) {
+ //
+ // TODO: What to do if the Flow Path already exists?
+ // Fow now, we just re-add it.
+ //
+ }
+ shouldFetchMyFlowIds.add(flowId);
+
+ break;
+ }
+
+ case ENTRY_REMOVE: {
+ //
+ // Remove an existing Flow Path.
+ //
+ // Find the Flow Path, and mark the Flow and its Flow Entries
+ // for deletion.
+ //
+ FlowPath existingFlowPath =
+ allFlowPaths.get(flowId.value());
+ if (existingFlowPath == null)
+ continue; // Nothing to do
+
+ existingFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_DELETE);
+ for (FlowEntry flowEntry : existingFlowPath.flowEntries()) {
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+
+ // Remove the Flow Path from the internal state
+ Long key = existingFlowPath.flowId().value();
+ allFlowPaths.remove(key);
+ shouldRecomputeFlowPaths.remove(key);
+ modifiedFlowPaths.put(key, existingFlowPath);
+
+ break;
+ }
+ }
+ }
+
+ // Get my Flows
+ Collection<FlowPath> myFlows =
+ ParallelFlowDatabaseOperation.getFlows(dbHandler,
+ shouldFetchMyFlowIds);
+
+ for (FlowPath flowPath : myFlows) {
+ fixFlowFromDatabase(flowPath);
+
+ switch (flowPath.flowPathType()) {
+ case FP_TYPE_SHORTEST_PATH:
+ //
+ // Reset the Data Path, in case it was set already, because
+ // we are going to recompute it anyway.
+ //
+ flowPath.flowEntries().clear();
+ shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
+ flowPath);
+ break;
+ case FP_TYPE_EXPLICIT_PATH:
+ //
+ // Mark all Flow Entries for installation in the switches.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ break;
+ case FP_TYPE_UNKNOWN:
+ log.error("FlowPath event with unknown type");
+ break;
+ }
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
+ * Process the Flow Entry ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ * @return a collection of modified Flow Entries this instance needs
+ * to push to its own switches.
+ */
+ private Collection<FlowEntry> processFlowEntryIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+ //
+ // Process all Flow ID events and update the appropriate state
+ //
+ for (EventEntry<Pair<FlowEntryId, Dpid>> eventEntry : flowEntryIdEvents) {
+ Pair<FlowEntryId, Dpid> pair = eventEntry.eventData();
+ FlowEntryId flowEntryId = pair.first;
+ Dpid dpid = pair.second;
+
+ log.debug("Flow Entry ID Event: {} {} {}", eventEntry.eventType(),
+ flowEntryId, dpid);
+
+ if (mySwitches.get(dpid.value()) == null)
+ continue;
+
+ // Fetch the Flow Entry
+ FlowEntry flowEntry = FlowDatabaseOperation.getFlowEntry(dbHandler,
+ flowEntryId);
+ if (flowEntry == null) {
+ log.debug("Flow Entry ID {} : Flow Entry not found!",
+ flowEntryId);
+ continue;
+ }
+ modifiedFlowEntries.add(flowEntry);
+ }
+
+ return modifiedFlowEntries;
+ }
+
+ /**
* Process the Flow Path events.
*/
private void processFlowPathEvents() {
@@ -468,15 +877,34 @@
* Process the Topology events.
*/
private void processTopologyEvents() {
+ boolean isTopologyModified = false;
+
+ if (enableOnrc2014MeasurementsTopology) {
+ if (topologyEvents.isEmpty())
+ return;
+
+ // TODO: Code for debugging purpose only
+ for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
+ TopologyElement topologyElement = eventEntry.eventData();
+ log.debug("Topology Event: {} {}", eventEntry.eventType(),
+ topologyElement.toString());
+ }
+
+ log.debug("[BEFORE] {}", topology.toString());
+ topology.readFromDatabase(dbHandler);
+ log.debug("[AFTER] {}", topology.toString());
+ shouldRecomputeFlowPaths.putAll(allFlowPaths);
+ return;
+ }
+
//
// Process all Topology events and update the appropriate state
//
- boolean isTopologyModified = false;
for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
TopologyElement topologyElement = eventEntry.eventData();
-
+
log.debug("Topology Event: {} {}", eventEntry.eventType(),
- topologyElement);
+ topologyElement.toString());
switch (eventEntry.eventType()) {
case ENTRY_ADD:
@@ -756,6 +1184,19 @@
private boolean recomputeFlowPath(FlowPath flowPath) {
boolean hasChanged = false;
+ if (enableOnrc2014MeasurementsFlows) {
+ // Cleanup the deleted Flow Entries from the earlier iteration
+ flowPath.dataPath().removeDeletedFlowEntries();
+
+ //
+ // TODO: Fake it that the Flow Entries have been already pushed
+ // into the switches, so we don't push them again.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+ }
+ }
+
//
// Test whether the Flow Path needs to be recomputed
//
@@ -776,7 +1217,6 @@
newDataPath = TopologyManager.computeNetworkPath(topology,
flowPath);
}
-
if (newDataPath == null) {
// We need the DataPath to compare the paths
newDataPath = new DataPath();
@@ -986,6 +1426,19 @@
*/
@Override
public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+// String tag = "EventHandler.AddFlowEntryToSwitch." + flowEntry.flowEntryId();
+ String tag = "EventHandler.AddFlowEntryToSwitch";
+ PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+// PerformanceMonitor.stop(tag);
+ m.stop();
+// PerformanceMonitor.report(tag);
+ return;
+ }
+
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
networkEvents.add(eventEntry);
@@ -998,6 +1451,25 @@
*/
@Override
public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+// String tag = "EventHandler.RemoveFlowEntryFromSwitch." + flowEntry.flowEntryId();
+ String tag = "EventHandler.RemoveFlowEntryFromSwitch";
+ PerformanceMonitor.Measurement m = PerformanceMonitor.start(tag);
+ //
+ // NOTE: Must update the state to DELETE, because
+ // the notification contains the original state.
+ //
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+// PerformanceMonitor.stop(tag);
+ m.stop();
+// PerformanceMonitor.report(tag);
+ return;
+ }
+
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_REMOVE, flowEntry);
networkEvents.add(eventEntry);
@@ -1010,6 +1482,13 @@
*/
@Override
public void notificationRecvFlowEntryUpdated(FlowEntry flowEntry) {
+ if (enableOnrc2014MeasurementsFlows) {
+ Collection entries = new ArrayList();
+ entries.add(flowEntry);
+ flowManager.pushModifiedFlowEntriesToSwitches(entries);
+ return;
+ }
+
// NOTE: The ADD and UPDATE events are processed in same way
EventEntry<FlowEntry> eventEntry =
new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
@@ -1017,6 +1496,101 @@
}
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ @Override
+ public void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid) {
+ Pair flowIdPair = new Pair(flowId, dpid);
+
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<Pair<FlowId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_REMOVE, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ @Override
+ public void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid) {
+ Pair flowEntryIdPair = new Pair(flowEntryId, dpid);
+
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<Pair<FlowEntryId, Dpid>> eventEntry =
+ new EventEntry<Pair<FlowEntryId, Dpid>>(EventEntry.Type.ENTRY_ADD, flowEntryIdPair);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
@@ -1054,6 +1628,40 @@
}
/**
+ * Receive a notification that a switch is added to this instance.
+ *
+ * @param sw the switch that is added.
+ */
+ @Override
+ public void addedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_ADD, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a switch is removed from this instance.
+ *
+ * @param sw the switch that is removed.
+ */
+ @Override
+ public void removedSwitch(IOFSwitch sw) {
+ Dpid dpid = new Dpid(sw.getId());
+ EventEntry<Dpid> eventEntry =
+ new EventEntry<Dpid>(EventEntry.Type.ENTRY_REMOVE, dpid);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that the ports on a switch have changed.
+ */
+ @Override
+ public void switchPortChanged(Long switchId) {
+ // Nothing to do
+ }
+
+ /**
* Get a sorted copy of all Flow Paths.
*
* @return a sorted copy of all Flow Paths.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 0d8cb38..de847de 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -50,8 +51,9 @@
* Flow Manager class for handling the network flows.
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
- // flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
- private final static boolean enableFlowPusher = false;
+
+ private boolean enableOnrc2014MeasurementsFlows = true;
+
protected DBOperation dbHandlerApi;
protected DBOperation dbHandlerInner;
@@ -103,6 +105,7 @@
*/
@Override
public void close() {
+ floodlightProvider.removeOFSwitchListener(flowEventHandler);
datagridService.deregisterFlowEventHandlerService(flowEventHandler);
dbHandlerApi.close();
dbHandlerInner.close();
@@ -229,6 +232,7 @@
// - startup
//
flowEventHandler = new FlowEventHandler(this, datagridService);
+ floodlightProvider.addOFSwitchListener(flowEventHandler);
datagridService.registerFlowEventHandlerService(flowEventHandler);
flowEventHandler.start();
}
@@ -269,7 +273,13 @@
}
if (FlowDatabaseOperation.addFlow(dbHandlerApi, flowPath)) {
- datagridService.notificationSendFlowAdded(flowPath);
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendFlowIdAdded(flowPath.flowId(),
+ flowPath.dataPath().srcPort().dpid());
+ } else {
+ datagridService.notificationSendFlowAdded(flowPath);
+ }
+
return flowPath.flowId();
}
return null;
@@ -283,7 +293,11 @@
@Override
public boolean deleteAllFlows() {
if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
- datagridService.notificationSendAllFlowsRemoved();
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendAllFlowIdsRemoved();
+ } else {
+ datagridService.notificationSendAllFlowsRemoved();
+ }
return true;
}
return false;
@@ -298,7 +312,11 @@
@Override
public boolean deleteFlow(FlowId flowId) {
if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
- datagridService.notificationSendFlowRemoved(flowId);
+ if (enableOnrc2014MeasurementsFlows) {
+ datagridService.notificationSendFlowIdRemoved(flowId);
+ } else {
+ datagridService.notificationSendFlowRemoved(flowId);
+ }
return true;
}
return false;
@@ -312,10 +330,40 @@
*/
@Override
public FlowPath getFlow(FlowId flowId) {
+ log.debug("FlowID: {}", flowId);
+ if(flowId.value() == -100) {
+ log.debug("Printing results...");
+ PerformanceMonitor.report();
+ PerformanceMonitor.clear();
+ }
+ else if(flowId.value() == -200) {
+ log.debug("Clearing results...");
+ PerformanceMonitor.clear();
+ }
return FlowDatabaseOperation.getFlow(dbHandlerApi, flowId);
}
/**
+ * Get a previously added flow entry.
+ *
+ * @param flowEntryId the Flow Entry ID of the flow entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
+ return FlowDatabaseOperation.getFlowEntry(dbHandlerApi, flowEntryId);
+ }
+
+ /**
+ * Get the source switch DPID of a previously added flow.
+ *
+ * @param flowId the Flow ID of the flow to get.
+ * @return the source switch DPID if found, otherwise null.
+ */
+ public Dpid getFlowSourceDpid(FlowId flowId) {
+ return FlowDatabaseOperation.getFlowSourceDpid(dbHandlerApi, flowId);
+ }
+
+ /**
* Get all installed flows by all installers.
*
* @return the Flow Paths if found, otherwise null.
@@ -326,6 +374,18 @@
}
/**
+ * Get all installed flows whose Source Switch is controlled by this
+ * instance.
+ *
+ * @param mySwitches the collection of the switches controlled by this
+ * instance.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ public ArrayList<FlowPath> getAllMyFlows(Map<Long, IOFSwitch> mySwitches) {
+ return FlowDatabaseOperation.getAllMyFlows(dbHandlerApi, mySwitches);
+ }
+
+ /**
* Get summary of all installed flows by all installers in a given range.
*
* @param flowId the Flow ID of the first flow in the flow range to get.
@@ -337,7 +397,17 @@
int maxFlows) {
ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
SortedMap<Long, FlowPath> sortedFlowPaths =
- flowEventHandler.getAllFlowPathsCopy();
+ new TreeMap<Long, FlowPath>();
+
+ if (enableOnrc2014MeasurementsFlows) {
+ Collection<FlowPath> databaseFlowPaths =
+ ParallelFlowDatabaseOperation.getAllFlows(dbHandlerApi);
+ for (FlowPath flowPath : databaseFlowPaths) {
+ sortedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ } else {
+ sortedFlowPaths = flowEventHandler.getAllFlowPathsCopy();
+ }
//
// Truncate each Flow Path and Flow Entry
@@ -422,6 +492,9 @@
public void flowEntriesPushedToSwitch(
Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+ if (enableOnrc2014MeasurementsFlows)
+ return;
+
//
// Process all entries
//
@@ -505,8 +578,12 @@
// - Flow Paths to the database
//
pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
- pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
- cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+ if (enableOnrc2014MeasurementsFlows) {
+ writeModifiedFlowPathsToDatabase(modifiedFlowPaths);
+ } else {
+ pushModifiedFlowPathsToDatabase(modifiedFlowPaths);
+ cleanupDeletedFlowEntriesFromDatagrid(modifiedFlowEntries);
+ }
}
/**
@@ -517,7 +594,7 @@
*
* @param modifiedFlowEntries the collection of modified Flow Entries.
*/
- private void pushModifiedFlowEntriesToSwitches(
+ void pushModifiedFlowEntriesToSwitches(
Collection<FlowEntry> modifiedFlowEntries) {
if (modifiedFlowEntries.isEmpty())
return;
@@ -694,7 +771,7 @@
*
* @param modifiedFlowPaths the collection of Flow Paths to write.
*/
- private void writeModifiedFlowPathsToDatabase(
+ void writeModifiedFlowPathsToDatabase(
Collection<FlowPath> modifiedFlowPaths) {
if (modifiedFlowPaths.isEmpty())
return;
@@ -753,10 +830,12 @@
allValid = false;
break;
}
- if (flowEntry.flowEntrySwitchState() !=
- FlowEntrySwitchState.FE_SWITCH_UPDATED) {
- allValid = false;
- break;
+ if (! enableOnrc2014MeasurementsFlows) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_UPDATED) {
+ allValid = false;
+ break;
+ }
}
}
if (! allValid)
@@ -792,6 +871,36 @@
//log.error("Performance %% Flow path total time {} : {}", endTime - startTime, flowPath.toString());
}
} while (retry);
+
+ if (enableOnrc2014MeasurementsFlows) {
+ // Send the notifications
+
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ continue;
+ }
+ // datagridService.notificationSendFlowEntryIdAdded(flowEntry.flowEntryId(), flowEntry.dpid());
+
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ datagridService.notificationSendFlowEntryAdded(flowEntry);
+ break;
+ case FE_USER_MODIFY:
+ datagridService.notificationSendFlowEntryUpdated(flowEntry);
+ break;
+ case FE_USER_DELETE:
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ break;
+ case FE_USER_UNKNOWN:
+ assert(false);
+ break;
+ }
+ }
+ }
}
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
index 78562e1..a44a898 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -1,7 +1,10 @@
package net.onrc.onos.ofcontroller.flowmanager;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.Dpid;
import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
/**
@@ -51,6 +54,56 @@
void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
/**
+ * Receive a notification that a FlowId is added.
+ *
+ * @param flowId the FlowId that is added.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ void notificationRecvFlowIdAdded(FlowId flowId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowId is removed.
+ *
+ * @param flowId the FlowId that is removed.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ void notificationRecvFlowIdRemoved(FlowId flowId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowId is updated.
+ *
+ * @param flowId the FlowId that is updated.
+ * @param dpid the Source Switch Dpid for the corresponding Flow.
+ */
+ void notificationRecvFlowIdUpdated(FlowId flowId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is added.
+ *
+ * @param flowEntryId the FlowEntryId that is added.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdAdded(FlowEntryId flowEntryId, Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is removed.
+ *
+ * @param flowEntryId the FlowEntryId that is removed.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdRemoved(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
+ * Receive a notification that a FlowEntryId is updated.
+ *
+ * @param flowEntryId the FlowEntryId that is updated.
+ * @param dpid the Switch Dpid for the corresponding Flow Entry.
+ */
+ void notificationRecvFlowEntryIdUpdated(FlowEntryId flowEntryId,
+ Dpid dpid);
+
+ /**
* Receive a notification that a Topology Element is added.
*
* @param topologyElement the Topology Element that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
new file mode 100644
index 0000000..ec03d09
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/ParallelFlowDatabaseOperation.java
@@ -0,0 +1,394 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+import com.esotericsoftware.kryo2.Kryo;
+
+/**
+ * Class for performing parallel Flow-related operations on the Database.
+ *
+ * This class is mostly a wrapper of FlowDatabaseOperation with a thread pool
+ * for parallelization.
+ *
+ * @author Brian O'Connor <brian@onlab.us>
+ */
+public class ParallelFlowDatabaseOperation extends FlowDatabaseOperation {
+ private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
+
+ private final static int numThreads = Integer.valueOf(System.getProperty("parallelFlowDatabase.numThreads", "32"));
+ private final static ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+
+ private static KryoFactory kryoFactory = new KryoFactory();
+
+ /**
+ * Get all installed flows by first querying the database for all FlowPaths
+ * and then populating them from the database in parallel.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @return the Flow Paths if found, otherwise an empty list.
+ */
+ static ArrayList<FlowPath> getAllFlows(DBOperation dbHandler) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+ int numTasks = 0;
+ for(IFlowPath flowObj : flowPathsObj) {
+ tasks.submit(new ExtractFlowTask(flowObj));
+ numTasks++;
+ }
+ for(int i = 0; i < numTasks; i++) {
+ try {
+ FlowPath flowPath = tasks.take().get();
+ if(flowPath != null) {
+ flowPaths.add(flowPath);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error reading FlowPath from IFlowPath object");
+ }
+ }
+ dbHandler.commit();
+ return flowPaths;
+ }
+
+ /**
+ * Query the database for all flow paths that have their source switch
+ * in the provided collection
+ *
+ * Note: this function is implemented naively and inefficiently
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param switches a collection of switches whose flow paths you want
+ * @return the Flow Paths if found, otherwise an empty list.
+ */
+ static ArrayList<FlowPath> getFlowsForSwitches(DBOperation dbHandler, Collection<Dpid> switches) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ // convert the collection of switch dpids into a set of strings
+ Set<String> switchSet = new HashSet<>();
+ for(Dpid dpid : switches) {
+ switchSet.add(dpid.toString());
+ }
+
+ CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+ int numTasks = 0;
+ for(IFlowPath flowObj : flowPathsObj) {
+ if(switchSet.contains(flowObj.getSrcSwitch())) {
+ tasks.submit(new ExtractFlowTask(flowObj));
+ numTasks++;
+ }
+ }
+ for(int i = 0; i < numTasks; i++) {
+ try {
+ FlowPath flowPath = tasks.take().get();
+ if(flowPath != null) {
+ flowPaths.add(flowPath);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error reading FlowPath from IFlowPath object");
+ }
+ }
+ dbHandler.commit();
+ return flowPaths;
+ }
+
+ /**
+ * The basic parallelization unit for extracting FlowEntries from the database.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.extractFlowPath()
+ */
+ private final static class ExtractFlowTask implements Callable<FlowPath> {
+ private final IFlowPath flowObj;
+
+ ExtractFlowTask(IFlowPath flowObj){
+ this.flowObj = flowObj;
+ }
+ @Override
+ public FlowPath call() throws Exception {
+ return extractFlowPath(flowObj);
+ }
+ }
+
+ /**
+ * Get a subset of installed flows in parallel.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowIds the collection of Flow IDs to get.
+ * @return the Flow Paths if found, otherwise an empty list.
+ */
+ static ArrayList<FlowPath> getFlows(DBOperation dbHandler,
+ Collection<FlowId> flowIds) {
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ CompletionService<FlowPath> tasks = new ExecutorCompletionService<>(executor);
+ int numTasks = 0;
+ for (FlowId flowId : flowIds) {
+ tasks.submit(new GetFlowTask(dbHandler, flowId));
+ numTasks++;
+ }
+ for(int i = 0; i < numTasks; i++) {
+ try {
+ FlowPath flowPath = tasks.take().get();
+ if(flowPath != null) {
+ flowPaths.add(flowPath);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Error reading FlowPath from database");
+ }
+ }
+ // TODO: should we commit?
+ //dbHandler.commit();
+ return flowPaths;
+ }
+
+ /**
+ * The basic parallelization unit for getting FlowEntries.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.getFlow()
+ */
+ private final static class GetFlowTask implements Callable<FlowPath> {
+ private final DBOperation dbHandler;
+ private final FlowId flowId;
+
+ GetFlowTask(DBOperation dbHandler, FlowId flowId) {
+ this.dbHandler = dbHandler;
+ this.flowId = flowId;
+ }
+ @Override
+ public FlowPath call() throws Exception{
+ return getFlow(dbHandler, flowId);
+ }
+ }
+
+ /**
+ * Add a flow by creating a database task, then waiting for the result.
+ * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
+ * performance benefit.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowPath the Flow Path to install.
+ * @return true on success, otherwise false.
+ */
+ static boolean addFlow(DBOperation dbHandler, FlowPath flowPath) {
+ Future<Boolean> result = executor.submit(new AddFlowTask(dbHandler, flowPath, null));
+ // NOTE: This function is blocking
+ try {
+ return result.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Add a flow asynchronously by creating a database task.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowPath the Flow Path to install.
+ * @param datagridService the notification service for when the task is completed
+ * @return true always
+ */
+ static boolean addFlow(DBOperation dbHandler, FlowPath flowPath, IDatagridService datagridService) {
+ executor.submit(new AddFlowTask(dbHandler, flowPath, datagridService));
+ // TODO: If we need the results, submit returns a Future that contains
+ // the result.
+ return true;
+
+ }
+
+ /**
+ * The basic parallelization unit for adding FlowPaths.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.addFlow(),
+ * which also sends a notification if a datagrid services is provided
+ */
+ private final static class AddFlowTask implements Callable<Boolean> {
+ private final DBOperation dbHandler;
+ private final FlowPath flowPath;
+ private final IDatagridService datagridService;
+
+ AddFlowTask(DBOperation dbHandler,
+ FlowPath flowPath,
+ IDatagridService datagridService) {
+ this.dbHandler = dbHandler;
+
+ // Create a copy of the FlowPath object
+ Kryo kryo = kryoFactory.newKryo();
+ this.flowPath = kryo.copy(flowPath);
+ kryoFactory.deleteKryo(kryo);
+
+ this.datagridService = datagridService;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+// String tag1 = "FlowDatabaseOperation.AddFlow." + flowPath.flowId();
+ String tag1 = "FlowDatabaseOperation.AddFlow";
+// String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry." + flowPath.flowId();
+ String tag2 = "FlowDatabaseOperation.NotificationSend.FlowEntry";
+ PerformanceMonitor.Measurement m;
+ m = PerformanceMonitor.start(tag1);
+ boolean success = FlowDatabaseOperation.addFlow(dbHandler, flowPath);
+// PerformanceMonitor.stop(tag1);
+ m.stop();
+ m = PerformanceMonitor.start(tag2);
+ if(success) {
+ if(datagridService != null) {
+ // Send notifications for each Flow Entry
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ continue;
+ }
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ datagridService.notificationSendFlowEntryAdded(flowEntry);
+ break;
+ case FE_USER_MODIFY:
+ datagridService.notificationSendFlowEntryUpdated(flowEntry);
+ break;
+ case FE_USER_DELETE:
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ break;
+ case FE_USER_UNKNOWN:
+ assert(false);
+ break;
+ }
+ }
+ }
+ }
+ else {
+ log.error("Error adding flow path {} to database", flowPath);
+ }
+ m.stop();
+// PerformanceMonitor.report(tag1);
+// PerformanceMonitor.report(tag2);
+ return success;
+
+ }
+ }
+
+ /**
+ * Delete a previously added flow by creating a database task, then waiting
+ * for the result.
+ *
+ * Mostly, a wrapper for FlowDatabaseOperation.addFlow() which overs little
+ * performance benefit.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to delete.
+ * @return true on success, otherwise false.
+ */
+ static boolean deleteFlow(DBOperation dbHandler, FlowId flowId) {
+ Future<Boolean> result = executor.submit(new DeleteFlowTask(dbHandler, flowId, null));
+ // NOTE: This function is blocking
+ try {
+ return result.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Delete a previously added flow asynchronously by creating a database task.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to delete.
+ * @param datagridService the notification service for when the task is completed
+ * @return true always
+ */
+ static boolean deleteFlow(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+ executor.submit(new DeleteFlowTask(dbHandler, flowId, datagridService));
+ // TODO: If we need the results, submit returns a Future that contains
+ // the result.
+ return true;
+ }
+
+ /**
+ * The basic parallelization unit for deleting FlowPaths.
+ *
+ * This is simply a wrapper for FlowDatabaseOperation.deleteFlow(),
+ * which also sends a notification if a datagrid services is provided
+ */
+ private final static class DeleteFlowTask implements Callable<Boolean> {
+ private final DBOperation dbHandler;
+ private final FlowId flowId;
+ private final IDatagridService datagridService;
+
+ DeleteFlowTask(DBOperation dbHandler, FlowId flowId, IDatagridService datagridService) {
+ this.dbHandler = dbHandler;
+
+ // Create a copy of the FlowId object
+ Kryo kryo = kryoFactory.newKryo();
+ this.flowId = kryo.copy(flowId);
+ kryoFactory.deleteKryo(kryo);
+
+ this.datagridService = datagridService;
+ }
+ @Override
+ public Boolean call() throws Exception{
+ boolean success = FlowDatabaseOperation.deleteFlow(dbHandler, flowId);
+ if(success) {
+ if(datagridService != null) {
+ datagridService.notificationSendFlowIdRemoved(flowId);
+ }
+ }
+ else {
+ log.error("Error removing flow path {} from database", flowId);
+ }
+ return success;
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
new file mode 100644
index 0000000..fa75460
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
@@ -0,0 +1,244 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for collecting performance measurements
+ */
+public class PerformanceMonitor {
+ private final static Map<String, List<Measurement>> map = new ConcurrentHashMap<>();;
+ private final static Logger log = LoggerFactory.getLogger(PerformanceMonitor.class);
+ private static long overhead;
+ private static long experimentStart = Long.MAX_VALUE;
+ private final static double normalization = Math.pow(10, 6);
+
+ /**
+ * Start a performance measurement, identified by a tag
+ *
+ * Note: Only a single measurement can use the same tag at a time.
+ * ..... not true anymore.
+ *
+ * @param tag for performance measurement
+ */
+ public static Measurement start(String tag) {
+ long start = System.nanoTime();
+ if(start < experimentStart) {
+ experimentStart = start;
+ }
+ List<Measurement> list = map.get(tag);
+ if(list == null) {
+ list = new ArrayList<Measurement>();
+ map.put(tag, list);
+ }
+ Measurement m = new Measurement();
+ list.add(m);
+ m.start();
+ overhead += System.nanoTime() - start;
+ return m;
+ }
+
+ /**
+ * Stop a performance measurement.
+ *
+ * You must have already started a measurement with tag.
+ *
+ * @param tag for performance measurement
+ */
+ public static void stop(String tag) {
+ long time = System.nanoTime();
+ List<Measurement> list = map.get(tag);
+ if(list == null || list.size() == 0) {
+ log.error("Tag {} does not exist", tag);
+ }
+ list.get(0).stop(time);
+ if(list.size() > 1) {
+ log.error("Tag {} has multiple measurements", tag);
+ }
+ overhead += System.nanoTime() - time;
+ }
+
+ /**
+ * Clear all performance measurements.
+ */
+ public static void clear() {
+ map.clear();
+ overhead = 0;
+ experimentStart = Long.MAX_VALUE;
+ }
+
+ /**
+ * Write all performance measurements to the log
+ */
+ public static void report() {
+ String result = "Performance Results: (avg/start/stop/count)\n";
+ if(map.size() == 0) {
+ result += "No Measurements";
+ log.error(result);
+ return;
+ }
+ long experimentEnd = -1;
+ for(Entry<String, List<Measurement>> e : map.entrySet()) {
+ String key = e.getKey();
+ List<Measurement> list = e.getValue();
+ int total = 0, count = 0;
+ long start = Long.MAX_VALUE, stop = -1;
+ for(Measurement m : list) {
+ if(m.stop < 0) {
+ continue; // measurement has not been stopped
+ }
+ // Collect overall start and end times
+ if(m.start < start) {
+ start = m.start;
+ }
+ if(m.stop > stop) {
+ stop = m.stop;
+ if(stop > experimentEnd) {
+ experimentEnd = stop;
+ }
+ }
+ // Collect statistics for average
+ total += m.elapsed();
+ count++;
+ }
+ double avg = (double) total / count;
+ // Normalize start/stop
+ start -= experimentStart;
+ stop -= experimentStart;
+ result += key + '=' +
+ (avg / normalization) + '/' +
+ (start / normalization) + '/' +
+ (stop / normalization) + '/' +
+ count + '\n';
+ }
+ double overheadMs = overhead / normalization;
+ double experimentElapsed = (experimentEnd - experimentStart) / normalization;
+ result += "TotalTime:" + experimentElapsed + "/Overhead:" + overheadMs;
+ log.error(result);
+// log.error("Performance Results: {} with measurement overhead: {} ms", map, overheadMilli);
+ }
+
+ /**
+ * Write the performance measurement for a tag to the log
+ *
+ * @param tag the tag name.
+ */
+ public static void report(String tag) {
+ List<Measurement> list = map.get(tag);
+ if(list == null) {
+ return; //TODO
+ }
+ //TODO: fix this;
+ Measurement m = list.get(0);
+ if (m != null) {
+ log.error("Performance Result: tag = {} start = {} stop = {} elapsed = {}",
+ tag, m.start, m.stop, m.toString());
+ } else {
+ log.error("Performance Result: unknown tag {}", tag);
+ }
+ }
+
+ /**
+ * A single performance measurement
+ */
+ public static class Measurement {
+ long start;
+ long stop = -1;
+
+ /**
+ * Start the measurement
+ */
+ public void start() {
+ if(start <= 0) {
+ start = System.nanoTime();
+ }
+ }
+
+ /**
+ * Stop the measurement
+ */
+ public void stop() {
+ long now = System.nanoTime();
+ stop(now);
+ }
+
+ /**
+ * Stop the measurement at a specific time
+ * @param time to stop
+ */
+ public void stop(long time){
+ if(stop <= 0) {
+ stop = time;
+ }
+ }
+
+ /**
+ * Compute the elapsed time of the measurement in nanoseconds
+ *
+ * @return the measurement time in nanoseconds, or -1 if the measurement is stil running.
+ */
+ public long elapsed() {
+ if(stop <= 0) {
+ return -1;
+ }
+ else {
+ return stop - start;
+ }
+ }
+
+ /**
+ * Returns the number of milliseconds for the measurement as a String.
+ */
+ public String toString() {
+ double milli = elapsed() / normalization;
+ double startMs = start / normalization;
+ double stopMs = stop / normalization;
+
+ return milli + "ms/" + startMs + '/' + stopMs;
+ }
+ }
+
+ public static void main(String args[]){
+ // test the measurement overhead
+ String tag;
+ for(int i = 0; i < 2; i++){
+ tag = "foo foo foo";
+ Measurement m;
+ m = start(tag); System.out.println(tag); m.stop();
+ m = start(tag); System.out.println(tag); m.stop();
+ m = start(tag); System.out.println(tag); m.stop();
+ m = start(tag); System.out.println(tag); m.stop();
+ tag = "bar";
+ start(tag); stop(tag);
+ tag = "baz";
+ start(tag); stop(tag);
+ report();
+ clear();
+ }
+ for(int i = 0; i < 100; i++){
+ tag = "a";
+ start(tag); stop(tag);
+ start(tag); stop(tag);
+
+ start(tag); stop(tag);
+ start(tag); stop(tag);
+ start(tag); stop(tag);
+ start(tag); stop(tag);
+ start(tag); stop(tag);
+ start(tag); stop(tag);
+
+ tag = "b";
+ start(tag); stop(tag);
+ tag = "c";
+ start(tag); stop(tag);
+ report();
+ clear();
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
index 1cbeece..de625d7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
@@ -1,6 +1,7 @@
package net.onrc.onos.ofcontroller.flowmanager.web;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.flowmanager.PerformanceMonitor;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index a718728..bd98527 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -250,12 +250,15 @@
*
*/
class FlowEntryWrapper {
- FlowEntryId flowEntryId;
- OFFlowStatisticsReply statisticsReply;
+ FlowEntryId flowEntryId;
+ IFlowEntry iFlowEntry;
+ OFFlowStatisticsReply statisticsReply;
+
public FlowEntryWrapper(IFlowEntry entry) {
flowEntryId = new FlowEntryId(entry.getFlowEntryId());
- }
+ iFlowEntry = entry;
+ }
public FlowEntryWrapper(OFFlowStatisticsReply entry) {
flowEntryId = new FlowEntryId(entry.getCookie());
@@ -276,18 +279,14 @@
double startDB = System.nanoTime();
// Get the Flow Entry state from the Network Graph
- IFlowEntry iFlowEntry = null;
- try {
- iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
- } catch (Exception e) {
- log.error("Error finding flow entry {} in Network Graph",
- flowEntryId);
- return;
- }
if (iFlowEntry == null) {
- log.error("Cannot add flow entry {} to sw {} : flow entry not found",
- flowEntryId, sw.getId());
- return;
+ try {
+ iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ log.error("Error finding flow entry {} in Network Graph",
+ flowEntryId);
+ return;
+ }
}
dbTime = System.nanoTime() - startDB;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
index 1674cf0..2d4f590 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/topology/Topology.java
@@ -1,6 +1,5 @@
package net.onrc.onos.ofcontroller.topology;
-import java.util.ArrayList;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
@@ -12,7 +11,6 @@
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
import net.onrc.onos.ofcontroller.core.ISwitchStorage.SwitchState;
-import org.apache.commons.lang.StringUtils;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,7 +193,10 @@
*/
public class Topology {
private final static Logger log = LoggerFactory.getLogger(Topology.class);
-
+
+ // flag to use optimized readFromDatabase() method.
+ private static final boolean enableOptimizedRead = true;
+
private Map<Long, Node> nodesMap; // The dpid->Node mapping
/**
@@ -392,289 +393,173 @@
* @param dbHandler the Graph Database handler to use.
*/
public void readFromDatabase(DBOperation dbHandler) {
- //
- // Fetch the relevant info from the Switch and Port vertices
- // from the Titan Graph.
- //
- nodesMap.clear();
+ if (enableOptimizedRead) {
+ readFromDatabaseBodyOptimized(dbHandler);
+ } else {
+ readFromDatabaseBody(dbHandler);
+ }
- // Load all switches into Map
- Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
- for (ISwitchObject switchObj : switches) {
- // Ignore inactive ports
- if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
- continue;
- }
- Vertex nodeVertex = switchObj.asVertex();
- //
- // The Switch info
- //
- String nodeDpid = nodeVertex.getProperty("dpid").toString();
- long nodeId = HexString.toLong(nodeDpid);
- addNode(nodeId);
- }
-
- //
- // Get All Ports
- //
- Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
- for (IPortObject myPortObj : ports) {
- Vertex myPortVertex = myPortObj.asVertex();
-
- // Ignore inactive ports
- if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- continue;
- }
-
- short myPort = 0;
- String idStr = myPortObj.getPortId();
- String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- continue;
- }
- String myDpid = splitter[0];
- myPort = Short.parseShort(splitter[1]);
- long myId = HexString.toLong(myDpid);
- Node me = nodesMap.get(myId);
-
- if (me == null) {
- // cannot proceed ports and switches are out of sync
- //TODO: Restart the whole read
- continue;
- }
-
- if (me.getPort(myPort) == null) {
- me.addPort(myPort);
- } else if (me.getLink(myPort) != null) {
- // Link already added..probably by neighbor
- continue;
- }
-
- //
- // The neighbor Port info
- //
- for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
-// log.debug("state : {}", neighborPortVertex.getProperty("state"));
-// log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
- // Ignore inactive ports
- if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- continue;
- }
- int neighborPort = 0;
- idStr = neighborPortVertex.getProperty("port_id").toString();
- splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- continue;
- }
- String neighborDpid = splitter[0];
- neighborPort = Short.parseShort(splitter[1]);
- long neighborId = HexString.toLong(neighborDpid);
- Node neighbor = nodesMap.get(neighborId);
-// log.debug("dpid {},{} port {}", neighborDpid, neighborId, neighborPort);
- if (neighbor == null) {
- continue;
- }
- me.addLink(myPort, neighbor, neighborPort);
- }
- }
- dbHandler.commit();
}
-
- // Only for debug use
- List<Long> logGetSw = new ArrayList<Long>(100);
- List<Long> logGetPt = new ArrayList<Long>(100);
- List<Long> logAddSw = new ArrayList<Long>(100);
- List<Long> logAddPt = new ArrayList<Long>(100);
- List<Long> logAddLk = new ArrayList<Long>(100);
- List<Long> logCommit = new ArrayList<Long>(100);
- List<Integer> logGetVertices = new ArrayList<Integer>(100);
- List<Integer> logGetProperty = new ArrayList<Integer>(100);
- public void readFromDatabaseBreakdown(DBOperation dbHandler) {
- int getVerticesCount = 0;
- int getPropertyCount = 0;
- int getVCount_sw = 0;
- int getVCount_pt = 0;
- int getVCount_lk = 0;
- int getPCount_sw = 0;
- int getPCount_pt = 0;
- int getPCount_lk = 0;
-
- //
- // Fetch the relevant info from the Switch and Port vertices
- // from the Titan Graph.
- //
+ private void readFromDatabaseBody(DBOperation dbHandler) {
+ //
+ // Fetch the relevant info from the Switch and Port vertices
+ // from the Titan Graph.
+ //
nodesMap.clear();
- long t1 = System.nanoTime();
+ Iterable<ISwitchObject> activeSwitches = dbHandler.getActiveSwitches();
+ for (ISwitchObject switchObj : activeSwitches) {
+ Vertex nodeVertex = switchObj.asVertex();
+ //
+ // The Switch info
+ //
+ String nodeDpid = nodeVertex.getProperty("dpid").toString();
+ long nodeId = HexString.toLong(nodeDpid);
+ Node me = nodesMap.get(nodeId);
+ if (me == null)
+ me = addNode(nodeId);
- // Load all switches into Map
- Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+ //
+ // The local Port info
+ //
+ for (Vertex myPortVertex : nodeVertex.getVertices(Direction.OUT, "on")) {
+ // Ignore inactive ports
+ if (! myPortVertex.getProperty("state").toString().equals("ACTIVE"))
+ continue;
- long t2 = System.nanoTime();
+ int myPort = 0;
+ Object obj = myPortVertex.getProperty("number");
+ if (obj instanceof Short) {
+ myPort = (Short)obj;
+ } else if (obj instanceof Integer) {
+ myPort = (Integer)obj;
+ }
+ me.addPort(myPort);
- long t_addSw = 0;
- for (ISwitchObject switchObj : switches) {
- long t3 = System.nanoTime();
- long t4;
+ for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+ // Ignore inactive ports
+ if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
- // Ignore inactive ports
- ++getPropertyCount;
- ++getPCount_sw;
- if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
- t4 = System.nanoTime();
- t_addSw += t4 - t3;
- continue;
- }
- Vertex nodeVertex = switchObj.asVertex();
- //
- // The Switch info
- //
- ++getPropertyCount;
- ++getPCount_sw;
- String nodeDpid = nodeVertex.getProperty("dpid").toString();
- long nodeId = HexString.toLong(nodeDpid);
- addNode(nodeId);
- t4 = System.nanoTime();
- t_addSw += t4 - t3;
- }
+ int neighborPort = 0;
+ obj = neighborPortVertex.getProperty("number");
+ if (obj instanceof Short) {
+ neighborPort = (Short)obj;
+ } else if (obj instanceof Integer) {
+ neighborPort = (Integer)obj;
+ }
+ //
+ // The neighbor Switch info
+ //
+ for (Vertex neighborVertex : neighborPortVertex.getVertices(Direction.IN, "on")) {
+ // Ignore inactive switches
+ String state = neighborVertex.getProperty("state").toString();
+ if (! state.equals(SwitchState.ACTIVE.toString()))
+ continue;
- long t5 = System.nanoTime();
- //
- // Get All Ports
- //
- Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
-
- long t6 = System.nanoTime();
- long t_addPort = 0;
- long t_addLink = 0;
-
- for (IPortObject myPortObj : ports) {
- long t7 = System.nanoTime();
- long t8;
- Vertex myPortVertex = myPortObj.asVertex();
-
- // Ignore inactive ports
- ++getPropertyCount;
- ++getPCount_pt;
- if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
-
- short myPort = 0;
- ++getPropertyCount;
- ++getPCount_pt;
- String idStr = myPortObj.getPortId();
- String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
- String myDpid = splitter[0];
- myPort = Short.parseShort(splitter[1]);
- long myId = HexString.toLong(myDpid);
- Node me = nodesMap.get(myId);
-
- if (me == null) {
- // cannot proceed ports and switches are out of sync
- //TODO: Restart the whole read
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
-
- if (me.getPort(myPort) == null) {
- me.addPort(myPort);
- } else if (me.getLink(myPort) != null) {
- // Link already added..probably by neighbor
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
- continue;
- }
- t8 = System.nanoTime();
- t_addPort += t8 - t7;
-
- //
- // The neighbor Port info
- //
- ++getVerticesCount;
- ++getVCount_pt;
- for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
-// log.debug("state : {}", neighborPortVertex.getProperty("state"));
-// log.debug("port id : {}", neighborPortVertex.getProperty("port_id"));
-
- long t9 = System.nanoTime();
- long t10;
-
- // Ignore inactive ports
- ++getPropertyCount;
- ++getPCount_lk;
- if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- continue;
- }
- int neighborPort = 0;
- ++getPropertyCount;
- ++getPCount_lk;
- idStr = neighborPortVertex.getProperty("port_id").toString();
- splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
- if (splitter.length != 2) {
- log.error("Invalid port_id : {}", idStr);
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- continue;
- }
- String neighborDpid = splitter[0];
- neighborPort = Short.parseShort(splitter[1]);
- long neighborId = HexString.toLong(neighborDpid);
- Node neighbor = nodesMap.get(neighborId);
-// log.debug("dpid {},{} port {}", neighborDpid, neighborId, neighborPort);
- if (neighbor == null) {
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- continue;
- }
- me.addLink(myPort, neighbor, neighborPort);
-
- t10 = System.nanoTime();
- t_addLink += t10 - t9;
- }
- }
- long t11 = System.nanoTime();
- dbHandler.commit();
- long t12 = System.nanoTime();
-
- logGetSw.add((t2-t1)/1000);
- logGetPt.add((t6-t5)/1000);
- logAddSw.add(t_addSw/1000);
- logAddPt.add(t_addPort/1000);
- logAddLk.add(t_addLink/1000);
- logCommit.add((t12-t11)/1000);
- logGetVertices.add(getVerticesCount);
- logGetProperty.add(getPropertyCount);
- log.debug("getVertices[N({}),P({}),L({})] getProperty[N({}),P({}),L({})]",
- new Object[]{getVCount_sw,getVCount_pt,getVCount_lk,
- getPCount_sw,getPCount_pt,getPCount_lk});
+ String neighborDpid = neighborVertex.getProperty("dpid").toString();
+ long neighborId = HexString.toLong(neighborDpid);
+ Node neighbor = nodesMap.get(neighborId);
+ if (neighbor == null)
+ neighbor = addNode(neighborId);
+ neighbor.addPort(neighborPort);
+ me.addLink(myPort, neighbor, neighborPort);
+ }
+ }
+ }
+ }
+ dbHandler.commit();
}
- public void printMeasuredLog() {
- log.debug("getsw: {}", StringUtils.join(logGetSw, ","));
- log.debug("getpt: {}", StringUtils.join(logGetPt, ","));
- log.debug("addsw: {}", StringUtils.join(logAddSw, ","));
- log.debug("addpt: {}", StringUtils.join(logAddPt, ","));
- log.debug("addlk: {}", StringUtils.join(logAddLk, ","));
- log.debug("commit: {}", StringUtils.join(logCommit, ","));
- log.debug("getvertices: {}", StringUtils.join(logGetVertices, ","));
- log.debug("getproperty: {}", StringUtils.join(logGetProperty, ","));
+ private void readFromDatabaseBodyOptimized(DBOperation dbHandler) {
+ nodesMap.clear();
+
+ // Load all switches into Map
+ Iterable<ISwitchObject> switches = dbHandler.getAllSwitches();
+ for (ISwitchObject switchObj : switches) {
+ // Ignore inactive ports
+ if (!switchObj.getState().equals(SwitchState.ACTIVE.toString())) {
+ continue;
+ }
+ Vertex nodeVertex = switchObj.asVertex();
+ //
+ // The Switch info
+ //
+ String nodeDpid = nodeVertex.getProperty("dpid").toString();
+ long nodeId = HexString.toLong(nodeDpid);
+ addNode(nodeId);
+ }
+
+ //
+ // Get All Ports
+ //
+ Iterable<IPortObject> ports = dbHandler.getAllPorts(); //TODO: Add to DB operations
+ for (IPortObject myPortObj : ports) {
+ Vertex myPortVertex = myPortObj.asVertex();
+
+ // Ignore inactive ports
+ if (! myPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
+
+ short myPort = 0;
+ String idStr = myPortObj.getPortId();
+ String[] splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+ if (splitter.length != 2) {
+ log.error("Invalid port_id : {}", idStr);
+ continue;
+ }
+ String myDpid = splitter[0];
+ myPort = Short.parseShort(splitter[1]);
+ long myId = HexString.toLong(myDpid);
+ Node me = nodesMap.get(myId);
+
+ if (me == null) {
+ // cannot proceed ports and switches are out of sync
+ //TODO: Restart the whole read
+ continue;
+ }
+
+ if (me.getPort((int)myPort) == null) {
+ me.addPort((int)myPort);
+ } else if (me.getLink((int)myPort) != null) {
+ // Link already added..probably by neighbor
+ continue;
+ }
+
+ //
+ // The neighbor Port info
+ //
+ for (Vertex neighborPortVertex : myPortVertex.getVertices(Direction.OUT, "link")) {
+ // Ignore inactive ports
+ if (! neighborPortVertex.getProperty("state").toString().equals("ACTIVE")) {
+ continue;
+ }
+ int neighborPort = 0;
+ idStr = neighborPortVertex.getProperty("port_id").toString();
+ splitter = idStr.split(IDBOperation.PORT_ID_DELIM);
+ if (splitter.length != 2) {
+ log.error("Invalid port_id : {}", idStr);
+ continue;
+ }
+ String neighborDpid = splitter[0];
+ neighborPort = Short.parseShort(splitter[1]);
+ long neighborId = HexString.toLong(neighborDpid);
+ Node neighbor = nodesMap.get(neighborId);
+ if (neighbor == null) {
+ continue;
+ }
+ if (neighbor.getPort(neighborPort) == null) {
+ neighbor.addPort(neighborPort);
+ }
+ me.addLink(myPort, neighbor, neighborPort);
+ }
+ }
+ dbHandler.commit();
}
-
+
// Only for debug use
@Override
public String toString() {