Added an interface for the TopologyPublisher to publish operations to
the Topology Replication Writer

ONOS-1906

 * Added new interface ITopologyReplicationWriter that should be implemented
   by the (forthcoming) Topology Replication Writer. After this
   writer is implemented, it should be created inside class
   TopologyModule (similar to class TopologyManager), and
   method TopologyModule.publish() should be modified to make the appropriate
   call.

 * Added new interface ITopologyPublisherService that exposes the
   TopologyPublisher to other modules.
   Currently, it includes only method publish() that can be used to
   pre-populate the topology (e.g., by topology operations from a
   configuration file).

 * Updated class TopologyBatchOperation:
   - The object type is TopologyEvent
   - Renamed methods addAddTopologyOperation and addRemoveTopologyOperation
     to appendAddOperation() and appendRemoveOperation() respectively

 * Updated the TopologyPublisher internals to generalize the publishing
   operation inside method TopologyPublisher.publishTopologyOperations()
   NOTE: Inside that method that is a temporary flag "isGlobalLogWriter"
   (set to false by default) that can be used to start experimenting with
   the forthcoming Global Log-based mechanism (when the rest of the components
   are ready).
   NOTE: As a short-term solution, added class DelayedOperationsHandler
   as a hack to deal with Link Events that need to be delayed
   (in case of GlobalLog mechanism).

Change-Id: I72bd3c4bea46020be283e3bd518ba3bf900d6f0a
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyPublisherService.java b/src/main/java/net/onrc/onos/core/topology/ITopologyPublisherService.java
new file mode 100644
index 0000000..47f5a17
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyPublisherService.java
@@ -0,0 +1,16 @@
+package net.onrc.onos.core.topology;
+
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+/**
+ * Interface for providing the topology publisher service to other modules.
+ */
+public interface ITopologyPublisherService extends IFloodlightService {
+    /**
+     * Publishes Topology Batch Operations.
+     *
+     * @param tbo the Topology Batch Operations to publish
+     * @return true on success, otherwise false
+     */
+    public boolean publish(TopologyBatchOperation tbo);
+}
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyReplicationWriter.java b/src/main/java/net/onrc/onos/core/topology/ITopologyReplicationWriter.java
new file mode 100644
index 0000000..8b51e1b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyReplicationWriter.java
@@ -0,0 +1,15 @@
+package net.onrc.onos.core.topology;
+
+/**
+ * Interface for publishing Topology Operations to the Topology Replication
+ * Writer.
+ */
+public interface ITopologyReplicationWriter {
+    /**
+     * Publishes Topology Batch Operations.
+     *
+     * @param tbo the Topology Batch Operations to publish
+     * @return true on success, otherwise false
+     */
+    public boolean publish(TopologyBatchOperation tbo);
+}
diff --git a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
index 26b6212..ab32130 100644
--- a/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
+++ b/src/main/java/net/onrc/onos/core/topology/ITopologyService.java
@@ -5,7 +5,8 @@
 /**
  * Interface for providing the topology service to other modules.
  */
-public interface ITopologyService extends IFloodlightService {
+public interface ITopologyService extends IFloodlightService,
+                                        ITopologyReplicationWriter {
     /**
      * Allows a module to get a reference to the global topology object.
      *
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java b/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java
index f05e9d0..f7d7049 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyBatchOperation.java
@@ -7,7 +7,7 @@
  * A list of topology operations.
  */
 public class TopologyBatchOperation extends
-        BatchOperation<BatchOperationEntry<TopologyBatchOperation.Operator, TopologyBatchTarget>> {
+        BatchOperation<BatchOperationEntry<TopologyBatchOperation.Operator, TopologyEvent>> {
 
     /**
      * The topology operations' operators.
@@ -25,28 +25,28 @@
     }
 
     /**
-     * Adds an add-TopologyEvent operation.
+     * Appends an ADD-TopologyEvent operation.
      *
-     * @param topologyEvent the Topology Event to be added
+     * @param topologyEvent the Topology Event to be appended
      * @return the TopologyBatchOperation object
      */
-    public TopologyBatchOperation addAddTopologyOperation(
+    public TopologyBatchOperation appendAddOperation(
                                         TopologyEvent topologyEvent) {
         return (TopologyBatchOperation) addOperation(
-                new BatchOperationEntry<Operator, TopologyBatchTarget>(
+                new BatchOperationEntry<Operator, TopologyEvent>(
                                         Operator.ADD, topologyEvent));
     }
 
     /**
-     * Adds a remove-TopologyEvent operation.
+     * Appends a REMOVE-TopologyEvent operation.
      *
-     * @param topologyEvent the Topology Event to be removed
+     * @param topologyEvent the Topology Event to be appended
      * @return the TopologyBatchOperation object
      */
-    public TopologyBatchOperation addRemoveTopologyOperation(
+    public TopologyBatchOperation appendRemoveOperation(
                                         TopologyEvent topologyEvent) {
         return (TopologyBatchOperation) addOperation(
-                new BatchOperationEntry<Operator, TopologyBatchTarget>(
+                new BatchOperationEntry<Operator, TopologyEvent>(
                                         Operator.REMOVE, topologyEvent));
     }
 }
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
index 7913fc5..5ce9b6b 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyModule.java
@@ -81,4 +81,11 @@
     public void removeListener(ITopologyListener listener) {
         topologyManager.removeListener(listener);
     }
+
+    @Override
+    public boolean publish(TopologyBatchOperation tbo) {
+        // TODO: Replace with a call to the new (log-based) TopologyReplicator
+        // return topologyReplicator.publish(tbo);
+        return false;
+    }
 }
diff --git a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
index d27d68e..ff05cc3 100644
--- a/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
+++ b/src/main/java/net/onrc/onos/core/topology/TopologyPublisher.java
@@ -3,11 +3,14 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -21,6 +24,8 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.core.util.SingletonTask;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+import net.onrc.onos.api.batchoperation.BatchOperationEntry;
 import net.onrc.onos.core.datagrid.IDatagridService;
 import net.onrc.onos.core.datagrid.IEventChannel;
 import net.onrc.onos.core.hostmanager.Host;
@@ -57,7 +62,8 @@
 public class TopologyPublisher implements IOFSwitchListener,
         ILinkDiscoveryListener,
         IFloodlightModule,
-        IHostListener {
+        IHostListener,
+        ITopologyPublisherService {
     private static final Logger log =
             LoggerFactory.getLogger(TopologyPublisher.class);
 
@@ -75,6 +81,8 @@
     private boolean cleanupEnabled = true;
     private static final int CLEANUP_TASK_INTERVAL = 60; // in seconds
     private SingletonTask cleanupTask;
+    private DelayedOperationsHandler delayedOperationsHandler =
+        new DelayedOperationsHandler();
 
     private IEventChannel<byte[], TopologyEvent> eventChannel;
 
@@ -102,6 +110,9 @@
     private ConcurrentMap<Dpid, ConcurrentMap<ByteBuffer, HostEvent>>
         publishedHostEvents = new ConcurrentHashMap<>();
 
+    private BlockingQueue<TopologyBatchOperation> delayedOperations =
+        new LinkedBlockingQueue<>();
+
 
     /**
      * Gets the ONOS Instance ID.
@@ -191,6 +202,90 @@
         }
     }
 
+    /**
+     * A class to deal with Topology Operations that couldn't be pushed
+     * to the Global Log writer, because they need to be delayed.
+     * For example, a link cannot be pushed before the switches on both
+     * ends are in the Global Log.
+     *
+     * TODO: This is an ugly hack that should go away: right now we have to
+     * keep trying periodically.
+     * TODO: Currently, we retry only ADD Link Events, everything else
+     * is thrown away.
+     */
+    private class DelayedOperationsHandler extends Thread {
+        private static final long RETRY_INTERVAL_MS = 10;       // 10ms
+
+        @Override
+        public void run() {
+            List<TopologyBatchOperation> operations = new LinkedList<>();
+
+            this.setName("TopologyPublisher.DelayedOperationsHandler " +
+                         this.getId());
+            //
+            // The main loop
+            //
+            while (true) {
+                try {
+                    //
+                    // Block-waiting for an operation to be added, sleep
+                    // and try to publish it again.
+                    //
+                    TopologyBatchOperation firstTbo = delayedOperations.take();
+                    Thread.sleep(RETRY_INTERVAL_MS);
+                    operations.add(firstTbo);
+                    delayedOperations.drainTo(operations);
+
+                    // Retry only the appropriate operations
+                    for (TopologyBatchOperation tbo : operations) {
+                        for (BatchOperationEntry<
+                                TopologyBatchOperation.Operator,
+                                TopologyEvent> boe : tbo.getOperations()) {
+                            TopologyBatchOperation.Operator oper =
+                                boe.getOperator();
+                            switch (oper) {
+                            case ADD:
+                                TopologyEvent topologyEvent = boe.getTarget();
+                                LinkEvent linkEvent =
+                                    topologyEvent.getLinkEvent();
+                                //
+                                // Test whether the Link Event still can be
+                                // published.
+                                // TODO: The implementation below has a bug:
+                                // If it happens that the same Link Event was
+                                // removed in the middle of checking, we might
+                                // incorrectly publish it again from here.
+                                //
+                                if (linkEvent == null) {
+                                    break;
+                                }
+                                ConcurrentMap<ByteBuffer, LinkEvent>
+                                    linkEvents = publishedLinkEvents.get(
+                                                linkEvent.getDst().getDpid());
+                                if (linkEvents == null) {
+                                    break;
+                                }
+                                if (linkEvents.get(linkEvent.getIDasByteBuffer()) == null) {
+                                    break;
+                                }
+                                publishAddLinkEvent(linkEvent);
+                                break;
+                            case REMOVE:
+                                break;
+                            default:
+                                log.error("Unknown Topology Batch Operation {}", oper);
+                                break;
+                            }
+                        }
+                    }
+                } catch (InterruptedException exception) {
+                    log.debug("Exception processing delayed operations: ",
+                              exception);
+                }
+            }
+        }
+    }
+
     @Override
     public void linkAdded(Link link) {
         LinkEvent linkEvent = new LinkEvent(
@@ -394,13 +489,19 @@
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-        return null;
+        List<Class<? extends IFloodlightService>> services =
+                new ArrayList<Class<? extends IFloodlightService>>();
+        services.add(ITopologyPublisherService.class);
+        return services;
     }
 
     @Override
     public Map<Class<? extends IFloodlightService>, IFloodlightService>
             getServiceImpls() {
-        return null;
+        Map<Class<? extends IFloodlightService>, IFloodlightService> impls =
+                new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
+        impls.put(ITopologyPublisherService.class, this);
+        return impls;
     }
 
     @Override
@@ -460,6 +561,9 @@
             // Run the cleanup task immediately on startup
             cleanupTask.reschedule(0, TimeUnit.SECONDS);
         }
+
+        // Run the Delayed Operations Handler thread
+        delayedOperationsHandler.start();
     }
 
     @Override
@@ -506,6 +610,12 @@
         }
     }
 
+    @Override
+    public boolean publish(TopologyBatchOperation tbo) {
+        publishTopologyOperations(tbo);
+        return true;
+    }
+
     /**
      * Prepares the Controller role changed event for a switch.
      *
@@ -534,10 +644,11 @@
     private void publishAddSwitchMastershipEvent(
                         MastershipEvent mastershipEvent) {
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(mastershipEvent, getOnosInstanceId());
-        log.debug("Publishing add mastership: {}", topologyEvent);
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        tbo.appendAddOperation(topologyEvent);
+        publishTopologyOperations(tbo);
         publishedMastershipEvents.put(mastershipEvent.getDpid(),
                                       mastershipEvent);
     }
@@ -554,10 +665,11 @@
         }
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(mastershipEvent, getOnosInstanceId());
-        log.debug("Publishing remove mastership: {}", topologyEvent);
-        eventChannel.removeEntry(topologyEvent.getID());
+        tbo.appendRemoveOperation(topologyEvent);
+        publishTopologyOperations(tbo);
         publishedMastershipEvents.remove(mastershipEvent.getDpid());
     }
 
@@ -584,11 +696,10 @@
         }
 
         // Publish the information for the switch
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(switchEvent, getOnosInstanceId());
-        log.debug("Publishing add switch: {}", topologyEvent);
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
-        publishedSwitchEvents.put(switchEvent.getDpid(), switchEvent);
+        tbo.appendAddOperation(topologyEvent);
 
         // Publish the information for each port
         ConcurrentMap<ByteBuffer, PortEvent> newPortEvents =
@@ -596,13 +707,14 @@
         for (PortEvent portEvent : portEvents) {
             topologyEvent =
                 new TopologyEvent(portEvent, getOnosInstanceId());
-            log.debug("Publishing add port: {}", topologyEvent);
-            eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+            tbo.appendAddOperation(topologyEvent);
 
             ByteBuffer id = portEvent.getIDasByteBuffer();
             newPortEvents.put(id, portEvent);
             oldPortEvents.remove(id);
         }
+        publishTopologyOperations(tbo);
+        publishedSwitchEvents.put(switchEvent.getDpid(), switchEvent);
         publishedPortEvents.put(switchEvent.getDpid(), newPortEvents);
 
         // Cleanup for each of the old removed port
@@ -629,10 +741,11 @@
         */
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(switchEvent, getOnosInstanceId());
-        log.debug("Publishing remove switch: {}", topologyEvent);
-        eventChannel.removeEntry(topologyEvent.getID());
+        tbo.appendRemoveOperation(topologyEvent);
+        publishTopologyOperations(tbo);
         publishedSwitchEvents.remove(switchEvent.getDpid());
 
         // Cleanup for each port
@@ -662,10 +775,11 @@
         }
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(portEvent, getOnosInstanceId());
-        log.debug("Publishing add port: {}", topologyEvent);
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        tbo.appendAddOperation(topologyEvent);
+        publishTopologyOperations(tbo);
 
         // Store the new Port Event in the local cache
         ConcurrentMap<ByteBuffer, PortEvent> portEvents =
@@ -691,10 +805,11 @@
         }
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(portEvent, getOnosInstanceId());
-        log.debug("Publishing remove port: {}", topologyEvent);
-        eventChannel.removeEntry(topologyEvent.getID());
+        tbo.appendRemoveOperation(topologyEvent);
+        publishTopologyOperations(tbo);
 
         // Cleanup for the incoming link(s)
         ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
@@ -736,10 +851,11 @@
         }
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(linkEvent, getOnosInstanceId());
-        log.debug("Publishing add link: {}", topologyEvent);
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        tbo.appendAddOperation(topologyEvent);
+        publishTopologyOperations(tbo);
 
         // Store the new Link Event in the local cache
         ConcurrentMap<ByteBuffer, LinkEvent> linkEvents =
@@ -765,10 +881,11 @@
         }
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(linkEvent, getOnosInstanceId());
-        log.debug("Publishing remove link: {}", topologyEvent);
-        eventChannel.removeEntry(topologyEvent.getID());
+        tbo.appendRemoveOperation(topologyEvent);
+        publishTopologyOperations(tbo);
 
         linkEvents.remove(linkEvent.getIDasByteBuffer());
     }
@@ -793,10 +910,11 @@
         }
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(hostEvent, getOnosInstanceId());
-        log.debug("Publishing add host: {}", topologyEvent);
-        eventChannel.addEntry(topologyEvent.getID(), topologyEvent);
+        tbo.appendAddOperation(topologyEvent);
+        publishTopologyOperations(tbo);
 
         // Store the new Host Event in the local cache
         ConcurrentMap<ByteBuffer, HostEvent> hostEvents =
@@ -822,11 +940,50 @@
         }
 
         // Publish the information
+        TopologyBatchOperation tbo = new TopologyBatchOperation();
         TopologyEvent topologyEvent =
             new TopologyEvent(hostEvent, getOnosInstanceId());
-        log.debug("Publishing remove host: {}", topologyEvent);
-        eventChannel.removeEntry(topologyEvent.getID());
+        tbo.appendRemoveOperation(topologyEvent);
+        publishTopologyOperations(tbo);
 
         hostEvents.remove(hostEvent.getIDasByteBuffer());
     }
+
+    /**
+     * Publishes Topology Operations.
+     *
+     * @param tbo the Topology Operations to publish
+     */
+    private void publishTopologyOperations(TopologyBatchOperation tbo) {
+        // TODO: This flag should be configurable
+        boolean isGlobalLogWriter = false;
+
+        log.debug("Publishing: {}", tbo);
+
+        if (isGlobalLogWriter) {
+            if (!topologyService.publish(tbo)) {
+                log.debug("Cannot publish: {}", tbo);
+                delayedOperations.add(tbo);
+            }
+        } else {
+            // TODO: For now we publish each TopologyEvent independently
+            for (BatchOperationEntry<TopologyBatchOperation.Operator,
+                     TopologyEvent> boe : tbo.getOperations()) {
+                TopologyBatchOperation.Operator oper = boe.getOperator();
+                TopologyEvent topologyEvent = boe.getTarget();
+                switch (oper) {
+                case ADD:
+                    eventChannel.addEntry(topologyEvent.getID(),
+                                          topologyEvent);
+                    break;
+                case REMOVE:
+                    eventChannel.removeEntry(topologyEvent.getID());
+                    break;
+                default:
+                    log.error("Unknown Topology Batch Operation {}", oper);
+                    break;
+                }
+            }
+        }
+    }
 }
diff --git a/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java b/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
index 01b27ea..732174d 100644
--- a/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
+++ b/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
@@ -350,7 +350,7 @@
                 SwitchEvent switchEvent = new SwitchEvent(dpid);
                 TopologyEvent topologyEvent =
                     swConst.newInstance(switchEvent, onosInstanceId);
-                tbo.addAddTopologyOperation(topologyEvent);
+                tbo.appendAddOperation(topologyEvent);
             }
 
             Result result = benchType(tbo, EqualityCheck.EQUALS);