implemented GossipDeviceStore with multi-provider, annotation support

Change-Id: I1953bdc37b28af79703ebcfc9201a71a2af49ab2
diff --git a/core/api/src/main/java/org/onlab/onos/store/Timestamp.java b/core/api/src/main/java/org/onlab/onos/store/Timestamp.java
index b9d3648..b3caf85 100644
--- a/core/api/src/main/java/org/onlab/onos/store/Timestamp.java
+++ b/core/api/src/main/java/org/onlab/onos/store/Timestamp.java
@@ -2,7 +2,15 @@
 
 /**
  * Opaque version structure.
+ * <p>
+ * Classes implementing this interface must also implement
+ * {@link #hashCode()} and {@link #equals(Object)}.
  */
 public interface Timestamp extends Comparable<Timestamp> {
 
+    @Override
+    public abstract int hashCode();
+
+    @Override
+    public abstract boolean equals(Object obj);
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
new file mode 100644
index 0000000..2908b16
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
@@ -0,0 +1,79 @@
+package org.onlab.onos.store.common.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Wrapper class to store Timestamped value.
+ * @param <T>
+ */
+public final class Timestamped<T> {
+
+    private final Timestamp timestamp;
+    private final T value;
+
+    /**
+     * Creates a time stamped value.
+     *
+     * @param value to be timestamp
+     * @param timestamp the timestamp
+     */
+    public Timestamped(T value, Timestamp timestamp) {
+        this.value = checkNotNull(value);
+        this.timestamp = checkNotNull(timestamp);
+    }
+
+    /**
+     * Returns the value.
+     * @return value
+     */
+    public T value() {
+        return value;
+    }
+
+    /**
+     * Returns the time stamp.
+     * @return time stamp
+     */
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Tests if this timestamped value is newer than the other.
+     *
+     * @param other timestamped value
+     * @return true if this instance is newer.
+     */
+    public boolean isNewer(Timestamped<T> other) {
+        return this.timestamp.compareTo(checkNotNull(other).timestamp()) > 0;
+    }
+
+    @Override
+    public int hashCode() {
+        return timestamp.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof Timestamped)) {
+            return false;
+        }
+        @SuppressWarnings("unchecked")
+        Timestamped<T> that = (Timestamped<T>) obj;
+        return Objects.equals(this.timestamp, that.timestamp);
+    }
+
+    // Default constructor for serialization
+    @Deprecated
+    protected Timestamped() {
+        this.value = null;
+        this.timestamp = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java
new file mode 100644
index 0000000..992fd49
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Common abstractions and facilities for implementing distributed store
+ * using gossip protocol.
+ */
+package org.onlab.onos.store.common.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
similarity index 92%
rename from core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
index a99482f..4dfc88b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
@@ -17,9 +17,12 @@
 import org.onlab.onos.store.impl.OnosTimestamp;
 import org.slf4j.Logger;
 
+/**
+ * Clock service to issue Timestamp based on Device Mastership.
+ */
 @Component(immediate = true)
 @Service
-public class OnosClockService implements ClockService {
+public class DeviceClockManager implements ClockService {
 
     private final Logger log = getLogger(getClass());
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
new file mode 100644
index 0000000..0edbc21
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -0,0 +1,652 @@
+package org.onlab.onos.store.device.impl;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Device.Type;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Predicates.notNull;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.onos.net.DefaultAnnotations.merge;
+import static com.google.common.base.Verify.verify;
+
+// TODO: implement remove event handling and call *Internal
+/**
+ * Manages inventory of infrastructure devices using gossip protocol to distribute
+ * information.
+ */
+@Component(immediate = true)
+@Service
+public class GossipDeviceStore
+        extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
+        implements DeviceStore {
+
+    private final Logger log = getLogger(getClass());
+
+    public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+
+    // TODO: Check if inner Map can be replaced with plain Map
+    // innerMap is used to lock a Device, thus instance should never be replaced.
+    // collection of Description given from various providers
+    private final ConcurrentMap<DeviceId,
+                            ConcurrentMap<ProviderId, DeviceDescriptions>>
+                                deviceDescs = new ConcurrentHashMap<>();
+
+    // cache of Device and Ports generated by compositing descriptions from providers
+    private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+
+    // available(=UP) devices
+    private final Set<DeviceId> availableDevices = new HashSet<>();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClockService clockService;
+
+    @Activate
+    public void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        deviceDescs.clear();
+        devices.clear();
+        devicePorts.clear();
+        availableDevices.clear();
+        log.info("Stopped");
+    }
+
+    @Override
+    public int getDeviceCount() {
+        return devices.size();
+    }
+
+    @Override
+    public Iterable<Device> getDevices() {
+        return Collections.unmodifiableCollection(devices.values());
+    }
+
+    @Override
+    public Device getDevice(DeviceId deviceId) {
+        return devices.get(deviceId);
+    }
+
+    @Override
+    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+                                     DeviceDescription deviceDescription) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+        DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+        if (event != null) {
+            // FIXME: broadcast deltaDesc, UP
+            log.debug("broadcast deltaDesc");
+        }
+        return event;
+    }
+
+    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId,
+                Timestamped<DeviceDescription> deltaDesc) {
+
+        // Collection of DeviceDescriptions for a Device
+        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+            = createIfAbsentUnchecked(deviceDescs, deviceId,
+                    new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+
+        DeviceDescriptions descs
+            = createIfAbsentUnchecked(providerDescs, providerId,
+                    new InitDeviceDescs(deltaDesc));
+
+        // update description
+        synchronized (providerDescs) {
+            // locking per device
+
+            final Device oldDevice = devices.get(deviceId);
+            final Device newDevice;
+
+            if (deltaDesc == descs.getDeviceDesc() ||
+                deltaDesc.isNewer(descs.getDeviceDesc())) {
+                // on new device or valid update
+                descs.putDeviceDesc(deltaDesc);
+                newDevice = composeDevice(deviceId, providerDescs);
+            } else {
+                // outdated event, ignored.
+                return null;
+            }
+            if (oldDevice == null) {
+                // ADD
+                return createDevice(providerId, newDevice);
+            } else {
+                // UPDATE or ignore (no change or stale)
+                return updateDevice(providerId, oldDevice, newDevice);
+            }
+        }
+    }
+
+    // Creates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent createDevice(ProviderId providerId,
+                                    Device newDevice) {
+
+        // update composed device cache
+        Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+        verify(oldDevice == null,
+                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+                providerId, oldDevice, newDevice);
+
+        if (!providerId.isAncillary()) {
+            availableDevices.add(newDevice.id());
+        }
+
+        return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
+    }
+
+    // Updates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent updateDevice(ProviderId providerId,
+                                     Device oldDevice, Device newDevice) {
+
+        // We allow only certain attributes to trigger update
+        if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
+            !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
+            !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
+
+            boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+            if (!replaced) {
+                verify(replaced,
+                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+                        providerId, oldDevice, devices.get(newDevice.id())
+                        , newDevice);
+            }
+            if (!providerId.isAncillary()) {
+                availableDevices.add(newDevice.id());
+            }
+            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
+        }
+
+        // Otherwise merely attempt to change availability if primary provider
+        if (!providerId.isAncillary()) {
+            boolean added = availableDevices.add(newDevice.id());
+            return !added ? null :
+                    new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
+        }
+        return null;
+    }
+
+    @Override
+    public DeviceEvent markOffline(DeviceId deviceId) {
+        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+            = createIfAbsentUnchecked(deviceDescs, deviceId,
+                    new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+
+        // locking device
+        synchronized (providerDescs) {
+            Device device = devices.get(deviceId);
+            if (device == null) {
+                return null;
+            }
+            boolean removed = availableDevices.remove(deviceId);
+            if (removed) {
+                // TODO: broadcast ... DOWN only?
+                return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+
+            }
+            return null;
+        }
+    }
+
+    @Override
+    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
+            List<PortDescription> portDescriptions) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+
+        List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
+        for (PortDescription e : portDescriptions) {
+            deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
+        }
+
+        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs);
+        if (!events.isEmpty()) {
+            // FIXME: broadcast deltaDesc, UP
+            log.debug("broadcast deltaDesc");
+        }
+        return events;
+
+    }
+
+    private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId,
+                List<Timestamped<PortDescription>> deltaDescs) {
+
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        DeviceDescriptions descs = descsMap.get(providerId);
+        // every provider must provide DeviceDescription.
+        checkArgument(descs != null,
+                "Device description for Device ID %s from Provider %s was not found",
+                deviceId, providerId);
+
+        List<DeviceEvent> events = new ArrayList<>();
+        synchronized (descsMap) {
+            Map<PortNumber, Port> ports = getPortMap(deviceId);
+
+            // Add new ports
+            Set<PortNumber> processed = new HashSet<>();
+            for (Timestamped<PortDescription> deltaDesc : deltaDescs) {
+                final PortNumber number = deltaDesc.value().portNumber();
+                final Port oldPort = ports.get(number);
+                final Port newPort;
+
+                final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+                if (existingPortDesc == null ||
+                    deltaDesc == existingPortDesc ||
+                    deltaDesc.isNewer(existingPortDesc)) {
+                    // on new port or valid update
+                    // update description
+                    descs.putPortDesc(deltaDesc);
+                    newPort = composePort(device, number, descsMap);
+                } else {
+                    // outdated event, ignored.
+                    continue;
+                }
+
+                events.add(oldPort == null ?
+                                   createPort(device, newPort, ports) :
+                                   updatePort(device, oldPort, newPort, ports));
+                processed.add(number);
+            }
+
+            events.addAll(pruneOldPorts(device, ports, processed));
+        }
+        return FluentIterable.from(events).filter(notNull()).toList();
+    }
+
+    // Creates a new port based on the port description adds it to the map and
+    // Returns corresponding event.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent createPort(Device device, Port newPort,
+                                   Map<PortNumber, Port> ports) {
+        ports.put(newPort.number(), newPort);
+        return new DeviceEvent(PORT_ADDED, device, newPort);
+    }
+
+    // Checks if the specified port requires update and if so, it replaces the
+    // existing entry in the map and returns corresponding event.
+    // Guarded by deviceDescs value (=locking Device)
+    private DeviceEvent updatePort(Device device, Port oldPort,
+                                   Port newPort,
+                                   Map<PortNumber, Port> ports) {
+        if (oldPort.isEnabled() != newPort.isEnabled() ||
+            !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
+
+            ports.put(oldPort.number(), newPort);
+            return new DeviceEvent(PORT_UPDATED, device, newPort);
+        }
+        return null;
+    }
+
+    // Prunes the specified list of ports based on which ports are in the
+    // processed list and returns list of corresponding events.
+    // Guarded by deviceDescs value (=locking Device)
+    private List<DeviceEvent> pruneOldPorts(Device device,
+                                            Map<PortNumber, Port> ports,
+                                            Set<PortNumber> processed) {
+        List<DeviceEvent> events = new ArrayList<>();
+        Iterator<PortNumber> iterator = ports.keySet().iterator();
+        while (iterator.hasNext()) {
+            PortNumber portNumber = iterator.next();
+            if (!processed.contains(portNumber)) {
+                events.add(new DeviceEvent(PORT_REMOVED, device,
+                                           ports.get(portNumber)));
+                iterator.remove();
+            }
+        }
+        return events;
+    }
+
+    // Gets the map of ports for the specified device; if one does not already
+    // exist, it creates and registers a new one.
+    private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
+        return createIfAbsentUnchecked(devicePorts, deviceId,
+                new InitConcurrentHashMap<PortNumber, Port>());
+    }
+
+    @Override
+    public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+            PortDescription portDescription) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
+        DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+        if (event != null) {
+            // FIXME: broadcast deltaDesc
+            log.debug("broadcast deltaDesc");
+        }
+        return event;
+    }
+
+    private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
+                Timestamped<PortDescription> deltaDesc) {
+
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        DeviceDescriptions descs = descsMap.get(providerId);
+        // assuming all providers must to give DeviceDescription
+        checkArgument(descs != null,
+                "Device description for Device ID %s from Provider %s was not found",
+                deviceId, providerId);
+
+        synchronized (descsMap) {
+            ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+            final PortNumber number = deltaDesc.value().portNumber();
+            final Port oldPort = ports.get(number);
+            final Port newPort;
+
+            final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+            if (existingPortDesc == null ||
+                deltaDesc == existingPortDesc ||
+                deltaDesc.isNewer(existingPortDesc)) {
+                // on new port or valid update
+                // update description
+                descs.putPortDesc(deltaDesc);
+                newPort = composePort(device, number, descsMap);
+            } else {
+                // outdated event, ignored.
+                return null;
+            }
+
+            if (oldPort == null) {
+                return createPort(device, newPort, ports);
+            } else {
+                return updatePort(device, oldPort, newPort, ports);
+            }
+        }
+    }
+
+    @Override
+    public List<Port> getPorts(DeviceId deviceId) {
+        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+        if (ports == null) {
+            return Collections.emptyList();
+        }
+        return ImmutableList.copyOf(ports.values());
+    }
+
+    @Override
+    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+        return ports == null ? null : ports.get(portNumber);
+    }
+
+    @Override
+    public boolean isAvailable(DeviceId deviceId) {
+        return availableDevices.contains(deviceId);
+    }
+
+    @Override
+    public DeviceEvent removeDevice(DeviceId deviceId) {
+        synchronized (this) {
+            Device device = devices.remove(deviceId);
+            return device == null ? null :
+                    new DeviceEvent(DEVICE_REMOVED, device, null);
+        }
+    }
+
+    private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
+        if (lhs == rhs) {
+            return true;
+        }
+        if (lhs == null || rhs == null) {
+            return false;
+        }
+
+        if (!lhs.keys().equals(rhs.keys())) {
+            return false;
+        }
+
+        for (String key : lhs.keys()) {
+            if (!lhs.value(key).equals(rhs.value(key))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Returns a Device, merging description given from multiple Providers.
+     *
+     * @param deviceId device identifier
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Device instance
+     */
+    private Device composeDevice(DeviceId deviceId,
+            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+
+        DeviceDescriptions desc = providerDescs.get(primary);
+
+        DeviceDescription base = desc.getDeviceDesc().value();
+        Type type = base.type();
+        String manufacturer = base.manufacturer();
+        String hwVersion = base.hwVersion();
+        String swVersion = base.swVersion();
+        String serialNumber = base.serialNumber();
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+        annotations = merge(annotations, base.annotations());
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // TODO: should keep track of Description timestamp
+            // and only merge conflicting keys when timestamp is newer
+            // Currently assuming there will never be a key conflict between
+            // providers
+
+            // annotation merging. not so efficient, should revisit later
+            annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
+        }
+
+        return new DefaultDevice(primary, deviceId , type, manufacturer,
+                            hwVersion, swVersion, serialNumber, annotations);
+    }
+
+    /**
+     * Returns a Port, merging description given from multiple Providers.
+     *
+     * @param device device the port is on
+     * @param number port number
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Port instance
+     */
+    private Port composePort(Device device, PortNumber number,
+                ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+        DeviceDescriptions primDescs = providerDescs.get(primary);
+        // if no primary, assume not enabled
+        // TODO: revisit this default port enabled/disabled behavior
+        boolean isEnabled = false;
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+
+        final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
+        if (portDesc != null) {
+            isEnabled = portDesc.value().isEnabled();
+            annotations = merge(annotations, portDesc.value().annotations());
+        }
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // TODO: should keep track of Description timestamp
+            // and only merge conflicting keys when timestamp is newer
+            // Currently assuming there will never be a key conflict between
+            // providers
+
+            // annotation merging. not so efficient, should revisit later
+            final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
+            if (otherPortDesc != null) {
+                annotations = merge(annotations, otherPortDesc.value().annotations());
+            }
+        }
+
+        return new DefaultPort(device, number, isEnabled, annotations);
+    }
+
+    /**
+     * @return primary ProviderID, or randomly chosen one if none exists
+     */
+    private ProviderId pickPrimaryPID(
+            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+        ProviderId fallBackPrimary = null;
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (!e.getKey().isAncillary()) {
+                return e.getKey();
+            } else if (fallBackPrimary == null) {
+                // pick randomly as a fallback in case there is no primary
+                fallBackPrimary = e.getKey();
+            }
+        }
+        return fallBackPrimary;
+    }
+
+    private static final class InitConcurrentHashMap<K, V> implements
+            ConcurrentInitializer<ConcurrentMap<K, V>> {
+        @Override
+        public ConcurrentMap<K, V> get() throws ConcurrentException {
+            return new ConcurrentHashMap<>();
+        }
+    }
+
+    public static final class InitDeviceDescs
+        implements ConcurrentInitializer<DeviceDescriptions> {
+
+        private final Timestamped<DeviceDescription> deviceDesc;
+
+        public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
+            this.deviceDesc = checkNotNull(deviceDesc);
+        }
+        @Override
+        public DeviceDescriptions get() throws ConcurrentException {
+            return new DeviceDescriptions(deviceDesc);
+        }
+    }
+
+
+    /**
+     * Collection of Description of a Device and it's Ports given from a Provider.
+     */
+    public static class DeviceDescriptions {
+
+        private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
+        private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
+
+        public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
+            this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
+            this.portDescs = new ConcurrentHashMap<>();
+        }
+
+        public Timestamped<DeviceDescription> getDeviceDesc() {
+            return deviceDesc.get();
+        }
+
+        public Timestamped<PortDescription> getPortDesc(PortNumber number) {
+            return portDescs.get(number);
+        }
+
+        /**
+         * Puts DeviceDescription, merging annotations as necessary.
+         *
+         * @param newDesc new DeviceDescription
+         * @return previous DeviceDescription
+         */
+        public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+            Timestamped<DeviceDescription> oldOne = deviceDesc.get();
+            Timestamped<DeviceDescription> newOne = newDesc;
+            if (oldOne != null) {
+                SparseAnnotations merged = merge(oldOne.value().annotations(),
+                                                 newDesc.value().annotations());
+                newOne = new Timestamped<DeviceDescription>(
+                        new DefaultDeviceDescription(newDesc.value(), merged),
+                        newDesc.timestamp());
+            }
+            return deviceDesc.getAndSet(newOne);
+        }
+
+        /**
+         * Puts PortDescription, merging annotations as necessary.
+         *
+         * @param newDesc new PortDescription
+         * @return previous PortDescription
+         */
+        public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
+            Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
+            Timestamped<PortDescription> newOne = newDesc;
+            if (oldOne != null) {
+                SparseAnnotations merged = merge(oldOne.value().annotations(),
+                                                 newDesc.value().annotations());
+                newOne = new Timestamped<PortDescription>(
+                        new DefaultPortDescription(newDesc.value(), merged),
+                        newDesc.timestamp());
+            }
+            return portDescs.put(newOne.value().portNumber(), newOne);
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
deleted file mode 100644
index c858aba..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
+++ /dev/null
@@ -1,340 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import static com.google.common.base.Predicates.notNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.DeviceEvent;
-import org.onlab.onos.net.device.DeviceStore;
-import org.onlab.onos.net.device.DeviceStoreDelegate;
-import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-//TODO: Add support for multiple provider and annotations
-/**
- * Manages inventory of infrastructure devices using a protocol that takes into consideration
- * the order in which device events occur.
- */
-@Component(immediate = true)
-@Service
-public class OnosDistributedDeviceStore
-        extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
-        implements DeviceStore {
-
-    private final Logger log = getLogger(getClass());
-
-    public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
-
-    private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
-    private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClockService clockService;
-
-    @Activate
-    public void activate() {
-
-        devices = new ConcurrentHashMap<>();
-        devicePorts = new ConcurrentHashMap<>();
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    @Override
-    public int getDeviceCount() {
-        return devices.size();
-    }
-
-    @Override
-    public Iterable<Device> getDevices() {
-        Builder<Device> builder = ImmutableSet.builder();
-        synchronized (this) {
-            for (VersionedValue<Device> device : devices.values()) {
-                builder.add(device.entity());
-            }
-            return builder.build();
-        }
-    }
-
-    @Override
-    public Device getDevice(DeviceId deviceId) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-        return device.entity();
-    }
-
-    @Override
-    public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
-                                            DeviceDescription deviceDescription) {
-        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-        VersionedValue<Device> device = devices.get(deviceId);
-
-        if (device == null) {
-            return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
-        }
-
-        checkState(newTimestamp.compareTo(device.timestamp()) > 0,
-                "Existing device has a timestamp in the future!");
-
-        return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
-    }
-
-    // Creates the device and returns the appropriate event if necessary.
-    private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
-                                     DeviceDescription desc, Timestamp timestamp) {
-        Device device = new DefaultDevice(providerId, deviceId, desc.type(),
-                                                 desc.manufacturer(),
-                                                 desc.hwVersion(), desc.swVersion(),
-                                                 desc.serialNumber());
-
-        devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
-        // TODO,FIXME: broadcast a message telling peers of a device event.
-        return new DeviceEvent(DEVICE_ADDED, device, null);
-    }
-
-    // Updates the device and returns the appropriate event if necessary.
-    private DeviceEvent updateDevice(ProviderId providerId, Device device,
-                                     DeviceDescription desc, Timestamp timestamp) {
-        // We allow only certain attributes to trigger update
-        if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
-                !Objects.equals(device.swVersion(), desc.swVersion())) {
-
-            Device updated = new DefaultDevice(providerId, device.id(),
-                                                      desc.type(),
-                                                      desc.manufacturer(),
-                                                      desc.hwVersion(),
-                                                      desc.swVersion(),
-                                                      desc.serialNumber());
-            devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
-            // FIXME: broadcast a message telling peers of a device event.
-            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
-        }
-
-        // Otherwise merely attempt to change availability
-        Device updated = new DefaultDevice(providerId, device.id(),
-                desc.type(),
-                desc.manufacturer(),
-                desc.hwVersion(),
-                desc.swVersion(),
-                desc.serialNumber());
-
-        VersionedValue<Device> oldDevice = devices.put(device.id(),
-                new VersionedValue<Device>(updated, true, timestamp));
-        if (!oldDevice.isUp()) {
-            return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public DeviceEvent markOffline(DeviceId deviceId) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        boolean willRemove = device != null && device.isUp();
-        if (!willRemove) {
-            return null;
-        }
-        Timestamp timestamp = clockService.getTimestamp(deviceId);
-        if (replaceIfLatest(device.entity(), false, timestamp)) {
-            return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
-        }
-        return null;
-    }
-
-    // Replace existing value if its timestamp is older.
-    private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
-        VersionedValue<Device> existingValue = devices.get(device.id());
-        if (timestamp.compareTo(existingValue.timestamp()) > 0) {
-            devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
-                                         List<PortDescription> portDescriptions) {
-        List<DeviceEvent> events = new ArrayList<>();
-        synchronized (this) {
-            VersionedValue<Device> device = devices.get(deviceId);
-            checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-            Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
-            Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-
-            // Add new ports
-            Set<PortNumber> processed = new HashSet<>();
-            for (PortDescription portDescription : portDescriptions) {
-                VersionedValue<Port> port = ports.get(portDescription.portNumber());
-                if (port == null) {
-                    events.add(createPort(device, portDescription, ports, newTimestamp));
-                }
-                checkState(newTimestamp.compareTo(port.timestamp()) > 0,
-                        "Existing port state has a timestamp in the future!");
-                events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
-                processed.add(portDescription.portNumber());
-            }
-
-            updatePortMap(deviceId, ports);
-
-            events.addAll(pruneOldPorts(device.entity(), ports, processed));
-        }
-        return FluentIterable.from(events).filter(notNull()).toList();
-    }
-
-    // Creates a new port based on the port description adds it to the map and
-    // Returns corresponding event.
-    //@GuardedBy("this")
-    private DeviceEvent createPort(VersionedValue<Device> device, PortDescription portDescription,
-                                   Map<PortNumber, VersionedValue<Port>> ports, Timestamp timestamp) {
-        Port port = new DefaultPort(device.entity(), portDescription.portNumber(),
-                                           portDescription.isEnabled());
-        ports.put(port.number(), new VersionedValue<Port>(port, true, timestamp));
-        updatePortMap(device.entity().id(), ports);
-        return new DeviceEvent(PORT_ADDED, device.entity(), port);
-    }
-
-    // Checks if the specified port requires update and if so, it replaces the
-    // existing entry in the map and returns corresponding event.
-    //@GuardedBy("this")
-    private DeviceEvent updatePort(Device device, Port port,
-                                   PortDescription portDescription,
-                                   Map<PortNumber, VersionedValue<Port>> ports,
-                                   Timestamp timestamp) {
-        if (port.isEnabled() != portDescription.isEnabled()) {
-            VersionedValue<Port> updatedPort = new VersionedValue<Port>(
-                    new DefaultPort(device, portDescription.portNumber(),
-                                    portDescription.isEnabled()),
-                    portDescription.isEnabled(),
-                    timestamp);
-            ports.put(port.number(), updatedPort);
-            updatePortMap(device.id(), ports);
-            return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
-        }
-        return null;
-    }
-
-    // Prunes the specified list of ports based on which ports are in the
-    // processed list and returns list of corresponding events.
-    //@GuardedBy("this")
-    private List<DeviceEvent> pruneOldPorts(Device device,
-                                            Map<PortNumber, VersionedValue<Port>> ports,
-                                            Set<PortNumber> processed) {
-        List<DeviceEvent> events = new ArrayList<>();
-        Iterator<PortNumber> iterator = ports.keySet().iterator();
-        while (iterator.hasNext()) {
-            PortNumber portNumber = iterator.next();
-            if (!processed.contains(portNumber)) {
-                events.add(new DeviceEvent(PORT_REMOVED, device,
-                                           ports.get(portNumber).entity()));
-                iterator.remove();
-            }
-        }
-        if (!events.isEmpty()) {
-            updatePortMap(device.id(), ports);
-        }
-        return events;
-    }
-
-    // Gets the map of ports for the specified device; if one does not already
-    // exist, it creates and registers a new one.
-    // WARN: returned value is a copy, changes made to the Map
-    //       needs to be written back using updatePortMap
-    //@GuardedBy("this")
-    private Map<PortNumber, VersionedValue<Port>> getPortMap(DeviceId deviceId) {
-        Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
-        if (ports == null) {
-            ports = new HashMap<>();
-            // this probably is waste of time in most cases.
-            updatePortMap(deviceId, ports);
-        }
-        return ports;
-    }
-
-    //@GuardedBy("this")
-    private void updatePortMap(DeviceId deviceId, Map<PortNumber, VersionedValue<Port>> ports) {
-        devicePorts.put(deviceId, ports);
-    }
-
-    @Override
-    public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
-                                        PortDescription portDescription) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-        Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
-        VersionedValue<Port> port = ports.get(portDescription.portNumber());
-        Timestamp timestamp = clockService.getTimestamp(deviceId);
-        return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
-    }
-
-    @Override
-    public List<Port> getPorts(DeviceId deviceId) {
-        Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
-        if (versionedPorts == null) {
-            return Collections.emptyList();
-        }
-        List<Port> ports = new ArrayList<>();
-        for (VersionedValue<Port> port : versionedPorts.values()) {
-            ports.add(port.entity());
-        }
-        return ports;
-    }
-
-    @Override
-    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
-        Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
-        return ports == null ? null : ports.get(portNumber).entity();
-    }
-
-    @Override
-    public boolean isAvailable(DeviceId deviceId) {
-        return devices.get(deviceId).isUp();
-    }
-
-    @Override
-    public DeviceEvent removeDevice(DeviceId deviceId) {
-        VersionedValue<Device> previousDevice = devices.remove(deviceId);
-        return previousDevice == null ? null :
-            new DeviceEvent(DEVICE_REMOVED, previousDevice.entity(), null);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java b/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java
index 2005582..3039f32 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java
@@ -84,4 +84,11 @@
     public int sequenceNumber() {
         return sequenceNumber;
     }
+
+    // Default constructor for serialization
+    @Deprecated
+    protected OnosTimestamp() {
+        this.termNumber = -1;
+        this.sequenceNumber = -1;
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
index 127dc84..c675f84 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
@@ -1,4 +1,4 @@
 /**
  * Implementation of link store using distributed p2p synchronization protocol.
  */
-package org.onlab.onos.store.link.impl;
\ No newline at end of file
+package org.onlab.onos.store.link.impl;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
new file mode 100644
index 0000000..f7fc291
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
@@ -0,0 +1,112 @@
+package org.onlab.onos.store.common.impl;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.impl.OnosTimestamp;
+import org.onlab.util.KryoPool;
+
+import com.google.common.testing.EqualsTester;
+
+public class TimestampedTest {
+
+    private static final Timestamp TS_1_1 = new OnosTimestamp(1, 1);
+    private static final Timestamp TS_1_2 = new OnosTimestamp(1, 2);
+    private static final Timestamp TS_2_1 = new OnosTimestamp(2, 1);
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public final void testHashCode() {
+        Timestamped<String> a = new Timestamped<>("a", TS_1_1);
+        Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+        assertTrue("value does not impact hashCode",
+                a.hashCode() == b.hashCode());
+    }
+
+    @Test
+    public final void testEquals() {
+        Timestamped<String> a = new Timestamped<>("a", TS_1_1);
+        Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+        assertTrue("value does not impact equality",
+                a.equals(b));
+
+        new EqualsTester()
+        .addEqualityGroup(new Timestamped<>("a", TS_1_1),
+                          new Timestamped<>("b", TS_1_1),
+                          new Timestamped<>("c", TS_1_1))
+        .addEqualityGroup(new Timestamped<>("a", TS_1_2),
+                          new Timestamped<>("b", TS_1_2),
+                          new Timestamped<>("c", TS_1_2))
+        .addEqualityGroup(new Timestamped<>("a", TS_2_1),
+                          new Timestamped<>("b", TS_2_1),
+                          new Timestamped<>("c", TS_2_1))
+        .testEquals();
+
+    }
+
+    @Test
+    public final void testValue() {
+       final Integer n = Integer.valueOf(42);
+       Timestamped<Integer> tsv = new Timestamped<>(n, TS_1_1);
+       assertSame(n, tsv.value());
+
+    }
+
+    @Test(expected = NullPointerException.class)
+    public final void testValueNonNull() {
+        new Timestamped<>(null, TS_1_1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public final void testTimestampNonNull() {
+        new Timestamped<>("Foo", null);
+    }
+
+    @Test
+    public final void testIsNewer() {
+        Timestamped<String> a = new Timestamped<>("a", TS_1_2);
+        Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+        assertTrue(a.isNewer(b));
+        assertFalse(b.isNewer(a));
+    }
+
+    @Test
+    public final void testKryoSerializable() {
+        final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+        final KryoPool kryos = KryoPool.newBuilder()
+                .register(Timestamped.class,
+                        OnosTimestamp.class)
+                .build();
+
+        Timestamped<String> original = new Timestamped<>("foobar", TS_1_1);
+        kryos.serialize(original, buffer);
+        buffer.flip();
+        Timestamped<String> copy = kryos.deserialize(buffer);
+
+        new EqualsTester()
+            .addEqualityGroup(original, copy)
+            .testEquals();
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
new file mode 100644
index 0000000..09c3098
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -0,0 +1,525 @@
+package org.onlab.onos.store.device.impl;
+
+import static org.junit.Assert.*;
+import static org.onlab.onos.net.Device.Type.SWITCH;
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.ClockService;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+
+// TODO add tests for remote replication
+/**
+ * Test of the gossip based distributed DeviceStore implementation.
+ */
+public class GossipDeviceStoreTest {
+
+    private static final ProviderId PID = new ProviderId("of", "foo");
+    private static final ProviderId PIDA = new ProviderId("of", "bar", true);
+    private static final DeviceId DID1 = deviceId("of:foo");
+    private static final DeviceId DID2 = deviceId("of:bar");
+    private static final String MFR = "whitebox";
+    private static final String HW = "1.1.x";
+    private static final String SW1 = "3.8.1";
+    private static final String SW2 = "3.9.5";
+    private static final String SN = "43311-12345";
+
+    private static final PortNumber P1 = PortNumber.portNumber(1);
+    private static final PortNumber P2 = PortNumber.portNumber(2);
+    private static final PortNumber P3 = PortNumber.portNumber(3);
+
+    private static final SparseAnnotations A1 = DefaultAnnotations.builder()
+            .set("A1", "a1")
+            .set("B1", "b1")
+            .build();
+    private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
+            .remove("A1")
+            .set("B3", "b3")
+            .build();
+    private static final SparseAnnotations A2 = DefaultAnnotations.builder()
+            .set("A2", "a2")
+            .set("B2", "b2")
+            .build();
+    private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
+            .remove("A2")
+            .set("B4", "b4")
+            .build();
+
+    private static final NodeId MYSELF = new NodeId("myself");
+
+    private GossipDeviceStore gossipDeviceStore;
+    private DeviceStore deviceStore;
+
+    private DeviceClockManager deviceClockManager;
+    private ClockService clockService;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+
+    @Before
+    public void setUp() throws Exception {
+        deviceClockManager = new DeviceClockManager();
+        deviceClockManager.activate();
+        clockService = deviceClockManager;
+
+        deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
+        deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
+
+        gossipDeviceStore = new TestGossipDeviceStore(clockService);
+        gossipDeviceStore.activate();
+        deviceStore = gossipDeviceStore;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        gossipDeviceStore.deactivate();
+        deviceClockManager.deactivate();
+    }
+
+    private void putDevice(DeviceId deviceId, String swVersion) {
+        DeviceDescription description =
+                new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
+                        HW, swVersion, SN);
+        deviceStore.createOrUpdateDevice(PID, deviceId, description);
+    }
+
+    private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
+        DeviceDescription description =
+                new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
+                        HW, swVersion, SN);
+        deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
+    }
+
+    private static void assertDevice(DeviceId id, String swVersion, Device device) {
+        assertNotNull(device);
+        assertEquals(id, device.id());
+        assertEquals(MFR, device.manufacturer());
+        assertEquals(HW, device.hwVersion());
+        assertEquals(swVersion, device.swVersion());
+        assertEquals(SN, device.serialNumber());
+    }
+
+    /**
+     * Verifies that Annotations created by merging {@code annotations} is
+     * equal to actual Annotations.
+     *
+     * @param actual Annotations to check
+     * @param annotations
+     */
+    private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
+        DefaultAnnotations expected = DefaultAnnotations.builder().build();
+        for (SparseAnnotations a : annotations) {
+            expected = DefaultAnnotations.merge(expected, a);
+        }
+        assertEquals(expected.keys(), actual.keys());
+        for (String key : expected.keys()) {
+            assertEquals(expected.value(key), actual.value(key));
+        }
+    }
+
+    @Test
+    public final void testGetDeviceCount() {
+        assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
+
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW2);
+        putDevice(DID1, SW1);
+
+        assertEquals("expect 2 uniq devices", 2, deviceStore.getDeviceCount());
+    }
+
+    @Test
+    public final void testGetDevices() {
+        assertEquals("initialy empty", 0, Iterables.size(deviceStore.getDevices()));
+
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW2);
+        putDevice(DID1, SW1);
+
+        assertEquals("expect 2 uniq devices",
+                2, Iterables.size(deviceStore.getDevices()));
+
+        Map<DeviceId, Device> devices = new HashMap<>();
+        for (Device device : deviceStore.getDevices()) {
+            devices.put(device.id(), device);
+        }
+
+        assertDevice(DID1, SW1, devices.get(DID1));
+        assertDevice(DID2, SW2, devices.get(DID2));
+
+        // add case for new node?
+    }
+
+    @Test
+    public final void testGetDevice() {
+
+        putDevice(DID1, SW1);
+
+        assertDevice(DID1, SW1, deviceStore.getDevice(DID1));
+        assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
+    }
+
+    @Test
+    public final void testCreateOrUpdateDevice() {
+        DeviceDescription description =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN);
+        DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
+        assertEquals(DEVICE_ADDED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+
+        DeviceDescription description2 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW2, SN);
+        DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
+        assertEquals(DEVICE_UPDATED, event2.type());
+        assertDevice(DID1, SW2, event2.subject());
+
+        assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
+    }
+
+    @Test
+    public final void testCreateOrUpdateDeviceAncillary() {
+        DeviceDescription description =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN, A2);
+        DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
+        assertEquals(DEVICE_ADDED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertEquals(PIDA, event.subject().providerId());
+        assertAnnotationsEquals(event.subject().annotations(), A2);
+        assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
+
+        DeviceDescription description2 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW2, SN, A1);
+        DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
+        assertEquals(DEVICE_UPDATED, event2.type());
+        assertDevice(DID1, SW2, event2.subject());
+        assertEquals(PID, event2.subject().providerId());
+        assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
+        assertTrue(deviceStore.isAvailable(DID1));
+
+        assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
+
+        // For now, Ancillary is ignored once primary appears
+        assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
+
+        // But, Ancillary annotations will be in effect
+        DeviceDescription description3 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN, A2_2);
+        DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
+        assertEquals(DEVICE_UPDATED, event3.type());
+        // basic information will be the one from Primary
+        assertDevice(DID1, SW2, event3.subject());
+        assertEquals(PID, event3.subject().providerId());
+        // but annotation from Ancillary will be merged
+        assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
+        assertTrue(deviceStore.isAvailable(DID1));
+    }
+
+
+    @Test
+    public final void testMarkOffline() {
+
+        putDevice(DID1, SW1);
+        assertTrue(deviceStore.isAvailable(DID1));
+
+        DeviceEvent event = deviceStore.markOffline(DID1);
+        assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertFalse(deviceStore.isAvailable(DID1));
+
+        DeviceEvent event2 = deviceStore.markOffline(DID1);
+        assertNull("No change, no event", event2);
+}
+
+    @Test
+    public final void testUpdatePorts() {
+        putDevice(DID1, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true),
+                new DefaultPortDescription(P2, true)
+                );
+
+        List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
+
+        Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
+        for (DeviceEvent event : events) {
+            assertEquals(PORT_ADDED, event.type());
+            assertDevice(DID1, SW1, event.subject());
+            assertTrue("PortNumber is one of expected",
+                    expectedPorts.remove(event.port().number()));
+            assertTrue("Port is enabled", event.port().isEnabled());
+        }
+        assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
+
+
+        List<PortDescription> pds2 = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, false),
+                new DefaultPortDescription(P2, true),
+                new DefaultPortDescription(P3, true)
+                );
+
+        events = deviceStore.updatePorts(PID, DID1, pds2);
+        assertFalse("event should be triggered", events.isEmpty());
+        for (DeviceEvent event : events) {
+            PortNumber num = event.port().number();
+            if (P1.equals(num)) {
+                assertEquals(PORT_UPDATED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                assertFalse("Port is disabled", event.port().isEnabled());
+            } else if (P2.equals(num)) {
+                fail("P2 event not expected.");
+            } else if (P3.equals(num)) {
+                assertEquals(PORT_ADDED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                assertTrue("Port is enabled", event.port().isEnabled());
+            } else {
+                fail("Unknown port number encountered: " + num);
+            }
+        }
+
+        List<PortDescription> pds3 = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, false),
+                new DefaultPortDescription(P2, true)
+                );
+        events = deviceStore.updatePorts(PID, DID1, pds3);
+        assertFalse("event should be triggered", events.isEmpty());
+        for (DeviceEvent event : events) {
+            PortNumber num = event.port().number();
+            if (P1.equals(num)) {
+                fail("P1 event not expected.");
+            } else if (P2.equals(num)) {
+                fail("P2 event not expected.");
+            } else if (P3.equals(num)) {
+                assertEquals(PORT_REMOVED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                assertTrue("Port was enabled", event.port().isEnabled());
+            } else {
+                fail("Unknown port number encountered: " + num);
+            }
+        }
+
+    }
+
+    @Test
+    public final void testUpdatePortStatus() {
+        putDevice(DID1, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
+                new DefaultPortDescription(P1, false));
+        assertEquals(PORT_UPDATED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertEquals(P1, event.port().number());
+        assertFalse("Port is disabled", event.port().isEnabled());
+
+    }
+    @Test
+    public final void testUpdatePortStatusAncillary() {
+        putDeviceAncillary(DID1, SW1);
+        putDevice(DID1, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true, A1)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
+                new DefaultPortDescription(P1, false, A1_2));
+        assertEquals(PORT_UPDATED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertEquals(P1, event.port().number());
+        assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
+        assertFalse("Port is disabled", event.port().isEnabled());
+
+        DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
+                new DefaultPortDescription(P1, true));
+        assertNull("Ancillary is ignored if primary exists", event2);
+
+        // but, Ancillary annotation update will be notified
+        DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
+                new DefaultPortDescription(P1, true, A2));
+        assertEquals(PORT_UPDATED, event3.type());
+        assertDevice(DID1, SW1, event3.subject());
+        assertEquals(P1, event3.port().number());
+        assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
+        assertFalse("Port is disabled", event3.port().isEnabled());
+
+        // port only reported from Ancillary will be notified as down
+        DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
+                new DefaultPortDescription(P2, true));
+        assertEquals(PORT_ADDED, event4.type());
+        assertDevice(DID1, SW1, event4.subject());
+        assertEquals(P2, event4.port().number());
+        assertAnnotationsEquals(event4.port().annotations());
+        assertFalse("Port is disabled if not given from primary provider",
+                        event4.port().isEnabled());
+    }
+
+    @Test
+    public final void testGetPorts() {
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true),
+                new DefaultPortDescription(P2, true)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
+        List<Port> ports = deviceStore.getPorts(DID1);
+        for (Port port : ports) {
+            assertTrue("Port is enabled", port.isEnabled());
+            assertTrue("PortNumber is one of expected",
+                    expectedPorts.remove(port.number()));
+        }
+        assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
+
+
+        assertTrue("DID2 has no ports", deviceStore.getPorts(DID2).isEmpty());
+    }
+
+    @Test
+    public final void testGetPort() {
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true),
+                new DefaultPortDescription(P2, false)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        Port port1 = deviceStore.getPort(DID1, P1);
+        assertEquals(P1, port1.number());
+        assertTrue("Port is enabled", port1.isEnabled());
+
+        Port port2 = deviceStore.getPort(DID1, P2);
+        assertEquals(P2, port2.number());
+        assertFalse("Port is disabled", port2.isEnabled());
+
+        Port port3 = deviceStore.getPort(DID1, P3);
+        assertNull("P3 not expected", port3);
+    }
+
+    @Test
+    public final void testRemoveDevice() {
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW1);
+
+        assertEquals(2, deviceStore.getDeviceCount());
+
+        DeviceEvent event = deviceStore.removeDevice(DID1);
+        assertEquals(DEVICE_REMOVED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+
+        assertEquals(1, deviceStore.getDeviceCount());
+    }
+
+    // If Delegates should be called only on remote events,
+    // then Simple* should never call them, thus not test required.
+    // TODO add test for Port events when we have them
+    @Ignore("Ignore until Delegate spec. is clear.")
+    @Test
+    public final void testEvents() throws InterruptedException {
+        final CountDownLatch addLatch = new CountDownLatch(1);
+        DeviceStoreDelegate checkAdd = new DeviceStoreDelegate() {
+            @Override
+            public void notify(DeviceEvent event) {
+                assertEquals(DEVICE_ADDED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                addLatch.countDown();
+            }
+        };
+        final CountDownLatch updateLatch = new CountDownLatch(1);
+        DeviceStoreDelegate checkUpdate = new DeviceStoreDelegate() {
+            @Override
+            public void notify(DeviceEvent event) {
+                assertEquals(DEVICE_UPDATED, event.type());
+                assertDevice(DID1, SW2, event.subject());
+                updateLatch.countDown();
+            }
+        };
+        final CountDownLatch removeLatch = new CountDownLatch(1);
+        DeviceStoreDelegate checkRemove = new DeviceStoreDelegate() {
+            @Override
+            public void notify(DeviceEvent event) {
+                assertEquals(DEVICE_REMOVED, event.type());
+                assertDevice(DID1, SW2, event.subject());
+                removeLatch.countDown();
+            }
+        };
+
+        DeviceDescription description =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN);
+        deviceStore.setDelegate(checkAdd);
+        deviceStore.createOrUpdateDevice(PID, DID1, description);
+        assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
+
+
+        DeviceDescription description2 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW2, SN);
+        deviceStore.unsetDelegate(checkAdd);
+        deviceStore.setDelegate(checkUpdate);
+        deviceStore.createOrUpdateDevice(PID, DID1, description2);
+        assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS));
+
+        deviceStore.unsetDelegate(checkUpdate);
+        deviceStore.setDelegate(checkRemove);
+        deviceStore.removeDevice(DID1);
+        assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS));
+    }
+
+    private static final class TestGossipDeviceStore extends GossipDeviceStore {
+
+        public TestGossipDeviceStore(ClockService clockService) {
+            this.clockService = clockService;
+        }
+    }
+}