Injecting topology through JSON ConfigProvider works for multi-instance (ONOS-490).

Change-Id: Ib977f4cf9a59ddec360072891fd803c6f9ee84f1

Injecting optical device annotations and ports works for multi-instance (ONOS-870).

Change-Id: Icdde16ef72fc4e47eec7213250b04902083f0537
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 6cc45ea..6131190 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -32,6 +32,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.AnnotationsUtil;
 import org.onosproject.net.ConnectPoint;
@@ -90,9 +91,7 @@
 import static org.onosproject.net.Link.Type.DIRECT;
 import static org.onosproject.net.Link.Type.INDIRECT;
 import static org.onosproject.net.LinkKey.linkKey;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
+import static org.onosproject.net.link.LinkEvent.Type.*;
 import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -106,6 +105,9 @@
         extends AbstractStore<LinkEvent, LinkStoreDelegate>
         implements LinkStore {
 
+    // Timeout in milliseconds to process links on remote master node
+    private static final int REMOTE_MASTER_TIMEOUT = 1000;
+
     private final Logger log = getLogger(getClass());
 
     // Link inventory
@@ -131,6 +133,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -141,6 +146,7 @@
                     .register(InternalLinkRemovedEvent.class)
                     .register(LinkAntiEntropyAdvertisement.class)
                     .register(LinkFragmentId.class)
+                    .register(LinkInjectedEvent.class)
                     .build();
         }
     };
@@ -161,6 +167,9 @@
         clusterCommunicator.addSubscriber(
                 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
                 new InternalLinkAntiEntropyAdvertisementListener());
+        clusterCommunicator.addSubscriber(
+                GossipLinkStoreMessageSubjects.LINK_INJECTED,
+                new LinkInjectedEventListener());
 
         executor = Executors.newCachedThreadPool(namedThreads("onos-link-fg-%d"));
 
@@ -270,27 +279,52 @@
     public LinkEvent createOrUpdateLink(ProviderId providerId,
                                         LinkDescription linkDescription) {
 
-        DeviceId dstDeviceId = linkDescription.dst().deviceId();
-        Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+        final DeviceId dstDeviceId = linkDescription.dst().deviceId();
+        final NodeId localNode = clusterService.getLocalNode().id();
+        final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
 
-        final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+        // Process link update only if we're the master of the destination node,
+        // otherwise signal the actual master.
+        LinkEvent linkEvent = null;
+        if (localNode.equals(dstNode)) {
 
-        LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
-        final LinkEvent event;
-        final Timestamped<LinkDescription> mergedDesc;
-        Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
-        synchronized (map) {
-            event = createOrUpdateLinkInternal(providerId, deltaDesc);
-            mergedDesc = map.get(providerId);
+            Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+
+            final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+
+            LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
+            final Timestamped<LinkDescription> mergedDesc;
+            Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
+
+            synchronized (map) {
+                linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
+                mergedDesc = map.get(providerId);
+            }
+
+            if (linkEvent != null) {
+                log.info("Notifying peers of a link update topology event from providerId: "
+                                + "{}  between src: {} and dst: {}",
+                        providerId, linkDescription.src(), linkDescription.dst());
+                notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
+            }
+
+        } else {
+
+            LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
+            ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
+                    GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
+
+            try {
+                clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+            } catch (IOException e) {
+                log.warn("Failed to process link update between src: {} and dst: {} " +
+                                "(cluster messaging failed: {})",
+                        linkDescription.src(), linkDescription.dst(), e);
+            }
+
         }
 
-        if (event != null) {
-            log.info("Notifying peers of a link update topology event from providerId: "
-                    + "{}  between src: {} and dst: {}",
-                    providerId, linkDescription.src(), linkDescription.dst());
-            notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
-        }
-        return event;
+        return linkEvent;
     }
 
     @Override
@@ -318,7 +352,7 @@
             Timestamped<LinkDescription> linkDescription) {
 
         final LinkKey key = linkKey(linkDescription.value().src(),
-                                    linkDescription.value().dst());
+                linkDescription.value().dst());
         Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
 
         synchronized (descs) {
@@ -397,7 +431,7 @@
             !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
 
             links.put(key, newLink);
-            // strictly speaking following can be ommitted
+            // strictly speaking following can be omitted
             srcLinks.put(oldLink.src().deviceId(), key);
             dstLinks.put(oldLink.dst().deviceId(), key);
             return new LinkEvent(LINK_UPDATED, newLink);
@@ -848,4 +882,25 @@
             });
         }
     }
+
+    private final class LinkInjectedEventListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.trace("Received injected link event from peer: {}", message.sender());
+            LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = linkInjectedEvent.providerId();
+            LinkDescription linkDescription = linkInjectedEvent.linkDescription();
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    createOrUpdateLink(providerId, linkDescription);
+                }
+            });
+        }
+    }
 }