Injecting topology through JSON ConfigProvider works for multi-instance (ONOS-490).

Change-Id: Ib977f4cf9a59ddec360072891fd803c6f9ee84f1

Injecting optical device annotations and ports works for multi-instance (ONOS-870).

Change-Id: Icdde16ef72fc4e47eec7213250b04902083f0537
diff --git a/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java b/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java
index fc33693..13796f9 100644
--- a/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java
+++ b/apps/optical/src/main/java/org/onosproject/optical/OpticalPathProvisioner.java
@@ -179,7 +179,7 @@
 
             if (!IntentState.FAILED.equals(intentService.getIntentState(intent.key()))) {
                    return;
-             }
+            }
 
             List<Intent> intents = Lists.newArrayList();
             if (intent instanceof HostToHostIntent) {
diff --git a/core/api/src/main/java/org/onosproject/event/AbstractEvent.java b/core/api/src/main/java/org/onosproject/event/AbstractEvent.java
index 51d1caa..8d4e522 100644
--- a/core/api/src/main/java/org/onosproject/event/AbstractEvent.java
+++ b/core/api/src/main/java/org/onosproject/event/AbstractEvent.java
@@ -15,10 +15,10 @@
  */
 package org.onosproject.event;
 
-import static com.google.common.base.MoreObjects.toStringHelper;
-
 import org.joda.time.LocalDateTime;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
+
 /**
  * Base event implementation.
  */
@@ -75,5 +75,4 @@
                 .add("subject", subject())
                 .toString();
     }
-
 }
diff --git a/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java b/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
index 5315994..fac6204 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
@@ -47,7 +47,7 @@
      * @param hwVersion    device HW version
      * @param swVersion    device SW version
      * @param serialNumber device serial number
-     * @param chassis      chasis id
+     * @param chassis      chassis id
      * @param annotations  optional key/value annotations map
      */
     public DefaultDeviceDescription(URI uri, Type type, String manufacturer,
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index e8cccf4..69ef991 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -306,18 +306,16 @@
                 // TODO: Do we need to explicitly tell the Provider that
                 // this instance is not the MASTER
                 applyRole(deviceId, MastershipRole.STANDBY);
-                return;
+            } else {
+                log.info("Role of this node is MASTER for {}", deviceId);
+                // tell clock provider if this instance is the master
+                deviceClockProviderService.setMastershipTerm(deviceId, term);
+                applyRole(deviceId, MastershipRole.MASTER);
             }
-            log.info("Role of this node is MASTER for {}", deviceId);
-
-            // tell clock provider if this instance is the master
-            deviceClockProviderService.setMastershipTerm(deviceId, term);
 
             DeviceEvent event = store.createOrUpdateDevice(provider().id(),
                                                            deviceId, deviceDescription);
 
-            applyRole(deviceId, MastershipRole.MASTER);
-
             // If there was a change of any kind, tell the provider
             // that this instance is the master.
             if (event != null) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java
new file mode 100644
index 0000000..6f93963
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/DeviceInjectedEvent.java
@@ -0,0 +1,49 @@
+package org.onosproject.store.device.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.provider.ProviderId;
+
+public class DeviceInjectedEvent {
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final DeviceDescription deviceDescription;
+
+    protected DeviceInjectedEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            DeviceDescription deviceDescription) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.deviceDescription = deviceDescription;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public DeviceDescription deviceDescription() {
+        return deviceDescription;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .add("deviceDescription", deviceDescription)
+                .toString();
+    }
+
+    // for serializer
+    protected DeviceInjectedEvent() {
+        this.providerId = null;
+        this.deviceId = null;
+        this.deviceDescription = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 3a94087..d29fadb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -29,6 +28,9 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.ChassisId;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -61,9 +63,6 @@
 import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
-import org.onlab.packet.ChassisId;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -86,17 +85,17 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Predicates.notNull;
-import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
-import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static org.onosproject.net.DefaultAnnotations.merge;
 import static com.google.common.base.Verify.verify;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 import static org.onlab.util.Tools.minPriority;
 import static org.onlab.util.Tools.namedThreads;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages inventory of infrastructure devices using gossip protocol to distribute
@@ -111,6 +110,8 @@
     private final Logger log = getLogger(getClass());
 
     private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+    // Timeout in milliseconds to process device or ports on remote master node
+    private static final int REMOTE_MASTER_TIMEOUT = 1000;
 
     // innerMap is used to lock a Device, thus instance should never be replaced.
     // collection of Description given from various providers
@@ -158,6 +159,8 @@
                     .register(DeviceAntiEntropyAdvertisement.class)
                     .register(DeviceFragmentId.class)
                     .register(PortFragmentId.class)
+                    .register(DeviceInjectedEvent.class)
+                    .register(PortInjectedEvent.class)
                     .build();
         }
     };
@@ -186,6 +189,10 @@
                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
 
         executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d"));
 
@@ -251,21 +258,48 @@
     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
                                      DeviceId deviceId,
                                      DeviceDescription deviceDescription) {
-        final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
-        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
-        final DeviceEvent event;
-        final Timestamped<DeviceDescription> mergedDesc;
-        final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
-        synchronized (device) {
-            event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
-            mergedDesc = device.get(providerId).getDeviceDesc();
+        NodeId localNode = clusterService.getLocalNode().id();
+        NodeId deviceNode = mastershipService.getMasterFor(deviceId);
+
+        // Process device update only if we're the master,
+        // otherwise signal the actual master.
+        DeviceEvent deviceEvent = null;
+        if (localNode.equals(deviceNode)) {
+
+            final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
+            final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+            final Timestamped<DeviceDescription> mergedDesc;
+            final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+            synchronized (device) {
+                deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+                mergedDesc = device.get(providerId).getDeviceDesc();
+            }
+
+            if (deviceEvent != null) {
+                log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+                        providerId, deviceId);
+                notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
+            }
+
+        } else {
+
+            DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
+                    providerId, deviceId, deviceDescription);
+            ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
+                    SERIALIZER.encode(deviceInjectedEvent));
+
+            try {
+                clusterCommunicator.unicast(clusterMessage, deviceNode);
+            } catch (IOException e) {
+                log.warn("Failed to process injected device id: {} desc: {} " +
+                                "(cluster messaging failed: {})",
+                        deviceId, deviceDescription, e);
+            }
+
         }
-        if (event != null) {
-            log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
-                providerId, deviceId);
-            notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
-        }
-        return event;
+
+        return deviceEvent;
     }
 
     private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
@@ -434,52 +468,81 @@
                                        DeviceId deviceId,
                                        List<PortDescription> portDescriptions) {
 
-        final Timestamp newTimestamp;
-        try {
-            newTimestamp = deviceClockService.getTimestamp(deviceId);
-        } catch (IllegalStateException e) {
-            log.info("Timestamp was not available for device {}", deviceId);
-            log.debug("  discarding {}", portDescriptions);
-            // Failed to generate timestamp.
+        NodeId localNode = clusterService.getLocalNode().id();
+        // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
+        // since it will trigger distributed store read.
+        // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
+        // outside Device subsystem. so that we don't have to modify both Device and Link stores.
+        // If we don't care much about topology performance, then it might be OK.
+        NodeId deviceNode = mastershipService.getMasterFor(deviceId);
 
-            // Possible situation:
-            //  Device connected and became master for short period of time,
-            // but lost mastership before this instance had the chance to
-            // retrieve term information.
+        // Process port update only if we're the master of the device,
+        // otherwise signal the actual master.
+        List<DeviceEvent> deviceEvents = null;
+        if (localNode.equals(deviceNode)) {
 
-            // Information dropped here is expected to be recoverable by
-            // device probing after mastership change
+            final Timestamp newTimestamp;
+            try {
+                newTimestamp = deviceClockService.getTimestamp(deviceId);
+            } catch (IllegalStateException e) {
+                log.info("Timestamp was not available for device {}", deviceId);
+                log.debug("  discarding {}", portDescriptions);
+                // Failed to generate timestamp.
 
-            return Collections.emptyList();
+                // Possible situation:
+                //  Device connected and became master for short period of time,
+                // but lost mastership before this instance had the chance to
+                // retrieve term information.
+
+                // Information dropped here is expected to be recoverable by
+                // device probing after mastership change
+
+                return Collections.emptyList();
+            }
+            log.debug("timestamp for {} {}", deviceId, newTimestamp);
+
+            final Timestamped<List<PortDescription>> timestampedInput
+                    = new Timestamped<>(portDescriptions, newTimestamp);
+            final Timestamped<List<PortDescription>> merged;
+
+            final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+            synchronized (device) {
+                deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
+                final DeviceDescriptions descs = device.get(providerId);
+                List<PortDescription> mergedList =
+                        FluentIterable.from(portDescriptions)
+                                .transform(new Function<PortDescription, PortDescription>() {
+                                    @Override
+                                    public PortDescription apply(PortDescription input) {
+                                        // lookup merged port description
+                                        return descs.getPortDesc(input.portNumber()).value();
+                                    }
+                                }).toList();
+                merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
+            }
+
+            if (!deviceEvents.isEmpty()) {
+                log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
+                        providerId, deviceId);
+                notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
+            }
+
+        } else {
+
+            PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
+            ClusterMessage clusterMessage = new ClusterMessage(
+                    localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
+            try {
+                clusterCommunicator.unicast(clusterMessage, deviceNode);
+            } catch (IOException e) {
+                log.warn("Failed to process injected ports of device id: {} " +
+                                "(cluster messaging failed: {})",
+                        deviceId, e);
+            }
         }
-        log.debug("timestamp for {} {}", deviceId, newTimestamp);
 
-        final Timestamped<List<PortDescription>> timestampedInput
-                = new Timestamped<>(portDescriptions, newTimestamp);
-        final List<DeviceEvent> events;
-        final Timestamped<List<PortDescription>> merged;
-
-        final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
-        synchronized (device) {
-            events = updatePortsInternal(providerId, deviceId, timestampedInput);
-            final DeviceDescriptions descs = device.get(providerId);
-            List<PortDescription> mergedList =
-                    FluentIterable.from(portDescriptions)
-                .transform(new Function<PortDescription, PortDescription>() {
-                    @Override
-                    public PortDescription apply(PortDescription input) {
-                        // lookup merged port description
-                        return descs.getPortDesc(input.portNumber()).value();
-                    }
-                }).toList();
-            merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
-        }
-        if (!events.isEmpty()) {
-            log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
-                    providerId, deviceId);
-            notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
-        }
-        return events;
+        return deviceEvents;
     }
 
     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -1431,4 +1494,48 @@
             });
         }
     }
+
+    private final class DeviceInjectedEventListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.debug("Received injected device event from peer: {}", message.sender());
+            DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            DeviceDescription deviceDescription = event.deviceDescription();
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    createOrUpdateDevice(providerId, deviceId, deviceDescription);
+                }
+            });
+        }
+    }
+
+    private final class PortInjectedEventListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.debug("Received injected port event from peer: {}", message.sender());
+            PortInjectedEvent event = SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            List<PortDescription> portDescriptions = event.portDescriptions();
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    updatePorts(providerId, deviceId, portDescriptions);
+                }
+            });
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java
index 83fdf98..9f7361b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -34,4 +34,8 @@
     public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements");
     // to be used with 3-way anti-entropy process
     public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request");
+
+    // Network elements injected (not discovered) by ConfigProvider
+    public static final MessageSubject DEVICE_INJECTED = new MessageSubject("peer-device-injected");
+    public static final MessageSubject PORT_INJECTED = new MessageSubject("peer-port-injected");
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java
new file mode 100644
index 0000000..c80f810
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/PortInjectedEvent.java
@@ -0,0 +1,50 @@
+package org.onosproject.store.device.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.PortDescription;
+import org.onosproject.net.provider.ProviderId;
+
+import java.util.List;
+
+public class PortInjectedEvent {
+
+    private ProviderId providerId;
+    private DeviceId deviceId;
+    private List<PortDescription> portDescriptions;
+
+    protected PortInjectedEvent(ProviderId providerId, DeviceId deviceId, List<PortDescription> portDescriptions) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portDescriptions = portDescriptions;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public List<PortDescription> portDescriptions() {
+        return portDescriptions;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("deviceId", deviceId)
+                .add("portDescriptions", portDescriptions)
+                .toString();
+    }
+
+    // for serializer
+    protected PortInjectedEvent() {
+        this.providerId = null;
+        this.deviceId = null;
+        this.portDescriptions = null;
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 6cc45ea..6131190 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -32,6 +32,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.AnnotationsUtil;
 import org.onosproject.net.ConnectPoint;
@@ -90,9 +91,7 @@
 import static org.onosproject.net.Link.Type.DIRECT;
 import static org.onosproject.net.Link.Type.INDIRECT;
 import static org.onosproject.net.LinkKey.linkKey;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
+import static org.onosproject.net.link.LinkEvent.Type.*;
 import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -106,6 +105,9 @@
         extends AbstractStore<LinkEvent, LinkStoreDelegate>
         implements LinkStore {
 
+    // Timeout in milliseconds to process links on remote master node
+    private static final int REMOTE_MASTER_TIMEOUT = 1000;
+
     private final Logger log = getLogger(getClass());
 
     // Link inventory
@@ -131,6 +133,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -141,6 +146,7 @@
                     .register(InternalLinkRemovedEvent.class)
                     .register(LinkAntiEntropyAdvertisement.class)
                     .register(LinkFragmentId.class)
+                    .register(LinkInjectedEvent.class)
                     .build();
         }
     };
@@ -161,6 +167,9 @@
         clusterCommunicator.addSubscriber(
                 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
                 new InternalLinkAntiEntropyAdvertisementListener());
+        clusterCommunicator.addSubscriber(
+                GossipLinkStoreMessageSubjects.LINK_INJECTED,
+                new LinkInjectedEventListener());
 
         executor = Executors.newCachedThreadPool(namedThreads("onos-link-fg-%d"));
 
@@ -270,27 +279,52 @@
     public LinkEvent createOrUpdateLink(ProviderId providerId,
                                         LinkDescription linkDescription) {
 
-        DeviceId dstDeviceId = linkDescription.dst().deviceId();
-        Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+        final DeviceId dstDeviceId = linkDescription.dst().deviceId();
+        final NodeId localNode = clusterService.getLocalNode().id();
+        final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
 
-        final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+        // Process link update only if we're the master of the destination node,
+        // otherwise signal the actual master.
+        LinkEvent linkEvent = null;
+        if (localNode.equals(dstNode)) {
 
-        LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
-        final LinkEvent event;
-        final Timestamped<LinkDescription> mergedDesc;
-        Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
-        synchronized (map) {
-            event = createOrUpdateLinkInternal(providerId, deltaDesc);
-            mergedDesc = map.get(providerId);
+            Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
+
+            final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
+
+            LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
+            final Timestamped<LinkDescription> mergedDesc;
+            Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
+
+            synchronized (map) {
+                linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
+                mergedDesc = map.get(providerId);
+            }
+
+            if (linkEvent != null) {
+                log.info("Notifying peers of a link update topology event from providerId: "
+                                + "{}  between src: {} and dst: {}",
+                        providerId, linkDescription.src(), linkDescription.dst());
+                notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
+            }
+
+        } else {
+
+            LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
+            ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
+                    GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
+
+            try {
+                clusterCommunicator.unicast(linkInjectedMessage, dstNode);
+            } catch (IOException e) {
+                log.warn("Failed to process link update between src: {} and dst: {} " +
+                                "(cluster messaging failed: {})",
+                        linkDescription.src(), linkDescription.dst(), e);
+            }
+
         }
 
-        if (event != null) {
-            log.info("Notifying peers of a link update topology event from providerId: "
-                    + "{}  between src: {} and dst: {}",
-                    providerId, linkDescription.src(), linkDescription.dst());
-            notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
-        }
-        return event;
+        return linkEvent;
     }
 
     @Override
@@ -318,7 +352,7 @@
             Timestamped<LinkDescription> linkDescription) {
 
         final LinkKey key = linkKey(linkDescription.value().src(),
-                                    linkDescription.value().dst());
+                linkDescription.value().dst());
         Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
 
         synchronized (descs) {
@@ -397,7 +431,7 @@
             !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
 
             links.put(key, newLink);
-            // strictly speaking following can be ommitted
+            // strictly speaking following can be omitted
             srcLinks.put(oldLink.src().deviceId(), key);
             dstLinks.put(oldLink.dst().deviceId(), key);
             return new LinkEvent(LINK_UPDATED, newLink);
@@ -848,4 +882,25 @@
             });
         }
     }
+
+    private final class LinkInjectedEventListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.trace("Received injected link event from peer: {}", message.sender());
+            LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = linkInjectedEvent.providerId();
+            LinkDescription linkDescription = linkInjectedEvent.linkDescription();
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    createOrUpdateLink(providerId, linkDescription);
+                }
+            });
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
index 6b583ef..1d261b5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
@@ -15,7 +15,7 @@
  */
 package org.onosproject.store.link.impl;
 
-import org.onosproject.store.cluster.messaging.MessageSubject;
+        import org.onosproject.store.cluster.messaging.MessageSubject;
 
 /**
  * MessageSubjects used by GossipLinkStore peer-peer communication.
@@ -30,4 +30,6 @@
             new MessageSubject("peer-link-removed");
     public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
             new MessageSubject("link-enti-entropy-advertisement");
+    public static final MessageSubject LINK_INJECTED =
+            new MessageSubject("peer-link-injected");
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java
new file mode 100644
index 0000000..356033b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java
@@ -0,0 +1,38 @@
+package org.onosproject.store.link.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.net.link.LinkDescription;
+import org.onosproject.net.provider.ProviderId;
+
+public class LinkInjectedEvent {
+
+    ProviderId providerId;
+    LinkDescription linkDescription;
+
+    public LinkInjectedEvent(ProviderId providerId, LinkDescription linkDescription) {
+        this.providerId = providerId;
+        this.linkDescription = linkDescription;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public LinkDescription linkDescription() {
+        return linkDescription;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("providerId", providerId)
+                .add("linkDescription", linkDescription)
+                .toString();
+    }
+
+    // for serializer
+    protected LinkInjectedEvent() {
+        this.providerId = null;
+        this.linkDescription = null;
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index 1251e96..eeca2a8b 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -27,6 +27,8 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
@@ -115,7 +117,7 @@
     private DeviceClockManager deviceClockManager;
     private DeviceClockService deviceClockService;
     private ClusterCommunicationService clusterCommunicator;
-
+    private MastershipService mastershipService;
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -146,11 +148,13 @@
         linkStoreImpl.deviceClockService = deviceClockService;
         linkStoreImpl.clusterCommunicator = clusterCommunicator;
         linkStoreImpl.clusterService = new TestClusterService();
+        linkStoreImpl.mastershipService = new TestMastershipService();
         linkStoreImpl.activate();
         linkStore = linkStoreImpl;
 
         verify(clusterCommunicator);
         reset(clusterCommunicator);
+
     }
 
     @After
@@ -602,4 +606,11 @@
             nodeStates.put(NID2, ACTIVE);
         }
     }
+
+    private final class TestMastershipService extends MastershipServiceAdapter {
+        @Override
+        public NodeId getMasterFor(DeviceId deviceId) {
+            return NID1;
+        }
+    }
 }