implemented GossipDeviceStore with multi-provider, annotation support

Change-Id: I1953bdc37b28af79703ebcfc9201a71a2af49ab2
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
similarity index 92%
rename from core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
index a99482f..4dfc88b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
@@ -17,9 +17,12 @@
 import org.onlab.onos.store.impl.OnosTimestamp;
 import org.slf4j.Logger;
 
+/**
+ * Clock service to issue Timestamp based on Device Mastership.
+ */
 @Component(immediate = true)
 @Service
-public class OnosClockService implements ClockService {
+public class DeviceClockManager implements ClockService {
 
     private final Logger log = getLogger(getClass());
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
new file mode 100644
index 0000000..0edbc21
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -0,0 +1,652 @@
+package org.onlab.onos.store.device.impl;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Device.Type;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Predicates.notNull;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.onos.net.DefaultAnnotations.merge;
+import static com.google.common.base.Verify.verify;
+
+// TODO: implement remove event handling and call *Internal
+/**
+ * Manages inventory of infrastructure devices using gossip protocol to distribute
+ * information.
+ */
+@Component(immediate = true)
+@Service
+public class GossipDeviceStore
+        extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
+        implements DeviceStore {
+
+    private final Logger log = getLogger(getClass());
+
+    public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+
+    // TODO: Check if inner Map can be replaced with plain Map
+    // innerMap is used to lock a Device, thus instance should never be replaced.
+    // collection of Description given from various providers
+    private final ConcurrentMap<DeviceId,
+                            ConcurrentMap<ProviderId, DeviceDescriptions>>
+                                deviceDescs = new ConcurrentHashMap<>();
+
+    // cache of Device and Ports generated by compositing descriptions from providers
+    private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+
+    // available(=UP) devices
+    private final Set<DeviceId> availableDevices = new HashSet<>();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClockService clockService;
+
+    @Activate
+    public void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        deviceDescs.clear();
+        devices.clear();
+        devicePorts.clear();
+        availableDevices.clear();
+        log.info("Stopped");
+    }
+
+    @Override
+    public int getDeviceCount() {
+        return devices.size();
+    }
+
+    @Override
+    public Iterable<Device> getDevices() {
+        return Collections.unmodifiableCollection(devices.values());
+    }
+
+    @Override
+    public Device getDevice(DeviceId deviceId) {
+        return devices.get(deviceId);
+    }
+
+    @Override
+    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+                                     DeviceDescription deviceDescription) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+        DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+        if (event != null) {
+            // FIXME: broadcast deltaDesc, UP
+            log.debug("broadcast deltaDesc");
+        }
+        return event;
+    }
+
+    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId,
+                Timestamped<DeviceDescription> deltaDesc) {
+
+        // Collection of DeviceDescriptions for a Device
+        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+            = createIfAbsentUnchecked(deviceDescs, deviceId,
+                    new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+
+        DeviceDescriptions descs
+            = createIfAbsentUnchecked(providerDescs, providerId,
+                    new InitDeviceDescs(deltaDesc));
+
+        // update description
+        synchronized (providerDescs) {
+            // locking per device
+
+            final Device oldDevice = devices.get(deviceId);
+            final Device newDevice;
+
+            if (deltaDesc == descs.getDeviceDesc() ||
+                deltaDesc.isNewer(descs.getDeviceDesc())) {
+                // on new device or valid update
+                descs.putDeviceDesc(deltaDesc);
+                newDevice = composeDevice(deviceId, providerDescs);
+            } else {
+                // outdated event, ignored.
+                return null;
+            }
+            if (oldDevice == null) {
+                // ADD
+                return createDevice(providerId, newDevice);
+            } else {
+                // UPDATE or ignore (no change or stale)
+                return updateDevice(providerId, oldDevice, newDevice);
+            }
+        }
+    }
+
+    // Creates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent createDevice(ProviderId providerId,
+                                    Device newDevice) {
+
+        // update composed device cache
+        Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+        verify(oldDevice == null,
+                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+                providerId, oldDevice, newDevice);
+
+        if (!providerId.isAncillary()) {
+            availableDevices.add(newDevice.id());
+        }
+
+        return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
+    }
+
+    // Updates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent updateDevice(ProviderId providerId,
+                                     Device oldDevice, Device newDevice) {
+
+        // We allow only certain attributes to trigger update
+        if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
+            !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
+            !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
+
+            boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+            if (!replaced) {
+                verify(replaced,
+                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+                        providerId, oldDevice, devices.get(newDevice.id())
+                        , newDevice);
+            }
+            if (!providerId.isAncillary()) {
+                availableDevices.add(newDevice.id());
+            }
+            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
+        }
+
+        // Otherwise merely attempt to change availability if primary provider
+        if (!providerId.isAncillary()) {
+            boolean added = availableDevices.add(newDevice.id());
+            return !added ? null :
+                    new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
+        }
+        return null;
+    }
+
+    @Override
+    public DeviceEvent markOffline(DeviceId deviceId) {
+        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+            = createIfAbsentUnchecked(deviceDescs, deviceId,
+                    new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+        // locking device
+        synchronized (providerDescs) {
+            Device device = devices.get(deviceId);
+            if (device == null) {
+                return null;
+            }
+            boolean removed = availableDevices.remove(deviceId);
+            if (removed) {
+                // TODO: broadcast ... DOWN only?
+                return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+
+            }
+            return null;
+        }
+    }
+
+    @Override
+    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
+            List<PortDescription> portDescriptions) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+
+        List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
+        for (PortDescription e : portDescriptions) {
+            deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
+        }
+
+        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs);
+        if (!events.isEmpty()) {
+            // FIXME: broadcast deltaDesc, UP
+            log.debug("broadcast deltaDesc");
+        }
+        return events;
+
+    }
+
+    private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId,
+                List<Timestamped<PortDescription>> deltaDescs) {
+
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        DeviceDescriptions descs = descsMap.get(providerId);
+        // every provider must provide DeviceDescription.
+        checkArgument(descs != null,
+                "Device description for Device ID %s from Provider %s was not found",
+                deviceId, providerId);
+
+        List<DeviceEvent> events = new ArrayList<>();
+        synchronized (descsMap) {
+            Map<PortNumber, Port> ports = getPortMap(deviceId);
+
+            // Add new ports
+            Set<PortNumber> processed = new HashSet<>();
+            for (Timestamped<PortDescription> deltaDesc : deltaDescs) {
+                final PortNumber number = deltaDesc.value().portNumber();
+                final Port oldPort = ports.get(number);
+                final Port newPort;
+
+                final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+                if (existingPortDesc == null ||
+                    deltaDesc == existingPortDesc ||
+                    deltaDesc.isNewer(existingPortDesc)) {
+                    // on new port or valid update
+                    // update description
+                    descs.putPortDesc(deltaDesc);
+                    newPort = composePort(device, number, descsMap);
+                } else {
+                    // outdated event, ignored.
+                    continue;
+                }
+
+                events.add(oldPort == null ?
+                                   createPort(device, newPort, ports) :
+                                   updatePort(device, oldPort, newPort, ports));
+                processed.add(number);
+            }
+
+            events.addAll(pruneOldPorts(device, ports, processed));
+        }
+        return FluentIterable.from(events).filter(notNull()).toList();
+    }
+
+    // Creates a new port based on the port description adds it to the map and
+    // Returns corresponding event.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent createPort(Device device, Port newPort,
+                                   Map<PortNumber, Port> ports) {
+        ports.put(newPort.number(), newPort);
+        return new DeviceEvent(PORT_ADDED, device, newPort);
+    }
+
+    // Checks if the specified port requires update and if so, it replaces the
+    // existing entry in the map and returns corresponding event.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent updatePort(Device device, Port oldPort,
+                                   Port newPort,
+                                   Map<PortNumber, Port> ports) {
+        if (oldPort.isEnabled() != newPort.isEnabled() ||
+            !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
+
+            ports.put(oldPort.number(), newPort);
+            return new DeviceEvent(PORT_UPDATED, device, newPort);
+        }
+        return null;
+    }
+
+    // Prunes the specified list of ports based on which ports are in the
+    // processed list and returns list of corresponding events.
+    // Guarded by deviceDescs value (=locking Device)
+    private List<DeviceEvent> pruneOldPorts(Device device,
+                                            Map<PortNumber, Port> ports,
+                                            Set<PortNumber> processed) {
+        List<DeviceEvent> events = new ArrayList<>();
+        Iterator<PortNumber> iterator = ports.keySet().iterator();
+        while (iterator.hasNext()) {
+            PortNumber portNumber = iterator.next();
+            if (!processed.contains(portNumber)) {
+                events.add(new DeviceEvent(PORT_REMOVED, device,
+                                           ports.get(portNumber)));
+                iterator.remove();
+            }
+        }
+        return events;
+    }
+
+    // Gets the map of ports for the specified device; if one does not already
+    // exist, it creates and registers a new one.
+    private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
+        return createIfAbsentUnchecked(devicePorts, deviceId,
+                new InitConcurrentHashMap<PortNumber, Port>());
+    }
+
+    @Override
+    public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+            PortDescription portDescription) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
+        DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+        if (event != null) {
+            // FIXME: broadcast deltaDesc
+            log.debug("broadcast deltaDesc");
+        }
+        return event;
+    }
+
+    private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
+                Timestamped<PortDescription> deltaDesc) {
+
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        DeviceDescriptions descs = descsMap.get(providerId);
+        // assuming all providers must to give DeviceDescription
+        checkArgument(descs != null,
+                "Device description for Device ID %s from Provider %s was not found",
+                deviceId, providerId);
+
+        synchronized (descsMap) {
+            ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+            final PortNumber number = deltaDesc.value().portNumber();
+            final Port oldPort = ports.get(number);
+            final Port newPort;
+
+            final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+            if (existingPortDesc == null ||
+                deltaDesc == existingPortDesc ||
+                deltaDesc.isNewer(existingPortDesc)) {
+                // on new port or valid update
+                // update description
+                descs.putPortDesc(deltaDesc);
+                newPort = composePort(device, number, descsMap);
+            } else {
+                // outdated event, ignored.
+                return null;
+            }
+
+            if (oldPort == null) {
+                return createPort(device, newPort, ports);
+            } else {
+                return updatePort(device, oldPort, newPort, ports);
+            }
+        }
+    }
+
+    @Override
+    public List<Port> getPorts(DeviceId deviceId) {
+        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+        if (ports == null) {
+            return Collections.emptyList();
+        }
+        return ImmutableList.copyOf(ports.values());
+    }
+
+    @Override
+    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+        return ports == null ? null : ports.get(portNumber);
+    }
+
+    @Override
+    public boolean isAvailable(DeviceId deviceId) {
+        return availableDevices.contains(deviceId);
+    }
+
+    @Override
+    public DeviceEvent removeDevice(DeviceId deviceId) {
+        synchronized (this) {
+            Device device = devices.remove(deviceId);
+            return device == null ? null :
+                    new DeviceEvent(DEVICE_REMOVED, device, null);
+        }
+    }
+
+    private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
+        if (lhs == rhs) {
+            return true;
+        }
+        if (lhs == null || rhs == null) {
+            return false;
+        }
+
+        if (!lhs.keys().equals(rhs.keys())) {
+            return false;
+        }
+
+        for (String key : lhs.keys()) {
+            if (!lhs.value(key).equals(rhs.value(key))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Returns a Device, merging description given from multiple Providers.
+     *
+     * @param deviceId device identifier
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Device instance
+     */
+    private Device composeDevice(DeviceId deviceId,
+            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+
+        DeviceDescriptions desc = providerDescs.get(primary);
+
+        DeviceDescription base = desc.getDeviceDesc().value();
+        Type type = base.type();
+        String manufacturer = base.manufacturer();
+        String hwVersion = base.hwVersion();
+        String swVersion = base.swVersion();
+        String serialNumber = base.serialNumber();
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+        annotations = merge(annotations, base.annotations());
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // TODO: should keep track of Description timestamp
+            // and only merge conflicting keys when timestamp is newer
+            // Currently assuming there will never be a key conflict between
+            // providers
+
+            // annotation merging. not so efficient, should revisit later
+            annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
+        }
+
+        return new DefaultDevice(primary, deviceId , type, manufacturer,
+                            hwVersion, swVersion, serialNumber, annotations);
+    }
+
+    /**
+     * Returns a Port, merging description given from multiple Providers.
+     *
+     * @param device device the port is on
+     * @param number port number
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Port instance
+     */
+    private Port composePort(Device device, PortNumber number,
+                ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+        DeviceDescriptions primDescs = providerDescs.get(primary);
+        // if no primary, assume not enabled
+        // TODO: revisit this default port enabled/disabled behavior
+        boolean isEnabled = false;
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+
+        final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
+        if (portDesc != null) {
+            isEnabled = portDesc.value().isEnabled();
+            annotations = merge(annotations, portDesc.value().annotations());
+        }
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // TODO: should keep track of Description timestamp
+            // and only merge conflicting keys when timestamp is newer
+            // Currently assuming there will never be a key conflict between
+            // providers
+
+            // annotation merging. not so efficient, should revisit later
+            final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
+            if (otherPortDesc != null) {
+                annotations = merge(annotations, otherPortDesc.value().annotations());
+            }
+        }
+
+        return new DefaultPort(device, number, isEnabled, annotations);
+    }
+
+    /**
+     * @return primary ProviderID, or randomly chosen one if none exists
+     */
+    private ProviderId pickPrimaryPID(
+            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+        ProviderId fallBackPrimary = null;
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (!e.getKey().isAncillary()) {
+                return e.getKey();
+            } else if (fallBackPrimary == null) {
+                // pick randomly as a fallback in case there is no primary
+                fallBackPrimary = e.getKey();
+            }
+        }
+        return fallBackPrimary;
+    }
+
+    private static final class InitConcurrentHashMap<K, V> implements
+            ConcurrentInitializer<ConcurrentMap<K, V>> {
+        @Override
+        public ConcurrentMap<K, V> get() throws ConcurrentException {
+            return new ConcurrentHashMap<>();
+        }
+    }
+
+    public static final class InitDeviceDescs
+        implements ConcurrentInitializer<DeviceDescriptions> {
+
+        private final Timestamped<DeviceDescription> deviceDesc;
+
+        public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
+            this.deviceDesc = checkNotNull(deviceDesc);
+        }
+        @Override
+        public DeviceDescriptions get() throws ConcurrentException {
+            return new DeviceDescriptions(deviceDesc);
+        }
+    }
+
+
+    /**
+     * Collection of Description of a Device and it's Ports given from a Provider.
+     */
+    public static class DeviceDescriptions {
+
+        private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
+        private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
+
+        public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
+            this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
+            this.portDescs = new ConcurrentHashMap<>();
+        }
+
+        public Timestamped<DeviceDescription> getDeviceDesc() {
+            return deviceDesc.get();
+        }
+
+        public Timestamped<PortDescription> getPortDesc(PortNumber number) {
+            return portDescs.get(number);
+        }
+
+        /**
+         * Puts DeviceDescription, merging annotations as necessary.
+         *
+         * @param newDesc new DeviceDescription
+         * @return previous DeviceDescription
+         */
+        public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+            Timestamped<DeviceDescription> oldOne = deviceDesc.get();
+            Timestamped<DeviceDescription> newOne = newDesc;
+            if (oldOne != null) {
+                SparseAnnotations merged = merge(oldOne.value().annotations(),
+                                                 newDesc.value().annotations());
+                newOne = new Timestamped<DeviceDescription>(
+                        new DefaultDeviceDescription(newDesc.value(), merged),
+                        newDesc.timestamp());
+            }
+            return deviceDesc.getAndSet(newOne);
+        }
+
+        /**
+         * Puts PortDescription, merging annotations as necessary.
+         *
+         * @param newDesc new PortDescription
+         * @return previous PortDescription
+         */
+        public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
+            Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
+            Timestamped<PortDescription> newOne = newDesc;
+            if (oldOne != null) {
+                SparseAnnotations merged = merge(oldOne.value().annotations(),
+                                                 newDesc.value().annotations());
+                newOne = new Timestamped<PortDescription>(
+                        new DefaultPortDescription(newDesc.value(), merged),
+                        newDesc.timestamp());
+            }
+            return portDescs.put(newOne.value().portNumber(), newOne);
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
deleted file mode 100644
index c858aba..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
+++ /dev/null
@@ -1,340 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import static com.google.common.base.Predicates.notNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.DeviceEvent;
-import org.onlab.onos.net.device.DeviceStore;
-import org.onlab.onos.net.device.DeviceStoreDelegate;
-import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-//TODO: Add support for multiple provider and annotations
-/**
- * Manages inventory of infrastructure devices using a protocol that takes into consideration
- * the order in which device events occur.
- */
-@Component(immediate = true)
-@Service
-public class OnosDistributedDeviceStore
-        extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
-        implements DeviceStore {
-
-    private final Logger log = getLogger(getClass());
-
-    public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
-
-    private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
-    private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClockService clockService;
-
-    @Activate
-    public void activate() {
-
-        devices = new ConcurrentHashMap<>();
-        devicePorts = new ConcurrentHashMap<>();
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    @Override
-    public int getDeviceCount() {
-        return devices.size();
-    }
-
-    @Override
-    public Iterable<Device> getDevices() {
-        Builder<Device> builder = ImmutableSet.builder();
-        synchronized (this) {
-            for (VersionedValue<Device> device : devices.values()) {
-                builder.add(device.entity());
-            }
-            return builder.build();
-        }
-    }
-
-    @Override
-    public Device getDevice(DeviceId deviceId) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-        return device.entity();
-    }
-
-    @Override
-    public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
-                                            DeviceDescription deviceDescription) {
-        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-        VersionedValue<Device> device = devices.get(deviceId);
-
-        if (device == null) {
-            return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
-        }
-
-        checkState(newTimestamp.compareTo(device.timestamp()) > 0,
-                "Existing device has a timestamp in the future!");
-
-        return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
-    }
-
-    // Creates the device and returns the appropriate event if necessary.
-    private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
-                                     DeviceDescription desc, Timestamp timestamp) {
-        Device device = new DefaultDevice(providerId, deviceId, desc.type(),
-                                                 desc.manufacturer(),
-                                                 desc.hwVersion(), desc.swVersion(),
-                                                 desc.serialNumber());
-
-        devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
-        // TODO,FIXME: broadcast a message telling peers of a device event.
-        return new DeviceEvent(DEVICE_ADDED, device, null);
-    }
-
-    // Updates the device and returns the appropriate event if necessary.
-    private DeviceEvent updateDevice(ProviderId providerId, Device device,
-                                     DeviceDescription desc, Timestamp timestamp) {
-        // We allow only certain attributes to trigger update
-        if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
-                !Objects.equals(device.swVersion(), desc.swVersion())) {
-
-            Device updated = new DefaultDevice(providerId, device.id(),
-                                                      desc.type(),
-                                                      desc.manufacturer(),
-                                                      desc.hwVersion(),
-                                                      desc.swVersion(),
-                                                      desc.serialNumber());
-            devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
-            // FIXME: broadcast a message telling peers of a device event.
-            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
-        }
-
-        // Otherwise merely attempt to change availability
-        Device updated = new DefaultDevice(providerId, device.id(),
-                desc.type(),
-                desc.manufacturer(),
-                desc.hwVersion(),
-                desc.swVersion(),
-                desc.serialNumber());
-
-        VersionedValue<Device> oldDevice = devices.put(device.id(),
-                new VersionedValue<Device>(updated, true, timestamp));
-        if (!oldDevice.isUp()) {
-            return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public DeviceEvent markOffline(DeviceId deviceId) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        boolean willRemove = device != null && device.isUp();
-        if (!willRemove) {
-            return null;
-        }
-        Timestamp timestamp = clockService.getTimestamp(deviceId);
-        if (replaceIfLatest(device.entity(), false, timestamp)) {
-            return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
-        }
-        return null;
-    }
-
-    // Replace existing value if its timestamp is older.
-    private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
-        VersionedValue<Device> existingValue = devices.get(device.id());
-        if (timestamp.compareTo(existingValue.timestamp()) > 0) {
-            devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
-                                         List<PortDescription> portDescriptions) {
-        List<DeviceEvent> events = new ArrayList<>();
-        synchronized (this) {
-            VersionedValue<Device> device = devices.get(deviceId);
-            checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-            Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
-            Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-
-            // Add new ports
-            Set<PortNumber> processed = new HashSet<>();
-            for (PortDescription portDescription : portDescriptions) {
-                VersionedValue<Port> port = ports.get(portDescription.portNumber());
-                if (port == null) {
-                    events.add(createPort(device, portDescription, ports, newTimestamp));
-                }
-                checkState(newTimestamp.compareTo(port.timestamp()) > 0,
-                        "Existing port state has a timestamp in the future!");
-                events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
-                processed.add(portDescription.portNumber());
-            }
-
-            updatePortMap(deviceId, ports);
-
-            events.addAll(pruneOldPorts(device.entity(), ports, processed));
-        }
-        return FluentIterable.from(events).filter(notNull()).toList();
-    }
-
-    // Creates a new port based on the port description adds it to the map and
-    // Returns corresponding event.
-    //@GuardedBy("this")
-    private DeviceEvent createPort(VersionedValue<Device> device, PortDescription portDescription,
-                                   Map<PortNumber, VersionedValue<Port>> ports, Timestamp timestamp) {
-        Port port = new DefaultPort(device.entity(), portDescription.portNumber(),
-                                           portDescription.isEnabled());
-        ports.put(port.number(), new VersionedValue<Port>(port, true, timestamp));
-        updatePortMap(device.entity().id(), ports);
-        return new DeviceEvent(PORT_ADDED, device.entity(), port);
-    }
-
-    // Checks if the specified port requires update and if so, it replaces the
-    // existing entry in the map and returns corresponding event.
-    //@GuardedBy("this")
-    private DeviceEvent updatePort(Device device, Port port,
-                                   PortDescription portDescription,
-                                   Map<PortNumber, VersionedValue<Port>> ports,
-                                   Timestamp timestamp) {
-        if (port.isEnabled() != portDescription.isEnabled()) {
-            VersionedValue<Port> updatedPort = new VersionedValue<Port>(
-                    new DefaultPort(device, portDescription.portNumber(),
-                                    portDescription.isEnabled()),
-                    portDescription.isEnabled(),
-                    timestamp);
-            ports.put(port.number(), updatedPort);
-            updatePortMap(device.id(), ports);
-            return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
-        }
-        return null;
-    }
-
-    // Prunes the specified list of ports based on which ports are in the
-    // processed list and returns list of corresponding events.
-    //@GuardedBy("this")
-    private List<DeviceEvent> pruneOldPorts(Device device,
-                                            Map<PortNumber, VersionedValue<Port>> ports,
-                                            Set<PortNumber> processed) {
-        List<DeviceEvent> events = new ArrayList<>();
-        Iterator<PortNumber> iterator = ports.keySet().iterator();
-        while (iterator.hasNext()) {
-            PortNumber portNumber = iterator.next();
-            if (!processed.contains(portNumber)) {
-                events.add(new DeviceEvent(PORT_REMOVED, device,
-                                           ports.get(portNumber).entity()));
-                iterator.remove();
-            }
-        }
-        if (!events.isEmpty()) {
-            updatePortMap(device.id(), ports);
-        }
-        return events;
-    }
-
-    // Gets the map of ports for the specified device; if one does not already
-    // exist, it creates and registers a new one.
-    // WARN: returned value is a copy, changes made to the Map
-    //       needs to be written back using updatePortMap
-    //@GuardedBy("this")
-    private Map<PortNumber, VersionedValue<Port>> getPortMap(DeviceId deviceId) {
-        Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
-        if (ports == null) {
-            ports = new HashMap<>();
-            // this probably is waste of time in most cases.
-            updatePortMap(deviceId, ports);
-        }
-        return ports;
-    }
-
-    //@GuardedBy("this")
-    private void updatePortMap(DeviceId deviceId, Map<PortNumber, VersionedValue<Port>> ports) {
-        devicePorts.put(deviceId, ports);
-    }
-
-    @Override
-    public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
-                                        PortDescription portDescription) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-        Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
-        VersionedValue<Port> port = ports.get(portDescription.portNumber());
-        Timestamp timestamp = clockService.getTimestamp(deviceId);
-        return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
-    }
-
-    @Override
-    public List<Port> getPorts(DeviceId deviceId) {
-        Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
-        if (versionedPorts == null) {
-            return Collections.emptyList();
-        }
-        List<Port> ports = new ArrayList<>();
-        for (VersionedValue<Port> port : versionedPorts.values()) {
-            ports.add(port.entity());
-        }
-        return ports;
-    }
-
-    @Override
-    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
-        Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
-        return ports == null ? null : ports.get(portNumber).entity();
-    }
-
-    @Override
-    public boolean isAvailable(DeviceId deviceId) {
-        return devices.get(deviceId).isUp();
-    }
-
-    @Override
-    public DeviceEvent removeDevice(DeviceId deviceId) {
-        VersionedValue<Device> previousDevice = devices.remove(deviceId);
-        return previousDevice == null ? null :
-            new DeviceEvent(DEVICE_REMOVED, previousDevice.entity(), null);
-    }
-}