Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Conflicts:
	core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
	core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java

Change-Id: Ia1274657b27e01366a4a87196a13068d7104ee80
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
new file mode 100644
index 0000000..74c22f1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
@@ -0,0 +1,10 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+public final class ClusterManagementMessageSubjects {
+    // avoid instantiation
+    private ClusterManagementMessageSubjects() {}
+
+    public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
index 961ed4f..30b847f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
 
 import org.onlab.onos.cluster.ControllerNode;
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
similarity index 69%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
index 1f5fd3f..cdfd145 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
 
 public enum ClusterMembershipEventType {
     NEW_MEMBER,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 9408cc9..5e64a39 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -7,11 +7,9 @@
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ClusterEvent;
 import org.onlab.onos.cluster.ClusterStore;
 import org.onlab.onos.cluster.ClusterStoreDelegate;
@@ -37,8 +35,8 @@
 /**
  * Distributed implementation of the cluster nodes store.
  */
-@Component(immediate = true)
-@Service
+//@Component(immediate = true)
+//@Service
 public class DistributedClusterStore
         extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
         implements ClusterStore {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
deleted file mode 100644
index 98e80f7..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import de.javakaffee.kryoserializers.URISerializer;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.ConnectPoint;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultLink;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Element;
-import org.onlab.onos.net.Link;
-import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.onlab.onos.store.serializers.ConnectPointSerializer;
-import org.onlab.onos.store.serializers.DefaultLinkSerializer;
-import org.onlab.onos.store.serializers.DefaultPortSerializer;
-import org.onlab.onos.store.serializers.DeviceIdSerializer;
-import org.onlab.onos.store.serializers.IpPrefixSerializer;
-import org.onlab.onos.store.serializers.LinkKeySerializer;
-import org.onlab.onos.store.serializers.NodeIdSerializer;
-import org.onlab.onos.store.serializers.PortNumberSerializer;
-import org.onlab.onos.store.serializers.ProviderIdSerializer;
-import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-/**
- * Factory for parsing messages sent between cluster members.
- */
-@Component(immediate = true)
-@Service
-public class MessageSerializer implements SerializationService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private static final int METADATA_LENGTH = 12; // 8 + 4
-    private static final int LENGTH_OFFSET = 8;
-
-    private static final long MARKER = 0xfeedcafebeaddeadL;
-
-    private KryoPool serializerPool;
-
-    @Activate
-    public void activate() {
-        setupKryoPool();
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    /**
-     * Sets up the common serialzers pool.
-     */
-    protected void setupKryoPool() {
-        // FIXME Slice out types used in common to separate pool/namespace.
-        serializerPool = KryoPool.newBuilder()
-                .register(ArrayList.class,
-                          HashMap.class,
-
-                          ControllerNode.State.class,
-                          Device.Type.class,
-
-                          DefaultControllerNode.class,
-                          DefaultDevice.class,
-                          MastershipRole.class,
-                          Port.class,
-                          Element.class,
-
-                          Link.Type.class,
-
-                          MessageSubject.class
-                )
-                .register(IpPrefix.class, new IpPrefixSerializer())
-                .register(URI.class, new URISerializer())
-                .register(NodeId.class, new NodeIdSerializer())
-                .register(ProviderId.class, new ProviderIdSerializer())
-                .register(DeviceId.class, new DeviceIdSerializer())
-                .register(PortNumber.class, new PortNumberSerializer())
-                .register(DefaultPort.class, new DefaultPortSerializer())
-                .register(LinkKey.class, new LinkKeySerializer())
-                .register(ConnectPoint.class, new ConnectPointSerializer())
-                .register(DefaultLink.class, new DefaultLinkSerializer())
-                .build()
-                .populate(1);
-    }
-
-
-    @Override
-    public Object decode(byte[] data) {
-        return serializerPool.deserialize(data);
-    }
-
-    @Override
-    public byte[] encode(Object payload) {
-        return serializerPool.serialize(payload);
-    }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
index 5966f12..0bc31fa 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
@@ -3,6 +3,8 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
 
+// TODO: This service interface can be removed, once we properly start
+// using ClusterService
 /**
  * Service for administering communications manager.
  */
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index ee558dd..b74f887 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -2,6 +2,7 @@
 
 import org.onlab.onos.cluster.NodeId;
 
+// TODO: Should payload type be ByteBuffer?
 /**
  * Base message for cluster-wide communications.
  */
@@ -9,14 +10,14 @@
 
     private final NodeId sender;
     private final MessageSubject subject;
-    private final Object payload;
+    private final byte[] payload;
 
     /**
      * Creates a cluster message.
      *
      * @param subject message subject
      */
-    public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
+    public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
         this.sender = sender;
         this.subject = subject;
         this.payload = payload;
@@ -45,7 +46,7 @@
      *
      * @return message payload.
      */
-    public Object payload() {
+    public byte[] payload() {
         return payload;
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
index 7ec27ec..4dd7bc2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
@@ -10,4 +10,4 @@
      * @param message cluster message.
      */
     public void handle(ClusterMessage message);
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index ee8d9c1..43df15f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -1,5 +1,9 @@
 package org.onlab.onos.store.cluster.messaging;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
 /**
  * Representation of a message subject.
  * Cluster messages have associated subjects that dictate how they get handled
@@ -10,7 +14,7 @@
     private final String value;
 
     public MessageSubject(String value) {
-        this.value = value;
+        this.value = checkNotNull(value);
     }
 
     public String value() {
@@ -21,4 +25,29 @@
     public String toString() {
         return value;
     }
+
+    @Override
+    public int hashCode() {
+        return value.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        MessageSubject that = (MessageSubject) obj;
+        return Objects.equals(this.value, that.value);
+    }
+
+    // for serializer
+    protected MessageSubject() {
+        this.value = "";
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
index d85f488..4d76ce3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
@@ -1,7 +1,7 @@
 package org.onlab.onos.store.cluster.messaging;
 
 /**
- * Service for encoding &amp; decoding intra-cluster messages.
+ * Service for encoding &amp; decoding intra-cluster message payload.
  */
 public interface SerializationService {
 
@@ -11,7 +11,7 @@
      * @param buffer byte buffer with message(s)
      * @return parsed message
      */
-    Object decode(byte[] data);
+    <T> T decode(byte[] data);
 
     /**
      * Encodes the specified message into the given byte buffer.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index d4fd9c0..1b11873 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -3,30 +3,36 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
 import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ClusterMessageSerializer;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MessageSubjectSerializer;
+import org.onlab.util.KryoPool;
 import org.onlab.netty.Endpoint;
 import org.onlab.netty.Message;
 import org.onlab.netty.MessageHandler;
 import org.onlab.netty.MessagingService;
+import org.onlab.netty.NettyMessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,28 +44,57 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private ControllerNode localNode;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
     private ClusterNodesDelegate nodesDelegate;
-    private Map<NodeId, ControllerNode> members = new HashMap<>();
     private final Timer timer = new Timer("onos-controller-heatbeats");
     public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    // TODO: This probably should not be a OSGi service.
     private MessagingService messagingService;
 
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(ClusterMessage.class, new ClusterMessageSerializer())
+                    .register(ClusterMembershipEvent.class)
+                    .register(byte[].class)
+                    .register(MessageSubject.class, new MessageSubjectSerializer())
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
     @Activate
     public void activate() {
+        localNode = clusterService.getLocalNode();
+        NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
+        // FIXME: workaround until it becomes a service.
+        try {
+            netty.activate();
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            log.error("NettyMessagingService#activate", e);
+        }
+        messagingService = netty;
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        // TODO: cleanup messageingService if needed.
         log.info("Stopped");
     }
 
     @Override
     public boolean broadcast(ClusterMessage message) {
         boolean ok = true;
-        for (ControllerNode node : members.values()) {
+        for (ControllerNode node : clusterService.getNodes()) {
             if (!node.equals(localNode)) {
                 ok = unicast(message, node.id()) && ok;
             }
@@ -80,11 +115,12 @@
 
     @Override
     public boolean unicast(ClusterMessage message, NodeId toNodeId) {
-        ControllerNode node = members.get(toNodeId);
+        ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
         try {
-            messagingService.sendAsync(nodeEp, message.subject().value(), message);
+            messagingService.sendAsync(nodeEp,
+                    message.subject().value(), SERIALIZER.encode(message));
             return true;
         } catch (IOException e) {
             log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -110,7 +146,7 @@
 
     @Override
     public void addNode(ControllerNode node) {
-        members.put(node.id(), node);
+        //members.put(node.id(), node);
     }
 
     @Override
@@ -118,8 +154,8 @@
         broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
-        members.remove(node.id());
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
+        //members.remove(node.id());
     }
 
     // Sends a heart beat to all peers.
@@ -130,7 +166,7 @@
             broadcast(new ClusterMessage(
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
-                new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
         }
     }
 
@@ -139,7 +175,7 @@
         @Override
         public void handle(ClusterMessage message) {
 
-            ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+            ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
             ControllerNode node = event.node();
             if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
                 log.info("Node {} sent a hearbeat", node.id());
@@ -154,7 +190,7 @@
         }
     }
 
-    private static class InternalClusterMessageHandler implements MessageHandler {
+    private final class InternalClusterMessageHandler implements MessageHandler {
 
         private final ClusterMessageHandler handler;
 
@@ -164,7 +200,13 @@
 
         @Override
         public void handle(Message message) {
-            handler.handle((ClusterMessage) message.payload());
+            try {
+                ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+                handler.handle(clusterMessage);
+            } catch (Exception e) {
+                log.error("Exception caught during ClusterMessageHandler", e);
+                throw e;
+            }
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
deleted file mode 100644
index d1f75ae..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-
-public final class ClusterMessageSubjects {
-    private ClusterMessageSubjects() {}
-    public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
new file mode 100644
index 0000000..bf47f49
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
@@ -0,0 +1,63 @@
+package org.onlab.onos.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+@Component(immediate = true)
+@Service
+public class MessageSerializer implements SerializationService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final int METADATA_LENGTH = 12; // 8 + 4
+    private static final int LENGTH_OFFSET = 8;
+
+    private static final long MARKER = 0xfeedcafebeaddeadL;
+
+    private KryoPool serializerPool;
+
+    @Activate
+    public void activate() {
+        setupKryoPool();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    /**
+     * Sets up the common serialzers pool.
+     */
+    protected void setupKryoPool() {
+        serializerPool = KryoPool.newBuilder()
+                .register(KryoPoolUtil.API)
+                // TODO: Should MessageSubject be in API bundle?
+                .register(MessageSubject.class)
+                .build()
+                .populate(1);
+    }
+
+
+    @Override
+    public <T> T decode(byte[] data) {
+        return serializerPool.deserialize(data);
+    }
+
+    @Override
+    public byte[] encode(Object payload) {
+        return serializerPool.serialize(payload);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/package-info.java
new file mode 100644
index 0000000..6c1e71b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of the cluster messaging mechanism.
+ */
+package org.onlab.onos.store.cluster.messaging.impl;
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
index b70da73..132f27a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
 
 import java.util.Map;
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
index 095752b..033a1de 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
 
 import java.util.Map;
 import java.util.Set;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/MastershipBasedTimestamp.java
similarity index 73%
rename from core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/MastershipBasedTimestamp.java
index 2005582..0f4f894 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/MastershipBasedTimestamp.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.impl;
+package org.onlab.onos.store.common.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -9,12 +9,11 @@
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
 
-// If it is store specific, implement serializable interfaces?
 /**
  * Default implementation of Timestamp.
  * TODO: Better documentation.
  */
-public final class OnosTimestamp implements Timestamp {
+public final class MastershipBasedTimestamp implements Timestamp {
 
     private final int termNumber;
     private final int sequenceNumber;
@@ -25,15 +24,16 @@
      * @param termNumber the mastership termNumber
      * @param sequenceNumber  the sequenceNumber number within the termNumber
      */
-    public OnosTimestamp(int termNumber, int sequenceNumber) {
+    public MastershipBasedTimestamp(int termNumber, int sequenceNumber) {
         this.termNumber = termNumber;
         this.sequenceNumber = sequenceNumber;
     }
 
     @Override
     public int compareTo(Timestamp o) {
-        checkArgument(o instanceof OnosTimestamp, "Must be OnosTimestamp", o);
-        OnosTimestamp that = (OnosTimestamp) o;
+        checkArgument(o instanceof MastershipBasedTimestamp,
+                "Must be MastershipBasedTimestamp", o);
+        MastershipBasedTimestamp that = (MastershipBasedTimestamp) o;
 
         return ComparisonChain.start()
                 .compare(this.termNumber, that.termNumber)
@@ -51,10 +51,10 @@
         if (this == obj) {
             return true;
         }
-        if (!(obj instanceof OnosTimestamp)) {
+        if (!(obj instanceof MastershipBasedTimestamp)) {
             return false;
         }
-        OnosTimestamp that = (OnosTimestamp) obj;
+        MastershipBasedTimestamp that = (MastershipBasedTimestamp) obj;
         return Objects.equals(this.termNumber, that.termNumber) &&
                 Objects.equals(this.sequenceNumber, that.sequenceNumber);
     }
@@ -84,4 +84,11 @@
     public int sequenceNumber() {
         return sequenceNumber;
     }
+
+    // Default constructor for serialization
+    @Deprecated
+    protected MastershipBasedTimestamp() {
+        this.termNumber = -1;
+        this.sequenceNumber = -1;
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
new file mode 100644
index 0000000..77b0a87
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
@@ -0,0 +1,89 @@
+package org.onlab.onos.store.common.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
+import org.onlab.onos.store.Timestamp;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Wrapper class to store Timestamped value.
+ * @param <T>
+ */
+public final class Timestamped<T> {
+
+    private final Timestamp timestamp;
+    private final T value;
+
+    /**
+     * Creates a time stamped value.
+     *
+     * @param value to be timestamp
+     * @param timestamp the timestamp
+     */
+    public Timestamped(T value, Timestamp timestamp) {
+        this.value = checkNotNull(value);
+        this.timestamp = checkNotNull(timestamp);
+    }
+
+    /**
+     * Returns the value.
+     * @return value
+     */
+    public T value() {
+        return value;
+    }
+
+    /**
+     * Returns the time stamp.
+     * @return time stamp
+     */
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Tests if this timestamped value is newer than the other.
+     *
+     * @param other timestamped value
+     * @return true if this instance is newer.
+     */
+    public boolean isNewer(Timestamped<T> other) {
+        return this.timestamp.compareTo(checkNotNull(other).timestamp()) > 0;
+    }
+
+    @Override
+    public int hashCode() {
+        return timestamp.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof Timestamped)) {
+            return false;
+        }
+        @SuppressWarnings("unchecked")
+        Timestamped<T> that = (Timestamped<T>) obj;
+        return Objects.equals(this.timestamp, that.timestamp);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                    .add("timestamp", timestamp)
+                    .add("value", value)
+                    .toString();
+    }
+
+    // Default constructor for serialization
+    @Deprecated
+    private Timestamped() {
+        this.value = null;
+        this.timestamp = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java
new file mode 100644
index 0000000..992fd49
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Common abstractions and facilities for implementing distributed store
+ * using gossip protocol.
+ */
+package org.onlab.onos.store.common.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
index 301884c..d05659b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
@@ -8,7 +8,7 @@
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
+import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
 
 // TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
 // TODO: Handle Port as part of these messages, or separate messages for Ports?
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
index 011713e..e7a4d0a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
@@ -10,7 +10,7 @@
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
+import org.onlab.onos.store.common.impl.AntiEntropyReply;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
similarity index 81%
rename from core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
index a99482f..e1e3692 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
@@ -12,14 +12,18 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.MastershipTerm;
 import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.ClockProviderService;
 import org.onlab.onos.store.ClockService;
 import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.impl.OnosTimestamp;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
 import org.slf4j.Logger;
 
+/**
+ * Clock service to issue Timestamp based on Device Mastership.
+ */
 @Component(immediate = true)
 @Service
-public class OnosClockService implements ClockService {
+public class DeviceClockManager implements ClockService, ClockProviderService {
 
     private final Logger log = getLogger(getClass());
 
@@ -43,7 +47,7 @@
         if (term == null) {
             throw new IllegalStateException("Requesting timestamp for a deviceId without mastership");
         }
-        return new OnosTimestamp(term.termNumber(), ticker.incrementAndGet());
+        return new MastershipBasedTimestamp(term.termNumber(), ticker.incrementAndGet());
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
new file mode 100644
index 0000000..f39413b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -0,0 +1,945 @@
+package org.onlab.onos.store.device.impl;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.AnnotationsUtil;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Device.Type;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.util.KryoPool;
+import org.onlab.util.NewConcurrentHashMap;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Predicates.notNull;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.onos.net.DefaultAnnotations.merge;
+import static org.onlab.onos.net.DefaultAnnotations.union;
+import static com.google.common.base.Verify.verify;
+
+// TODO: give me a better name
+/**
+ * Manages inventory of infrastructure devices using gossip protocol to distribute
+ * information.
+ */
+@Component(immediate = true)
+@Service
+public class GossipDeviceStore
+        extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
+        implements DeviceStore {
+
+    private final Logger log = getLogger(getClass());
+
+    public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+
+    // TODO: Check if inner Map can be replaced with plain Map
+    // innerMap is used to lock a Device, thus instance should never be replaced.
+    // collection of Description given from various providers
+    private final ConcurrentMap<DeviceId,
+                            ConcurrentMap<ProviderId, DeviceDescriptions>>
+                                deviceDescs = Maps.newConcurrentMap();
+
+    // cache of Device and Ports generated by compositing descriptions from providers
+    private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
+
+    // to be updated under Device lock
+    private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
+    private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
+
+    // available(=UP) devices
+    private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClockService clockService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(KryoPoolUtil.API)
+                    .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
+                    .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
+                    .register(InternalDeviceRemovedEvent.class)
+                    .register(InternalPortEvent.class, new InternalPortEventSerializer())
+                    .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
+                    .register(Timestamp.class)
+                    .register(Timestamped.class)
+                    .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+                    .build()
+                    .populate(1);
+        }
+
+    };
+
+    @Activate
+    public void activate() {
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
+        clusterCommunicator.addSubscriber(
+                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        deviceDescs.clear();
+        devices.clear();
+        devicePorts.clear();
+        availableDevices.clear();
+        log.info("Stopped");
+    }
+
+    @Override
+    public int getDeviceCount() {
+        return devices.size();
+    }
+
+    @Override
+    public Iterable<Device> getDevices() {
+        return Collections.unmodifiableCollection(devices.values());
+    }
+
+    @Override
+    public Device getDevice(DeviceId deviceId) {
+        return devices.get(deviceId);
+    }
+
+    @Override
+    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
+                                     DeviceId deviceId,
+                                     DeviceDescription deviceDescription) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+        DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+        if (event != null) {
+            log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+                providerId, deviceId);
+            try {
+                notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a device update topology event for providerId: "
+                        + providerId + " and deviceId: " + deviceId, e);
+            }
+        }
+        return event;
+    }
+
+    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
+                                    DeviceId deviceId,
+                                    Timestamped<DeviceDescription> deltaDesc) {
+
+        // Collection of DeviceDescriptions for a Device
+        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+            = getDeviceDescriptions(deviceId);
+
+        synchronized (providerDescs) {
+            // locking per device
+
+            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+                log.debug("Ignoring outdated event: {}", deltaDesc);
+                return null;
+            }
+
+            DeviceDescriptions descs
+                = createIfAbsentUnchecked(providerDescs, providerId,
+                    new InitDeviceDescs(deltaDesc));
+
+            final Device oldDevice = devices.get(deviceId);
+            final Device newDevice;
+
+            if (deltaDesc == descs.getDeviceDesc() ||
+                deltaDesc.isNewer(descs.getDeviceDesc())) {
+                // on new device or valid update
+                descs.putDeviceDesc(deltaDesc);
+                newDevice = composeDevice(deviceId, providerDescs);
+            } else {
+                // outdated event, ignored.
+                return null;
+            }
+            if (oldDevice == null) {
+                // ADD
+                return createDevice(providerId, newDevice, deltaDesc.timestamp());
+            } else {
+                // UPDATE or ignore (no change or stale)
+                return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
+            }
+        }
+    }
+
+    // Creates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=Device lock)
+    private DeviceEvent createDevice(ProviderId providerId,
+                                     Device newDevice, Timestamp timestamp) {
+
+        // update composed device cache
+        Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+        verify(oldDevice == null,
+                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+                providerId, oldDevice, newDevice);
+
+        if (!providerId.isAncillary()) {
+            markOnline(newDevice.id(), timestamp);
+        }
+
+        return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
+    }
+
+    // Updates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=Device lock)
+    private DeviceEvent updateDevice(ProviderId providerId,
+                                     Device oldDevice,
+                                     Device newDevice, Timestamp newTimestamp) {
+
+        // We allow only certain attributes to trigger update
+        if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
+            !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
+            !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
+
+            boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+            if (!replaced) {
+                verify(replaced,
+                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+                        providerId, oldDevice, devices.get(newDevice.id())
+                        , newDevice);
+            }
+            if (!providerId.isAncillary()) {
+                markOnline(newDevice.id(), newTimestamp);
+            }
+            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
+        }
+
+        // Otherwise merely attempt to change availability if primary provider
+        if (!providerId.isAncillary()) {
+            boolean added = markOnline(newDevice.id(), newTimestamp);
+            return !added ? null :
+                    new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
+        }
+        return null;
+    }
+
+    @Override
+    public DeviceEvent markOffline(DeviceId deviceId) {
+        Timestamp timestamp = clockService.getTimestamp(deviceId);
+        DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+        if (event != null) {
+            log.info("Notifying peers of a device offline topology event for deviceId: {}",
+                    deviceId);
+            try {
+                notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
+                     deviceId);
+            }
+        }
+        return event;
+    }
+
+    private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
+
+        Map<ProviderId, DeviceDescriptions> providerDescs
+            = getDeviceDescriptions(deviceId);
+
+        // locking device
+        synchronized (providerDescs) {
+
+            // accept off-line if given timestamp is newer than
+            // the latest Timestamp from Primary provider
+            DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
+            Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+            if (timestamp.compareTo(lastTimestamp) <= 0) {
+                // outdated event ignore
+                return null;
+            }
+
+            offline.put(deviceId, timestamp);
+
+            Device device = devices.get(deviceId);
+            if (device == null) {
+                return null;
+            }
+            boolean removed = availableDevices.remove(deviceId);
+            if (removed) {
+                // TODO: broadcast ... DOWN only?
+                return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Marks the device as available if the given timestamp is not outdated,
+     * compared to the time the device has been marked offline.
+     *
+     * @param deviceId identifier of the device
+     * @param timestamp of the event triggering this change.
+     * @return true if availability change request was accepted and changed the state
+     */
+    // Guarded by deviceDescs value (=Device lock)
+    private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
+        // accept on-line if given timestamp is newer than
+        // the latest offline request Timestamp
+        Timestamp offlineTimestamp = offline.get(deviceId);
+        if (offlineTimestamp == null ||
+            offlineTimestamp.compareTo(timestamp) < 0) {
+
+            offline.remove(deviceId);
+            return availableDevices.add(deviceId);
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
+                                       DeviceId deviceId,
+                                       List<PortDescription> portDescriptions) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+
+        Timestamped<List<PortDescription>> timestampedPortDescriptions =
+            new Timestamped<>(portDescriptions, newTimestamp);
+
+        List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
+        if (!events.isEmpty()) {
+            log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
+                    providerId, deviceId);
+            try {
+                notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a port update topology event or providerId: "
+                    + providerId + " and deviceId: " + deviceId, e);
+            }
+        }
+        return events;
+    }
+
+    private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
+                                DeviceId deviceId,
+                                Timestamped<List<PortDescription>> portDescriptions) {
+
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        List<DeviceEvent> events = new ArrayList<>();
+        synchronized (descsMap) {
+
+            if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
+                log.debug("Ignoring outdated events: {}", portDescriptions);
+                return null;
+            }
+
+            DeviceDescriptions descs = descsMap.get(providerId);
+            // every provider must provide DeviceDescription.
+            checkArgument(descs != null,
+                    "Device description for Device ID %s from Provider %s was not found",
+                    deviceId, providerId);
+
+            Map<PortNumber, Port> ports = getPortMap(deviceId);
+
+            final Timestamp newTimestamp = portDescriptions.timestamp();
+
+            // Add new ports
+            Set<PortNumber> processed = new HashSet<>();
+            for (PortDescription portDescription : portDescriptions.value()) {
+                final PortNumber number = portDescription.portNumber();
+                processed.add(number);
+
+                final Port oldPort = ports.get(number);
+                final Port newPort;
+
+
+                final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+                if (existingPortDesc == null ||
+                    newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
+                    // on new port or valid update
+                    // update description
+                    descs.putPortDesc(new Timestamped<>(portDescription,
+                                            portDescriptions.timestamp()));
+                    newPort = composePort(device, number, descsMap);
+                } else {
+                    // outdated event, ignored.
+                    continue;
+                }
+
+                events.add(oldPort == null ?
+                                   createPort(device, newPort, ports) :
+                                   updatePort(device, oldPort, newPort, ports));
+            }
+
+            events.addAll(pruneOldPorts(device, ports, processed));
+        }
+        return FluentIterable.from(events).filter(notNull()).toList();
+    }
+
+    // Creates a new port based on the port description adds it to the map and
+    // Returns corresponding event.
+    // Guarded by deviceDescs value (=Device lock)
+    private DeviceEvent createPort(Device device, Port newPort,
+                                   Map<PortNumber, Port> ports) {
+        ports.put(newPort.number(), newPort);
+        return new DeviceEvent(PORT_ADDED, device, newPort);
+    }
+
+    // Checks if the specified port requires update and if so, it replaces the
+    // existing entry in the map and returns corresponding event.
+    // Guarded by deviceDescs value (=Device lock)
+    private DeviceEvent updatePort(Device device, Port oldPort,
+                                   Port newPort,
+                                   Map<PortNumber, Port> ports) {
+        if (oldPort.isEnabled() != newPort.isEnabled() ||
+            !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
+
+            ports.put(oldPort.number(), newPort);
+            return new DeviceEvent(PORT_UPDATED, device, newPort);
+        }
+        return null;
+    }
+
+    // Prunes the specified list of ports based on which ports are in the
+    // processed list and returns list of corresponding events.
+    // Guarded by deviceDescs value (=Device lock)
+    private List<DeviceEvent> pruneOldPorts(Device device,
+                                            Map<PortNumber, Port> ports,
+                                            Set<PortNumber> processed) {
+        List<DeviceEvent> events = new ArrayList<>();
+        Iterator<PortNumber> iterator = ports.keySet().iterator();
+        while (iterator.hasNext()) {
+            PortNumber portNumber = iterator.next();
+            if (!processed.contains(portNumber)) {
+                events.add(new DeviceEvent(PORT_REMOVED, device,
+                                           ports.get(portNumber)));
+                iterator.remove();
+            }
+        }
+        return events;
+    }
+
+    // Gets the map of ports for the specified device; if one does not already
+    // exist, it creates and registers a new one.
+    private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
+        return createIfAbsentUnchecked(devicePorts, deviceId,
+                NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
+    }
+
+    private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
+            DeviceId deviceId) {
+        return createIfAbsentUnchecked(deviceDescs, deviceId,
+                NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
+    }
+
+    @Override
+    public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+            PortDescription portDescription) {
+        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+        final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
+        DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+        if (event != null) {
+            log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
+                        providerId, deviceId);
+            try {
+                notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a port status update topology event or providerId: "
+                        + providerId + " and deviceId: " + deviceId, e);
+            }
+        }
+        return event;
+    }
+
+    private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
+                Timestamped<PortDescription> deltaDesc) {
+
+        Device device = devices.get(deviceId);
+        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+        ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+        checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+        synchronized (descsMap) {
+
+            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+                log.debug("Ignoring outdated event: {}", deltaDesc);
+                return null;
+            }
+
+            DeviceDescriptions descs = descsMap.get(providerId);
+            // assuming all providers must to give DeviceDescription
+            checkArgument(descs != null,
+                    "Device description for Device ID %s from Provider %s was not found",
+                    deviceId, providerId);
+
+            ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+            final PortNumber number = deltaDesc.value().portNumber();
+            final Port oldPort = ports.get(number);
+            final Port newPort;
+
+            final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+            if (existingPortDesc == null ||
+                deltaDesc == existingPortDesc ||
+                deltaDesc.isNewer(existingPortDesc)) {
+                // on new port or valid update
+                // update description
+                descs.putPortDesc(deltaDesc);
+                newPort = composePort(device, number, descsMap);
+            } else {
+                // outdated event, ignored.
+                return null;
+            }
+
+            if (oldPort == null) {
+                return createPort(device, newPort, ports);
+            } else {
+                return updatePort(device, oldPort, newPort, ports);
+            }
+        }
+    }
+
+    @Override
+    public List<Port> getPorts(DeviceId deviceId) {
+        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+        if (ports == null) {
+            return Collections.emptyList();
+        }
+        return ImmutableList.copyOf(ports.values());
+    }
+
+    @Override
+    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+        Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+        return ports == null ? null : ports.get(portNumber);
+    }
+
+    @Override
+    public boolean isAvailable(DeviceId deviceId) {
+        return availableDevices.contains(deviceId);
+    }
+
+    @Override
+    public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
+        Timestamp timestamp = clockService.getTimestamp(deviceId);
+        DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
+        if (event != null) {
+            log.info("Notifying peers of a device removed topology event for deviceId: {}",
+                    deviceId);
+            try {
+                notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
+                     deviceId);
+            }
+        }
+        return event;
+    }
+
+    private DeviceEvent removeDeviceInternal(DeviceId deviceId,
+                                             Timestamp timestamp) {
+
+        Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
+        synchronized (descs) {
+            // accept removal request if given timestamp is newer than
+            // the latest Timestamp from Primary provider
+            DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
+            Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+            if (timestamp.compareTo(lastTimestamp) <= 0) {
+                // outdated event ignore
+                return null;
+            }
+            removalRequest.put(deviceId, timestamp);
+
+            Device device = devices.remove(deviceId);
+            // should DEVICE_REMOVED carry removed ports?
+            Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+            if (ports != null) {
+                ports.clear();
+            }
+            markOfflineInternal(deviceId, timestamp);
+            descs.clear();
+            return device == null ? null :
+                new DeviceEvent(DEVICE_REMOVED, device, null);
+        }
+    }
+
+    private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
+        Timestamp removalTimestamp = removalRequest.get(deviceId);
+        if (removalTimestamp != null &&
+            removalTimestamp.compareTo(timestampToCheck) >= 0) {
+            // removalRequest is more recent
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Returns a Device, merging description given from multiple Providers.
+     *
+     * @param deviceId device identifier
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Device instance
+     */
+    private Device composeDevice(DeviceId deviceId,
+            ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+
+        DeviceDescriptions desc = providerDescs.get(primary);
+
+        final DeviceDescription base = desc.getDeviceDesc().value();
+        Type type = base.type();
+        String manufacturer = base.manufacturer();
+        String hwVersion = base.hwVersion();
+        String swVersion = base.swVersion();
+        String serialNumber = base.serialNumber();
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+        annotations = merge(annotations, base.annotations());
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // TODO: should keep track of Description timestamp
+            // and only merge conflicting keys when timestamp is newer
+            // Currently assuming there will never be a key conflict between
+            // providers
+
+            // annotation merging. not so efficient, should revisit later
+            annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
+        }
+
+        return new DefaultDevice(primary, deviceId , type, manufacturer,
+                            hwVersion, swVersion, serialNumber, annotations);
+    }
+
+    /**
+     * Returns a Port, merging description given from multiple Providers.
+     *
+     * @param device device the port is on
+     * @param number port number
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Port instance
+     */
+    private Port composePort(Device device, PortNumber number,
+                ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+        ProviderId primary = pickPrimaryPID(providerDescs);
+        DeviceDescriptions primDescs = providerDescs.get(primary);
+        // if no primary, assume not enabled
+        // TODO: revisit this default port enabled/disabled behavior
+        boolean isEnabled = false;
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+
+        final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
+        if (portDesc != null) {
+            isEnabled = portDesc.value().isEnabled();
+            annotations = merge(annotations, portDesc.value().annotations());
+        }
+
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (e.getKey().equals(primary)) {
+                continue;
+            }
+            // TODO: should keep track of Description timestamp
+            // and only merge conflicting keys when timestamp is newer
+            // Currently assuming there will never be a key conflict between
+            // providers
+
+            // annotation merging. not so efficient, should revisit later
+            final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
+            if (otherPortDesc != null) {
+                annotations = merge(annotations, otherPortDesc.value().annotations());
+            }
+        }
+
+        return new DefaultPort(device, number, isEnabled, annotations);
+    }
+
+    /**
+     * @return primary ProviderID, or randomly chosen one if none exists
+     */
+    private ProviderId pickPrimaryPID(
+            Map<ProviderId, DeviceDescriptions> providerDescs) {
+        ProviderId fallBackPrimary = null;
+        for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+            if (!e.getKey().isAncillary()) {
+                return e.getKey();
+            } else if (fallBackPrimary == null) {
+                // pick randomly as a fallback in case there is no primary
+                fallBackPrimary = e.getKey();
+            }
+        }
+        return fallBackPrimary;
+    }
+
+    private DeviceDescriptions getPrimaryDescriptions(
+                            Map<ProviderId, DeviceDescriptions> providerDescs) {
+        ProviderId pid = pickPrimaryPID(providerDescs);
+        return providerDescs.get(pid);
+    }
+
+    public static final class InitDeviceDescs
+        implements ConcurrentInitializer<DeviceDescriptions> {
+
+        private final Timestamped<DeviceDescription> deviceDesc;
+
+        public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
+            this.deviceDesc = checkNotNull(deviceDesc);
+        }
+        @Override
+        public DeviceDescriptions get() throws ConcurrentException {
+            return new DeviceDescriptions(deviceDesc);
+        }
+    }
+
+
+    /**
+     * Collection of Description of a Device and it's Ports given from a Provider.
+     */
+    public static class DeviceDescriptions {
+
+        private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
+        private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
+
+        public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
+            this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
+            this.portDescs = new ConcurrentHashMap<>();
+        }
+
+        Timestamp getLatestTimestamp() {
+            Timestamp latest = deviceDesc.get().timestamp();
+            for (Timestamped<PortDescription> desc : portDescs.values()) {
+                if (desc.timestamp().compareTo(latest) > 0) {
+                    latest = desc.timestamp();
+                }
+            }
+            return latest;
+        }
+
+        public Timestamped<DeviceDescription> getDeviceDesc() {
+            return deviceDesc.get();
+        }
+
+        public Timestamped<PortDescription> getPortDesc(PortNumber number) {
+            return portDescs.get(number);
+        }
+
+        /**
+         * Puts DeviceDescription, merging annotations as necessary.
+         *
+         * @param newDesc new DeviceDescription
+         * @return previous DeviceDescription
+         */
+        public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+            Timestamped<DeviceDescription> oldOne = deviceDesc.get();
+            Timestamped<DeviceDescription> newOne = newDesc;
+            if (oldOne != null) {
+                SparseAnnotations merged = union(oldOne.value().annotations(),
+                                                 newDesc.value().annotations());
+                newOne = new Timestamped<DeviceDescription>(
+                        new DefaultDeviceDescription(newDesc.value(), merged),
+                        newDesc.timestamp());
+            }
+            return deviceDesc.getAndSet(newOne);
+        }
+
+        /**
+         * Puts PortDescription, merging annotations as necessary.
+         *
+         * @param newDesc new PortDescription
+         * @return previous PortDescription
+         */
+        public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
+            Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
+            Timestamped<PortDescription> newOne = newDesc;
+            if (oldOne != null) {
+                SparseAnnotations merged = union(oldOne.value().annotations(),
+                                                 newDesc.value().annotations());
+                newOne = new Timestamped<PortDescription>(
+                        new DefaultPortDescription(newDesc.value(), merged),
+                        newDesc.timestamp());
+            }
+            return portDescs.put(newOne.value().portNumber(), newOne);
+        }
+    }
+
+    private void notifyPeers(InternalDeviceEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalPortEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void notifyPeers(InternalPortStatusEvent event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
+    private class InternalDeviceEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received device update event from peer: {}", message.sender());
+            InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+
+            createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
+        }
+    }
+
+    private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received device offline event from peer: {}", message.sender());
+            InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+
+            DeviceId deviceId = event.deviceId();
+            Timestamp timestamp = event.timestamp();
+
+            markOfflineInternal(deviceId, timestamp);
+        }
+    }
+
+    private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received device removed event from peer: {}", message.sender());
+            InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+
+            DeviceId deviceId = event.deviceId();
+            Timestamp timestamp = event.timestamp();
+
+            removeDeviceInternal(deviceId, timestamp);
+        }
+    }
+
+    private class InternalPortEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received port update event from peer: {}", message.sender());
+            InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
+
+            updatePortsInternal(providerId, deviceId, portDescriptions);
+        }
+    }
+
+    private class InternalPortStatusEventListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+
+            log.info("Received port status update event from peer: {}", message.sender());
+            InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+
+            ProviderId providerId = event.providerId();
+            DeviceId deviceId = event.deviceId();
+            Timestamped<PortDescription> portDescription = event.portDescription();
+
+            updatePortStatusInternal(providerId, deviceId, portDescription);
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
new file mode 100644
index 0000000..5272182
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -0,0 +1,17 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by GossipDeviceStore peer-peer communication.
+ */
+public final class GossipDeviceStoreMessageSubjects {
+
+    private GossipDeviceStoreMessageSubjects() {}
+
+    public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
+    public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
+    public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
+    public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
+    public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
new file mode 100644
index 0000000..4214384
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * change event.
+ */
+public class InternalDeviceEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<DeviceDescription> deviceDescription;
+
+    protected InternalDeviceEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<DeviceDescription> deviceDescription) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.deviceDescription = deviceDescription;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<DeviceDescription> deviceDescription() {
+        return deviceDescription;
+    }
+
+    // for serializer
+    protected InternalDeviceEvent() {
+        this.providerId = null;
+        this.deviceId = null;
+        this.deviceDescription = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
new file mode 100644
index 0000000..0d3d013
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
@@ -0,0 +1,43 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceEvent}.
+ */
+public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> {
+
+    /**
+     * Creates a serializer for {@link InternalDeviceEvent}.
+     */
+    public InternalDeviceEventSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, InternalDeviceEvent event) {
+        kryo.writeClassAndObject(output, event.providerId());
+        kryo.writeClassAndObject(output, event.deviceId());
+        kryo.writeClassAndObject(output, event.deviceDescription());
+    }
+
+    @Override
+    public InternalDeviceEvent read(Kryo kryo, Input input,
+                               Class<InternalDeviceEvent> type) {
+        ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+        DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        Timestamped<DeviceDescription> deviceDescription
+            = (Timestamped<DeviceDescription>) kryo.readClassAndObject(input);
+
+        return new InternalDeviceEvent(providerId, deviceId, deviceDescription);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
new file mode 100644
index 0000000..d8942d6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * going offline.
+ */
+public class InternalDeviceOfflineEvent {
+
+    private final DeviceId deviceId;
+    private final Timestamp timestamp;
+
+    /**
+     * Creates a InternalDeviceOfflineEvent.
+     * @param deviceId identifier of device going offline.
+     * @param timestamp timestamp of when the device went offline.
+     */
+    public InternalDeviceOfflineEvent(DeviceId deviceId, Timestamp timestamp) {
+        this.deviceId = deviceId;
+        this.timestamp = timestamp;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    // for serializer
+    @SuppressWarnings("unused")
+    private InternalDeviceOfflineEvent() {
+        deviceId = null;
+        timestamp = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
new file mode 100644
index 0000000..7059636
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceOfflineEvent}.
+ */
+public class InternalDeviceOfflineEventSerializer extends Serializer<InternalDeviceOfflineEvent> {
+
+    /**
+     * Creates a serializer for {@link InternalDeviceOfflineEvent}.
+     */
+    public InternalDeviceOfflineEventSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, InternalDeviceOfflineEvent event) {
+        kryo.writeClassAndObject(output, event.deviceId());
+        kryo.writeClassAndObject(output, event.timestamp());
+    }
+
+    @Override
+    public InternalDeviceOfflineEvent read(Kryo kryo, Input input,
+                               Class<InternalDeviceOfflineEvent> type) {
+        DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        Timestamp timestamp = (Timestamp) kryo.readClassAndObject(input);
+
+        return new InternalDeviceOfflineEvent(deviceId, timestamp);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
new file mode 100644
index 0000000..6c8b905
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * being administratively removed.
+ */
+public class InternalDeviceRemovedEvent {
+
+    private final DeviceId deviceId;
+    private final Timestamp timestamp;
+
+    /**
+     * Creates a InternalDeviceRemovedEvent.
+     * @param deviceId identifier of the removed device.
+     * @param timestamp timestamp of when the device was administratively removed.
+     */
+    public InternalDeviceRemovedEvent(DeviceId deviceId, Timestamp timestamp) {
+        this.deviceId = deviceId;
+        this.timestamp = timestamp;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    // for serializer
+    @SuppressWarnings("unused")
+    private InternalDeviceRemovedEvent() {
+        deviceId = null;
+        timestamp = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
new file mode 100644
index 0000000..64e77ca
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -0,0 +1,47 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a port
+ * change event.
+ */
+public class InternalPortEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<List<PortDescription>> portDescriptions;
+
+    protected InternalPortEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<List<PortDescription>> portDescriptions) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portDescriptions = portDescriptions;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<List<PortDescription>> portDescriptions() {
+        return portDescriptions;
+    }
+
+    // for serializer
+    protected InternalPortEvent() {
+        this.providerId = null;
+        this.deviceId = null;
+        this.portDescriptions = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
new file mode 100644
index 0000000..6fff395
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortEvent}.
+ */
+public class InternalPortEventSerializer extends Serializer<InternalPortEvent> {
+
+    /**
+     * Creates a serializer for {@link InternalPortEvent}.
+     */
+    public InternalPortEventSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, InternalPortEvent event) {
+        kryo.writeClassAndObject(output, event.providerId());
+        kryo.writeClassAndObject(output, event.deviceId());
+        kryo.writeClassAndObject(output, event.portDescriptions());
+    }
+
+    @Override
+    public InternalPortEvent read(Kryo kryo, Input input,
+                               Class<InternalPortEvent> type) {
+        ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+        DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        Timestamped<List<PortDescription>> portDescriptions
+            = (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input);
+
+        return new InternalPortEvent(providerId, deviceId, portDescriptions);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
new file mode 100644
index 0000000..7d3854b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a port
+ * status change event.
+ */
+public class InternalPortStatusEvent {
+
+    private final ProviderId providerId;
+    private final DeviceId deviceId;
+    private final Timestamped<PortDescription> portDescription;
+
+    protected InternalPortStatusEvent(
+            ProviderId providerId,
+            DeviceId deviceId,
+            Timestamped<PortDescription> portDescription) {
+        this.providerId = providerId;
+        this.deviceId = deviceId;
+        this.portDescription = portDescription;
+    }
+
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public Timestamped<PortDescription> portDescription() {
+        return portDescription;
+    }
+
+    // for serializer
+    protected InternalPortStatusEvent() {
+        this.providerId = null;
+        this.deviceId = null;
+        this.portDescription = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
new file mode 100644
index 0000000..6ec4122
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortStatusEvent}.
+ */
+public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> {
+
+    /**
+     * Creates a serializer for {@link InternalPortStatusEvent}.
+     */
+    public InternalPortStatusEventSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, InternalPortStatusEvent event) {
+        kryo.writeClassAndObject(output, event.providerId());
+        kryo.writeClassAndObject(output, event.deviceId());
+        kryo.writeClassAndObject(output, event.portDescription());
+    }
+
+    @Override
+    public InternalPortStatusEvent read(Kryo kryo, Input input,
+                               Class<InternalPortStatusEvent> type) {
+        ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+        DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+        Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
+
+        return new InternalPortStatusEvent(providerId, deviceId, portDescription);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
deleted file mode 100644
index d912983..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
+++ /dev/null
@@ -1,339 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import static com.google.common.base.Predicates.notNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.DeviceEvent;
-import org.onlab.onos.net.device.DeviceStore;
-import org.onlab.onos.net.device.DeviceStoreDelegate;
-import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of infrastructure devices using a protocol that takes into consideration
- * the order in which device events occur.
- */
-@Component(immediate = true)
-@Service
-public class OnosDistributedDeviceStore
-        extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
-        implements DeviceStore {
-
-    private final Logger log = getLogger(getClass());
-
-    public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
-
-    private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
-    private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClockService clockService;
-
-    @Activate
-    public void activate() {
-
-        devices = new ConcurrentHashMap<>();
-        devicePorts = new ConcurrentHashMap<>();
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    @Override
-    public int getDeviceCount() {
-        return devices.size();
-    }
-
-    @Override
-    public Iterable<Device> getDevices() {
-        Builder<Device> builder = ImmutableSet.builder();
-        synchronized (this) {
-            for (VersionedValue<Device> device : devices.values()) {
-                builder.add(device.entity());
-            }
-            return builder.build();
-        }
-    }
-
-    @Override
-    public Device getDevice(DeviceId deviceId) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-        return device.entity();
-    }
-
-    @Override
-    public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
-                                            DeviceDescription deviceDescription) {
-        Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-        VersionedValue<Device> device = devices.get(deviceId);
-
-        if (device == null) {
-            return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
-        }
-
-        checkState(newTimestamp.compareTo(device.timestamp()) > 0,
-                "Existing device has a timestamp in the future!");
-
-        return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
-    }
-
-    // Creates the device and returns the appropriate event if necessary.
-    private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
-                                     DeviceDescription desc, Timestamp timestamp) {
-        Device device = new DefaultDevice(providerId, deviceId, desc.type(),
-                                                 desc.manufacturer(),
-                                                 desc.hwVersion(), desc.swVersion(),
-                                                 desc.serialNumber());
-
-        devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
-        // TODO,FIXME: broadcast a message telling peers of a device event.
-        return new DeviceEvent(DEVICE_ADDED, device, null);
-    }
-
-    // Updates the device and returns the appropriate event if necessary.
-    private DeviceEvent updateDevice(ProviderId providerId, Device device,
-                                     DeviceDescription desc, Timestamp timestamp) {
-        // We allow only certain attributes to trigger update
-        if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
-                !Objects.equals(device.swVersion(), desc.swVersion())) {
-
-            Device updated = new DefaultDevice(providerId, device.id(),
-                                                      desc.type(),
-                                                      desc.manufacturer(),
-                                                      desc.hwVersion(),
-                                                      desc.swVersion(),
-                                                      desc.serialNumber());
-            devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
-            // FIXME: broadcast a message telling peers of a device event.
-            return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
-        }
-
-        // Otherwise merely attempt to change availability
-        Device updated = new DefaultDevice(providerId, device.id(),
-                desc.type(),
-                desc.manufacturer(),
-                desc.hwVersion(),
-                desc.swVersion(),
-                desc.serialNumber());
-
-        VersionedValue<Device> oldDevice = devices.put(device.id(),
-                new VersionedValue<Device>(updated, true, timestamp));
-        if (!oldDevice.isUp()) {
-            return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public DeviceEvent markOffline(DeviceId deviceId) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        boolean willRemove = device != null && device.isUp();
-        if (!willRemove) {
-            return null;
-        }
-        Timestamp timestamp = clockService.getTimestamp(deviceId);
-        if (replaceIfLatest(device.entity(), false, timestamp)) {
-            return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
-        }
-        return null;
-    }
-
-    // Replace existing value if its timestamp is older.
-    private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
-        VersionedValue<Device> existingValue = devices.get(device.id());
-        if (timestamp.compareTo(existingValue.timestamp()) > 0) {
-            devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
-                                         List<PortDescription> portDescriptions) {
-        List<DeviceEvent> events = new ArrayList<>();
-        synchronized (this) {
-            VersionedValue<Device> device = devices.get(deviceId);
-            checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-            Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
-            Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-
-            // Add new ports
-            Set<PortNumber> processed = new HashSet<>();
-            for (PortDescription portDescription : portDescriptions) {
-                VersionedValue<Port> port = ports.get(portDescription.portNumber());
-                if (port == null) {
-                    events.add(createPort(device, portDescription, ports, newTimestamp));
-                }
-                checkState(newTimestamp.compareTo(port.timestamp()) > 0,
-                        "Existing port state has a timestamp in the future!");
-                events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
-                processed.add(portDescription.portNumber());
-            }
-
-            updatePortMap(deviceId, ports);
-
-            events.addAll(pruneOldPorts(device.entity(), ports, processed));
-        }
-        return FluentIterable.from(events).filter(notNull()).toList();
-    }
-
-    // Creates a new port based on the port description adds it to the map and
-    // Returns corresponding event.
-    //@GuardedBy("this")
-    private DeviceEvent createPort(VersionedValue<Device> device, PortDescription portDescription,
-                                   Map<PortNumber, VersionedValue<Port>> ports, Timestamp timestamp) {
-        Port port = new DefaultPort(device.entity(), portDescription.portNumber(),
-                                           portDescription.isEnabled());
-        ports.put(port.number(), new VersionedValue<Port>(port, true, timestamp));
-        updatePortMap(device.entity().id(), ports);
-        return new DeviceEvent(PORT_ADDED, device.entity(), port);
-    }
-
-    // Checks if the specified port requires update and if so, it replaces the
-    // existing entry in the map and returns corresponding event.
-    //@GuardedBy("this")
-    private DeviceEvent updatePort(Device device, Port port,
-                                   PortDescription portDescription,
-                                   Map<PortNumber, VersionedValue<Port>> ports,
-                                   Timestamp timestamp) {
-        if (port.isEnabled() != portDescription.isEnabled()) {
-            VersionedValue<Port> updatedPort = new VersionedValue<Port>(
-                    new DefaultPort(device, portDescription.portNumber(),
-                                    portDescription.isEnabled()),
-                    portDescription.isEnabled(),
-                    timestamp);
-            ports.put(port.number(), updatedPort);
-            updatePortMap(device.id(), ports);
-            return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
-        }
-        return null;
-    }
-
-    // Prunes the specified list of ports based on which ports are in the
-    // processed list and returns list of corresponding events.
-    //@GuardedBy("this")
-    private List<DeviceEvent> pruneOldPorts(Device device,
-                                            Map<PortNumber, VersionedValue<Port>> ports,
-                                            Set<PortNumber> processed) {
-        List<DeviceEvent> events = new ArrayList<>();
-        Iterator<PortNumber> iterator = ports.keySet().iterator();
-        while (iterator.hasNext()) {
-            PortNumber portNumber = iterator.next();
-            if (!processed.contains(portNumber)) {
-                events.add(new DeviceEvent(PORT_REMOVED, device,
-                                           ports.get(portNumber).entity()));
-                iterator.remove();
-            }
-        }
-        if (!events.isEmpty()) {
-            updatePortMap(device.id(), ports);
-        }
-        return events;
-    }
-
-    // Gets the map of ports for the specified device; if one does not already
-    // exist, it creates and registers a new one.
-    // WARN: returned value is a copy, changes made to the Map
-    //       needs to be written back using updatePortMap
-    //@GuardedBy("this")
-    private Map<PortNumber, VersionedValue<Port>> getPortMap(DeviceId deviceId) {
-        Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
-        if (ports == null) {
-            ports = new HashMap<>();
-            // this probably is waste of time in most cases.
-            updatePortMap(deviceId, ports);
-        }
-        return ports;
-    }
-
-    //@GuardedBy("this")
-    private void updatePortMap(DeviceId deviceId, Map<PortNumber, VersionedValue<Port>> ports) {
-        devicePorts.put(deviceId, ports);
-    }
-
-    @Override
-    public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
-                                        PortDescription portDescription) {
-        VersionedValue<Device> device = devices.get(deviceId);
-        checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
-        Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
-        VersionedValue<Port> port = ports.get(portDescription.portNumber());
-        Timestamp timestamp = clockService.getTimestamp(deviceId);
-        return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
-    }
-
-    @Override
-    public List<Port> getPorts(DeviceId deviceId) {
-        Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
-        if (versionedPorts == null) {
-            return Collections.emptyList();
-        }
-        List<Port> ports = new ArrayList<>();
-        for (VersionedValue<Port> port : versionedPorts.values()) {
-            ports.add(port.entity());
-        }
-        return ports;
-    }
-
-    @Override
-    public Port getPort(DeviceId deviceId, PortNumber portNumber) {
-        Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
-        return ports == null ? null : ports.get(portNumber).entity();
-    }
-
-    @Override
-    public boolean isAvailable(DeviceId deviceId) {
-        return devices.get(deviceId).isUp();
-    }
-
-    @Override
-    public DeviceEvent removeDevice(DeviceId deviceId) {
-        VersionedValue<Device> previousDevice = devices.remove(deviceId);
-        return previousDevice == null ? null :
-            new DeviceEvent(DEVICE_REMOVED, previousDevice.entity(), null);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java
index aa644db..c1f5aad 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java
@@ -1,4 +1,4 @@
 /**
  * Implementation of device store using distributed distributed p2p synchronization protocol.
  */
-package org.onlab.onos.store.device.impl;
\ No newline at end of file
+package org.onlab.onos.store.device.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 5a5592a..d49e00b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,6 +1,5 @@
 package org.onlab.onos.store.flow.impl;
 
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -13,9 +12,10 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.flow.DefaultFlowRule;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
 import org.onlab.onos.net.flow.FlowRule;
-import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
 import org.onlab.onos.net.flow.FlowRuleStore;
@@ -30,18 +30,18 @@
 /**
  * Manages inventory of flow rules using trivial in-memory implementation.
  */
-//FIXME: I LIE I AM NOT DISTRIBUTED
+//FIXME I LIE. I AIN'T DISTRIBUTED
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
-extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
-implements FlowRuleStore {
+        extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+        implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
 
     // store entries as a pile of rules, no info about device tables
-    private final Multimap<DeviceId, FlowRule> flowEntries =
-            ArrayListMultimap.<DeviceId, FlowRule>create();
+    private final Multimap<DeviceId, FlowEntry> flowEntries =
+            ArrayListMultimap.<DeviceId, FlowEntry>create();
 
     private final Multimap<ApplicationId, FlowRule> flowEntriesById =
             ArrayListMultimap.<ApplicationId, FlowRule>create();
@@ -58,8 +58,13 @@
 
 
     @Override
-    public synchronized FlowRule getFlowRule(FlowRule rule) {
-        for (FlowRule f : flowEntries.get(rule.deviceId())) {
+    public int getFlowRuleCount() {
+        return flowEntries.size();
+    }
+
+    @Override
+    public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+        for (FlowEntry f : flowEntries.get(rule.deviceId())) {
             if (f.equals(rule)) {
                 return f;
             }
@@ -68,8 +73,8 @@
     }
 
     @Override
-    public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
-        Collection<FlowRule> rules = flowEntries.get(deviceId);
+    public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+        Collection<FlowEntry> rules = flowEntries.get(deviceId);
         if (rules == null) {
             return Collections.emptyList();
         }
@@ -77,7 +82,7 @@
     }
 
     @Override
-    public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
+    public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
         Collection<FlowRule> rules = flowEntriesById.get(appId);
         if (rules == null) {
             return Collections.emptyList();
@@ -87,7 +92,7 @@
 
     @Override
     public synchronized void storeFlowRule(FlowRule rule) {
-        FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
+        FlowEntry f = new DefaultFlowEntry(rule);
         DeviceId did = f.deviceId();
         if (!flowEntries.containsEntry(did, f)) {
             flowEntries.put(did, f);
@@ -97,57 +102,41 @@
 
     @Override
     public synchronized void deleteFlowRule(FlowRule rule) {
-        FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
-        DeviceId did = f.deviceId();
-
-        /*
-         *  find the rule and mark it for deletion.
-         *  Ultimately a flow removed will come remove it.
-         */
-
-        if (flowEntries.containsEntry(did, f)) {
-            //synchronized (flowEntries) {
-            flowEntries.remove(did, f);
-            flowEntries.put(did, f);
-            flowEntriesById.remove(rule.appId(), rule);
-            //}
+        FlowEntry entry = getFlowEntry(rule);
+        if (entry == null) {
+            return;
         }
+        entry.setState(FlowEntryState.PENDING_REMOVE);
     }
 
     @Override
-    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
+    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
         DeviceId did = rule.deviceId();
 
         // check if this new rule is an update to an existing entry
-        if (flowEntries.containsEntry(did, rule)) {
-            //synchronized (flowEntries) {
-            // Multimaps support duplicates so we have to remove our rule
-            // and replace it with the current version.
-            flowEntries.remove(did, rule);
-            flowEntries.put(did, rule);
-            //}
+        FlowEntry stored = getFlowEntry(rule);
+        if (stored != null) {
+            stored.setBytes(rule.bytes());
+            stored.setLife(rule.life());
+            stored.setPackets(rule.packets());
+            if (stored.state() == FlowEntryState.PENDING_ADD) {
+                stored.setState(FlowEntryState.ADDED);
+                return new FlowRuleEvent(Type.RULE_ADDED, rule);
+            }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
 
         flowEntries.put(did, rule);
-        return new FlowRuleEvent(RULE_ADDED, rule);
+        return null;
     }
 
     @Override
-    public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
-        //synchronized (this) {
+    public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+        // This is where one could mark a rule as removed and still keep it in the store.
         if (flowEntries.remove(rule.deviceId(), rule)) {
             return new FlowRuleEvent(RULE_REMOVED, rule);
         } else {
             return null;
         }
-        //}
     }
-
-
-
-
-
-
-
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
index 5df25b4..a59b151 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
@@ -42,6 +42,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+//TODO: Add support for multiple provider and annotations
 /**
  * Manages inventory of infrastructure links using a protocol that takes into consideration
  * the order in which events occur.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
index 127dc84..c675f84 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
@@ -1,4 +1,4 @@
 /**
  * Implementation of link store using distributed p2p synchronization protocol.
  */
-package org.onlab.onos.store.link.impl;
\ No newline at end of file
+package org.onlab.onos.store.link.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
new file mode 100644
index 0000000..c0cefd6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
+
+    /**
+     * Creates a serializer for {@link ClusterMessage}.
+     */
+    public ClusterMessageSerializer() {
+        // does not accept null
+        super(false);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ClusterMessage message) {
+        kryo.writeClassAndObject(output, message.sender());
+        kryo.writeClassAndObject(output, message.subject());
+        output.writeInt(message.payload().length);
+        output.writeBytes(message.payload());
+    }
+
+    @Override
+    public ClusterMessage read(Kryo kryo, Input input,
+                               Class<ClusterMessage> type) {
+        NodeId sender = (NodeId) kryo.readClassAndObject(input);
+        MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
+        int payloadSize = input.readInt();
+        byte[] payload = input.readBytes(payloadSize);
+        return new ClusterMessage(sender, subject, payload);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
new file mode 100644
index 0000000..516915e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+// To be used if Timestamp ever needs to cross bundle boundary.
+/**
+ * Kryo Serializer for {@link MastershipBasedTimestamp}.
+ */
+public class MastershipBasedTimestampSerializer extends Serializer<MastershipBasedTimestamp> {
+
+    /**
+     * Creates a serializer for {@link MastershipBasedTimestamp}.
+     */
+    public MastershipBasedTimestampSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, MastershipBasedTimestamp object) {
+        output.writeInt(object.termNumber());
+        output.writeInt(object.sequenceNumber());
+    }
+
+    @Override
+    public MastershipBasedTimestamp read(Kryo kryo, Input input, Class<MastershipBasedTimestamp> type) {
+        final int term = input.readInt();
+        final int sequence = input.readInt();
+        return new MastershipBasedTimestamp(term, sequence);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
new file mode 100644
index 0000000..bb6b292
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
+
+    /**
+     * Creates a serializer for {@link MessageSubject}.
+     */
+    public MessageSubjectSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+
+    @Override
+    public void write(Kryo kryo, Output output, MessageSubject object) {
+        output.writeString(object.value());
+    }
+
+    @Override
+    public MessageSubject read(Kryo kryo, Input input,
+            Class<MessageSubject> type) {
+        return new MessageSubject(input.readString());
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/OnosTimestampSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/OnosTimestampSerializer.java
deleted file mode 100644
index 192e035..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/OnosTimestampSerializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import org.onlab.onos.store.impl.OnosTimestamp;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Kryo Serializer for {@link OnosTimestamp}.
- */
-public class OnosTimestampSerializer extends Serializer<OnosTimestamp> {
-
-    /**
-     * Default constructor.
-     */
-    public OnosTimestampSerializer() {
-        // non-null, immutable
-        super(false, true);
-    }
-
-    @Override
-    public void write(Kryo kryo, Output output, OnosTimestamp object) {
-        output.writeInt(object.termNumber());
-        output.writeInt(object.sequenceNumber());
-    }
-
-    @Override
-    public OnosTimestamp read(Kryo kryo, Input input, Class<OnosTimestamp> type) {
-        final int term = input.readInt();
-        final int sequence = input.readInt();
-        return new OnosTimestamp(term, sequence);
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
index 567861e..9ae9d7b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
@@ -125,7 +125,8 @@
         // Promote the new topology to current and return a ready-to-send event.
         synchronized (this) {
             current = newTopology;
-            return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
+            return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
+                                     current, reasons);
         }
     }
 
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
index bba12f2..e63fcaa 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -7,6 +7,7 @@
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.MessageSerializer;
 import org.onlab.netty.NettyMessagingService;
 import org.onlab.packet.IpPrefix;
 
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java
new file mode 100644
index 0000000..ea63ef8
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java
@@ -0,0 +1,95 @@
+package org.onlab.onos.store.common.impl;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.util.KryoPool;
+
+import com.google.common.testing.EqualsTester;
+
+/**
+ * Test of {@link MastershipBasedTimestamp}.
+ */
+public class MastershipBasedTimestampTest {
+
+    private static final Timestamp TS_1_1 = new MastershipBasedTimestamp(1, 1);
+    private static final Timestamp TS_1_2 = new MastershipBasedTimestamp(1, 2);
+    private static final Timestamp TS_2_1 = new MastershipBasedTimestamp(2, 1);
+    private static final Timestamp TS_2_2 = new MastershipBasedTimestamp(2, 2);
+
+    @Test
+    public final void testBasic() {
+        final int termNumber = 5;
+        final int sequenceNumber = 6;
+        MastershipBasedTimestamp ts = new MastershipBasedTimestamp(termNumber,
+                                                sequenceNumber);
+
+        assertEquals(termNumber, ts.termNumber());
+        assertEquals(sequenceNumber, ts.sequenceNumber());
+    }
+
+    @Test
+    public final void testCompareTo() {
+        assertTrue(TS_1_1.compareTo(TS_1_1) == 0);
+        assertTrue(TS_1_1.compareTo(new MastershipBasedTimestamp(1, 1)) == 0);
+
+        assertTrue(TS_1_1.compareTo(TS_1_2) < 0);
+        assertTrue(TS_1_2.compareTo(TS_1_1) > 0);
+
+        assertTrue(TS_1_2.compareTo(TS_2_1) < 0);
+        assertTrue(TS_1_2.compareTo(TS_2_2) < 0);
+        assertTrue(TS_2_1.compareTo(TS_1_1) > 0);
+        assertTrue(TS_2_2.compareTo(TS_1_1) > 0);
+    }
+
+    @Test
+    public final void testEqualsObject() {
+        new EqualsTester()
+        .addEqualityGroup(new MastershipBasedTimestamp(1, 1),
+                          new MastershipBasedTimestamp(1, 1), TS_1_1)
+        .addEqualityGroup(new MastershipBasedTimestamp(1, 2),
+                          new MastershipBasedTimestamp(1, 2), TS_1_2)
+        .addEqualityGroup(new MastershipBasedTimestamp(2, 1),
+                          new MastershipBasedTimestamp(2, 1), TS_2_1)
+        .addEqualityGroup(new MastershipBasedTimestamp(2, 2),
+                          new MastershipBasedTimestamp(2, 2), TS_2_2)
+        .testEquals();
+    }
+
+    @Test
+    public final void testKryoSerializable() {
+        final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+        final KryoPool kryos = KryoPool.newBuilder()
+                .register(MastershipBasedTimestamp.class)
+                .build();
+
+        kryos.serialize(TS_2_1, buffer);
+        buffer.flip();
+        Timestamp copy = kryos.deserialize(buffer);
+
+        new EqualsTester()
+            .addEqualityGroup(TS_2_1, copy)
+            .testEquals();
+    }
+
+    @Test
+    public final void testKryoSerializableWithHandcraftedSerializer() {
+        final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+        final KryoPool kryos = KryoPool.newBuilder()
+                .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+                .build();
+
+        kryos.serialize(TS_1_2, buffer);
+        buffer.flip();
+        Timestamp copy = kryos.deserialize(buffer);
+
+        new EqualsTester()
+            .addEqualityGroup(TS_1_2, copy)
+            .testEquals();
+    }
+
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
new file mode 100644
index 0000000..4b44d40
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
@@ -0,0 +1,94 @@
+package org.onlab.onos.store.common.impl;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.util.KryoPool;
+
+import com.google.common.testing.EqualsTester;
+
+/**
+ * Test of {@link Timestamped}.
+ */
+public class TimestampedTest {
+
+    private static final Timestamp TS_1_1 = new MastershipBasedTimestamp(1, 1);
+    private static final Timestamp TS_1_2 = new MastershipBasedTimestamp(1, 2);
+    private static final Timestamp TS_2_1 = new MastershipBasedTimestamp(2, 1);
+
+    @Test
+    public final void testHashCode() {
+        Timestamped<String> a = new Timestamped<>("a", TS_1_1);
+        Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+        assertTrue("value does not impact hashCode",
+                a.hashCode() == b.hashCode());
+    }
+
+    @Test
+    public final void testEquals() {
+        Timestamped<String> a = new Timestamped<>("a", TS_1_1);
+        Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+        assertTrue("value does not impact equality",
+                a.equals(b));
+
+        new EqualsTester()
+        .addEqualityGroup(new Timestamped<>("a", TS_1_1),
+                          new Timestamped<>("b", TS_1_1),
+                          new Timestamped<>("c", TS_1_1))
+        .addEqualityGroup(new Timestamped<>("a", TS_1_2),
+                          new Timestamped<>("b", TS_1_2),
+                          new Timestamped<>("c", TS_1_2))
+        .addEqualityGroup(new Timestamped<>("a", TS_2_1),
+                          new Timestamped<>("b", TS_2_1),
+                          new Timestamped<>("c", TS_2_1))
+        .testEquals();
+
+    }
+
+    @Test
+    public final void testValue() {
+       final Integer n = Integer.valueOf(42);
+       Timestamped<Integer> tsv = new Timestamped<>(n, TS_1_1);
+       assertSame(n, tsv.value());
+
+    }
+
+    @Test(expected = NullPointerException.class)
+    public final void testValueNonNull() {
+        new Timestamped<>(null, TS_1_1);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public final void testTimestampNonNull() {
+        new Timestamped<>("Foo", null);
+    }
+
+    @Test
+    public final void testIsNewer() {
+        Timestamped<String> a = new Timestamped<>("a", TS_1_2);
+        Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+        assertTrue(a.isNewer(b));
+        assertFalse(b.isNewer(a));
+    }
+
+    @Test
+    public final void testKryoSerializable() {
+        final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+        final KryoPool kryos = KryoPool.newBuilder()
+                .register(Timestamped.class,
+                        MastershipBasedTimestamp.class)
+                .build();
+
+        Timestamped<String> original = new Timestamped<>("foobar", TS_1_1);
+        kryos.serialize(original, buffer);
+        buffer.flip();
+        Timestamped<String> copy = kryos.deserialize(buffer);
+
+        new EqualsTester()
+            .addEqualityGroup(original, copy)
+            .testEquals();
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
new file mode 100644
index 0000000..fa42a6b
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -0,0 +1,619 @@
+package org.onlab.onos.store.device.impl;
+
+import static org.junit.Assert.*;
+import static org.onlab.onos.net.Device.Type.SWITCH;
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+
+// TODO add tests for remote replication
+/**
+ * Test of the gossip based distributed DeviceStore implementation.
+ */
+public class GossipDeviceStoreTest {
+
+    private static final ProviderId PID = new ProviderId("of", "foo");
+    private static final ProviderId PIDA = new ProviderId("of", "bar", true);
+    private static final DeviceId DID1 = deviceId("of:foo");
+    private static final DeviceId DID2 = deviceId("of:bar");
+    private static final String MFR = "whitebox";
+    private static final String HW = "1.1.x";
+    private static final String SW1 = "3.8.1";
+    private static final String SW2 = "3.9.5";
+    private static final String SN = "43311-12345";
+
+    private static final PortNumber P1 = PortNumber.portNumber(1);
+    private static final PortNumber P2 = PortNumber.portNumber(2);
+    private static final PortNumber P3 = PortNumber.portNumber(3);
+
+    private static final SparseAnnotations A1 = DefaultAnnotations.builder()
+            .set("A1", "a1")
+            .set("B1", "b1")
+            .build();
+    private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
+            .remove("A1")
+            .set("B3", "b3")
+            .build();
+    private static final SparseAnnotations A2 = DefaultAnnotations.builder()
+            .set("A2", "a2")
+            .set("B2", "b2")
+            .build();
+    private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
+            .remove("A2")
+            .set("B4", "b4")
+            .build();
+
+    private static final NodeId MYSELF = new NodeId("myself");
+
+    private GossipDeviceStore gossipDeviceStore;
+    private DeviceStore deviceStore;
+
+    private DeviceClockManager deviceClockManager;
+    private ClockService clockService;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+
+    @Before
+    public void setUp() throws Exception {
+        deviceClockManager = new DeviceClockManager();
+        deviceClockManager.activate();
+        clockService = deviceClockManager;
+
+        deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
+        deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
+
+        ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+        ClusterService clusterService = new TestClusterService();
+
+        gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
+        gossipDeviceStore.activate();
+        deviceStore = gossipDeviceStore;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        gossipDeviceStore.deactivate();
+        deviceClockManager.deactivate();
+    }
+
+    private void putDevice(DeviceId deviceId, String swVersion,
+                           SparseAnnotations... annotations) {
+        DeviceDescription description =
+                new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
+                        HW, swVersion, SN, annotations);
+        deviceStore.createOrUpdateDevice(PID, deviceId, description);
+    }
+
+    private void putDeviceAncillary(DeviceId deviceId, String swVersion,
+                                    SparseAnnotations... annotations) {
+        DeviceDescription description =
+                new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
+                        HW, swVersion, SN, annotations);
+        deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
+    }
+
+    private static void assertDevice(DeviceId id, String swVersion, Device device) {
+        assertNotNull(device);
+        assertEquals(id, device.id());
+        assertEquals(MFR, device.manufacturer());
+        assertEquals(HW, device.hwVersion());
+        assertEquals(swVersion, device.swVersion());
+        assertEquals(SN, device.serialNumber());
+    }
+
+    /**
+     * Verifies that Annotations created by merging {@code annotations} is
+     * equal to actual Annotations.
+     *
+     * @param actual Annotations to check
+     * @param annotations
+     */
+    private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
+        DefaultAnnotations expected = DefaultAnnotations.builder().build();
+        for (SparseAnnotations a : annotations) {
+            expected = DefaultAnnotations.merge(expected, a);
+        }
+        assertEquals(expected.keys(), actual.keys());
+        for (String key : expected.keys()) {
+            assertEquals(expected.value(key), actual.value(key));
+        }
+    }
+
+    @Test
+    public final void testGetDeviceCount() {
+        assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
+
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW2);
+        putDevice(DID1, SW1);
+
+        assertEquals("expect 2 uniq devices", 2, deviceStore.getDeviceCount());
+    }
+
+    @Test
+    public final void testGetDevices() {
+        assertEquals("initialy empty", 0, Iterables.size(deviceStore.getDevices()));
+
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW2);
+        putDevice(DID1, SW1);
+
+        assertEquals("expect 2 uniq devices",
+                2, Iterables.size(deviceStore.getDevices()));
+
+        Map<DeviceId, Device> devices = new HashMap<>();
+        for (Device device : deviceStore.getDevices()) {
+            devices.put(device.id(), device);
+        }
+
+        assertDevice(DID1, SW1, devices.get(DID1));
+        assertDevice(DID2, SW2, devices.get(DID2));
+
+        // add case for new node?
+    }
+
+    @Test
+    public final void testGetDevice() {
+
+        putDevice(DID1, SW1);
+
+        assertDevice(DID1, SW1, deviceStore.getDevice(DID1));
+        assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
+    }
+
+    @Test
+    public final void testCreateOrUpdateDevice() {
+        DeviceDescription description =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN);
+        DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
+        assertEquals(DEVICE_ADDED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+
+        DeviceDescription description2 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW2, SN);
+        DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
+        assertEquals(DEVICE_UPDATED, event2.type());
+        assertDevice(DID1, SW2, event2.subject());
+
+        assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
+    }
+
+    @Test
+    public final void testCreateOrUpdateDeviceAncillary() {
+        DeviceDescription description =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN, A2);
+        DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
+        assertEquals(DEVICE_ADDED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertEquals(PIDA, event.subject().providerId());
+        assertAnnotationsEquals(event.subject().annotations(), A2);
+        assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
+
+        DeviceDescription description2 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW2, SN, A1);
+        DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
+        assertEquals(DEVICE_UPDATED, event2.type());
+        assertDevice(DID1, SW2, event2.subject());
+        assertEquals(PID, event2.subject().providerId());
+        assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
+        assertTrue(deviceStore.isAvailable(DID1));
+
+        assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
+
+        // For now, Ancillary is ignored once primary appears
+        assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
+
+        // But, Ancillary annotations will be in effect
+        DeviceDescription description3 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN, A2_2);
+        DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
+        assertEquals(DEVICE_UPDATED, event3.type());
+        // basic information will be the one from Primary
+        assertDevice(DID1, SW2, event3.subject());
+        assertEquals(PID, event3.subject().providerId());
+        // but annotation from Ancillary will be merged
+        assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
+        assertTrue(deviceStore.isAvailable(DID1));
+    }
+
+
+    @Test
+    public final void testMarkOffline() {
+
+        putDevice(DID1, SW1);
+        assertTrue(deviceStore.isAvailable(DID1));
+
+        DeviceEvent event = deviceStore.markOffline(DID1);
+        assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertFalse(deviceStore.isAvailable(DID1));
+
+        DeviceEvent event2 = deviceStore.markOffline(DID1);
+        assertNull("No change, no event", event2);
+}
+
+    @Test
+    public final void testUpdatePorts() {
+        putDevice(DID1, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true),
+                new DefaultPortDescription(P2, true)
+                );
+
+        List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
+
+        Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
+        for (DeviceEvent event : events) {
+            assertEquals(PORT_ADDED, event.type());
+            assertDevice(DID1, SW1, event.subject());
+            assertTrue("PortNumber is one of expected",
+                    expectedPorts.remove(event.port().number()));
+            assertTrue("Port is enabled", event.port().isEnabled());
+        }
+        assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
+
+
+        List<PortDescription> pds2 = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, false),
+                new DefaultPortDescription(P2, true),
+                new DefaultPortDescription(P3, true)
+                );
+
+        events = deviceStore.updatePorts(PID, DID1, pds2);
+        assertFalse("event should be triggered", events.isEmpty());
+        for (DeviceEvent event : events) {
+            PortNumber num = event.port().number();
+            if (P1.equals(num)) {
+                assertEquals(PORT_UPDATED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                assertFalse("Port is disabled", event.port().isEnabled());
+            } else if (P2.equals(num)) {
+                fail("P2 event not expected.");
+            } else if (P3.equals(num)) {
+                assertEquals(PORT_ADDED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                assertTrue("Port is enabled", event.port().isEnabled());
+            } else {
+                fail("Unknown port number encountered: " + num);
+            }
+        }
+
+        List<PortDescription> pds3 = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, false),
+                new DefaultPortDescription(P2, true)
+                );
+        events = deviceStore.updatePorts(PID, DID1, pds3);
+        assertFalse("event should be triggered", events.isEmpty());
+        for (DeviceEvent event : events) {
+            PortNumber num = event.port().number();
+            if (P1.equals(num)) {
+                fail("P1 event not expected.");
+            } else if (P2.equals(num)) {
+                fail("P2 event not expected.");
+            } else if (P3.equals(num)) {
+                assertEquals(PORT_REMOVED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                assertTrue("Port was enabled", event.port().isEnabled());
+            } else {
+                fail("Unknown port number encountered: " + num);
+            }
+        }
+
+    }
+
+    @Test
+    public final void testUpdatePortStatus() {
+        putDevice(DID1, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
+                new DefaultPortDescription(P1, false));
+        assertEquals(PORT_UPDATED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertEquals(P1, event.port().number());
+        assertFalse("Port is disabled", event.port().isEnabled());
+
+    }
+    @Test
+    public final void testUpdatePortStatusAncillary() {
+        putDeviceAncillary(DID1, SW1);
+        putDevice(DID1, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true, A1)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
+                new DefaultPortDescription(P1, false, A1_2));
+        assertEquals(PORT_UPDATED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+        assertEquals(P1, event.port().number());
+        assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
+        assertFalse("Port is disabled", event.port().isEnabled());
+
+        DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
+                new DefaultPortDescription(P1, true));
+        assertNull("Ancillary is ignored if primary exists", event2);
+
+        // but, Ancillary annotation update will be notified
+        DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
+                new DefaultPortDescription(P1, true, A2));
+        assertEquals(PORT_UPDATED, event3.type());
+        assertDevice(DID1, SW1, event3.subject());
+        assertEquals(P1, event3.port().number());
+        assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
+        assertFalse("Port is disabled", event3.port().isEnabled());
+
+        // port only reported from Ancillary will be notified as down
+        DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
+                new DefaultPortDescription(P2, true));
+        assertEquals(PORT_ADDED, event4.type());
+        assertDevice(DID1, SW1, event4.subject());
+        assertEquals(P2, event4.port().number());
+        assertAnnotationsEquals(event4.port().annotations());
+        assertFalse("Port is disabled if not given from primary provider",
+                        event4.port().isEnabled());
+    }
+
+    @Test
+    public final void testGetPorts() {
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true),
+                new DefaultPortDescription(P2, true)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
+        List<Port> ports = deviceStore.getPorts(DID1);
+        for (Port port : ports) {
+            assertTrue("Port is enabled", port.isEnabled());
+            assertTrue("PortNumber is one of expected",
+                    expectedPorts.remove(port.number()));
+        }
+        assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
+
+
+        assertTrue("DID2 has no ports", deviceStore.getPorts(DID2).isEmpty());
+    }
+
+    @Test
+    public final void testGetPort() {
+        putDevice(DID1, SW1);
+        putDevice(DID2, SW1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true),
+                new DefaultPortDescription(P2, false)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+
+        Port port1 = deviceStore.getPort(DID1, P1);
+        assertEquals(P1, port1.number());
+        assertTrue("Port is enabled", port1.isEnabled());
+
+        Port port2 = deviceStore.getPort(DID1, P2);
+        assertEquals(P2, port2.number());
+        assertFalse("Port is disabled", port2.isEnabled());
+
+        Port port3 = deviceStore.getPort(DID1, P3);
+        assertNull("P3 not expected", port3);
+    }
+
+    @Test
+    public final void testRemoveDevice() {
+        putDevice(DID1, SW1, A1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true, A2)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
+        putDevice(DID2, SW1);
+
+        assertEquals(2, deviceStore.getDeviceCount());
+        assertEquals(1, deviceStore.getPorts(DID1).size());
+        assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
+        assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
+
+        DeviceEvent event = deviceStore.removeDevice(DID1);
+        assertEquals(DEVICE_REMOVED, event.type());
+        assertDevice(DID1, SW1, event.subject());
+
+        assertEquals(1, deviceStore.getDeviceCount());
+        assertEquals(0, deviceStore.getPorts(DID1).size());
+
+        // putBack Device, Port w/o annotation
+        putDevice(DID1, SW1);
+        List<PortDescription> pds2 = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true)
+                );
+        deviceStore.updatePorts(PID, DID1, pds2);
+
+        // annotations should not survive
+        assertEquals(2, deviceStore.getDeviceCount());
+        assertEquals(1, deviceStore.getPorts(DID1).size());
+        assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations());
+        assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations());
+    }
+
+    // If Delegates should be called only on remote events,
+    // then Simple* should never call them, thus not test required.
+    // TODO add test for Port events when we have them
+    @Ignore("Ignore until Delegate spec. is clear.")
+    @Test
+    public final void testEvents() throws InterruptedException {
+        final CountDownLatch addLatch = new CountDownLatch(1);
+        DeviceStoreDelegate checkAdd = new DeviceStoreDelegate() {
+            @Override
+            public void notify(DeviceEvent event) {
+                assertEquals(DEVICE_ADDED, event.type());
+                assertDevice(DID1, SW1, event.subject());
+                addLatch.countDown();
+            }
+        };
+        final CountDownLatch updateLatch = new CountDownLatch(1);
+        DeviceStoreDelegate checkUpdate = new DeviceStoreDelegate() {
+            @Override
+            public void notify(DeviceEvent event) {
+                assertEquals(DEVICE_UPDATED, event.type());
+                assertDevice(DID1, SW2, event.subject());
+                updateLatch.countDown();
+            }
+        };
+        final CountDownLatch removeLatch = new CountDownLatch(1);
+        DeviceStoreDelegate checkRemove = new DeviceStoreDelegate() {
+            @Override
+            public void notify(DeviceEvent event) {
+                assertEquals(DEVICE_REMOVED, event.type());
+                assertDevice(DID1, SW2, event.subject());
+                removeLatch.countDown();
+            }
+        };
+
+        DeviceDescription description =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW1, SN);
+        deviceStore.setDelegate(checkAdd);
+        deviceStore.createOrUpdateDevice(PID, DID1, description);
+        assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
+
+
+        DeviceDescription description2 =
+                new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+                        HW, SW2, SN);
+        deviceStore.unsetDelegate(checkAdd);
+        deviceStore.setDelegate(checkUpdate);
+        deviceStore.createOrUpdateDevice(PID, DID1, description2);
+        assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS));
+
+        deviceStore.unsetDelegate(checkUpdate);
+        deviceStore.setDelegate(checkRemove);
+        deviceStore.removeDevice(DID1);
+        assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS));
+    }
+
+    private static final class TestGossipDeviceStore extends GossipDeviceStore {
+
+        public TestGossipDeviceStore(
+                ClockService clockService,
+                ClusterService clusterService,
+                ClusterCommunicationService clusterCommunicator) {
+            this.clockService = clockService;
+            this.clusterService = clusterService;
+            this.clusterCommunicator = clusterCommunicator;
+        }
+    }
+
+    private static final class TestClusterCommunicationService implements ClusterCommunicationService {
+        @Override
+        public boolean broadcast(ClusterMessage message) throws IOException { return true; }
+        @Override
+        public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
+        @Override
+        public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
+        @Override
+        public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+    }
+
+    private static final class TestClusterService implements ClusterService {
+
+        private static final ControllerNode ONOS1 =
+            new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
+        private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
+        private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
+
+        public TestClusterService() {
+            nodes.put(new NodeId("N1"), ONOS1);
+            nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
+        }
+
+        @Override
+        public ControllerNode getLocalNode() {
+            return ONOS1;
+        }
+
+        @Override
+        public Set<ControllerNode> getNodes() {
+            return Sets.newHashSet(nodes.values());
+        }
+
+        @Override
+        public ControllerNode getNode(NodeId nodeId) {
+            return nodes.get(nodeId);
+        }
+
+        @Override
+        public State getState(NodeId nodeId) {
+            return nodeStates.get(nodeId);
+        }
+
+        @Override
+        public void addListener(ClusterEventListener listener) {
+        }
+
+        @Override
+        public void removeListener(ClusterEventListener listener) {
+        }
+    }
+}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index f83ac59..61a7374 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -57,7 +57,7 @@
 
         rawNodes = theInstance.getMap("nodes");
         OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
-                = new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
+                = new OptionalCacheLoader<>(serializer, rawNodes);
         nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
         rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
 
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
index 9178e90..bf1bb38 100644
--- a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
@@ -30,8 +30,7 @@
 import org.onlab.onos.store.common.StoreManager;
 import org.onlab.onos.store.common.StoreService;
 import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.KryoSerializer;
 import org.onlab.packet.IpPrefix;
 
 import com.google.common.collect.Sets;
@@ -57,7 +56,7 @@
 
     private DistributedMastershipStore dms;
     private TestDistributedMastershipStore testStore;
-    private KryoSerializationManager serializationMgr;
+    private KryoSerializer serializationMgr;
     private StoreManager storeMgr;
 
     @BeforeClass
@@ -76,8 +75,7 @@
         storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
         storeMgr.activate();
 
-        serializationMgr = new KryoSerializationManager();
-        serializationMgr.activate();
+        serializationMgr = new KryoSerializer();
 
         dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
         dms.clusterService = new TestClusterService();
@@ -90,8 +88,6 @@
     public void tearDown() throws Exception {
         dms.deactivate();
 
-        serializationMgr.deactivate();
-
         storeMgr.deactivate();
     }
 
@@ -234,9 +230,9 @@
     private class TestDistributedMastershipStore extends
             DistributedMastershipStore {
         public TestDistributedMastershipStore(StoreService storeService,
-                KryoSerializationService kryoSerializationService) {
+                KryoSerializer kryoSerialization) {
             this.storeService = storeService;
-            this.kryoSerializationService = kryoSerializationService;
+            this.serializer = kryoSerialization;
         }
 
         //helper to populate master/backup structures
@@ -260,6 +256,7 @@
             }
         }
 
+        //a dumb utility function.
         public void dump() {
             System.out.println("standbys");
             for (Map.Entry<byte [], byte []> e : standbys.entrySet()) {
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
index 0302105..a22dd89 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
@@ -15,7 +15,8 @@
 import org.onlab.onos.event.Event;
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.StoreDelegate;
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
 import org.slf4j.Logger;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -24,7 +25,7 @@
 /**
  * Abstraction of a distributed store based on Hazelcast.
  */
-@Component(componentAbstract = true)
+@Component
 public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
         extends AbstractStore<E, D> {
 
@@ -33,13 +34,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StoreService storeService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected KryoSerializationService kryoSerializationService;
+    protected StoreSerializer serializer;
 
     protected HazelcastInstance theInstance;
 
     @Activate
     public void activate() {
+        serializer = new KryoSerializer();
         theInstance = storeService.getHazelcastInstance();
     }
 
@@ -50,7 +51,7 @@
      * @return serialized object
      */
     protected byte[] serialize(Object obj) {
-        return kryoSerializationService.serialize(obj);
+        return serializer.encode(obj);
     }
 
     /**
@@ -61,7 +62,7 @@
      * @return deserialized object
      */
     protected <T> T deserialize(byte[] bytes) {
-        return kryoSerializationService.deserialize(bytes);
+        return serializer.decode(bytes);
     }
 
 
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
index f96fdd8..d5fc380 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
@@ -2,7 +2,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.StoreSerializer;
 
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheLoader;
@@ -18,28 +18,28 @@
 public final class OptionalCacheLoader<K, V> extends
         CacheLoader<K, Optional<V>> {
 
-    private final KryoSerializationService kryoSerializationService;
+    private final StoreSerializer serializer;
     private IMap<byte[], byte[]> rawMap;
 
     /**
      * Constructor.
      *
-     * @param kryoSerializationService to use for serialization
+     * @param serializer to use for serialization
      * @param rawMap underlying IMap
      */
-    public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
-        this.kryoSerializationService = checkNotNull(kryoSerializationService);
+    public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
+        this.serializer = checkNotNull(serializer);
         this.rawMap = checkNotNull(rawMap);
     }
 
     @Override
     public Optional<V> load(K key) throws Exception {
-        byte[] keyBytes = kryoSerializationService.serialize(key);
+        byte[] keyBytes = serializer.encode(key);
         byte[] valBytes = rawMap.get(keyBytes);
         if (valBytes == null) {
             return Optional.absent();
         }
-        V dev = kryoSerializationService.deserialize(valBytes);
+        V dev = serializer.decode(valBytes);
         return Optional.of(dev);
     }
 }
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
index 5feb1ba..0016939 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
@@ -47,6 +47,7 @@
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
+//TODO: Add support for multiple provider and annotations
 /**
  * Manages inventory of infrastructure devices using Hazelcast-backed map.
  */
@@ -87,7 +88,7 @@
         // TODO decide on Map name scheme to avoid collision
         rawDevices = theInstance.getMap("devices");
         final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
-                = new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
+                = new OptionalCacheLoader<>(serializer, rawDevices);
         devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
         // refresh/populate cache based on notification from other instance
         devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
@@ -97,7 +98,7 @@
 
         rawDevicePorts = theInstance.getMap("devicePorts");
         final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
-                = new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
+                = new OptionalCacheLoader<>(serializer, rawDevicePorts);
         devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
         // refresh/populate cache based on notification from other instance
         portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockProviderService.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockProviderService.java
new file mode 100644
index 0000000..b68620a
--- /dev/null
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockProviderService.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.store.device.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.ClockProviderService;
+
+// FIXME: Code clone in onos-core-trivial, onos-core-hz-net
+/**
+ * Dummy implementation of {@link ClockProviderService}.
+ */
+@Component(immediate = true)
+@Service
+public class NoOpClockProviderService implements ClockProviderService {
+
+    @Override
+    public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
+    }
+}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockService.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockService.java
deleted file mode 100644
index 2c443e9..0000000
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.MastershipTerm;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-
-// FIXME: Code clone in onos-core-trivial, onos-core-hz-net
-/**
- * Dummy implementation of {@link ClockService}.
- */
-@Component(immediate = true)
-@Service
-public class NoOpClockService implements ClockService {
-
-    @Override
-    public Timestamp getTimestamp(DeviceId deviceId) {
-        return new Timestamp() {
-
-            @Override
-            public int compareTo(Timestamp o) {
-                throw new IllegalStateException("Never expected to be used.");
-            }
-        };
-    }
-
-    @Override
-    public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
-    }
-}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 6ec7c51..d49e00b 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,6 +1,5 @@
 package org.onlab.onos.store.flow.impl;
 
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -13,9 +12,10 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.flow.DefaultFlowRule;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
 import org.onlab.onos.net.flow.FlowRule;
-import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
 import org.onlab.onos.net.flow.FlowRuleStore;
@@ -28,20 +28,20 @@
 import com.google.common.collect.Multimap;
 
 /**
- * TEMPORARY: Manages inventory of flow rules using distributed store implementation.
+ * Manages inventory of flow rules using trivial in-memory implementation.
  */
-//FIXME: I LIE I AM NOT DISTRIBUTED
+//FIXME I LIE. I AIN'T DISTRIBUTED
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
-extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
-implements FlowRuleStore {
+        extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+        implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
 
     // store entries as a pile of rules, no info about device tables
-    private final Multimap<DeviceId, FlowRule> flowEntries =
-            ArrayListMultimap.<DeviceId, FlowRule>create();
+    private final Multimap<DeviceId, FlowEntry> flowEntries =
+            ArrayListMultimap.<DeviceId, FlowEntry>create();
 
     private final Multimap<ApplicationId, FlowRule> flowEntriesById =
             ArrayListMultimap.<ApplicationId, FlowRule>create();
@@ -58,8 +58,13 @@
 
 
     @Override
-    public synchronized FlowRule getFlowRule(FlowRule rule) {
-        for (FlowRule f : flowEntries.get(rule.deviceId())) {
+    public int getFlowRuleCount() {
+        return flowEntries.size();
+    }
+
+    @Override
+    public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+        for (FlowEntry f : flowEntries.get(rule.deviceId())) {
             if (f.equals(rule)) {
                 return f;
             }
@@ -68,8 +73,8 @@
     }
 
     @Override
-    public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
-        Collection<FlowRule> rules = flowEntries.get(deviceId);
+    public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+        Collection<FlowEntry> rules = flowEntries.get(deviceId);
         if (rules == null) {
             return Collections.emptyList();
         }
@@ -77,7 +82,7 @@
     }
 
     @Override
-    public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
+    public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
         Collection<FlowRule> rules = flowEntriesById.get(appId);
         if (rules == null) {
             return Collections.emptyList();
@@ -87,7 +92,7 @@
 
     @Override
     public synchronized void storeFlowRule(FlowRule rule) {
-        FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
+        FlowEntry f = new DefaultFlowEntry(rule);
         DeviceId did = f.deviceId();
         if (!flowEntries.containsEntry(did, f)) {
             flowEntries.put(did, f);
@@ -97,57 +102,41 @@
 
     @Override
     public synchronized void deleteFlowRule(FlowRule rule) {
-        FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
-        DeviceId did = f.deviceId();
-
-        /*
-         *  find the rule and mark it for deletion.
-         *  Ultimately a flow removed will come remove it.
-         */
-
-        if (flowEntries.containsEntry(did, f)) {
-            //synchronized (flowEntries) {
-            flowEntries.remove(did, f);
-            flowEntries.put(did, f);
-            flowEntriesById.remove(rule.appId(), rule);
-            //}
+        FlowEntry entry = getFlowEntry(rule);
+        if (entry == null) {
+            return;
         }
+        entry.setState(FlowEntryState.PENDING_REMOVE);
     }
 
     @Override
-    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
+    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
         DeviceId did = rule.deviceId();
 
         // check if this new rule is an update to an existing entry
-        if (flowEntries.containsEntry(did, rule)) {
-            //synchronized (flowEntries) {
-            // Multimaps support duplicates so we have to remove our rule
-            // and replace it with the current version.
-            flowEntries.remove(did, rule);
-            flowEntries.put(did, rule);
-            //}
+        FlowEntry stored = getFlowEntry(rule);
+        if (stored != null) {
+            stored.setBytes(rule.bytes());
+            stored.setLife(rule.life());
+            stored.setPackets(rule.packets());
+            if (stored.state() == FlowEntryState.PENDING_ADD) {
+                stored.setState(FlowEntryState.ADDED);
+                return new FlowRuleEvent(Type.RULE_ADDED, rule);
+            }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
 
         flowEntries.put(did, rule);
-        return new FlowRuleEvent(RULE_ADDED, rule);
+        return null;
     }
 
     @Override
-    public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
-        //synchronized (this) {
+    public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+        // This is where one could mark a rule as removed and still keep it in the store.
         if (flowEntries.remove(rule.deviceId(), rule)) {
             return new FlowRuleEvent(RULE_REMOVED, rule);
         } else {
             return null;
         }
-        //}
     }
-
-
-
-
-
-
-
 }
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
index 5161f2f..3dd42a3 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
@@ -38,6 +38,7 @@
 import com.google.common.collect.ImmutableSet.Builder;
 import com.hazelcast.core.IMap;
 
+//TODO: Add support for multiple provider and annotations
 /**
  * Manages inventory of infrastructure links using Hazelcast-backed map.
  */
@@ -70,7 +71,7 @@
         // TODO decide on Map name scheme to avoid collision
         rawLinks = theInstance.getMap("links");
         final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
-                = new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
+                = new OptionalCacheLoader<>(serializer, rawLinks);
         links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
         // refresh/populate cache based on notification from other instance
         linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
index 4728850..04f5fce 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
@@ -125,7 +125,8 @@
         // Promote the new topology to current and return a ready-to-send event.
         synchronized (this) {
             current = newTopology;
-            return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
+            return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
+                                     current, reasons);
         }
     }
 
diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
index 80c9464..7e2924b 100644
--- a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
+++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
@@ -36,9 +36,6 @@
 import org.onlab.onos.store.common.StoreManager;
 import org.onlab.onos.store.common.StoreService;
 import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
-
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.hazelcast.config.Config;
@@ -63,7 +60,6 @@
     private static final PortNumber P3 = PortNumber.portNumber(3);
 
     private DistributedDeviceStore deviceStore;
-    private KryoSerializationManager serializationMgr;
 
     private StoreManager storeManager;
 
@@ -85,10 +81,7 @@
         storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
         storeManager.activate();
 
-        serializationMgr = new KryoSerializationManager();
-        serializationMgr.activate();
-
-        deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
+        deviceStore = new TestDistributedDeviceStore(storeManager);
         deviceStore.activate();
     }
 
@@ -96,8 +89,6 @@
     public void tearDown() throws Exception {
         deviceStore.deactivate();
 
-        serializationMgr.deactivate();
-
         storeManager.deactivate();
     }
 
@@ -392,10 +383,8 @@
     }
 
     private class TestDistributedDeviceStore extends DistributedDeviceStore {
-        public TestDistributedDeviceStore(StoreService storeService,
-                                KryoSerializationService kryoSerializationService) {
+        public TestDistributedDeviceStore(StoreService storeService) {
             this.storeService = storeService;
-            this.kryoSerializationService = kryoSerializationService;
         }
     }
 }
diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
index a76e901..dd959b5 100644
--- a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
+++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
@@ -30,9 +30,6 @@
 import org.onlab.onos.store.common.StoreManager;
 import org.onlab.onos.store.common.StoreService;
 import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
-
 import com.google.common.collect.Iterables;
 import com.hazelcast.config.Config;
 import com.hazelcast.core.Hazelcast;
@@ -51,7 +48,6 @@
     private static final PortNumber P3 = PortNumber.portNumber(3);
 
     private StoreManager storeManager;
-    private KryoSerializationManager serializationMgr;
 
     private DistributedLinkStore linkStore;
 
@@ -71,17 +67,13 @@
         storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
         storeManager.activate();
 
-        serializationMgr = new KryoSerializationManager();
-        serializationMgr.activate();
-
-        linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
+        linkStore = new TestDistributedLinkStore(storeManager);
         linkStore.activate();
     }
 
     @After
     public void tearDown() throws Exception {
         linkStore.deactivate();
-        serializationMgr.deactivate();
         storeManager.deactivate();
     }
 
@@ -361,10 +353,8 @@
 
 
     class TestDistributedLinkStore extends DistributedLinkStore {
-        TestDistributedLinkStore(StoreService storeService,
-                            KryoSerializationService kryoSerializationService) {
+        TestDistributedLinkStore(StoreService storeService) {
             this.storeService = storeService;
-            this.kryoSerializationService = kryoSerializationService;
         }
     }
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java
index 46badcb..14a64d2 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java
@@ -3,7 +3,6 @@
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.ElementId;
 import org.onlab.onos.net.PortNumber;
-
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
@@ -15,7 +14,7 @@
 public class ConnectPointSerializer extends Serializer<ConnectPoint> {
 
     /**
-     * Default constructor.
+     * Creates {@link ConnectPointSerializer} serializer instance.
      */
     public ConnectPointSerializer() {
         // non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
index 5ee273d..06d01b5 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
@@ -16,7 +16,7 @@
 public class DefaultLinkSerializer extends Serializer<DefaultLink> {
 
     /**
-     * Default constructor.
+     * Creates {@link DefaultLink} serializer instance.
      */
     public DefaultLinkSerializer() {
         // non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java
index 8455e80..5dc310b 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java
@@ -16,7 +16,7 @@
         Serializer<DefaultPort> {
 
     /**
-     * Default constructor.
+     * Creates {@link DefaultPort} serializer instance.
      */
     public DefaultPortSerializer() {
         // non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java
index c63b676..36d0a21 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java
@@ -14,6 +14,14 @@
 */
 public final class DeviceIdSerializer extends Serializer<DeviceId> {
 
+    /**
+     * Creates {@link DeviceId} serializer instance.
+     */
+    public DeviceIdSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
     @Override
     public void write(Kryo kryo, Output output, DeviceId object) {
         kryo.writeObject(output, object.uri());
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
index 244cc57..734033f 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
@@ -19,6 +19,9 @@
 
     private final MapSerializer mapSerializer = new MapSerializer();
 
+    /**
+     * Creates {@link ImmutableMap} serializer instance.
+     */
     public ImmutableMapSerializer() {
         // non-null, immutable
         super(false, true);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
index c08bf9a..051a843 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
@@ -18,6 +18,9 @@
 
     private final CollectionSerializer serializer = new CollectionSerializer();
 
+    /**
+     * Creates {@link ImmutableSet} serializer instance.
+     */
     public ImmutableSetSerializer() {
         // non-null, immutable
         super(false, true);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpAddressSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpAddressSerializer.java
new file mode 100644
index 0000000..b923df7
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpAddressSerializer.java
@@ -0,0 +1,41 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.packet.IpAddress;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link IpAddress}.
+ */
+public class IpAddressSerializer extends Serializer<IpAddress> {
+
+    /**
+     * Creates {@link IpAddress} serializer instance.
+     */
+    public IpAddressSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output,
+            IpAddress object) {
+        byte[] octs = object.toOctets();
+        output.writeInt(octs.length);
+        output.writeBytes(octs);
+        output.writeInt(object.prefixLength());
+    }
+
+    @Override
+    public IpAddress read(Kryo kryo, Input input,
+            Class<IpAddress> type) {
+        int octLen = input.readInt();
+        byte[] octs = new byte[octLen];
+        input.read(octs);
+        int prefLen = input.readInt();
+        return IpAddress.valueOf(octs, prefLen);
+    }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java
index 2dbec57..2e92692 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java
@@ -13,7 +13,7 @@
 public final class IpPrefixSerializer extends Serializer<IpPrefix> {
 
     /**
-     * Default constructor.
+     * Creates {@link IpPrefix} serializer instance.
      */
     public IpPrefixSerializer() {
         // non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
new file mode 100644
index 0000000..0c33cfe
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -0,0 +1,85 @@
+package org.onlab.onos.store.serializers;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultLink;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Element;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoPool;
+
+import de.javakaffee.kryoserializers.URISerializer;
+
+public final class KryoPoolUtil {
+
+    /**
+     * KryoPool which can serialize ON.lab misc classes.
+     */
+    public static final KryoPool MISC = KryoPool.newBuilder()
+            .register(IpPrefix.class, new IpPrefixSerializer())
+            .register(IpAddress.class, new IpAddressSerializer())
+            .build();
+
+    // TODO: Populate other classes
+    /**
+     * KryoPool which can serialize API bundle classes.
+     */
+    public static final KryoPool API = KryoPool.newBuilder()
+            .register(MISC)
+            .register(
+                    //
+                    ArrayList.class,
+                    Arrays.asList().getClass(),
+                    HashMap.class,
+                    //
+                    ControllerNode.State.class,
+                    Device.Type.class,
+                    DefaultAnnotations.class,
+                    DefaultControllerNode.class,
+                    DefaultDevice.class,
+                    DefaultDeviceDescription.class,
+                    MastershipRole.class,
+                    Port.class,
+                    DefaultPortDescription.class,
+                    Element.class,
+                    Link.Type.class
+                    )
+            .register(URI.class, new URISerializer())
+            .register(NodeId.class, new NodeIdSerializer())
+            .register(ProviderId.class, new ProviderIdSerializer())
+            .register(DeviceId.class, new DeviceIdSerializer())
+            .register(PortNumber.class, new PortNumberSerializer())
+            .register(DefaultPort.class, new DefaultPortSerializer())
+            .register(LinkKey.class, new LinkKeySerializer())
+            .register(ConnectPoint.class, new ConnectPointSerializer())
+            .register(DefaultLink.class, new DefaultLinkSerializer())
+            .register(MastershipTerm.class, new MastershipTermSerializer())
+            .register(MastershipRole.class, new MastershipRoleSerializer())
+
+            .build();
+
+
+    // not to be instantiated
+    private KryoPoolUtil() {}
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
deleted file mode 100644
index 04d1a88..0000000
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import de.javakaffee.kryoserializers.URISerializer;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.ConnectPoint;
-import org.onlab.onos.net.DefaultAnnotations;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultLink;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Element;
-import org.onlab.onos.net.Link;
-import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-/**
- * Serialization service using Kryo.
- */
-@Component(immediate = true)
-@Service
-public class KryoSerializationManager implements KryoSerializationService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private KryoPool serializerPool;
-
-
-    @Activate
-    public void activate() {
-        setupKryoPool();
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    /**
-     * Sets up the common serialzers pool.
-     */
-    protected void setupKryoPool() {
-        // FIXME Slice out types used in common to separate pool/namespace.
-        serializerPool = KryoPool.newBuilder()
-                .register(ArrayList.class,
-                          HashMap.class,
-
-                          ControllerNode.State.class,
-                          Device.Type.class,
-
-                          DefaultAnnotations.class,
-                          DefaultControllerNode.class,
-                          DefaultDevice.class,
-                          MastershipRole.class,
-                          Port.class,
-                          Element.class,
-
-                          Link.Type.class
-                )
-                .register(IpPrefix.class, new IpPrefixSerializer())
-                .register(URI.class, new URISerializer())
-                .register(NodeId.class, new NodeIdSerializer())
-                .register(ProviderId.class, new ProviderIdSerializer())
-                .register(DeviceId.class, new DeviceIdSerializer())
-                .register(PortNumber.class, new PortNumberSerializer())
-                .register(DefaultPort.class, new DefaultPortSerializer())
-                .register(LinkKey.class, new LinkKeySerializer())
-                .register(ConnectPoint.class, new ConnectPointSerializer())
-                .register(DefaultLink.class, new DefaultLinkSerializer())
-                .build()
-                .populate(1);
-    }
-
-    @Override
-    public byte[] serialize(final Object obj) {
-        return serializerPool.serialize(obj);
-    }
-
-    @Override
-    public <T> T deserialize(final byte[] bytes) {
-        if (bytes == null) {
-            return null;
-        }
-        return serializerPool.deserialize(bytes);
-    }
-
-    @Override
-    public void serialize(Object obj, ByteBuffer buffer) {
-        serializerPool.serialize(obj, buffer);
-    }
-
-    @Override
-    public <T> T deserialize(ByteBuffer buffer) {
-        return serializerPool.deserialize(buffer);
-    }
-
-}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
deleted file mode 100644
index 385128c..0000000
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import java.nio.ByteBuffer;
-
-// TODO: To be replaced with SerializationService from IOLoop activity
-/**
- * Service to serialize Objects into byte array.
- */
-public interface KryoSerializationService {
-
-    /**
-     * Serializes the specified object into bytes using one of the
-     * pre-registered serializers.
-     *
-     * @param obj object to be serialized
-     * @return serialized bytes
-     */
-    public byte[] serialize(final Object obj);
-
-    /**
-     * Serializes the specified object into bytes using one of the
-     * pre-registered serializers.
-     *
-     * @param obj object to be serialized
-     * @param buffer to write serialized bytes
-     */
-    public void serialize(final Object obj, ByteBuffer buffer);
-
-    /**
-     * Deserializes the specified bytes into an object using one of the
-     * pre-registered serializers.
-     *
-     * @param bytes bytes to be deserialized
-     * @return deserialized object
-     */
-    public <T> T deserialize(final byte[] bytes);
-
-    /**
-     * Deserializes the specified bytes into an object using one of the
-     * pre-registered serializers.
-     *
-     * @param buffer bytes to be deserialized
-     * @return deserialized object
-     */
-    public <T> T deserialize(final ByteBuffer buffer);
-}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
new file mode 100644
index 0000000..738086e
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -0,0 +1,55 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * StoreSerializer implementation using Kryo.
+ */
+public class KryoSerializer implements StoreSerializer {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    protected KryoPool serializerPool;
+
+
+    public KryoSerializer() {
+        setupKryoPool();
+    }
+
+    /**
+     * Sets up the common serialzers pool.
+     */
+    protected void setupKryoPool() {
+        serializerPool = KryoPool.newBuilder()
+                .register(KryoPoolUtil.API)
+                .build()
+                .populate(1);
+    }
+
+    @Override
+    public byte[] encode(final Object obj) {
+        return serializerPool.serialize(obj);
+    }
+
+    @Override
+    public <T> T decode(final byte[] bytes) {
+        if (bytes == null) {
+            return null;
+        }
+        return serializerPool.deserialize(bytes);
+    }
+
+    @Override
+    public void encode(Object obj, ByteBuffer buffer) {
+        serializerPool.serialize(obj, buffer);
+    }
+
+    @Override
+    public <T> T decode(ByteBuffer buffer) {
+        return serializerPool.deserialize(buffer);
+    }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java
index f635f3c..bafee4f 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java
@@ -2,6 +2,7 @@
 
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.LinkKey;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
@@ -13,7 +14,7 @@
 public class LinkKeySerializer extends Serializer<LinkKey> {
 
     /**
-     * Default constructor.
+     * Creates {@link LinkKey} serializer instance.
      */
     public LinkKeySerializer() {
         // non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
index 3903491..dab5aa8 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
@@ -12,6 +12,14 @@
  */
 public class MastershipRoleSerializer extends Serializer<MastershipRole> {
 
+    /**
+     * Creates {@link MastershipRole} serializer instance.
+     */
+    public MastershipRoleSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
     @Override
     public MastershipRole read(Kryo kryo, Input input, Class<MastershipRole> type) {
         final String role = kryo.readObject(input, String.class);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
index a5d6198..0ac61a8 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
@@ -2,7 +2,6 @@
 
 import org.onlab.onos.cluster.MastershipTerm;
 import org.onlab.onos.cluster.NodeId;
-
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
@@ -13,9 +12,17 @@
  */
 public class MastershipTermSerializer extends Serializer<MastershipTerm> {
 
+    /**
+     * Creates {@link MastershipTerm} serializer instance.
+     */
+    public MastershipTermSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
     @Override
     public MastershipTerm read(Kryo kryo, Input input, Class<MastershipTerm> type) {
-        final NodeId node = new NodeId(kryo.readObject(input, String.class));
+        final NodeId node = new NodeId(input.readString());
         final int term = input.readInt();
         return MastershipTerm.of(node, term);
     }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
index ef9d3f1..460b63d 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
@@ -4,6 +4,7 @@
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+
 import org.onlab.onos.cluster.NodeId;
 
 /**
@@ -11,14 +12,22 @@
  */
 public final class NodeIdSerializer extends Serializer<NodeId> {
 
+    /**
+     * Creates {@link NodeId} serializer instance.
+     */
+    public NodeIdSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
     @Override
     public void write(Kryo kryo, Output output, NodeId object) {
-        kryo.writeObject(output, object.toString());
+        output.writeString(object.toString());
     }
 
     @Override
     public NodeId read(Kryo kryo, Input input, Class<NodeId> type) {
-        final String id = kryo.readObject(input, String.class);
+        final String id = input.readString();
         return new NodeId(id);
     }
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java
index 02805bb..3792966 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java
@@ -14,7 +14,7 @@
         Serializer<PortNumber> {
 
     /**
-     * Default constructor.
+     * Creates {@link PortNumber} serializer instance.
      */
     public PortNumberSerializer() {
         // non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java
index 1a1c6f6..060ac7d 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java
@@ -13,7 +13,7 @@
 public class ProviderIdSerializer extends Serializer<ProviderId> {
 
     /**
-     * Default constructor.
+     * Creates {@link ProviderId} serializer instance.
      */
     public ProviderIdSerializer() {
         // non-null, immutable
@@ -24,13 +24,15 @@
     public void write(Kryo kryo, Output output, ProviderId object) {
         output.writeString(object.scheme());
         output.writeString(object.id());
+        output.writeBoolean(object.isAncillary());
     }
 
     @Override
     public ProviderId read(Kryo kryo, Input input, Class<ProviderId> type) {
         String scheme = input.readString();
         String id = input.readString();
-        return new ProviderId(scheme, id);
+        boolean isAncillary = input.readBoolean();
+        return new ProviderId(scheme, id, isAncillary);
     }
 
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
new file mode 100644
index 0000000..6c43a1b
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.serializers;
+
+import java.nio.ByteBuffer;
+
+// TODO: To be replaced with SerializationService from IOLoop activity
+/**
+ * Service to serialize Objects into byte array.
+ */
+public interface StoreSerializer {
+
+    /**
+     * Serializes the specified object into bytes.
+     *
+     * @param obj object to be serialized
+     * @return serialized bytes
+     */
+    public byte[] encode(final Object obj);
+
+    /**
+     * Serializes the specified object into bytes.
+     *
+     * @param obj object to be serialized
+     * @param buffer to write serialized bytes
+     */
+    public void encode(final Object obj, ByteBuffer buffer);
+
+    /**
+     * Deserializes the specified bytes into an object.
+     *
+     * @param bytes bytes to be deserialized
+     * @return deserialized object
+     */
+    public <T> T decode(final byte[] bytes);
+
+    /**
+     * Deserializes the specified bytes into an object.
+     *
+     * @param buffer bytes to be deserialized
+     * @return deserialized object
+     */
+    public <T> T decode(final ByteBuffer buffer);
+}
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
similarity index 61%
rename from core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
rename to core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
index c972d1a..d651d56 100644
--- a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
@@ -1,12 +1,10 @@
 package org.onlab.onos.store.serializers;
 
+import static org.junit.Assert.assertEquals;
 import static org.onlab.onos.net.DeviceId.deviceId;
 import static org.onlab.onos.net.PortNumber.portNumber;
 
-import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
 
 import org.junit.After;
 import org.junit.Before;
@@ -14,7 +12,9 @@
 import org.junit.Test;
 import org.onlab.onos.cluster.MastershipTerm;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Annotations;
 import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultDevice;
 import org.onlab.onos.net.DefaultLink;
 import org.onlab.onos.net.DefaultPort;
@@ -24,7 +24,9 @@
 import org.onlab.onos.net.LinkKey;
 import org.onlab.onos.net.MastershipRole;
 import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.util.KryoPool;
 
@@ -32,10 +34,10 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.testing.EqualsTester;
 
-import de.javakaffee.kryoserializers.URISerializer;
+public class KryoSerializerTest {
 
-public class KryoSerializerTests {
     private static final ProviderId PID = new ProviderId("of", "foo");
+    private static final ProviderId PIDA = new ProviderId("of", "foo", true);
     private static final DeviceId DID1 = deviceId("of:foo");
     private static final DeviceId DID2 = deviceId("of:bar");
     private static final PortNumber P1 = portNumber(1);
@@ -48,44 +50,23 @@
     private static final String SW2 = "3.9.5";
     private static final String SN = "43311-12345";
     private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
+    private static final SparseAnnotations A1 = DefaultAnnotations.builder()
+            .set("A1", "a1")
+            .set("B1", "b1")
+            .build();
+    private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
+            .remove("A1")
+            .set("B3", "b3")
+            .build();
 
     private static KryoPool kryos;
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
         kryos = KryoPool.newBuilder()
-                .register(
-                        ArrayList.class,
-                        HashMap.class
-                        )
-                .register(
-                        Device.Type.class,
-                        Link.Type.class
-
-//                      ControllerNode.State.class,
-//                        DefaultControllerNode.class,
-//                        MastershipRole.class,
-//                        Port.class,
-//                        Element.class,
-                        )
-                .register(ConnectPoint.class, new ConnectPointSerializer())
-                .register(DefaultLink.class, new DefaultLinkSerializer())
-                .register(DefaultPort.class, new DefaultPortSerializer())
-                .register(DeviceId.class, new DeviceIdSerializer())
+                .register(KryoPoolUtil.API)
                 .register(ImmutableMap.class, new ImmutableMapSerializer())
                 .register(ImmutableSet.class, new ImmutableSetSerializer())
-                .register(IpPrefix.class, new IpPrefixSerializer())
-                .register(LinkKey.class, new LinkKeySerializer())
-                .register(NodeId.class, new NodeIdSerializer())
-                .register(PortNumber.class, new PortNumberSerializer())
-                .register(ProviderId.class, new ProviderIdSerializer())
-
-                .register(DefaultDevice.class)
-
-                .register(URI.class, new URISerializer())
-
-                .register(MastershipRole.class, new MastershipRoleSerializer())
-                .register(MastershipTerm.class, new MastershipTermSerializer())
                 .build();
     }
 
@@ -112,10 +93,12 @@
 
 
     @Test
-    public final void test() {
+    public final void testSerialization() {
         testSerialized(new ConnectPoint(DID1, P1));
         testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
         testSerialized(new DefaultPort(DEV1, P1, true));
+        testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT, A1));
+        testSerialized(new DefaultPort(DEV1, P1, true, A1_2));
         testSerialized(DID1);
         testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
         testSerialized(ImmutableMap.of(DID1, DEV1));
@@ -124,10 +107,41 @@
         testSerialized(ImmutableSet.of(DID1));
         testSerialized(ImmutableSet.of());
         testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
+        testSerialized(IpAddress.valueOf("192.168.0.1"));
         testSerialized(new LinkKey(CP1, CP2));
         testSerialized(new NodeId("SomeNodeIdentifier"));
         testSerialized(P1);
         testSerialized(PID);
+        testSerialized(PIDA);
+        testSerialized(new NodeId("bar"));
+        testSerialized(MastershipTerm.of(new NodeId("foo"), 2));
+        for (MastershipRole role : MastershipRole.values()) {
+            testSerialized(role);
+        }
+    }
+
+    @Test
+    public final void testAnnotations() {
+        // Annotations does not have equals defined, manually test equality
+        final byte[] a1Bytes = kryos.serialize(A1);
+        SparseAnnotations copiedA1 = kryos.deserialize(a1Bytes);
+        assertAnnotationsEquals(copiedA1, A1);
+
+        final byte[] a12Bytes = kryos.serialize(A1_2);
+        SparseAnnotations copiedA12 = kryos.deserialize(a12Bytes);
+        assertAnnotationsEquals(copiedA12, A1_2);
+    }
+
+    // code clone
+    public static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
+        SparseAnnotations expected = DefaultAnnotations.builder().build();
+        for (SparseAnnotations a : annotations) {
+            expected = DefaultAnnotations.union(expected, a);
+        }
+        assertEquals(expected.keys(), actual.keys());
+        for (String key : expected.keys()) {
+            assertEquals(expected.value(key), actual.value(key));
+        }
     }
 
 }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockProviderService.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockProviderService.java
new file mode 100644
index 0000000..ff4b31a
--- /dev/null
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockProviderService.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.store.trivial.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.ClockProviderService;
+
+//FIXME: Code clone in onos-core-trivial, onos-core-hz-net
+/**
+ * Dummy implementation of {@link ClockProviderService}.
+ */
+@Component(immediate = true)
+@Service
+public class NoOpClockProviderService implements ClockProviderService {
+
+    @Override
+    public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
+    }
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockService.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockService.java
deleted file mode 100644
index b3f8320..0000000
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.onlab.onos.store.trivial.impl;
-
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.MastershipTerm;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-
-//FIXME: Code clone in onos-core-trivial, onos-core-hz-net
-/**
- * Dummy implementation of {@link ClockService}.
- */
-@Component(immediate = true)
-@Service
-public class NoOpClockService implements ClockService {
-
-    @Override
-    public Timestamp getTimestamp(DeviceId deviceId) {
-        return new Timestamp() {
-
-            @Override
-            public int compareTo(Timestamp o) {
-                throw new IllegalStateException("Never expected to be used.");
-            }
-        };
-    }
-
-    @Override
-    public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
-    }
-}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
index 0b0ae37..514a22e 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
@@ -2,6 +2,8 @@
 
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
@@ -9,7 +11,7 @@
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultDevice;
 import org.onlab.onos.net.DefaultPort;
@@ -28,10 +30,10 @@
 import org.onlab.onos.net.device.PortDescription;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.AbstractStore;
+import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,12 +49,13 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Predicates.notNull;
+import static com.google.common.base.Verify.verify;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.onos.net.DefaultAnnotations.union;
 import static org.onlab.onos.net.DefaultAnnotations.merge;
 
-// TODO: synchronization should be done in more fine-grained manner.
 /**
  * Manages inventory of infrastructure devices using trivial in-memory
  * structures implementation.
@@ -70,14 +73,14 @@
     // collection of Description given from various providers
     private final ConcurrentMap<DeviceId,
                             ConcurrentMap<ProviderId, DeviceDescriptions>>
-                                deviceDescs = new ConcurrentHashMap<>();
+                                deviceDescs = Maps.newConcurrentMap();
 
     // cache of Device and Ports generated by compositing descriptions from providers
-    private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
-    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
+    private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
 
     // available(=UP) devices
-    private final Set<DeviceId> availableDevices = new HashSet<>();
+    private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
 
 
     @Activate
@@ -87,6 +90,10 @@
 
     @Deactivate
     public void deactivate() {
+        deviceDescs.clear();
+        devices.clear();
+        devicePorts.clear();
+        availableDevices.clear();
         log.info("Stopped");
     }
 
@@ -106,117 +113,142 @@
     }
 
     @Override
-    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+    public DeviceEvent createOrUpdateDevice(ProviderId providerId,
+                                     DeviceId deviceId,
                                      DeviceDescription deviceDescription) {
+
         ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
-            = createIfAbsentUnchecked(deviceDescs, deviceId,
-                    new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+            = getDeviceDescriptions(deviceId);
 
-        Device oldDevice = devices.get(deviceId);
+        synchronized (providerDescs) {
+            // locking per device
 
-        DeviceDescriptions descs
-            = createIfAbsentUnchecked(providerDescs, providerId,
-                    new InitDeviceDescs(deviceDescription));
+            DeviceDescriptions descs
+                = createIfAbsentUnchecked(providerDescs, providerId,
+                        new InitDeviceDescs(deviceDescription));
 
-        // update description
-        descs.putDeviceDesc(deviceDescription);
-        Device newDevice = composeDevice(deviceId, providerDescs);
+            Device oldDevice = devices.get(deviceId);
+            // update description
+            descs.putDeviceDesc(deviceDescription);
+            Device newDevice = composeDevice(deviceId, providerDescs);
 
-        if (oldDevice == null) {
-            // ADD
-            return createDevice(providerId, newDevice);
-        } else {
-            // UPDATE or ignore (no change or stale)
-            return updateDevice(providerId, oldDevice, newDevice);
+            if (oldDevice == null) {
+                // ADD
+                return createDevice(providerId, newDevice);
+            } else {
+                // UPDATE or ignore (no change or stale)
+                return updateDevice(providerId, oldDevice, newDevice);
+            }
         }
     }
 
     // Creates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
 
         // update composed device cache
-        synchronized (this) {
-            devices.putIfAbsent(newDevice.id(), newDevice);
-            if (!providerId.isAncillary()) {
-                availableDevices.add(newDevice.id());
-            }
+        Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+        verify(oldDevice == null,
+                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+                providerId, oldDevice, newDevice);
+
+        if (!providerId.isAncillary()) {
+            availableDevices.add(newDevice.id());
         }
 
         return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
     }
 
     // Updates the device and returns the appropriate event if necessary.
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
 
         // We allow only certain attributes to trigger update
         if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
             !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
-            !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
+            !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
 
-            synchronized (this) {
-                devices.replace(newDevice.id(), oldDevice, newDevice);
-                if (!providerId.isAncillary()) {
-                    availableDevices.add(newDevice.id());
-                }
+            boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+            if (!replaced) {
+                verify(replaced,
+                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+                        providerId, oldDevice, devices.get(newDevice.id())
+                        , newDevice);
+            }
+            if (!providerId.isAncillary()) {
+                availableDevices.add(newDevice.id());
             }
             return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
         }
 
         // Otherwise merely attempt to change availability if primary provider
         if (!providerId.isAncillary()) {
-            synchronized (this) {
             boolean added = availableDevices.add(newDevice.id());
             return !added ? null :
                     new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
-            }
         }
         return null;
     }
 
     @Override
     public DeviceEvent markOffline(DeviceId deviceId) {
-        synchronized (this) {
+        ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+            = getDeviceDescriptions(deviceId);
+
+        // locking device
+        synchronized (providerDescs) {
             Device device = devices.get(deviceId);
-            boolean removed = (device != null) && availableDevices.remove(deviceId);
-            return !removed ? null :
-                    new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+            if (device == null) {
+                return null;
+            }
+            boolean removed = availableDevices.remove(deviceId);
+            if (removed) {
+                // TODO: broadcast ... DOWN only?
+                return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+            }
+            return null;
         }
     }
 
     @Override
-    public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
-                                  List<PortDescription> portDescriptions) {
+    public List<DeviceEvent> updatePorts(ProviderId providerId,
+                                      DeviceId deviceId,
+                                      List<PortDescription> portDescriptions) {
 
-        // TODO: implement multi-provider
         Device device = devices.get(deviceId);
         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
 
         ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
 
-        DeviceDescriptions descs = descsMap.get(providerId);
-        checkArgument(descs != null,
-                "Device description for Device ID %s from Provider %s was not found",
-                deviceId, providerId);
-
-
         List<DeviceEvent> events = new ArrayList<>();
-        synchronized (this) {
-            ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+        synchronized (descsMap) {
+            DeviceDescriptions descs = descsMap.get(providerId);
+            // every provider must provide DeviceDescription.
+            checkArgument(descs != null,
+                    "Device description for Device ID %s from Provider %s was not found",
+                    deviceId, providerId);
+
+            Map<PortNumber, Port> ports = getPortMap(deviceId);
 
             // Add new ports
             Set<PortNumber> processed = new HashSet<>();
             for (PortDescription portDescription : portDescriptions) {
-                PortNumber number = portDescription.portNumber();
-                Port oldPort = ports.get(number);
+                final PortNumber number = portDescription.portNumber();
+                processed.add(portDescription.portNumber());
+
+                final Port oldPort = ports.get(number);
+                final Port newPort;
+
+// event suppression hook?
+
                 // update description
                 descs.putPortDesc(portDescription);
-                Port newPort = composePort(device, number, descsMap);
+                newPort = composePort(device, number, descsMap);
 
                 events.add(oldPort == null ?
-                                   createPort(device, newPort, ports) :
-                                   updatePort(device, oldPort, newPort, ports));
-                processed.add(portDescription.portNumber());
+                        createPort(device, newPort, ports) :
+                        updatePort(device, oldPort, newPort, ports));
             }
 
             events.addAll(pruneOldPorts(device, ports, processed));
@@ -226,19 +258,21 @@
 
     // Creates a new port based on the port description adds it to the map and
     // Returns corresponding event.
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent createPort(Device device, Port newPort,
-                                   ConcurrentMap<PortNumber, Port> ports) {
+                                   Map<PortNumber, Port> ports) {
         ports.put(newPort.number(), newPort);
         return new DeviceEvent(PORT_ADDED, device, newPort);
     }
 
     // Checks if the specified port requires update and if so, it replaces the
     // existing entry in the map and returns corresponding event.
+    // Guarded by deviceDescs value (=Device lock)
     private DeviceEvent updatePort(Device device, Port oldPort,
                                    Port newPort,
-                                   ConcurrentMap<PortNumber, Port> ports) {
+                                   Map<PortNumber, Port> ports) {
         if (oldPort.isEnabled() != newPort.isEnabled() ||
-            !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
+            !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
 
             ports.put(oldPort.number(), newPort);
             return new DeviceEvent(PORT_UPDATED, device, newPort);
@@ -248,6 +282,7 @@
 
     // Prunes the specified list of ports based on which ports are in the
     // processed list and returns list of corresponding events.
+    // Guarded by deviceDescs value (=Device lock)
     private List<DeviceEvent> pruneOldPorts(Device device,
                                             Map<PortNumber, Port> ports,
                                             Set<PortNumber> processed) {
@@ -268,11 +303,17 @@
     // exist, it creates and registers a new one.
     private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
         return createIfAbsentUnchecked(devicePorts, deviceId,
-                new InitConcurrentHashMap<PortNumber, Port>());
+                NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
+    }
+
+    private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
+            DeviceId deviceId) {
+        return createIfAbsentUnchecked(deviceDescs, deviceId,
+                NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
     }
 
     @Override
-    public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+    public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
                                  PortDescription portDescription) {
         Device device = devices.get(deviceId);
         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -280,19 +321,22 @@
         ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
 
-        DeviceDescriptions descs = descsMap.get(providerId);
-        // assuming all providers must to give DeviceDescription
-        checkArgument(descs != null,
-                "Device description for Device ID %s from Provider %s was not found",
-                deviceId, providerId);
+        synchronized (descsMap) {
+            DeviceDescriptions descs = descsMap.get(providerId);
+            // assuming all providers must to give DeviceDescription
+            checkArgument(descs != null,
+                    "Device description for Device ID %s from Provider %s was not found",
+                    deviceId, providerId);
 
-        synchronized (this) {
             ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
             final PortNumber number = portDescription.portNumber();
-            Port oldPort = ports.get(number);
+            final Port oldPort = ports.get(number);
+            final Port newPort;
+
             // update description
             descs.putPortDesc(portDescription);
-            Port newPort = composePort(device, number, descsMap);
+            newPort = composePort(device, number, descsMap);
+
             if (oldPort == null) {
                 return createPort(device, newPort, ports);
             } else {
@@ -323,31 +367,19 @@
 
     @Override
     public DeviceEvent removeDevice(DeviceId deviceId) {
-        synchronized (this) {
+        ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
+        synchronized (descs) {
             Device device = devices.remove(deviceId);
-            return device == null ? null :
-                    new DeviceEvent(DEVICE_REMOVED, device, null);
-        }
-    }
-
-    private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
-        if (lhs == rhs) {
-            return true;
-        }
-        if (lhs == null || rhs == null) {
-            return false;
-        }
-
-        if (!lhs.keys().equals(rhs.keys())) {
-            return false;
-        }
-
-        for (String key : lhs.keys()) {
-            if (!lhs.value(key).equals(rhs.value(key))) {
-                return false;
+            // should DEVICE_REMOVED carry removed ports?
+            Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+            if (ports != null) {
+                ports.clear();
             }
+            availableDevices.remove(deviceId);
+            descs.clear();
+            return device == null ? null :
+                new DeviceEvent(DEVICE_REMOVED, device, null);
         }
-        return true;
     }
 
     /**
@@ -366,14 +398,14 @@
 
         DeviceDescriptions desc = providerDescs.get(primary);
 
-        // base
-        Type type = desc.getDeviceDesc().type();
-        String manufacturer = desc.getDeviceDesc().manufacturer();
-        String hwVersion = desc.getDeviceDesc().hwVersion();
-        String swVersion = desc.getDeviceDesc().swVersion();
-        String serialNumber = desc.getDeviceDesc().serialNumber();
+        final DeviceDescription base = desc.getDeviceDesc();
+        Type type = base.type();
+        String manufacturer = base.manufacturer();
+        String hwVersion = base.hwVersion();
+        String swVersion = base.swVersion();
+        String serialNumber = base.serialNumber();
         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
-        annotations = merge(annotations, desc.getDeviceDesc().annotations());
+        annotations = merge(annotations, base.annotations());
 
         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
             if (e.getKey().equals(primary)) {
@@ -392,7 +424,14 @@
                             hwVersion, swVersion, serialNumber, annotations);
     }
 
-    // probably want composePort"s" also
+    /**
+     * Returns a Port, merging description given from multiple Providers.
+     *
+     * @param device device the port is on
+     * @param number port number
+     * @param providerDescs Collection of Descriptions from multiple providers
+     * @return Port instance
+     */
     private Port composePort(Device device, PortNumber number,
                 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
 
@@ -445,18 +484,11 @@
         return fallBackPrimary;
     }
 
-    // TODO: can be made generic
-    private static final class InitConcurrentHashMap<K, V> implements
-            ConcurrentInitializer<ConcurrentMap<K, V>> {
-        @Override
-        public ConcurrentMap<K, V> get() throws ConcurrentException {
-            return new ConcurrentHashMap<>();
-        }
-    }
-
     public static final class InitDeviceDescs
         implements ConcurrentInitializer<DeviceDescriptions> {
+
         private final DeviceDescription deviceDesc;
+
         public InitDeviceDescs(DeviceDescription deviceDesc) {
             this.deviceDesc = checkNotNull(deviceDesc);
         }
@@ -471,8 +503,6 @@
      * Collection of Description of a Device and it's Ports given from a Provider.
      */
     private static class DeviceDescriptions {
-        //        private final DeviceId id;
-        //        private final ProviderId pid;
 
         private final AtomicReference<DeviceDescription> deviceDesc;
         private final ConcurrentMap<PortNumber, PortDescription> portDescs;
@@ -490,10 +520,6 @@
             return portDescs.get(number);
         }
 
-        public Collection<PortDescription> getPortDescs() {
-            return Collections.unmodifiableCollection(portDescs.values());
-        }
-
         /**
          * Puts DeviceDescription, merging annotations as necessary.
          *
@@ -504,7 +530,7 @@
             DeviceDescription oldOne = deviceDesc.get();
             DeviceDescription newOne = newDesc;
             if (oldOne != null) {
-                SparseAnnotations merged = merge(oldOne.annotations(),
+                SparseAnnotations merged = union(oldOne.annotations(),
                                                  newDesc.annotations());
                 newOne = new DefaultDeviceDescription(newOne, merged);
             }
@@ -521,7 +547,7 @@
             PortDescription oldOne = portDescs.get(newDesc.portNumber());
             PortDescription newOne = newDesc;
             if (oldOne != null) {
-                SparseAnnotations merged = merge(oldOne.annotations(),
+                SparseAnnotations merged = union(oldOne.annotations(),
                                                  newDesc.annotations());
                 newOne = new DefaultPortDescription(newOne, merged);
             }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index d12d00e..7ff797c 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -12,9 +12,10 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.flow.DefaultFlowRule;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
 import org.onlab.onos.net.flow.FlowRule;
-import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
 import org.onlab.onos.net.flow.FlowRuleStore;
@@ -38,8 +39,8 @@
     private final Logger log = getLogger(getClass());
 
     // store entries as a pile of rules, no info about device tables
-    private final Multimap<DeviceId, FlowRule> flowEntries =
-            ArrayListMultimap.<DeviceId, FlowRule>create();
+    private final Multimap<DeviceId, FlowEntry> flowEntries =
+            ArrayListMultimap.<DeviceId, FlowEntry>create();
 
     private final Multimap<ApplicationId, FlowRule> flowEntriesById =
             ArrayListMultimap.<ApplicationId, FlowRule>create();
@@ -56,8 +57,13 @@
 
 
     @Override
-    public synchronized FlowRule getFlowRule(FlowRule rule) {
-        for (FlowRule f : flowEntries.get(rule.deviceId())) {
+    public int getFlowRuleCount() {
+        return flowEntries.size();
+    }
+
+    @Override
+    public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+        for (FlowEntry f : flowEntries.get(rule.deviceId())) {
             if (f.equals(rule)) {
                 return f;
             }
@@ -66,8 +72,8 @@
     }
 
     @Override
-    public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
-        Collection<FlowRule> rules = flowEntries.get(deviceId);
+    public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+        Collection<FlowEntry> rules = flowEntries.get(deviceId);
         if (rules == null) {
             return Collections.emptyList();
         }
@@ -75,7 +81,7 @@
     }
 
     @Override
-    public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
+    public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
         Collection<FlowRule> rules = flowEntriesById.get(appId);
         if (rules == null) {
             return Collections.emptyList();
@@ -85,7 +91,7 @@
 
     @Override
     public synchronized void storeFlowRule(FlowRule rule) {
-        FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
+        FlowEntry f = new DefaultFlowEntry(rule);
         DeviceId did = f.deviceId();
         if (!flowEntries.containsEntry(did, f)) {
             flowEntries.put(did, f);
@@ -95,51 +101,42 @@
 
     @Override
     public synchronized void deleteFlowRule(FlowRule rule) {
-        FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
-        DeviceId did = f.deviceId();
-
-        /*
-         *  find the rule and mark it for deletion.
-         *  Ultimately a flow removed will come remove it.
-         */
-
-        if (flowEntries.containsEntry(did, f)) {
-            flowEntries.remove(did, f);
-            flowEntries.put(did, f);
-            flowEntriesById.remove(rule.appId(), rule);
+        FlowEntry entry = getFlowEntry(rule);
+        if (entry == null) {
+            //log.warn("Cannot find rule {}", rule);
+            return;
         }
+        entry.setState(FlowEntryState.PENDING_REMOVE);
     }
 
     @Override
-    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
+    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
         DeviceId did = rule.deviceId();
 
         // check if this new rule is an update to an existing entry
-        FlowRule stored = getFlowRule(rule);
+        FlowEntry stored = getFlowEntry(rule);
         if (stored != null) {
-            // Multimaps support duplicates so we have to remove our rule
-            // and replace it with the current version.
-            flowEntries.remove(did, rule);
-            flowEntries.put(did, rule);
-
-            if (stored.state() == FlowRuleState.PENDING_ADD) {
+            stored.setBytes(rule.bytes());
+            stored.setLife(rule.life());
+            stored.setPackets(rule.packets());
+            if (stored.state() == FlowEntryState.PENDING_ADD) {
+                stored.setState(FlowEntryState.ADDED);
                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
             }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
 
-        flowEntries.put(did, rule);
+        //flowEntries.put(did, rule);
         return null;
     }
 
     @Override
-    public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
-        //synchronized (this) {
+    public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+        // This is where one could mark a rule as removed and still keep it in the store.
         if (flowEntries.remove(rule.deviceId(), rule)) {
             return new FlowRuleEvent(RULE_REMOVED, rule);
         } else {
             return null;
         }
-        //}
     }
 }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
new file mode 100644
index 0000000..732d753
--- /dev/null
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
@@ -0,0 +1,108 @@
+package org.onlab.onos.store.trivial.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.intent.InstallableIntent;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentEvent;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentState;
+import org.onlab.onos.net.intent.IntentStore;
+import org.onlab.onos.net.intent.IntentStoreDelegate;
+import org.onlab.onos.store.AbstractStore;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.onlab.onos.net.intent.IntentState.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true)
+@Service
+public class SimpleIntentStore
+        extends AbstractStore<IntentEvent, IntentStoreDelegate>
+        implements IntentStore {
+
+    private final Logger log = getLogger(getClass());
+    private final Map<IntentId, Intent> intents = new HashMap<>();
+    private final Map<IntentId, IntentState> states = new HashMap<>();
+    private final Map<IntentId, List<InstallableIntent>> installable = new HashMap<>();
+
+    @Activate
+    public void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public IntentEvent createIntent(Intent intent) {
+        intents.put(intent.id(), intent);
+        return this.setState(intent, IntentState.SUBMITTED);
+    }
+
+    @Override
+    public IntentEvent removeIntent(IntentId intentId) {
+        Intent intent = intents.remove(intentId);
+        installable.remove(intentId);
+        IntentEvent event = this.setState(intent, WITHDRAWN);
+        states.remove(intentId);
+        return event;
+    }
+
+    @Override
+    public long getIntentCount() {
+        return intents.size();
+    }
+
+    @Override
+    public Iterable<Intent> getIntents() {
+        return ImmutableSet.copyOf(intents.values());
+    }
+
+    @Override
+    public Intent getIntent(IntentId intentId) {
+        return intents.get(intentId);
+    }
+
+    @Override
+    public IntentState getIntentState(IntentId id) {
+        return states.get(id);
+    }
+
+    @Override
+    public IntentEvent setState(Intent intent, IntentState state) {
+        IntentId id = intent.id();
+        states.put(id, state);
+        IntentEvent.Type type = (state == SUBMITTED ? IntentEvent.Type.SUBMITTED :
+                (state == INSTALLED ? IntentEvent.Type.INSTALLED :
+                        (state == FAILED ? IntentEvent.Type.FAILED :
+                                state == WITHDRAWN ? IntentEvent.Type.WITHDRAWN :
+                                        null)));
+        return type == null ? null : new IntentEvent(type, intent);
+    }
+
+    @Override
+    public void addInstallableIntents(IntentId intentId, List<InstallableIntent> result) {
+        installable.put(intentId, result);
+    }
+
+    @Override
+    public List<InstallableIntent> getInstallableIntents(IntentId intentId) {
+        return installable.get(intentId);
+    }
+
+    @Override
+    public void removeInstalledIntents(IntentId intentId) {
+        installable.remove(intentId);
+    }
+
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java
index a0e569d..daf28df 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java
@@ -1,36 +1,51 @@
 package org.onlab.onos.store.trivial.impl;
 
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
 
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.AnnotationsUtil;
 import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DefaultLink;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Link;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.Link.Type;
 import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.Provided;
+import org.onlab.onos.net.link.DefaultLinkDescription;
 import org.onlab.onos.net.link.LinkDescription;
 import org.onlab.onos.net.link.LinkEvent;
 import org.onlab.onos.net.link.LinkStore;
 import org.onlab.onos.net.link.LinkStoreDelegate;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.AbstractStore;
+import org.onlab.util.NewConcurrentHashMap;
 import org.slf4j.Logger;
 
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
+import static org.onlab.onos.net.DefaultAnnotations.union;
+import static org.onlab.onos.net.DefaultAnnotations.merge;
 import static org.onlab.onos.net.Link.Type.DIRECT;
 import static org.onlab.onos.net.Link.Type.INDIRECT;
 import static org.onlab.onos.net.link.LinkEvent.Type.*;
 import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.base.Predicates.notNull;
 
 /**
  * Manages inventory of infrastructure links using trivial in-memory structures
@@ -45,11 +60,17 @@
     private final Logger log = getLogger(getClass());
 
     // Link inventory
-    private final Map<LinkKey, DefaultLink> links = new ConcurrentHashMap<>();
+    private final ConcurrentMap<LinkKey,
+            ConcurrentMap<ProviderId, LinkDescription>>
+                    linkDescs = new ConcurrentHashMap<>();
+
+    // Link instance cache
+    private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
 
     // Egress and ingress link sets
-    private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
-    private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
+    private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
+    private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
+
 
     @Activate
     public void activate() {
@@ -58,6 +79,10 @@
 
     @Deactivate
     public void deactivate() {
+        linkDescs.clear();
+        links.clear();
+        srcLinks.clear();
+        dstLinks.clear();
         log.info("Stopped");
     }
 
@@ -68,17 +93,29 @@
 
     @Override
     public Iterable<Link> getLinks() {
-        return Collections.unmodifiableSet(new HashSet<Link>(links.values()));
+        return Collections.unmodifiableCollection(links.values());
     }
 
     @Override
     public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
-        return ImmutableSet.copyOf(srcLinks.get(deviceId));
+        // lock for iteration
+        synchronized (srcLinks) {
+            return FluentIterable.from(srcLinks.get(deviceId))
+            .transform(lookupLink())
+            .filter(notNull())
+            .toSet();
+        }
     }
 
     @Override
     public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
-        return ImmutableSet.copyOf(dstLinks.get(deviceId));
+        // lock for iteration
+        synchronized (dstLinks) {
+            return FluentIterable.from(dstLinks.get(deviceId))
+            .transform(lookupLink())
+            .filter(notNull())
+            .toSet();
+        }
     }
 
     @Override
@@ -89,9 +126,9 @@
     @Override
     public Set<Link> getEgressLinks(ConnectPoint src) {
         Set<Link> egress = new HashSet<>();
-        for (Link link : srcLinks.get(src.deviceId())) {
-            if (link.src().equals(src)) {
-                egress.add(link);
+        for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
+            if (linkKey.src().equals(src)) {
+                egress.add(links.get(linkKey));
             }
         }
         return egress;
@@ -100,9 +137,9 @@
     @Override
     public Set<Link> getIngressLinks(ConnectPoint dst) {
         Set<Link> ingress = new HashSet<>();
-        for (Link link : dstLinks.get(dst.deviceId())) {
-            if (link.dst().equals(dst)) {
-                ingress.add(link);
+        for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
+            if (linkKey.dst().equals(dst)) {
+                ingress.add(links.get(linkKey));
             }
         }
         return ingress;
@@ -112,56 +149,172 @@
     public LinkEvent createOrUpdateLink(ProviderId providerId,
                                         LinkDescription linkDescription) {
         LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
-        DefaultLink link = links.get(key);
-        if (link == null) {
-            return createLink(providerId, key, linkDescription);
+
+        ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
+        synchronized (descs) {
+            final Link oldLink = links.get(key);
+            // update description
+            createOrUpdateLinkDescription(descs, providerId, linkDescription);
+            final Link newLink = composeLink(descs);
+            if (oldLink == null) {
+                return createLink(key, newLink);
+            }
+            return updateLink(key, oldLink, newLink);
         }
-        return updateLink(providerId, link, key, linkDescription);
+    }
+
+    // Guarded by linkDescs value (=locking each Link)
+    private LinkDescription createOrUpdateLinkDescription(
+                             ConcurrentMap<ProviderId, LinkDescription> descs,
+                             ProviderId providerId,
+                             LinkDescription linkDescription) {
+
+        // merge existing attributes and merge
+        LinkDescription oldDesc = descs.get(providerId);
+        LinkDescription newDesc = linkDescription;
+        if (oldDesc != null) {
+            SparseAnnotations merged = union(oldDesc.annotations(),
+                    linkDescription.annotations());
+            newDesc = new DefaultLinkDescription(
+                        linkDescription.src(),
+                        linkDescription.dst(),
+                        linkDescription.type(), merged);
+        }
+        return descs.put(providerId, newDesc);
     }
 
     // Creates and stores the link and returns the appropriate event.
-    private LinkEvent createLink(ProviderId providerId, LinkKey key,
-                                 LinkDescription linkDescription) {
-        DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(),
-                                           linkDescription.type());
-        synchronized (this) {
-            links.put(key, link);
-            srcLinks.put(link.src().deviceId(), link);
-            dstLinks.put(link.dst().deviceId(), link);
+    // Guarded by linkDescs value (=locking each Link)
+    private LinkEvent createLink(LinkKey key, Link newLink) {
+
+        if (newLink.providerId().isAncillary()) {
+            // TODO: revisit ancillary only Link handling
+
+            // currently treating ancillary only as down (not visible outside)
+            return null;
         }
-        return new LinkEvent(LINK_ADDED, link);
+
+        links.put(key, newLink);
+        srcLinks.put(newLink.src().deviceId(), key);
+        dstLinks.put(newLink.dst().deviceId(), key);
+        return new LinkEvent(LINK_ADDED, newLink);
     }
 
     // Updates, if necessary the specified link and returns the appropriate event.
-    private LinkEvent updateLink(ProviderId providerId, DefaultLink link,
-                                 LinkKey key, LinkDescription linkDescription) {
-        if (link.type() == INDIRECT && linkDescription.type() == DIRECT) {
-            synchronized (this) {
-                srcLinks.remove(link.src().deviceId(), link);
-                dstLinks.remove(link.dst().deviceId(), link);
+    // Guarded by linkDescs value (=locking each Link)
+    private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
 
-                DefaultLink updated =
-                        new DefaultLink(providerId, link.src(), link.dst(),
-                                        linkDescription.type());
-                links.put(key, updated);
-                srcLinks.put(link.src().deviceId(), updated);
-                dstLinks.put(link.dst().deviceId(), updated);
-                return new LinkEvent(LINK_UPDATED, updated);
-            }
+        if (newLink.providerId().isAncillary()) {
+            // TODO: revisit ancillary only Link handling
+
+            // currently treating ancillary only as down (not visible outside)
+            return null;
+        }
+
+        if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
+            !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
+
+            links.put(key, newLink);
+            // strictly speaking following can be ommitted
+            srcLinks.put(oldLink.src().deviceId(), key);
+            dstLinks.put(oldLink.dst().deviceId(), key);
+            return new LinkEvent(LINK_UPDATED, newLink);
         }
         return null;
     }
 
     @Override
     public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
-        synchronized (this) {
-            Link link = links.remove(new LinkKey(src, dst));
+        final LinkKey key = new LinkKey(src, dst);
+        ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
+        synchronized (descs) {
+            Link link = links.remove(key);
+            descs.clear();
             if (link != null) {
-                srcLinks.remove(link.src().deviceId(), link);
-                dstLinks.remove(link.dst().deviceId(), link);
+                srcLinks.remove(link.src().deviceId(), key);
+                dstLinks.remove(link.dst().deviceId(), key);
                 return new LinkEvent(LINK_REMOVED, link);
             }
             return null;
         }
     }
+
+    private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
+        return synchronizedSetMultimap(HashMultimap.<K, V>create());
+    }
+
+    /**
+     * @return primary ProviderID, or randomly chosen one if none exists
+     */
+    private ProviderId pickPrimaryPID(
+            ConcurrentMap<ProviderId, LinkDescription> providerDescs) {
+
+        ProviderId fallBackPrimary = null;
+        for (Entry<ProviderId, LinkDescription> e : providerDescs.entrySet()) {
+            if (!e.getKey().isAncillary()) {
+                return e.getKey();
+            } else if (fallBackPrimary == null) {
+                // pick randomly as a fallback in case there is no primary
+                fallBackPrimary = e.getKey();
+            }
+        }
+        return fallBackPrimary;
+    }
+
+    private Link composeLink(ConcurrentMap<ProviderId, LinkDescription> descs) {
+        ProviderId primary = pickPrimaryPID(descs);
+        LinkDescription base = descs.get(primary);
+
+        ConnectPoint src = base.src();
+        ConnectPoint dst = base.dst();
+        Type type = base.type();
+        DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+        annotations = merge(annotations, base.annotations());
+
+        for (Entry<ProviderId, LinkDescription> e : descs.entrySet()) {
+            if (primary.equals(e.getKey())) {
+                continue;
+            }
+
+            // TODO: should keep track of Description timestamp
+            // and only merge conflicting keys when timestamp is newer
+            // Currently assuming there will never be a key conflict between
+            // providers
+
+            // annotation merging. not so efficient, should revisit later
+            annotations = merge(annotations, e.getValue().annotations());
+        }
+
+        return new DefaultLink(primary , src, dst, type, annotations);
+    }
+
+    private ConcurrentMap<ProviderId, LinkDescription> getLinkDescriptions(LinkKey key) {
+        return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
+                NewConcurrentHashMap.<ProviderId, LinkDescription>ifNeeded());
+    }
+
+    private final Function<LinkKey, Link> lookupLink = new LookupLink();
+    private Function<LinkKey, Link> lookupLink() {
+        return lookupLink;
+    }
+
+    private final class LookupLink implements Function<LinkKey, Link> {
+        @Override
+        public Link apply(LinkKey input) {
+            return links.get(input);
+        }
+    }
+
+    private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
+    private static final Predicate<Provided> isPrimary() {
+        return IS_PRIMARY;
+    }
+
+    private static final class IsPrimary implements Predicate<Provided> {
+
+        @Override
+        public boolean apply(Provided input) {
+            return !input.providerId().isAncillary();
+        }
+    }
 }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java
index 4e7d5ed..bd7db8a 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java
@@ -124,7 +124,8 @@
         // Promote the new topology to current and return a ready-to-send event.
         synchronized (this) {
             current = newTopology;
-            return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
+            return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
+                                     current, reasons);
         }
     }
 
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java
index a0d6e1c..146086a 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java
@@ -103,17 +103,19 @@
         simpleDeviceStore.deactivate();
     }
 
-    private void putDevice(DeviceId deviceId, String swVersion) {
+    private void putDevice(DeviceId deviceId, String swVersion,
+                           SparseAnnotations... annotations) {
         DeviceDescription description =
                 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
-                        HW, swVersion, SN);
+                        HW, swVersion, SN, annotations);
         deviceStore.createOrUpdateDevice(PID, deviceId, description);
     }
 
-    private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
+    private void putDeviceAncillary(DeviceId deviceId, String swVersion,
+                                    SparseAnnotations... annotations) {
         DeviceDescription description =
                 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
-                        HW, swVersion, SN);
+                        HW, swVersion, SN, annotations);
         deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
     }
 
@@ -126,6 +128,7 @@
         assertEquals(SN, device.serialNumber());
     }
 
+    // TODO slice this out somewhere
     /**
      * Verifies that Annotations created by merging {@code annotations} is
      * equal to actual Annotations.
@@ -133,7 +136,7 @@
      * @param actual Annotations to check
      * @param annotations
      */
-    private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
+    public static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
         DefaultAnnotations expected = DefaultAnnotations.builder().build();
         for (SparseAnnotations a : annotations) {
             expected = DefaultAnnotations.merge(expected, a);
@@ -347,6 +350,7 @@
         assertFalse("Port is disabled", event.port().isEnabled());
 
     }
+
     @Test
     public final void testUpdatePortStatusAncillary() {
         putDeviceAncillary(DID1, SW1);
@@ -435,16 +439,37 @@
 
     @Test
     public final void testRemoveDevice() {
-        putDevice(DID1, SW1);
+        putDevice(DID1, SW1, A1);
+        List<PortDescription> pds = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true, A2)
+                );
+        deviceStore.updatePorts(PID, DID1, pds);
         putDevice(DID2, SW1);
 
         assertEquals(2, deviceStore.getDeviceCount());
+        assertEquals(1, deviceStore.getPorts(DID1).size());
+        assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
+        assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
 
         DeviceEvent event = deviceStore.removeDevice(DID1);
         assertEquals(DEVICE_REMOVED, event.type());
         assertDevice(DID1, SW1, event.subject());
 
         assertEquals(1, deviceStore.getDeviceCount());
+        assertEquals(0, deviceStore.getPorts(DID1).size());
+
+        // putBack Device, Port w/o annotation
+        putDevice(DID1, SW1);
+        List<PortDescription> pds2 = Arrays.<PortDescription>asList(
+                new DefaultPortDescription(P1, true)
+                );
+        deviceStore.updatePorts(PID, DID1, pds2);
+
+        // annotations should not survive
+        assertEquals(2, deviceStore.getDeviceCount());
+        assertEquals(1, deviceStore.getPorts(DID1).size());
+        assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations());
+        assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations());
     }
 
     // If Delegates should be called only on remote events,
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java
index eb4a312..8a16609 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java
@@ -4,7 +4,9 @@
 import static org.onlab.onos.net.DeviceId.deviceId;
 import static org.onlab.onos.net.Link.Type.*;
 import static org.onlab.onos.net.link.LinkEvent.Type.*;
+import static org.onlab.onos.store.trivial.impl.SimpleDeviceStoreTest.assertAnnotationsEquals;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -18,10 +20,12 @@
 import org.junit.Ignore;
 import org.junit.Test;
 import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Link;
 import org.onlab.onos.net.LinkKey;
 import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
 import org.onlab.onos.net.Link.Type;
 import org.onlab.onos.net.link.DefaultLinkDescription;
 import org.onlab.onos.net.link.LinkEvent;
@@ -37,6 +41,7 @@
 public class SimpleLinkStoreTest {
 
     private static final ProviderId PID = new ProviderId("of", "foo");
+    private static final ProviderId PIDA = new ProviderId("of", "bar", true);
     private static final DeviceId DID1 = deviceId("of:foo");
     private static final DeviceId DID2 = deviceId("of:bar");
 
@@ -44,6 +49,23 @@
     private static final PortNumber P2 = PortNumber.portNumber(2);
     private static final PortNumber P3 = PortNumber.portNumber(3);
 
+    private static final SparseAnnotations A1 = DefaultAnnotations.builder()
+            .set("A1", "a1")
+            .set("B1", "b1")
+            .build();
+    private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
+            .remove("A1")
+            .set("B3", "b3")
+            .build();
+    private static final SparseAnnotations A2 = DefaultAnnotations.builder()
+            .set("A2", "a2")
+            .set("B2", "b2")
+            .build();
+    private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
+            .remove("A2")
+            .set("B4", "b4")
+            .build();
+
 
     private SimpleLinkStore simpleLinkStore;
     private LinkStore linkStore;
@@ -69,16 +91,17 @@
     }
 
     private void putLink(DeviceId srcId, PortNumber srcNum,
-                         DeviceId dstId, PortNumber dstNum, Type type) {
+                         DeviceId dstId, PortNumber dstNum, Type type,
+                         SparseAnnotations... annotations) {
         ConnectPoint src = new ConnectPoint(srcId, srcNum);
         ConnectPoint dst = new ConnectPoint(dstId, dstNum);
-        linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type));
+        linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
     }
 
-    private void putLink(LinkKey key, Type type) {
+    private void putLink(LinkKey key, Type type, SparseAnnotations... annotations) {
         putLink(key.src().deviceId(), key.src().port(),
                 key.dst().deviceId(), key.dst().port(),
-                type);
+                type, annotations);
     }
 
     private static void assertLink(DeviceId srcId, PortNumber srcNum,
@@ -270,14 +293,67 @@
     }
 
     @Test
+    public final void testCreateOrUpdateLinkAncillary() {
+        ConnectPoint src = new ConnectPoint(DID1, P1);
+        ConnectPoint dst = new ConnectPoint(DID2, P2);
+
+        // add Ancillary link
+        LinkEvent event = linkStore.createOrUpdateLink(PIDA,
+                    new DefaultLinkDescription(src, dst, INDIRECT, A1));
+
+        assertNull("Ancillary only link is ignored", event);
+
+        // add Primary link
+        LinkEvent event2 = linkStore.createOrUpdateLink(PID,
+                new DefaultLinkDescription(src, dst, INDIRECT, A2));
+
+        assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
+        assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
+        assertEquals(LINK_ADDED, event2.type());
+
+        // update link type
+        LinkEvent event3 = linkStore.createOrUpdateLink(PID,
+                new DefaultLinkDescription(src, dst, DIRECT, A2));
+        assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
+        assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
+        assertEquals(LINK_UPDATED, event3.type());
+
+
+        // no change
+        LinkEvent event4 = linkStore.createOrUpdateLink(PID,
+                new DefaultLinkDescription(src, dst, DIRECT));
+        assertNull("No change event expected", event4);
+
+        // update link annotation (Primary)
+        LinkEvent event5 = linkStore.createOrUpdateLink(PID,
+                new DefaultLinkDescription(src, dst, DIRECT, A2_2));
+        assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
+        assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
+        assertEquals(LINK_UPDATED, event5.type());
+
+        // update link annotation (Ancillary)
+        LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
+                new DefaultLinkDescription(src, dst, DIRECT, A1_2));
+        assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
+        assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
+        assertEquals(LINK_UPDATED, event6.type());
+
+        // update link type (Ancillary) : ignored
+        LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
+                new DefaultLinkDescription(src, dst, EDGE));
+        assertNull("Ancillary change other than annotation is ignored", event7);
+    }
+
+
+    @Test
     public final void testRemoveLink() {
         final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
         final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
         LinkKey linkId1 = new LinkKey(d1P1, d2P2);
         LinkKey linkId2 = new LinkKey(d2P2, d1P1);
 
-        putLink(linkId1, DIRECT);
-        putLink(linkId2, DIRECT);
+        putLink(linkId1, DIRECT, A1);
+        putLink(linkId2, DIRECT, A2);
 
         // DID1,P1 => DID2,P2
         // DID2,P2 => DID1,P1
@@ -285,10 +361,41 @@
 
         LinkEvent event = linkStore.removeLink(d1P1, d2P2);
         assertEquals(LINK_REMOVED, event.type());
+        assertAnnotationsEquals(event.subject().annotations(), A1);
         LinkEvent event2 = linkStore.removeLink(d1P1, d2P2);
         assertNull(event2);
 
         assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1));
+        assertAnnotationsEquals(linkStore.getLink(d2P2, d1P1).annotations(), A2);
+
+        // annotations, etc. should not survive remove
+        putLink(linkId1, DIRECT);
+        assertLink(linkId1, DIRECT, linkStore.getLink(d1P1, d2P2));
+        assertAnnotationsEquals(linkStore.getLink(d1P1, d2P2).annotations());
+    }
+
+    @Test
+    public final void testAncillaryOnlyNotVisible() {
+        ConnectPoint src = new ConnectPoint(DID1, P1);
+        ConnectPoint dst = new ConnectPoint(DID2, P2);
+
+        // add Ancillary link
+        linkStore.createOrUpdateLink(PIDA,
+                    new DefaultLinkDescription(src, dst, INDIRECT, A1));
+
+        // Ancillary only link should not be visible
+        assertEquals(0, linkStore.getLinkCount());
+
+        assertTrue(Iterables.isEmpty(linkStore.getLinks()));
+
+        assertNull(linkStore.getLink(src, dst));
+
+        assertEquals(Collections.emptySet(), linkStore.getIngressLinks(dst));
+
+        assertEquals(Collections.emptySet(), linkStore.getEgressLinks(src));
+
+        assertEquals(Collections.emptySet(), linkStore.getDeviceEgressLinks(DID1));
+        assertEquals(Collections.emptySet(), linkStore.getDeviceIngressLinks(DID2));
     }
 
     // If Delegates should be called only on remote events,