DeviceStore update

- Add off-line/remove handling to Gossip~
- Backport lock scope to Simple~

Change-Id: I5b4c8e12738ef78920341fb8699c4b07bde8712a
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 85d9b07..8316769 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -2,7 +2,8 @@
 
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
 import org.apache.felix.scr.annotations.Activate;
@@ -59,7 +60,7 @@
 import static org.onlab.onos.net.DefaultAnnotations.union;
 import static com.google.common.base.Verify.verify;
 
-// TODO: implement remove event handling and call *Internal
+// TODO: give me a better name
 /**
  * Manages inventory of infrastructure devices using gossip protocol to distribute
  * information.
@@ -79,14 +80,18 @@
     // collection of Description given from various providers
     private final ConcurrentMap<DeviceId,
                             ConcurrentMap<ProviderId, DeviceDescriptions>>
-                                deviceDescs = new ConcurrentHashMap<>();
+                                deviceDescs = Maps.newConcurrentMap();
 
     // cache of Device and Ports generated by compositing descriptions from providers
-    private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
-    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
+
+    // to be updated under Device lock
+    private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
+    private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
 
     // available(=UP) devices
-    private final Set<DeviceId> availableDevices = new HashSet<>();
+    private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClockService clockService;
@@ -121,7 +126,8 @@
     }
 
     @Override
-    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
+                                     DeviceId deviceId,
                                      DeviceDescription deviceDescription) {
         Timestamp newTimestamp = clockService.getTimestamp(deviceId);
         final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
@@ -133,22 +139,26 @@
         return event;
     }
 
-    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId,
-                Timestamped<DeviceDescription> deltaDesc) {
+    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
+                                    DeviceId deviceId,
+                                    Timestamped<DeviceDescription> deltaDesc) {
 
         // Collection of DeviceDescriptions for a Device
         ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
             = getDeviceDescriptions(deviceId);
 
-
-        DeviceDescriptions descs
-            = createIfAbsentUnchecked(providerDescs, providerId,
-                    new InitDeviceDescs(deltaDesc));
-
-        // update description
         synchronized (providerDescs) {
             // locking per device
 
+            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+                log.debug("Ignoring outdated event: {}", deltaDesc);
+                return null;
+            }
+
+            DeviceDescriptions descs
+                = createIfAbsentUnchecked(providerDescs, providerId,
+                    new InitDeviceDescs(deltaDesc));
+
             final Device oldDevice = devices.get(deviceId);
             final Device newDevice;
 
@@ -163,18 +173,18 @@
             }
             if (oldDevice == null) {
                 // ADD
-                return createDevice(providerId, newDevice);
+                return createDevice(providerId, newDevice, deltaDesc.timestamp());
             } else {
                 // UPDATE or ignore (no change or stale)
-                return updateDevice(providerId, oldDevice, newDevice);
+                return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
             }
         }
     }
 
     // Creates the device and returns the appropriate event if necessary.
-    // Guarded by deviceDescs value (=locking Device)
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent createDevice(ProviderId providerId,
-                                    Device newDevice) {
+                                     Device newDevice, Timestamp timestamp) {
 
         // update composed device cache
         Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
@@ -183,16 +193,17 @@
                 providerId, oldDevice, newDevice);
 
         if (!providerId.isAncillary()) {
-            availableDevices.add(newDevice.id());
+            markOnline(newDevice.id(), timestamp);
         }
 
         return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
     }
 
     // Updates the device and returns the appropriate event if necessary.
-    // Guarded by deviceDescs value (=locking Device)
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent updateDevice(ProviderId providerId,
-                                     Device oldDevice, Device newDevice) {
+                                     Device oldDevice,
+                                     Device newDevice, Timestamp newTimestamp) {
 
         // We allow only certain attributes to trigger update
         if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
@@ -207,14 +218,14 @@
                         , newDevice);
             }
             if (!providerId.isAncillary()) {
-                availableDevices.add(newDevice.id());
+                markOnline(newDevice.id(), newTimestamp);
             }
             return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
         }
 
         // Otherwise merely attempt to change availability if primary provider
         if (!providerId.isAncillary()) {
-            boolean added = availableDevices.add(newDevice.id());
+            boolean added = markOnline(newDevice.id(), newTimestamp);
             return !added ? null :
                     new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
         }
@@ -223,11 +234,29 @@
 
     @Override
     public DeviceEvent markOffline(DeviceId deviceId) {
-        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+        Timestamp timestamp = clockService.getTimestamp(deviceId);
+        return markOfflineInternal(deviceId, timestamp);
+    }
+
+    private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
+
+        Map<ProviderId, DeviceDescriptions> providerDescs
             = getDeviceDescriptions(deviceId);
 
         // locking device
         synchronized (providerDescs) {
+
+            // accept off-line if given timestamp is newer than
+            // the latest Timestamp from Primary provider
+            DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
+            Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+            if (timestamp.compareTo(lastTimestamp) <= 0) {
+                // outdated event ignore
+                return null;
+            }
+
+            offline.put(deviceId, timestamp);
+
             Device device = devices.get(deviceId);
             if (device == null) {
                 return null;
@@ -236,15 +265,37 @@
             if (removed) {
                 // TODO: broadcast ... DOWN only?
                 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
-
             }
             return null;
         }
     }
 
+    /**
+     * Marks the device as available if the given timestamp is not outdated,
+     * compared to the time the device has been marked offline.
+     *
+     * @param deviceId identifier of the device
+     * @param timestamp of the event triggering this change.
+     * @return true if availability change request was accepted and changed the state
+     */
+    // Guarded by deviceDescs value (=Device lock)
+    private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
+        // accept on-line if given timestamp is newer than
+        // the latest offline request Timestamp
+        Timestamp offlineTimestamp = offline.get(deviceId);
+        if (offlineTimestamp == null ||
+            offlineTimestamp.compareTo(timestamp) < 0) {
+
+            offline.remove(deviceId);
+            return availableDevices.add(deviceId);
+        }
+        return false;
+    }
+
     @Override
-    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
-            List<PortDescription> portDescriptions) {
+    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
+                                       DeviceId deviceId,
+                                       List<PortDescription> portDescriptions) {
         Timestamp newTimestamp = clockService.getTimestamp(deviceId);
 
         List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
@@ -252,7 +303,8 @@
             deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
         }
 
-        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs);
+        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
+                          new Timestamped<>(portDescriptions, newTimestamp));
         if (!events.isEmpty()) {
             // FIXME: broadcast deltaDesc, UP
             log.debug("broadcast deltaDesc");
@@ -261,8 +313,9 @@
 
     }
 
-    private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId,
-                List<Timestamped<PortDescription>> deltaDescs) {
+    private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
+                                DeviceId deviceId,
+                                Timestamped<List<PortDescription>> portDescriptions) {
 
         Device device = devices.get(deviceId);
         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -270,30 +323,41 @@
         ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
 
-        DeviceDescriptions descs = descsMap.get(providerId);
-        // every provider must provide DeviceDescription.
-        checkArgument(descs != null,
-                "Device description for Device ID %s from Provider %s was not found",
-                deviceId, providerId);
-
         List<DeviceEvent> events = new ArrayList<>();
         synchronized (descsMap) {
+
+            if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
+                log.debug("Ignoring outdated events: {}", portDescriptions);
+                return null;
+            }
+
+            DeviceDescriptions descs = descsMap.get(providerId);
+            // every provider must provide DeviceDescription.
+            checkArgument(descs != null,
+                    "Device description for Device ID %s from Provider %s was not found",
+                    deviceId, providerId);
+
             Map<PortNumber, Port> ports = getPortMap(deviceId);
 
+            final Timestamp newTimestamp = portDescriptions.timestamp();
+
             // Add new ports
             Set<PortNumber> processed = new HashSet<>();
-            for (Timestamped<PortDescription> deltaDesc : deltaDescs) {
-                final PortNumber number = deltaDesc.value().portNumber();
+            for (PortDescription portDescription : portDescriptions.value()) {
+                final PortNumber number = portDescription.portNumber();
+                processed.add(number);
+
                 final Port oldPort = ports.get(number);
                 final Port newPort;
 
+
                 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
                 if (existingPortDesc == null ||
-                    deltaDesc == existingPortDesc ||
-                    deltaDesc.isNewer(existingPortDesc)) {
+                    newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
                     // on new port or valid update
                     // update description
-                    descs.putPortDesc(deltaDesc);
+                    descs.putPortDesc(new Timestamped<>(portDescription,
+                                            portDescriptions.timestamp()));
                     newPort = composePort(device, number, descsMap);
                 } else {
                     // outdated event, ignored.
@@ -303,7 +367,6 @@
                 events.add(oldPort == null ?
                                    createPort(device, newPort, ports) :
                                    updatePort(device, oldPort, newPort, ports));
-                processed.add(number);
             }
 
             events.addAll(pruneOldPorts(device, ports, processed));
@@ -313,7 +376,7 @@
 
     // Creates a new port based on the port description adds it to the map and
     // Returns corresponding event.
-    // Guarded by deviceDescs value (=locking Device)
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent createPort(Device device, Port newPort,
                                    Map<PortNumber, Port> ports) {
         ports.put(newPort.number(), newPort);
@@ -322,7 +385,7 @@
 
     // Checks if the specified port requires update and if so, it replaces the
     // existing entry in the map and returns corresponding event.
-    // Guarded by deviceDescs value (=locking Device)
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent updatePort(Device device, Port oldPort,
                                    Port newPort,
                                    Map<PortNumber, Port> ports) {
@@ -337,7 +400,7 @@
 
     // Prunes the specified list of ports based on which ports are in the
     // processed list and returns list of corresponding events.
-    // Guarded by deviceDescs value (=locking Device)
+    // Guarded by deviceDescs value (=Device lock)
     private List<DeviceEvent> pruneOldPorts(Device device,
                                             Map<PortNumber, Port> ports,
                                             Set<PortNumber> processed) {
@@ -389,13 +452,19 @@
         ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
 
-        DeviceDescriptions descs = descsMap.get(providerId);
-        // assuming all providers must to give DeviceDescription
-        checkArgument(descs != null,
-                "Device description for Device ID %s from Provider %s was not found",
-                deviceId, providerId);
-
         synchronized (descsMap) {
+
+            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+                log.debug("Ignoring outdated event: {}", deltaDesc);
+                return null;
+            }
+
+            DeviceDescriptions descs = descsMap.get(providerId);
+            // assuming all providers must to give DeviceDescription
+            checkArgument(descs != null,
+                    "Device description for Device ID %s from Provider %s was not found",
+                    deviceId, providerId);
+
             ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
             final PortNumber number = deltaDesc.value().portNumber();
             final Port oldPort = ports.get(number);
@@ -443,19 +512,51 @@
     }
 
     @Override
-    public DeviceEvent removeDevice(DeviceId deviceId) {
-        ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
+    public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
+        Timestamp timestamp = clockService.getTimestamp(deviceId);
+        DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
+        // TODO: broadcast removal event
+        return event;
+    }
+
+    private DeviceEvent removeDeviceInternal(DeviceId deviceId,
+                                             Timestamp timestamp) {
+
+        Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
         synchronized (descs) {
+            // accept removal request if given timestamp is newer than
+            // the latest Timestamp from Primary provider
+            DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
+            Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+            if (timestamp.compareTo(lastTimestamp) <= 0) {
+                // outdated event ignore
+                return null;
+            }
+            removalRequest.put(deviceId, timestamp);
+
             Device device = devices.remove(deviceId);
             // should DEVICE_REMOVED carry removed ports?
-            devicePorts.get(deviceId).clear();
-            availableDevices.remove(deviceId);
+            Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+            if (ports != null) {
+                ports.clear();
+            }
+            markOfflineInternal(deviceId, timestamp);
             descs.clear();
             return device == null ? null :
                 new DeviceEvent(DEVICE_REMOVED, device, null);
         }
     }
 
+    private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
+        Timestamp removalTimestamp = removalRequest.get(deviceId);
+        if (removalTimestamp != null &&
+            removalTimestamp.compareTo(timestampToCheck) >= 0) {
+            // removalRequest is more recent
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Returns a Device, merging description given from multiple Providers.
      *
@@ -472,7 +573,7 @@
 
         DeviceDescriptions desc = providerDescs.get(primary);
 
-        DeviceDescription base = desc.getDeviceDesc().value();
+        final DeviceDescription base = desc.getDeviceDesc().value();
         Type type = base.type();
         String manufacturer = base.manufacturer();
         String hwVersion = base.hwVersion();
@@ -545,7 +646,7 @@
      * @return primary ProviderID, or randomly chosen one if none exists
      */
     private ProviderId pickPrimaryPID(
-            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+            Map<ProviderId, DeviceDescriptions> providerDescs) {
         ProviderId fallBackPrimary = null;
         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
             if (!e.getKey().isAncillary()) {
@@ -558,6 +659,12 @@
         return fallBackPrimary;
     }
 
+    private DeviceDescriptions getPrimaryDescriptions(
+                            Map<ProviderId, DeviceDescriptions> providerDescs) {
+        ProviderId pid = pickPrimaryPID(providerDescs);
+        return providerDescs.get(pid);
+    }
+
     public static final class InitDeviceDescs
         implements ConcurrentInitializer<DeviceDescriptions> {
 
@@ -586,6 +693,16 @@
             this.portDescs = new ConcurrentHashMap<>();
         }
 
+        Timestamp getLatestTimestamp() {
+            Timestamp latest = deviceDesc.get().timestamp();
+            for (Timestamped<PortDescription> desc : portDescs.values()) {
+                if (desc.timestamp().compareTo(latest) > 0) {
+                    latest = desc.timestamp();
+                }
+            }
+            return latest;
+        }
+
         public Timestamped<DeviceDescription> getDeviceDesc() {
             return deviceDesc.get();
         }