Added an interface for the TopologyPublisher to publish operations to
the Topology Replication Writer
ONOS-1906
* Added new interface ITopologyReplicationWriter that should be implemented
by the (forthcoming) Topology Replication Writer. After this
writer is implemented, it should be created inside class
TopologyModule (similar to class TopologyManager), and
method TopologyModule.publish() should be modified to make the appropriate
call.
* Added new interface ITopologyPublisherService that exposes the
TopologyPublisher to other modules.
Currently, it includes only method publish() that can be used to
pre-populate the topology (e.g., by topology operations from a
configuration file).
* Updated class TopologyBatchOperation:
- The object type is TopologyEvent
- Renamed methods addAddTopologyOperation and addRemoveTopologyOperation
to appendAddOperation() and appendRemoveOperation() respectively
* Updated the TopologyPublisher internals to generalize the publishing
operation inside method TopologyPublisher.publishTopologyOperations()
NOTE: Inside that method that is a temporary flag "isGlobalLogWriter"
(set to false by default) that can be used to start experimenting with
the forthcoming Global Log-based mechanism (when the rest of the components
are ready).
NOTE: As a short-term solution, added class DelayedOperationsHandler
as a hack to deal with Link Events that need to be delayed
(in case of GlobalLog mechanism).
Change-Id: I72bd3c4bea46020be283e3bd518ba3bf900d6f0a
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyPublisherService.java b/src/main/java/net/onrc/onos/core/topology/ITopologyPublisherService.java
new file mode 100644
index 0000000..47f5a17
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyPublisherService.java
@@ -0,0 +1,16 @@
+package net.onrc.onos.core.topology;
+
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+/**
+ * Interface for providing the topology publisher service to other modules.
+ */
+public interface ITopologyPublisherService extends IFloodlightService {
+ /**
+ * Publishes Topology Batch Operations.
+ *
+ * @param tbo the Topology Batch Operations to publish
+ * @return true on success, otherwise false
+ */
+ public boolean publish(TopologyBatchOperation tbo);
+}
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyReplicationWriter.java b/src/main/java/net/onrc/onos/core/topology/ITopologyReplicationWriter.java
new file mode 100644
index 0000000..8b51e1b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyReplicationWriter.java
@@ -0,0 +1,15 @@
+package net.onrc.onos.core.topology;
+
+/**
+ * Interface for publishing Topology Operations to the Topology Replication
+ * Writer.
+ */
+public interface ITopologyReplicationWriter {
+ /**
+ * Publishes Topology Batch Operations.
+ *
+ * @param tbo the Topology Batch Operations to publish
+ * @return true on success, otherwise false
+ */
+ public boolean publish(TopologyBatchOperation tbo);
+}
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
index 26b6212..ab32130 100644
--- a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
@@ -5,7 +5,8 @@
/**
* Interface for providing the topology service to other modules.
*/
-public interface ITopologyService extends IFloodlightService {
+public interface ITopologyService extends IFloodlightService,
+ ITopologyReplicationWriter {
/**
* Allows a module to get a reference to the global topology object.
*
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java b/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java
index f05e9d0..f7d7049 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java
@@ -7,7 +7,7 @@
* A list of topology operations.
*/
public class TopologyBatchOperation extends
- BatchOperation<BatchOperationEntry<TopologyBatchOperation.Operator, TopologyBatchTarget>> {
+ BatchOperation<BatchOperationEntry<TopologyBatchOperation.Operator, TopologyEvent>> {
/**
* The topology operations' operators.
@@ -25,28 +25,28 @@
}
/**
- * Adds an add-TopologyEvent operation.
+ * Appends an ADD-TopologyEvent operation.
*
- * @param topologyEvent the Topology Event to be added
+ * @param topologyEvent the Topology Event to be appended
* @return the TopologyBatchOperation object
*/
- public TopologyBatchOperation addAddTopologyOperation(
+ public TopologyBatchOperation appendAddOperation(
TopologyEvent topologyEvent) {
return (TopologyBatchOperation) addOperation(
- new BatchOperationEntry<Operator, TopologyBatchTarget>(
+ new BatchOperationEntry<Operator, TopologyEvent>(
Operator.ADD, topologyEvent));
}
/**
- * Adds a remove-TopologyEvent operation.
+ * Appends a REMOVE-TopologyEvent operation.
*
- * @param topologyEvent the Topology Event to be removed
+ * @param topologyEvent the Topology Event to be appended
* @return the TopologyBatchOperation object
*/
- public TopologyBatchOperation addRemoveTopologyOperation(
+ public TopologyBatchOperation appendRemoveOperation(
TopologyEvent topologyEvent) {
return (TopologyBatchOperation) addOperation(
- new BatchOperationEntry<Operator, TopologyBatchTarget>(
+ new BatchOperationEntry<Operator, TopologyEvent>(
Operator.REMOVE, topologyEvent));
}
}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
index 7913fc5..5ce9b6b 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
@@ -81,4 +81,11 @@
public void removeListener(ITopologyListener listener) {
topologyManager.removeListener(listener);
}
+
+ @Override
+ public boolean publish(TopologyBatchOperation tbo) {
+ // TODO: Replace with a call to the new (log-based) TopologyReplicator
+ // return topologyReplicator.publish(tbo);
+ return false;
+ }
}
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
index d27d68e..ff05cc3 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
@@ -3,11 +3,14 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -21,6 +24,8 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.core.util.SingletonTask;
import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+import net.onrc.onos.api.batchoperation.BatchOperationEntry;
import net.onrc.onos.core.datagrid.IDatagridService;
import net.onrc.onos.core.datagrid.IEventChannel;
import net.onrc.onos.core.hostmanager.Host;
@@ -57,7 +62,8 @@
public class TopologyPublisher implements IOFSwitchListener,
ILinkDiscoveryListener,
IFloodlightModule,
- IHostListener {
+ IHostListener,
+ ITopologyPublisherService {
private static final Logger log =
LoggerFactory.getLogger(TopologyPublisher.class);
@@ -75,6 +81,8 @@
private boolean cleanupEnabled = true;
private static final int CLEANUP_TASK_INTERVAL = 60; // in seconds
private SingletonTask cleanupTask;
+ private DelayedOperationsHandler delayedOperationsHandler =
+ new DelayedOperationsHandler();
private IEventChannel<byte[], TopologyEvent> eventChannel;
@@ -102,6 +110,9 @@
private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, HostEvent>>
publishedHostEvents = new ConcurrentHashMap<>();
+ private BlockingQueue<TopologyBatchOperation> delayedOperations =
+ new LinkedBlockingQueue<>();
+
/**
* Gets the ONOS Instance ID.
@@ -191,6 +202,90 @@
}
}
+ /**
+ * A class to deal with Topology Operations that couldn't be pushed
+ * to the Global Log writer, because they need to be delayed.
+ * For example, a link cannot be pushed before the switches on both
+ * ends are in the Global Log.
+ *
+ * TODO: This is an ugly hack that should go away: right now we have to
+ * keep trying periodically.
+ * TODO: Currently, we retry only ADD Link Events, everything else
+ * is thrown away.
+ */
+ private class DelayedOperationsHandler extends Thread {
+ private static final long RETRY_INTERVAL_MS = 10; // 10ms
+
+ @Override
+ public void run() {
+ List<TopologyBatchOperation> operations = new LinkedList<>();
+
+ this.setName("TopologyPublisher.DelayedOperationsHandler " +
+ this.getId());
+ //
+ // The main loop
+ //
+ while (true) {
+ try {
+ //
+ // Block-waiting for an operation to be added, sleep
+ // and try to publish it again.
+ //
+ TopologyBatchOperation firstTbo = delayedOperations.take();
+ Thread.sleep(RETRY_INTERVAL_MS);
+ operations.add(firstTbo);
+ delayedOperations.drainTo(operations);
+
+ // Retry only the appropriate operations
+ for (TopologyBatchOperation tbo : operations) {
+ for (BatchOperationEntry<
+ TopologyBatchOperation.Operator,
+ TopologyEvent> boe : tbo.getOperations()) {
+ TopologyBatchOperation.Operator oper =
+ boe.getOperator();
+ switch (oper) {
+ case ADD:
+ TopologyEvent topologyEvent = boe.getTarget();
+ LinkEvent linkEvent =
+ topologyEvent.getLinkEvent();
+ //
+ // Test whether the Link Event still can be
+ // published.
+ // TODO: The implementation below has a bug:
+ // If it happens that the same Link Event was
+ // removed in the middle of checking, we might
+ // incorrectly publish it again from here.
+ //
+ if (linkEvent == null) {
+ break;
+ }
+ ConcurrentMap<ByteBuffer, LinkEvent>
+ linkEvents = publishedLinkEvents.get(
+ linkEvent.getDst().getDpid());
+ if (linkEvents == null) {
+ break;
+ }
+ if (linkEvents.get(linkEvent.getIDasByteBuffer()) == null) {
+ break;
+ }
+ publishAddLinkEvent(linkEvent);
+ break;
+ case REMOVE:
+ break;
+ default:
+ log.error("Unknown Topology Batch Operation {}", oper);
+ break;
+ }
+ }
+ }
+ } catch (InterruptedException exception) {
+ log.debug("Exception processing delayed operations: ",
+ exception);
+ }
+ }
+ }
+ }
+
@Override
public void linkAdded(Link link) {
LinkEvent linkEvent = new LinkEvent(
@@ -394,13 +489,19 @@
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
- return null;
+ List<Class<? extends IFloodlightService>> services =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ services.add(ITopologyPublisherService.class);
+ return services;
}
@Override
public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
- return null;
+ Map<Class<? extends IFloodlightService>, IFloodlightService> impls =
+ new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
+ impls.put(ITopologyPublisherService.class, this);
+ return impls;
}
@Override
@@ -460,6 +561,9 @@
// Run the cleanup task immediately on startup
cleanupTask.reschedule(0, TimeUnit.SECONDS);
}
+
+ // Run the Delayed Operations Handler thread
+ delayedOperationsHandler.start();
}
@Override
@@ -506,6 +610,12 @@
}
}
+ @Override
+ public boolean publish(TopologyBatchOperation tbo) {
+ publishTopologyOperations(tbo);
+ return true;
+ }
+
/**
* Prepares the Controller role changed event for a switch.
*
@@ -534,10 +644,11 @@
private void publishAddSwitchMastershipEvent(
MastershipEvent mastershipEvent) {
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(mastershipEvent, getOnosInstanceId());
- log.debug("Publishing add mastership: {}", topologyEvent);
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ tbo.appendAddOperation(topologyEvent);
+ publishTopologyOperations(tbo);
publishedMastershipEvents.put(mastershipEvent.getDpid(),
mastershipEvent);
}
@@ -554,10 +665,11 @@
}
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(mastershipEvent, getOnosInstanceId());
- log.debug("Publishing remove mastership: {}", topologyEvent);
- eventChannel.removeEntry(topologyEvent.getID());
+ tbo.appendRemoveOperation(topologyEvent);
+ publishTopologyOperations(tbo);
publishedMastershipEvents.remove(mastershipEvent.getDpid());
}
@@ -584,11 +696,10 @@
}
// Publish the information for the switch
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(switchEvent, getOnosInstanceId());
- log.debug("Publishing add switch: {}", topologyEvent);
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
- publishedSwitchEvents.put(switchEvent.getDpid(), switchEvent);
+ tbo.appendAddOperation(topologyEvent);
// Publish the information for each port
ConcurrentMap<ByteBuffer, PortEvent> newPortEvents =
@@ -596,13 +707,14 @@
for (PortEvent portEvent : portEvents) {
topologyEvent =
new TopologyEvent(portEvent, getOnosInstanceId());
- log.debug("Publishing add port: {}", topologyEvent);
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ tbo.appendAddOperation(topologyEvent);
ByteBuffer id = portEvent.getIDasByteBuffer();
newPortEvents.put(id, portEvent);
oldPortEvents.remove(id);
}
+ publishTopologyOperations(tbo);
+ publishedSwitchEvents.put(switchEvent.getDpid(), switchEvent);
publishedPortEvents.put(switchEvent.getDpid(), newPortEvents);
// Cleanup for each of the old removed port
@@ -629,10 +741,11 @@
*/
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(switchEvent, getOnosInstanceId());
- log.debug("Publishing remove switch: {}", topologyEvent);
- eventChannel.removeEntry(topologyEvent.getID());
+ tbo.appendRemoveOperation(topologyEvent);
+ publishTopologyOperations(tbo);
publishedSwitchEvents.remove(switchEvent.getDpid());
// Cleanup for each port
@@ -662,10 +775,11 @@
}
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(portEvent, getOnosInstanceId());
- log.debug("Publishing add port: {}", topologyEvent);
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ tbo.appendAddOperation(topologyEvent);
+ publishTopologyOperations(tbo);
// Store the new Port Event in the local cache
ConcurrentMap<ByteBuffer, PortEvent> portEvents =
@@ -691,10 +805,11 @@
}
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(portEvent, getOnosInstanceId());
- log.debug("Publishing remove port: {}", topologyEvent);
- eventChannel.removeEntry(topologyEvent.getID());
+ tbo.appendRemoveOperation(topologyEvent);
+ publishTopologyOperations(tbo);
// Cleanup for the incoming link(s)
ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
@@ -736,10 +851,11 @@
}
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(linkEvent, getOnosInstanceId());
- log.debug("Publishing add link: {}", topologyEvent);
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ tbo.appendAddOperation(topologyEvent);
+ publishTopologyOperations(tbo);
// Store the new Link Event in the local cache
ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
@@ -765,10 +881,11 @@
}
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(linkEvent, getOnosInstanceId());
- log.debug("Publishing remove link: {}", topologyEvent);
- eventChannel.removeEntry(topologyEvent.getID());
+ tbo.appendRemoveOperation(topologyEvent);
+ publishTopologyOperations(tbo);
linkEvents.remove(linkEvent.getIDasByteBuffer());
}
@@ -793,10 +910,11 @@
}
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(hostEvent, getOnosInstanceId());
- log.debug("Publishing add host: {}", topologyEvent);
- eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+ tbo.appendAddOperation(topologyEvent);
+ publishTopologyOperations(tbo);
// Store the new Host Event in the local cache
ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
@@ -822,11 +940,50 @@
}
// Publish the information
+ TopologyBatchOperation tbo = new TopologyBatchOperation();
TopologyEvent topologyEvent =
new TopologyEvent(hostEvent, getOnosInstanceId());
- log.debug("Publishing remove host: {}", topologyEvent);
- eventChannel.removeEntry(topologyEvent.getID());
+ tbo.appendRemoveOperation(topologyEvent);
+ publishTopologyOperations(tbo);
hostEvents.remove(hostEvent.getIDasByteBuffer());
}
+
+ /**
+ * Publishes Topology Operations.
+ *
+ * @param tbo the Topology Operations to publish
+ */
+ private void publishTopologyOperations(TopologyBatchOperation tbo) {
+ // TODO: This flag should be configurable
+ boolean isGlobalLogWriter = false;
+
+ log.debug("Publishing: {}", tbo);
+
+ if (isGlobalLogWriter) {
+ if (!topologyService.publish(tbo)) {
+ log.debug("Cannot publish: {}", tbo);
+ delayedOperations.add(tbo);
+ }
+ } else {
+ // TODO: For now we publish each TopologyEvent independently
+ for (BatchOperationEntry<TopologyBatchOperation.Operator,
+ TopologyEvent> boe : tbo.getOperations()) {
+ TopologyBatchOperation.Operator oper = boe.getOperator();
+ TopologyEvent topologyEvent = boe.getTarget();
+ switch (oper) {
+ case ADD:
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
+ break;
+ case REMOVE:
+ eventChannel.removeEntry(topologyEvent.getID());
+ break;
+ default:
+ log.error("Unknown Topology Batch Operation {}", oper);
+ break;
+ }
+ }
+ }
+ }
}
diff --git a/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java b/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
index 01b27ea..732174d 100644
--- a/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
+++ b/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
@@ -350,7 +350,7 @@
SwitchEvent switchEvent = new SwitchEvent(dpid);
TopologyEvent topologyEvent =
swConst.newInstance(switchEvent, onosInstanceId);
- tbo.addAddTopologyOperation(topologyEvent);
+ tbo.appendAddOperation(topologyEvent);
}
Result result = benchType(tbo, EqualityCheck.EQUALS);