Use a separate thread to write the Flow Entries to the Titan Graph DB.
This fixes issue ONOS-737.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 174d5d7..7e46b54 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -7,7 +7,9 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -79,6 +81,11 @@
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowManager.class);
+ // The queue to write Flow Entries to the database
+ private BlockingQueue<FlowPathEntryPair> flowEntriesToDatabaseQueue =
+ new LinkedBlockingQueue<FlowPathEntryPair>();
+ FlowDatabaseWriter flowDatabaseWriter;
+
// The periodic task(s)
private ScheduledExecutorService mapReaderScheduler;
private ScheduledExecutorService shortestPathReconcileScheduler;
@@ -515,7 +522,14 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
-
+
+ //
+ // The thread to write to the database
+ //
+ flowDatabaseWriter = new FlowDatabaseWriter(this,
+ flowEntriesToDatabaseQueue);
+ flowDatabaseWriter.start();
+
//
// Create the Flow Event Handler thread and register it with the
// Datagrid Service
@@ -1020,6 +1034,49 @@
}
/**
+ * Class to implement writing to the database in a separate thread.
+ */
+ class FlowDatabaseWriter extends Thread {
+ private FlowManager flowManager;
+ private BlockingQueue<FlowPathEntryPair> blockingQueue;
+
+ /**
+ * Constructor.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param blockingQueue the blocking queue to use.
+ */
+ FlowDatabaseWriter(FlowManager flowManager,
+ BlockingQueue<FlowPathEntryPair> blockingQueue) {
+ this.flowManager = flowManager;
+ this.blockingQueue = blockingQueue;
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ //
+ // The main loop
+ //
+ Collection<FlowPathEntryPair> collection =
+ new LinkedList<FlowPathEntryPair>();
+ try {
+ while (true) {
+ FlowPathEntryPair entryPair = blockingQueue.take();
+ collection.add(entryPair);
+ blockingQueue.drainTo(collection);
+ flowManager.writeModifiedFlowEntriesToDatabase(collection);
+ collection.clear();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception writing to the Database: ", exception);
+ }
+ }
+ }
+
+ /**
* Push Flow Entries to the Network MAP.
*
* NOTE: The Flow Entries are pushed only on the instance responsible
@@ -1028,7 +1085,29 @@
*
* @param modifiedFlowEntries the collection of Flow Entries to push.
*/
- public void pushModifiedFlowEntriesToDatabase(
+ void pushModifiedFlowEntriesToDatabase(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
+ // TODO: For now, the pushing of Flow Entries is disabled
+ if (! enableNotifications)
+ return;
+
+ //
+ // We only add the Flow Entries to the Database Queue.
+ // The FlowDatabaseWriter thread is responsible for the actual writing.
+ //
+ flowEntriesToDatabaseQueue.addAll(modifiedFlowEntries);
+ }
+
+ /**
+ * Write Flow Entries to the Network MAP.
+ *
+ * NOTE: The Flow Entries are written only on the instance responsible
+ * for the first switch. This is to avoid database errors when multiple
+ * instances are writing Flow Entries for the same Flow Path.
+ *
+ * @param modifiedFlowEntries the collection of Flow Entries to write.
+ */
+ private void writeModifiedFlowEntriesToDatabase(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
// TODO: For now, the pushing of Flow Entries is disabled
if (! enableNotifications)