Merge pull request #469 from jonohart/master
A collection of bug fixes and improvements
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index a96f5dc..73db675 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -3,7 +3,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -496,52 +495,6 @@
}
/**
- * Get summary of all installed flows by all installers in a given range.
- *
- * @param dbHandler the Graph Database handler to use.
- * @param flowId the Flow ID of the first flow in the flow range to get.
- * @param maxFlows the maximum number of flows to be returned.
- * @return the Flow Paths if found, otherwise null.
- */
- static ArrayList<FlowPath> getAllFlowsSummary(GraphDBOperation dbHandler,
- FlowId flowId,
- int maxFlows) {
- //
- // TODO: The implementation below is not optimal:
- // We fetch all flows, and then return only the subset that match
- // the query conditions.
- // We should use the appropriate Titan/Gremlin query to filter-out
- // the flows as appropriate.
- //
- ArrayList<FlowPath> flowPaths = getAllFlowsWithDataPathSummary(dbHandler);
- Collections.sort(flowPaths);
- return flowPaths;
- }
-
- /**
- * Get all Flows information, with Data Path summary for the Flow Entries.
- *
- * @param dbHandler the Graph Database handler to use.
- * @return all Flows information, with Data Path summary for the Flow
- * Entries.
- */
- static ArrayList<FlowPath> getAllFlowsWithDataPathSummary(GraphDBOperation dbHandler) {
- ArrayList<FlowPath> flowPaths = getAllFlows(dbHandler);
-
- // Truncate each Flow Path and Flow Entry
- for (FlowPath flowPath : flowPaths) {
- flowPath.setFlowEntryMatch(null);
- flowPath.setFlowEntryActions(null);
- for (FlowEntry flowEntry : flowPath.flowEntries()) {
- flowEntry.setFlowEntryMatch(null);
- flowEntry.setFlowEntryActions(null);
- }
- }
-
- return flowPaths;
- }
-
- /**
* Extract Flow Path State from a Titan Database Object @ref IFlowPath.
*
* @param flowObj the object to extract the Flow Path State from.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 47ef3b7..6c200fa 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -7,6 +7,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -27,6 +29,9 @@
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+
+import com.esotericsoftware.kryo2.Kryo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +50,7 @@
private FlowManager flowManager; // The Flow Manager to use
private IDatagridService datagridService; // The Datagrid Service to use
private Topology topology; // The network topology
+ private KryoFactory kryoFactory = new KryoFactory();
// The queue with Flow Path and Topology Element updates
private BlockingQueue<EventEntry<?>> networkEvents =
@@ -129,7 +135,9 @@
}
// Process the initial events (if any)
- processEvents();
+ synchronized (allFlowPaths) {
+ processEvents();
+ }
}
/**
@@ -183,7 +191,9 @@
collection.clear();
// Process the events (if any)
- processEvents();
+ synchronized (allFlowPaths) {
+ processEvents();
+ }
}
} catch (Exception exception) {
log.debug("Exception processing Network Events: ", exception);
@@ -852,4 +862,31 @@
new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
networkEvents.add(eventEntry);
}
+
+ /**
+ * Get a sorted copy of all Flow Paths.
+ *
+ * @return a sorted copy of all Flow Paths.
+ */
+ synchronized SortedMap<Long, FlowPath> getAllFlowPathsCopy() {
+ SortedMap<Long, FlowPath> sortedFlowPaths =
+ new TreeMap<Long, FlowPath>();
+
+ //
+ // TODO: For now we use serialization/deserialization to create
+ // a copy of each Flow Path. In the future, we should use proper
+ // copy constructors.
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ synchronized (allFlowPaths) {
+ for (Map.Entry<Long, FlowPath> entry : allFlowPaths.entrySet()) {
+ FlowPath origFlowPath = entry.getValue();
+ FlowPath copyFlowPath = kryo.copy(origFlowPath);
+ sortedFlowPaths.put(entry.getKey(), copyFlowPath);
+ }
+ }
+ kryoFactory.deleteKryo(kryo);
+
+ return sortedFlowPaths;
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index f4b1f8f..3fe47a0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -7,6 +7,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -217,7 +218,7 @@
public FlowId addFlow(FlowPath flowPath) {
// Allocate the Flow ID if necessary
- if (! flowPath.flowId().isValid()) {
+ if (! flowPath.isValidFlowId()) {
long id = getNextFlowEntryId();
flowPath.setFlowId(new FlowId(id));
}
@@ -302,8 +303,28 @@
@Override
public ArrayList<FlowPath> getAllFlowsSummary(FlowId flowId,
int maxFlows) {
- return FlowDatabaseOperation.getAllFlowsSummary(dbHandlerApi, flowId,
- maxFlows);
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+ SortedMap<Long, FlowPath> sortedFlowPaths =
+ flowEventHandler.getAllFlowPathsCopy();
+
+ //
+ // Truncate each Flow Path and Flow Entry
+ //
+ for (FlowPath flowPath : sortedFlowPaths.values()) {
+ //
+ // TODO: Add only the Flow Paths that have been successfully
+ // installed.
+ //
+ flowPath.setFlowEntryMatch(null);
+ flowPath.setFlowEntryActions(null);
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntryMatch(null);
+ flowEntry.setFlowEntryActions(null);
+ }
+ flowPaths.add(flowPath);
+ }
+
+ return flowPaths;
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java
index a720fc6..ab3edb1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowPath.java
@@ -6,6 +6,7 @@
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
/**
@@ -210,6 +211,18 @@
}
/**
+ * Test whether the Flow ID is valid.
+ *
+ * @return true if the Flow ID is valid, otherwise false.
+ */
+ @JsonIgnore
+ public boolean isValidFlowId() {
+ if (this.flowId == null)
+ return false;
+ return (this.flowId.isValid());
+ }
+
+ /**
* Get the Caller ID of the flow path installer.
*
* @return the Caller ID of the flow path installer.