Injecting topology through JSON ConfigProvider works for multi-instance (ONOS-490).

Change-Id: Ib977f4cf9a59ddec360072891fd803c6f9ee84f1

Injecting optical device annotations and ports works for multi-instance (ONOS-870).

Change-Id: Icdde16ef72fc4e47eec7213250b04902083f0537
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 3a94087..d29fadb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -21,7 +21,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -29,6 +28,9 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.ChassisId;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -61,9 +63,6 @@
 import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
-import org.onlab.packet.ChassisId;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -86,17 +85,17 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Predicates.notNull;
-import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
-import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static org.onosproject.net.DefaultAnnotations.merge;
 import static com.google.common.base.Verify.verify;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 import static org.onlab.util.Tools.minPriority;
 import static org.onlab.util.Tools.namedThreads;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
-import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages inventory of infrastructure devices using gossip protocol to distribute
@@ -111,6 +110,8 @@
     private final Logger log = getLogger(getClass());
 
     private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+    // Timeout in milliseconds to process device or ports on remote master node
+    private static final int REMOTE_MASTER_TIMEOUT = 1000;
 
     // innerMap is used to lock a Device, thus instance should never be replaced.
     // collection of Description given from various providers
@@ -158,6 +159,8 @@
                     .register(DeviceAntiEntropyAdvertisement.class)
                     .register(DeviceFragmentId.class)
                     .register(PortFragmentId.class)
+                    .register(DeviceInjectedEvent.class)
+                    .register(PortInjectedEvent.class)
                     .build();
         }
     };
@@ -186,6 +189,10 @@
                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
 
         executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d"));
 
@@ -251,21 +258,48 @@
     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
                                      DeviceId deviceId,
                                      DeviceDescription deviceDescription) {
-        final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
-        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
-        final DeviceEvent event;
-        final Timestamped<DeviceDescription> mergedDesc;
-        final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
-        synchronized (device) {
-            event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
-            mergedDesc = device.get(providerId).getDeviceDesc();
+        NodeId localNode = clusterService.getLocalNode().id();
+        NodeId deviceNode = mastershipService.getMasterFor(deviceId);
+
+        // Process device update only if we're the master,
+        // otherwise signal the actual master.
+        DeviceEvent deviceEvent = null;
+        if (localNode.equals(deviceNode)) {
+
+            final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
+            final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+            final Timestamped<DeviceDescription> mergedDesc;
+            final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+            synchronized (device) {
+                deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+                mergedDesc = device.get(providerId).getDeviceDesc();
+            }
+
+            if (deviceEvent != null) {
+                log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+                        providerId, deviceId);
+                notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
+            }
+
+        } else {
+
+            DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
+                    providerId, deviceId, deviceDescription);
+            ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
+                    SERIALIZER.encode(deviceInjectedEvent));
+
+            try {
+                clusterCommunicator.unicast(clusterMessage, deviceNode);
+            } catch (IOException e) {
+                log.warn("Failed to process injected device id: {} desc: {} " +
+                                "(cluster messaging failed: {})",
+                        deviceId, deviceDescription, e);
+            }
+
         }
-        if (event != null) {
-            log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
-                providerId, deviceId);
-            notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
-        }
-        return event;
+
+        return deviceEvent;
     }
 
     private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
@@ -434,52 +468,81 @@
                                        DeviceId deviceId,
                                        List<PortDescription> portDescriptions) {
 
-        final Timestamp newTimestamp;
-        try {
-            newTimestamp = deviceClockService.getTimestamp(deviceId);
-        } catch (IllegalStateException e) {
-            log.info("Timestamp was not available for device {}", deviceId);
-            log.debug("  discarding {}", portDescriptions);
-            // Failed to generate timestamp.
+        NodeId localNode = clusterService.getLocalNode().id();
+        // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
+        // since it will trigger distributed store read.
+        // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
+        // outside Device subsystem. so that we don't have to modify both Device and Link stores.
+        // If we don't care much about topology performance, then it might be OK.
+        NodeId deviceNode = mastershipService.getMasterFor(deviceId);
 
-            // Possible situation:
-            //  Device connected and became master for short period of time,
-            // but lost mastership before this instance had the chance to
-            // retrieve term information.
+        // Process port update only if we're the master of the device,
+        // otherwise signal the actual master.
+        List<DeviceEvent> deviceEvents = null;
+        if (localNode.equals(deviceNode)) {
 
-            // Information dropped here is expected to be recoverable by
-            // device probing after mastership change
+            final Timestamp newTimestamp;
+            try {
+                newTimestamp = deviceClockService.getTimestamp(deviceId);
+            } catch (IllegalStateException e) {
+                log.info("Timestamp was not available for device {}", deviceId);
+                log.debug("  discarding {}", portDescriptions);
+                // Failed to generate timestamp.
 
-            return Collections.emptyList();
+                // Possible situation:
+                //  Device connected and became master for short period of time,
+                // but lost mastership before this instance had the chance to
+                // retrieve term information.
+
+                // Information dropped here is expected to be recoverable by
+                // device probing after mastership change
+
+                return Collections.emptyList();
+            }
+            log.debug("timestamp for {} {}", deviceId, newTimestamp);
+
+            final Timestamped<List<PortDescription>> timestampedInput
+                    = new Timestamped<>(portDescriptions, newTimestamp);
+            final Timestamped<List<PortDescription>> merged;
+
+            final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+
+            synchronized (device) {
+                deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
+                final DeviceDescriptions descs = device.get(providerId);
+                List<PortDescription> mergedList =
+                        FluentIterable.from(portDescriptions)
+                                .transform(new Function<PortDescription, PortDescription>() {
+                                    @Override
+                                    public PortDescription apply(PortDescription input) {
+                                        // lookup merged port description
+                                        return descs.getPortDesc(input.portNumber()).value();
+                                    }
+                                }).toList();
+                merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
+            }
+
+            if (!deviceEvents.isEmpty()) {
+                log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
+                        providerId, deviceId);
+                notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
+            }
+
+        } else {
+
+            PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
+            ClusterMessage clusterMessage = new ClusterMessage(
+                    localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
+            try {
+                clusterCommunicator.unicast(clusterMessage, deviceNode);
+            } catch (IOException e) {
+                log.warn("Failed to process injected ports of device id: {} " +
+                                "(cluster messaging failed: {})",
+                        deviceId, e);
+            }
         }
-        log.debug("timestamp for {} {}", deviceId, newTimestamp);
 
-        final Timestamped<List<PortDescription>> timestampedInput
-                = new Timestamped<>(portDescriptions, newTimestamp);
-        final List<DeviceEvent> events;
-        final Timestamped<List<PortDescription>> merged;
-
-        final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
-        synchronized (device) {
-            events = updatePortsInternal(providerId, deviceId, timestampedInput);
-            final DeviceDescriptions descs = device.get(providerId);
-            List<PortDescription> mergedList =
-                    FluentIterable.from(portDescriptions)
-                .transform(new Function<PortDescription, PortDescription>() {
-                    @Override
-                    public PortDescription apply(PortDescription input) {
-                        // lookup merged port description
-                        return descs.getPortDesc(input.portNumber()).value();
-                    }
-                }).toList();
-            merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
-        }
-        if (!events.isEmpty()) {
-            log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
-                    providerId, deviceId);
-            notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
-        }
-        return events;
+        return deviceEvents;
     }
 
     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -1431,4 +1494,48 @@
             });
         }
     }
+
+    private final class DeviceInjectedEventListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.debug("Received injected device event from peer: {}", message.sender());
+            DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            DeviceDescription deviceDescription = event.deviceDescription();
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    createOrUpdateDevice(providerId, deviceId, deviceDescription);
+                }
+            });
+        }
+    }
+
+    private final class PortInjectedEventListener
+            implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.debug("Received injected port event from peer: {}", message.sender());
+            PortInjectedEvent event = SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            List<PortDescription> portDescriptions = event.portDescriptions();
+
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    updatePorts(providerId, deviceId, portDescriptions);
+                }
+            });
+        }
+    }
 }