Revert "Refactored code in an attempt to break dependency cycles"

This reverts commit 195af6e6b27c23c7beb98f4cd425e7d7ffff1ecd.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
index aa0dc83..74c22f1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
@@ -6,5 +6,5 @@
     // avoid instantiation
     private ClusterManagementMessageSubjects() {}
 
-    public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("cluster-membership-event");
+    public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
index 2ad2666..b82a835 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
@@ -1,6 +1,6 @@
 package org.onlab.onos.store.cluster.impl;
 
-import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.packet.IpPrefix;
 
@@ -18,7 +18,7 @@
      * @param tcpPort node TCP listen port
      * @return the controller node
      */
-    ControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
+    DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
 
     /**
      * Notifies about cluster node going offline.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 9433130..5e64a39 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -1,5 +1,9 @@
 package org.onlab.onos.store.cluster.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -14,16 +18,8 @@
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.impl.ClusterMessageSerializer;
-import org.onlab.onos.store.cluster.messaging.impl.MessageSubjectSerializer;
-import org.onlab.onos.store.serializers.KryoPoolUtil;
-import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
 import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import static org.onlab.onos.cluster.ControllerNode.State;
 import static org.onlab.packet.IpPrefix.valueOf;
@@ -46,33 +43,17 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private ControllerNode localNode;
-    private final Map<NodeId, ControllerNode> nodes = new ConcurrentHashMap<>();
+    private DefaultControllerNode localNode;
+    private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
     private final Map<NodeId, State> states = new ConcurrentHashMap<>();
-
-    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoPool.newBuilder()
-                    .register(KryoPoolUtil.API)
-                    .register(ClusterMessage.class, new ClusterMessageSerializer())
-                    .register(ClusterMembershipEvent.class)
-                    .register(byte[].class)
-                    .register(MessageSubject.class, new MessageSubjectSerializer())
-                    .build()
-                    .populate(1);
-        }
-    };
+    private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
+            .maximumSize(1000)
+            .expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
+            .removalListener(new LivenessCacheRemovalListener()).build();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterCommunicationAdminService clusterCommunicationAdminService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationService clusterCommunicator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterMonitorService clusterMonitor;
-
     private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
 
     @Activate
@@ -80,15 +61,10 @@
         loadClusterDefinition();
         establishSelfIdentity();
 
-        clusterCommunicator.addSubscriber(
-                ClusterManagementMessageSubjects.CLUSTER_MEMBERSHIP_EVENT,
-                new ClusterMembershipEventListener());
-
-        // Start-up the monitor service and prime it with the loaded nodes.
-        clusterMonitor.initialize(localNode, nodesDelegate);
-
-        for (ControllerNode node : nodes.values()) {
-            clusterMonitor.addNode(node);
+        // Start-up the comm service and prime it with the loaded nodes.
+        clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
+        for (DefaultControllerNode node : nodes.values()) {
+            clusterCommunicationAdminService.addNode(node);
         }
         log.info("Started");
     }
@@ -154,78 +130,22 @@
     @Override
     public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
         DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
-        addNodeInternal(node);
-
-        try {
-            clusterCommunicator.broadcast(
-                    new ClusterMessage(
-                            localNode.id(),
-                            ClusterManagementMessageSubjects.CLUSTER_MEMBERSHIP_EVENT,
-                            SERIALIZER.encode(
-                                    new ClusterMembershipEvent(
-                                            ClusterMembershipEventType.NEW_MEMBER,
-                                            node))));
-        } catch (IOException e) {
-            // TODO: In a setup where cluster membership is not static (i.e. not everything has the same picture)
-            // we'll need a more consistent/dependable way to replicate membership events.
-            log.error("Failed to notify peers of a new cluster member", e);
-        }
-
+        nodes.put(nodeId, node);
+        clusterCommunicationAdminService.addNode(node);
         return node;
     }
 
-    private void addNodeInternal(ControllerNode node) {
-        nodes.put(node.id(), node);
-    }
-
     @Override
     public void removeNode(NodeId nodeId) {
-        ControllerNode node = removeNodeInternal(nodeId);
-
-        if (node != null) {
-            try {
-                clusterCommunicator.broadcast(
-                        new ClusterMessage(
-                                localNode.id(),
-                                ClusterManagementMessageSubjects.CLUSTER_MEMBERSHIP_EVENT,
-                                SERIALIZER.encode(
-                                        new ClusterMembershipEvent(
-                                                ClusterMembershipEventType.LEAVING_MEMBER,
-                                                node))));
-            } catch (IOException e) {
-                // TODO: In a setup where cluster membership is not static (i.e. not everything has the same picture)
-                // we'll need a more consistent/dependable way to replicate membership events.
-                log.error("Failed to notify peers of a existing cluster member leaving.", e);
-            }
-        }
-
-    }
-
-    private ControllerNode removeNodeInternal(NodeId nodeId) {
         if (nodeId.equals(localNode.id())) {
             nodes.clear();
             nodes.put(localNode.id(), localNode);
-            return localNode;
 
-        }
-        // Remove the other node.
-        ControllerNode node = nodes.remove(nodeId);
-        return node;
-    }
-
-    private class ClusterMembershipEventListener implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-
-            log.info("Received cluster membership event from peer: {}", message.sender());
-            ClusterMembershipEvent event = (ClusterMembershipEvent) SERIALIZER.decode(message.payload());
-            if (event.type() == ClusterMembershipEventType.NEW_MEMBER) {
-                log.info("Node {} is added", event.node().id());
-                addNodeInternal(event.node());
-            }
-            if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
-                log.info("Node {} is removed ", event.node().id());
-                removeNodeInternal(event.node().id());
+        } else {
+            // Remove the other node.
+            DefaultControllerNode node = nodes.remove(nodeId);
+            if (node != null) {
+                clusterCommunicationAdminService.removeNode(node);
             }
         }
     }
@@ -233,12 +153,13 @@
     // Entity to handle back calls from the connection manager.
     private class InnerNodesDelegate implements ClusterNodesDelegate {
         @Override
-        public ControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
-            ControllerNode node = nodes.get(nodeId);
+        public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
+            DefaultControllerNode node = nodes.get(nodeId);
             if (node == null) {
                 node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
             }
             states.put(nodeId, State.ACTIVE);
+            livenessCache.put(nodeId, node);
             return node;
         }
 
@@ -252,4 +173,14 @@
             removeNode(nodeId);
         }
     }
+
+    private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
+
+        @Override
+        public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
+            NodeId nodeId = entry.getKey();
+            log.warn("Failed to receive heartbeats from controller: " + nodeId);
+            nodesDelegate.nodeVanished(nodeId);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 0e6b1b8..c72fae8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -4,6 +4,8 @@
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -14,6 +16,10 @@
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
+import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -32,7 +38,7 @@
 @Component(immediate = true)
 @Service
 public class ClusterCommunicationManager
-        implements ClusterCommunicationService {
+        implements ClusterCommunicationService, ClusterCommunicationAdminService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -41,6 +47,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
+    private ClusterNodesDelegate nodesDelegate;
+    private final Timer timer = new Timer("onos-controller-heatbeats");
+    public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
+
     // TODO: This probably should not be a OSGi service.
     private MessagingService messagingService;
 
@@ -50,6 +60,7 @@
             serializerPool = KryoPool.newBuilder()
                     .register(KryoPoolUtil.API)
                     .register(ClusterMessage.class, new ClusterMessageSerializer())
+                    .register(ClusterMembershipEvent.class)
                     .register(byte[].class)
                     .register(MessageSubject.class, new MessageSubjectSerializer())
                     .build()
@@ -123,6 +134,61 @@
         messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
     }
 
+    @Override
+    public void initialize(ControllerNode localNode,
+            ClusterNodesDelegate delegate) {
+        this.localNode = localNode;
+        this.nodesDelegate = delegate;
+        this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
+        timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
+    }
+
+    @Override
+    public void addNode(ControllerNode node) {
+        //members.put(node.id(), node);
+    }
+
+    @Override
+    public void removeNode(ControllerNode node) {
+        broadcast(new ClusterMessage(
+                localNode.id(),
+                new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
+        //members.remove(node.id());
+    }
+
+    // Sends a heart beat to all peers.
+    private class KeepAlive extends TimerTask {
+
+        @Override
+        public void run() {
+            broadcast(new ClusterMessage(
+                localNode.id(),
+                new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
+                SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
+        }
+    }
+
+    private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+
+            ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
+            ControllerNode node = event.node();
+            if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
+                log.info("Node {} sent a hearbeat", node.id());
+                nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
+            } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
+                log.info("Node {} is leaving", node.id());
+                nodesDelegate.nodeRemoved(node.id());
+            } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
+                log.info("Node {} is unreachable", node.id());
+                nodesDelegate.nodeVanished(node.id());
+            }
+        }
+    }
+
     private final class InternalClusterMessageHandler implements MessageHandler {
 
         private final ClusterMessageHandler handler;
@@ -142,4 +208,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
index 5ab3e6f..132f27a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
@@ -3,7 +3,7 @@
 import java.util.Map;
 
 import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.device.Timestamp;
+import org.onlab.onos.store.Timestamp;
 
 import com.google.common.collect.ImmutableMap;
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
index 2a0ef60..46a2333 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
@@ -4,7 +4,7 @@
 import java.util.Set;
 
 import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.device.VersionedValue;
+import org.onlab.onos.store.VersionedValue;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/DeviceMastershipBasedTimestamp.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/DeviceMastershipBasedTimestamp.java
index 30b3db6..4bc41ab 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/DeviceMastershipBasedTimestamp.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/DeviceMastershipBasedTimestamp.java
@@ -4,7 +4,7 @@
 
 import java.util.Objects;
 
-import org.onlab.onos.net.device.Timestamp;
+import org.onlab.onos.store.Timestamp;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
index 6bb9f01..c9f232a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
@@ -7,8 +7,8 @@
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.device.Timestamp;
-import org.onlab.onos.net.device.VersionedValue;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.VersionedValue;
 import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
 
 // TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
index 4bb9dda..34dc873 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
@@ -9,8 +9,8 @@
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.device.Timestamp;
-import org.onlab.onos.net.device.VersionedValue;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.VersionedValue;
 import org.onlab.onos.store.common.impl.AntiEntropyReply;
 
 import com.google.common.collect.ImmutableMap;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
index a8d023c..6f4fb96 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
@@ -11,10 +11,10 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.device.DeviceClockProviderService;
-import org.onlab.onos.net.device.DeviceClockService;
 import org.onlab.onos.net.device.DeviceMastershipTerm;
-import org.onlab.onos.net.device.Timestamp;
+import org.onlab.onos.store.ClockProviderService;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
 import org.onlab.onos.store.common.impl.DeviceMastershipBasedTimestamp;
 import org.slf4j.Logger;
 
@@ -23,7 +23,7 @@
  */
 @Component(immediate = true)
 @Service
-public class DeviceClockManager implements DeviceClockService, DeviceClockProviderService {
+public class DeviceClockManager implements ClockService, ClockProviderService {
 
     private final Logger log = getLogger(getClass());
 
@@ -51,7 +51,7 @@
     }
 
     @Override
-    public void setDeviceMastershipTerm(DeviceId deviceId, DeviceMastershipTerm term) {
+    public void setMastershipTerm(DeviceId deviceId, DeviceMastershipTerm term) {
         deviceMastershipTerms.put(deviceId, term);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index ebcbb0c..06c70a0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -26,16 +26,16 @@
 import org.onlab.onos.net.SparseAnnotations;
 import org.onlab.onos.net.device.DefaultDeviceDescription;
 import org.onlab.onos.net.device.DefaultPortDescription;
-import org.onlab.onos.net.device.DeviceClockService;
 import org.onlab.onos.net.device.DeviceDescription;
 import org.onlab.onos.net.device.DeviceEvent;
 import org.onlab.onos.net.device.DeviceStore;
 import org.onlab.onos.net.device.DeviceStoreDelegate;
 import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.device.Timestamp;
-import org.onlab.onos.net.device.Timestamped;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.Timestamped;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -105,7 +105,7 @@
     private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceClockService clockService;
+    protected ClockService clockService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
index 55da310..1deb5d5 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -2,8 +2,8 @@
 
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.Timestamped;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamped;
 
 /**
  * Information published by GossipDeviceStore to notify peers of a device
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
index f17a602..63ddda3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
@@ -2,8 +2,8 @@
 
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.Timestamped;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamped;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
index 806a9fb..d8942d6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
@@ -1,7 +1,7 @@
 package org.onlab.onos.store.device.impl;
 
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.device.Timestamp;
+import org.onlab.onos.store.Timestamp;
 
 /**
  * Information published by GossipDeviceStore to notify peers of a device
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
index e014cac..7059636 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
@@ -1,7 +1,7 @@
 package org.onlab.onos.store.device.impl;
 
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.device.Timestamp;
+import org.onlab.onos.store.Timestamp;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
index 5800583..6c8b905 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
@@ -1,7 +1,7 @@
 package org.onlab.onos.store.device.impl;
 
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.device.Timestamp;
+import org.onlab.onos.store.Timestamp;
 
 /**
  * Information published by GossipDeviceStore to notify peers of a device
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
index b9acb2c..ad4e380 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -4,8 +4,8 @@
 
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.device.Timestamped;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamped;
 
 /**
  * Information published by GossipDeviceStore to notify peers of a port
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
index a2737b4..ede88f1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
@@ -4,8 +4,8 @@
 
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.device.Timestamped;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamped;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
index 07424eb..00f9a9f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -2,8 +2,8 @@
 
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.device.Timestamped;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamped;
 
 /**
  * Information published by GossipDeviceStore to notify peers of a port
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
index 434787c..82fbb89 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -2,8 +2,8 @@
 
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.device.Timestamped;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamped;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
index 17fbe10..d00d76b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
@@ -23,15 +23,15 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Link;
 import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.device.DeviceClockService;
-import org.onlab.onos.net.device.Timestamp;
-import org.onlab.onos.net.device.VersionedValue;
 import org.onlab.onos.net.link.LinkDescription;
 import org.onlab.onos.net.link.LinkEvent;
 import org.onlab.onos.net.link.LinkStore;
 import org.onlab.onos.net.link.LinkStoreDelegate;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.VersionedValue;
 import org.slf4j.Logger;
 
 import com.google.common.collect.HashMultimap;
@@ -71,7 +71,7 @@
     private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DeviceClockService clockService;
+    protected ClockService clockService;
 
     @Activate
     public void activate() {
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
index c78f78a..e63fcaa 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -2,6 +2,8 @@
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
@@ -9,6 +11,12 @@
 import org.onlab.netty.NettyMessagingService;
 import org.onlab.packet.IpPrefix;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests of the cluster communication manager.
  */
@@ -25,6 +33,9 @@
     private ClusterCommunicationManager ccm1;
     private ClusterCommunicationManager ccm2;
 
+    private TestDelegate cnd1 = new TestDelegate();
+    private TestDelegate cnd2 = new TestDelegate();
+
     private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
     private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
 
@@ -37,10 +48,15 @@
         messagingService.activate();
 
         ccm1 = new ClusterCommunicationManager();
+//        ccm1.serializationService = messageSerializer;
         ccm1.activate();
 
         ccm2 = new ClusterCommunicationManager();
+//        ccm2.serializationService = messageSerializer;
         ccm2.activate();
+
+        ccm1.initialize(node1, cnd1);
+        ccm2.initialize(node2, cnd2);
     }
 
     @After
@@ -48,4 +64,70 @@
         ccm1.deactivate();
         ccm2.deactivate();
     }
-}
\ No newline at end of file
+
+    @Ignore("FIXME: failing randomly?")
+    @Test
+    public void connect() throws Exception {
+        cnd1.latch = new CountDownLatch(1);
+        cnd2.latch = new CountDownLatch(1);
+
+        ccm1.addNode(node2);
+        validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
+        validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
+    }
+
+    @Test
+    @Ignore
+    public void disconnect() throws Exception {
+        cnd1.latch = new CountDownLatch(1);
+        cnd2.latch = new CountDownLatch(1);
+
+        ccm1.addNode(node2);
+        validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
+        validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
+
+        cnd1.latch = new CountDownLatch(1);
+        cnd2.latch = new CountDownLatch(1);
+        ccm1.deactivate();
+//
+//        validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
+    }
+
+    private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
+            throws InterruptedException {
+        assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
+        assertEquals("incorrect event", op, delegate.op);
+        assertEquals("incorrect event node", nodeId, delegate.nodeId);
+    }
+
+    enum Op { DETECTED, VANISHED, REMOVED };
+
+    private class TestDelegate implements ClusterNodesDelegate {
+
+        Op op;
+        CountDownLatch latch;
+        NodeId nodeId;
+
+        @Override
+        public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
+            latch(nodeId, Op.DETECTED);
+            return new DefaultControllerNode(nodeId, ip, tcpPort);
+        }
+
+        @Override
+        public void nodeVanished(NodeId nodeId) {
+            latch(nodeId, Op.VANISHED);
+        }
+
+        @Override
+        public void nodeRemoved(NodeId nodeId) {
+            latch(nodeId, Op.REMOVED);
+        }
+
+        private void latch(NodeId nodeId, Op op) {
+            this.op = op;
+            this.nodeId = nodeId;
+            latch.countDown();
+        }
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java
index 154c2f6..2c8ff35 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java
@@ -5,7 +5,7 @@
 import java.nio.ByteBuffer;
 
 import org.junit.Test;
-import org.onlab.onos.net.device.Timestamp;
+import org.onlab.onos.store.Timestamp;
 import org.onlab.util.KryoPool;
 
 import com.google.common.testing.EqualsTester;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
index 9b6c3e6..2a0faa8 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
@@ -5,8 +5,8 @@
 import java.nio.ByteBuffer;
 
 import org.junit.Test;
-import org.onlab.onos.net.device.Timestamp;
-import org.onlab.onos.net.device.Timestamped;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.Timestamped;
 import org.onlab.util.KryoPool;
 
 import com.google.common.testing.EqualsTester;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index a683f91..d1060cc 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -35,7 +35,6 @@
 import org.onlab.onos.net.SparseAnnotations;
 import org.onlab.onos.net.device.DefaultDeviceDescription;
 import org.onlab.onos.net.device.DefaultPortDescription;
-import org.onlab.onos.net.device.DeviceClockService;
 import org.onlab.onos.net.device.DeviceDescription;
 import org.onlab.onos.net.device.DeviceEvent;
 import org.onlab.onos.net.device.DeviceStore;
@@ -43,6 +42,7 @@
 import org.onlab.onos.net.device.DeviceMastershipTerm;
 import org.onlab.onos.net.device.PortDescription;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.ClockService;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -96,7 +96,7 @@
     private DeviceStore deviceStore;
 
     private DeviceClockManager deviceClockManager;
-    private DeviceClockService clockService;
+    private ClockService clockService;
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -113,8 +113,8 @@
         deviceClockManager.activate();
         clockService = deviceClockManager;
 
-        deviceClockManager.setDeviceMastershipTerm(DID1, DeviceMastershipTerm.of(MYSELF, 1));
-        deviceClockManager.setDeviceMastershipTerm(DID2, DeviceMastershipTerm.of(MYSELF, 2));
+        deviceClockManager.setMastershipTerm(DID1, DeviceMastershipTerm.of(MYSELF, 1));
+        deviceClockManager.setMastershipTerm(DID2, DeviceMastershipTerm.of(MYSELF, 2));
 
         ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
         ClusterService clusterService = new TestClusterService();
@@ -556,7 +556,7 @@
     private static final class TestGossipDeviceStore extends GossipDeviceStore {
 
         public TestGossipDeviceStore(
-                DeviceClockService clockService,
+                ClockService clockService,
                 ClusterService clusterService,
                 ClusterCommunicationService clusterCommunicator) {
             this.clockService = clockService;