Refactored ClusterManager as proper fix for Karaf clean issue (Topic phi-fd-on)

Change-Id: Ibb328d73412855dd2d44ca6b734f738ae2996873
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java
deleted file mode 100644
index f8202fc..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.Endpoint;
-import org.onlab.netty.Message;
-import org.onlab.netty.MessageHandler;
-import org.onlab.netty.NettyMessagingService;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterAdminService;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.ControllerNode.State;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.AbstractListenerRegistry;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.slf4j.Logger;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.hazelcast.util.AddressUtil;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * ClusterService implementation that employs an accrual failure
- * detector to identify cluster member up/down status.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class ClusterManager implements ClusterService, ClusterAdminService {
-
-    private final Logger log = getLogger(getClass());
-
-    protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
-        listenerRegistry = new AbstractListenerRegistry<>();
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected EventDeliveryService eventDispatcher;
-
-    // TODO: make these configurable.
-    private static final int HEARTBEAT_FD_PORT = 2419;
-    private static final int HEARTBEAT_INTERVAL_MS = 100;
-    private static final int PHI_FAILURE_THRESHOLD = 10;
-
-    private static final String CONFIG_DIR = "../config";
-    private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
-
-    private ClusterDefinition clusterDefinition;
-
-    private Set<ControllerNode> seedNodes;
-    private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
-    private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
-    private NettyMessagingService messagingService = new NettyMessagingService();
-    private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
-            groupedThreads("onos/cluster/membership", "heartbeat-sender"));
-    private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
-            groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
-
-    private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
-
-
-    private PhiAccrualFailureDetector failureDetector;
-
-    private ControllerNode localNode;
-
-    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(HeartbeatMessage.class)
-                .build()
-                .populate(1);
-        }
-    };
-
-    private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
-
-    @Activate
-    public void activate() {
-
-        File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
-
-        try {
-            clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath()).read();
-            seedNodes = ImmutableSet.copyOf(clusterDefinition.getNodes())
-                            .stream()
-                            .map(nodeInfo -> new DefaultControllerNode(
-                                        new NodeId(nodeInfo.getId()),
-                                        IpAddress.valueOf(nodeInfo.getIp()),
-                                        nodeInfo.getTcpPort()))
-                            .collect(Collectors.toSet());
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to read cluster definition.", e);
-        }
-
-        seedNodes.forEach(node -> {
-            allNodes.put(node.id(), node);
-            nodeStates.put(node.id(), State.INACTIVE);
-        });
-
-        establishSelfIdentity();
-
-        messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
-
-        try {
-            messagingService.activate();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IllegalStateException("Failed to cleanly initialize membership and"
-                    + " failure detector communication channel.", e);
-        }
-        messagingService.registerHandler(
-                HEARTBEAT_MESSAGE,
-                new HeartbeatMessageHandler(),
-                heartBeatMessageHandler);
-
-        eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
-        failureDetector = new PhiAccrualFailureDetector();
-
-        heartBeatSender.scheduleWithFixedDelay(
-                this::heartbeat,
-                0,
-                HEARTBEAT_INTERVAL_MS,
-                TimeUnit.MILLISECONDS);
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        try {
-            messagingService.deactivate();
-        } catch (Exception e) {
-            log.trace("Failed to cleanly shutdown cluster membership messaging", e);
-        }
-
-        heartBeatSender.shutdownNow();
-        heartBeatMessageHandler.shutdownNow();
-        eventDispatcher.removeSink(ClusterEvent.class);
-
-        log.info("Stopped");
-    }
-
-    @Override
-    public ControllerNode getLocalNode() {
-        return localNode;
-    }
-
-    @Override
-    public Set<ControllerNode> getNodes() {
-        return ImmutableSet.copyOf(allNodes.values());
-    }
-
-    @Override
-    public ControllerNode getNode(NodeId nodeId) {
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        return allNodes.get(nodeId);
-    }
-
-    @Override
-    public State getState(NodeId nodeId) {
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        return nodeStates.get(nodeId);
-    }
-
-    @Override
-    public void addListener(ClusterEventListener listener) {
-        checkNotNull(listener, "Listener must not be null");
-        listenerRegistry.addListener(listener);
-    }
-
-    @Override
-    public void removeListener(ClusterEventListener listener) {
-        checkNotNull(listener, "Listener must not be null");
-        listenerRegistry.removeListener(listener);
-    }
-
-    @Override
-    public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        checkNotNull(ip, "IP address must not be null");
-        checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
-        ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
-        allNodes.put(node.id(), node);
-        nodeStates.put(nodeId, State.INACTIVE);
-        eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
-        return node;
-    }
-
-    @Override
-    public void removeNode(NodeId nodeId) {
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        ControllerNode node = allNodes.remove(nodeId);
-        if (node != null) {
-            nodeStates.remove(nodeId);
-            eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
-        }
-    }
-
-    private void establishSelfIdentity() {
-        try {
-            IpAddress ip = findLocalIp();
-            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
-            allNodes.put(localNode.id(), localNode);
-            nodeStates.put(localNode.id(), State.ACTIVE);
-            log.info("Local Node: {}", localNode);
-        } catch (SocketException e) {
-            throw new IllegalStateException("Cannot determine local IP", e);
-        }
-    }
-
-    private void heartbeat() {
-        try {
-            Set<ControllerNode> peers = allNodes.values()
-                    .stream()
-                    .filter(node -> !(node.id().equals(localNode.id())))
-                    .collect(Collectors.toSet());
-            byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
-            peers.forEach((node) -> {
-                heartbeatToPeer(hbMessagePayload, node);
-                State currentState = nodeStates.get(node.id());
-                double phi = failureDetector.phi(node.id());
-                if (phi >= PHI_FAILURE_THRESHOLD) {
-                    if (currentState == State.ACTIVE) {
-                        nodeStates.put(node.id(), State.INACTIVE);
-                        notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
-                    }
-                } else {
-                    if (currentState == State.INACTIVE) {
-                        nodeStates.put(node.id(), State.ACTIVE);
-                        notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
-                    }
-                }
-            });
-        } catch (Exception e) {
-            log.debug("Failed to send heartbeat", e);
-        }
-    }
-
-    private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
-        if (newState == State.ACTIVE) {
-            eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, allNodes.get(nodeId)));
-        } else {
-            eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, allNodes.get(nodeId)));
-        }
-    }
-
-    private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
-        Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
-        try {
-            messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
-        } catch (IOException e) {
-            log.debug("Sending heartbeat to {} failed", remoteEp, e);
-        }
-    }
-
-    private class HeartbeatMessageHandler implements MessageHandler {
-        @Override
-        public void handle(Message message) throws IOException {
-            HeartbeatMessage hb = SERIALIZER.decode(message.payload());
-            failureDetector.report(hb.source().id());
-            hb.knownPeers().forEach(node -> {
-                allNodes.put(node.id(), node);
-            });
-        }
-    }
-
-    private static class HeartbeatMessage {
-        private ControllerNode source;
-        private Set<ControllerNode> knownPeers;
-
-        public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
-            this.source = source;
-            this.knownPeers = ImmutableSet.copyOf(members);
-        }
-
-        public ControllerNode source() {
-            return source;
-        }
-
-        public Set<ControllerNode> knownPeers() {
-            return knownPeers;
-        }
-    }
-
-    private IpAddress findLocalIp() throws SocketException {
-        Enumeration<NetworkInterface> interfaces =
-                NetworkInterface.getNetworkInterfaces();
-        while (interfaces.hasMoreElements()) {
-            NetworkInterface iface = interfaces.nextElement();
-            Enumeration<InetAddress> inetAddresses =  iface.getInetAddresses();
-            while (inetAddresses.hasMoreElements()) {
-                IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
-                if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
-                    return ip;
-                }
-            }
-        }
-        throw new IllegalStateException("Unable to determine local ip");
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 4da1b46..213d0f4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -15,167 +15,323 @@
  */
 package org.onosproject.store.cluster.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.Member;
-import com.hazelcast.core.MemberAttributeEvent;
-import com.hazelcast.core.MembershipEvent;
-import com.hazelcast.core.MembershipListener;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.NettyMessagingService;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterStore;
 import org.onosproject.cluster.ClusterStoreDelegate;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.OptionalCacheLoader;
-import org.onlab.packet.IpAddress;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.hazelcast.util.AddressUtil;
 
-import static com.google.common.cache.CacheBuilder.newBuilder;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
-import static org.onosproject.cluster.ControllerNode.State;
-
-/**
- * Distributed implementation of the cluster nodes store.
- */
 @Component(immediate = true)
 @Service
+/**
+ * Distributed cluster nodes store that employs an accrual failure
+ * detector to identify cluster member up/down status.
+ */
 public class DistributedClusterStore
-        extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
+        extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
         implements ClusterStore {
 
-    private IMap<byte[], byte[]> rawNodes;
-    private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
+    private final Logger log = getLogger(DistributedClusterStore.class);
 
-    private String listenerId;
-    private final MembershipListener listener = new InternalMembershipListener();
-    private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+    // TODO: make these configurable.
+    private static final int HEARTBEAT_FD_PORT = 2419;
+    private static final int HEARTBEAT_INTERVAL_MS = 100;
+    private static final int PHI_FAILURE_THRESHOLD = 10;
 
-    private String nodesListenerId;
+    private static final String CONFIG_DIR = "../config";
+    private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
+    private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
 
-    @Override
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(HeartbeatMessage.class)
+                .build()
+                .populate(1);
+        }
+    };
+
+    private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+
+    private ClusterDefinition clusterDefinition;
+
+    private Set<ControllerNode> seedNodes;
+    private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
+    private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
+    private NettyMessagingService messagingService = new NettyMessagingService();
+    private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+            groupedThreads("onos/cluster/membership", "heartbeat-sender"));
+    private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
+            groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
+
+    private PhiAccrualFailureDetector failureDetector;
+
+    private ControllerNode localNode;
+
     @Activate
     public void activate() {
-        super.activate();
-        listenerId = theInstance.getCluster().addMembershipListener(listener);
+        File clusterDefinitionFile = new File(CONFIG_DIR,
+                CLUSTER_DEFINITION_FILE);
 
-        rawNodes = theInstance.getMap("nodes");
-        OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
-                = new OptionalCacheLoader<>(serializer, rawNodes);
-        nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
-        nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
+        try {
+            clusterDefinition = new ClusterDefinitionStore(
+                    clusterDefinitionFile.getPath()).read();
+            seedNodes = ImmutableSet
+                    .copyOf(clusterDefinition.getNodes())
+                    .stream()
+                    .map(nodeInfo -> new DefaultControllerNode(new NodeId(
+                            nodeInfo.getId()), IpAddress.valueOf(nodeInfo
+                            .getIp()), nodeInfo.getTcpPort()))
+                    .collect(Collectors.toSet());
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "Failed to read cluster definition.", e);
+        }
 
-        loadClusterNodes();
+        seedNodes.forEach(node -> {
+            allNodes.put(node.id(), node);
+            nodeStates.put(node.id(), State.INACTIVE);
+        });
+
+        establishSelfIdentity();
+
+        messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
+
+        try {
+            messagingService.activate();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(
+                    "Failed to cleanly initialize membership and"
+                            + " failure detector communication channel.", e);
+        }
+        messagingService.registerHandler(HEARTBEAT_MESSAGE,
+                new HeartbeatMessageHandler(), heartBeatMessageHandler);
+
+        failureDetector = new PhiAccrualFailureDetector();
+
+        heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+                HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
 
         log.info("Started");
     }
 
-    // Loads the initial set of cluster nodes
-    private void loadClusterNodes() {
-        for (Member member : theInstance.getCluster().getMembers()) {
-            addNode(node(member));
-        }
-    }
-
     @Deactivate
     public void deactivate() {
-        rawNodes.removeEntryListener(nodesListenerId);
-        theInstance.getCluster().removeMembershipListener(listenerId);
+        try {
+            messagingService.deactivate();
+        } catch (Exception e) {
+            log.trace("Failed to cleanly shutdown cluster membership messaging", e);
+        }
+
+        heartBeatSender.shutdownNow();
+        heartBeatMessageHandler.shutdownNow();
+
         log.info("Stopped");
     }
 
     @Override
+    public void setDelegate(ClusterStoreDelegate delegate) {
+        checkNotNull(delegate, "Delegate cannot be null");
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void unsetDelegate(ClusterStoreDelegate delegate) {
+        this.delegate = null;
+    }
+
+    @Override
+    public boolean hasDelegate() {
+        return this.delegate != null;
+    }
+
+    @Override
     public ControllerNode getLocalNode() {
-        return node(theInstance.getCluster().getLocalMember());
+        return localNode;
     }
 
     @Override
     public Set<ControllerNode> getNodes() {
-        ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
-        for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
-            builder.add(optional.get());
-        }
-        return builder.build();
+        return ImmutableSet.copyOf(allNodes.values());
     }
 
     @Override
     public ControllerNode getNode(NodeId nodeId) {
-        return nodes.getUnchecked(nodeId).orNull();
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return allNodes.get(nodeId);
     }
 
     @Override
     public State getState(NodeId nodeId) {
-        State state = states.get(nodeId);
-        return state == null ? State.INACTIVE : state;
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return nodeStates.get(nodeId);
     }
 
     @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
-        return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        checkNotNull(ip, "IP address must not be null");
+        checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
+        ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
+        allNodes.put(node.id(), node);
+        nodeStates.put(nodeId, State.INACTIVE);
+        delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+        return node;
     }
 
     @Override
     public void removeNode(NodeId nodeId) {
-        synchronized (this) {
-            rawNodes.remove(serialize(nodeId));
-            nodes.invalidate(nodeId);
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        ControllerNode node = allNodes.remove(nodeId);
+        if (node != null) {
+            nodeStates.remove(nodeId);
+            delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
         }
     }
 
-    // Adds a new node based on the specified member
-    private synchronized ControllerNode addNode(DefaultControllerNode node) {
-        rawNodes.put(serialize(node.id()), serialize(node));
-        nodes.put(node.id(), Optional.of(node));
-        states.put(node.id(), State.ACTIVE);
-        return node;
+    private void establishSelfIdentity() {
+        try {
+            IpAddress ip = findLocalIp();
+            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+            allNodes.put(localNode.id(), localNode);
+            nodeStates.put(localNode.id(), State.ACTIVE);
+            log.info("Local Node: {}", localNode);
+        } catch (SocketException e) {
+            throw new IllegalStateException("Cannot determine local IP", e);
+        }
     }
 
-    // Creates a controller node descriptor from the Hazelcast member.
-    private DefaultControllerNode node(Member member) {
-        IpAddress ip = memberAddress(member);
-        return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+    private void heartbeat() {
+        try {
+            Set<ControllerNode> peers = allNodes.values()
+                    .stream()
+                    .filter(node -> !(node.id().equals(localNode.id())))
+                    .collect(Collectors.toSet());
+            byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
+            peers.forEach((node) -> {
+                heartbeatToPeer(hbMessagePayload, node);
+                State currentState = nodeStates.get(node.id());
+                double phi = failureDetector.phi(node.id());
+                if (phi >= PHI_FAILURE_THRESHOLD) {
+                    if (currentState == State.ACTIVE) {
+                        nodeStates.put(node.id(), State.INACTIVE);
+                        notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
+                    }
+                } else {
+                    if (currentState == State.INACTIVE) {
+                        nodeStates.put(node.id(), State.ACTIVE);
+                        notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.debug("Failed to send heartbeat", e);
+        }
     }
 
-    private IpAddress memberAddress(Member member) {
-        return IpAddress.valueOf(member.getSocketAddress().getAddress());
+    private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
+        ControllerNode node = allNodes.get(nodeId);
+        if (newState == State.ACTIVE) {
+            delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
+        } else {
+            delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
+        }
     }
 
-    // Interceptor for membership events.
-    private class InternalMembershipListener implements MembershipListener {
+    private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
+        Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+        try {
+            messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
+        } catch (IOException e) {
+            log.debug("Sending heartbeat to {} failed", remoteEp, e);
+        }
+    }
+
+    private IpAddress findLocalIp() throws SocketException {
+        Enumeration<NetworkInterface> interfaces =
+                NetworkInterface.getNetworkInterfaces();
+        while (interfaces.hasMoreElements()) {
+            NetworkInterface iface = interfaces.nextElement();
+            Enumeration<InetAddress> inetAddresses =  iface.getInetAddresses();
+            while (inetAddresses.hasMoreElements()) {
+                IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
+                if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
+                    return ip;
+                }
+            }
+        }
+        throw new IllegalStateException("Unable to determine local ip");
+    }
+
+    private class HeartbeatMessageHandler implements MessageHandler {
         @Override
-        public void memberAdded(MembershipEvent membershipEvent) {
-            log.info("Member {} added", membershipEvent.getMember());
-            ControllerNode node = addNode(node(membershipEvent.getMember()));
-            notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
-        }
-
-        @Override
-        public void memberRemoved(MembershipEvent membershipEvent) {
-            log.info("Member {} removed", membershipEvent.getMember());
-            NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
-            states.put(nodeId, State.INACTIVE);
-            notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
-        }
-
-        @Override
-        public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
-            log.info("Member {} attribute {} changed to {}",
-                     memberAttributeEvent.getMember(),
-                     memberAttributeEvent.getKey(),
-                     memberAttributeEvent.getValue());
+        public void handle(Message message) throws IOException {
+            HeartbeatMessage hb = SERIALIZER.decode(message.payload());
+            failureDetector.report(hb.source().id());
+            hb.knownPeers().forEach(node -> {
+                allNodes.put(node.id(), node);
+            });
         }
     }
+
+    private static class HeartbeatMessage {
+        private ControllerNode source;
+        private Set<ControllerNode> knownPeers;
+
+        public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
+            this.source = source;
+            this.knownPeers = ImmutableSet.copyOf(members);
+        }
+
+        public ControllerNode source() {
+            return source;
+        }
+
+        public Set<ControllerNode> knownPeers() {
+            return knownPeers;
+        }
+    }
+
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
new file mode 100644
index 0000000..d32f7a8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterStore;
+import org.onosproject.cluster.ClusterStoreDelegate;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
+import org.onosproject.store.hz.AbstractHazelcastStore;
+import org.onosproject.store.hz.OptionalCacheLoader;
+import org.onlab.packet.IpAddress;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.cache.CacheBuilder.newBuilder;
+import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
+import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
+import static org.onosproject.cluster.ControllerNode.State;
+
+/**
+ * Distributed, Hazelcast-based implementation of the cluster nodes store.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class HazelcastClusterStore
+        extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
+        implements ClusterStore {
+
+    private IMap<byte[], byte[]> rawNodes;
+    private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
+
+    private String listenerId;
+    private final MembershipListener listener = new InternalMembershipListener();
+    private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+
+    private String nodesListenerId;
+
+    @Override
+    @Activate
+    public void activate() {
+        super.activate();
+        listenerId = theInstance.getCluster().addMembershipListener(listener);
+
+        rawNodes = theInstance.getMap("nodes");
+        OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
+                = new OptionalCacheLoader<>(serializer, rawNodes);
+        nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+        nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
+
+        loadClusterNodes();
+
+        log.info("Started");
+    }
+
+    // Loads the initial set of cluster nodes
+    private void loadClusterNodes() {
+        for (Member member : theInstance.getCluster().getMembers()) {
+            addNode(node(member));
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        rawNodes.removeEntryListener(nodesListenerId);
+        theInstance.getCluster().removeMembershipListener(listenerId);
+        log.info("Stopped");
+    }
+
+    @Override
+    public ControllerNode getLocalNode() {
+        return node(theInstance.getCluster().getLocalMember());
+    }
+
+    @Override
+    public Set<ControllerNode> getNodes() {
+        ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
+        for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
+            builder.add(optional.get());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public ControllerNode getNode(NodeId nodeId) {
+        return nodes.getUnchecked(nodeId).orNull();
+    }
+
+    @Override
+    public State getState(NodeId nodeId) {
+        State state = states.get(nodeId);
+        return state == null ? State.INACTIVE : state;
+    }
+
+    @Override
+    public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+        return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
+    }
+
+    @Override
+    public void removeNode(NodeId nodeId) {
+        synchronized (this) {
+            rawNodes.remove(serialize(nodeId));
+            nodes.invalidate(nodeId);
+        }
+    }
+
+    // Adds a new node based on the specified member
+    private synchronized ControllerNode addNode(DefaultControllerNode node) {
+        rawNodes.put(serialize(node.id()), serialize(node));
+        nodes.put(node.id(), Optional.of(node));
+        states.put(node.id(), State.ACTIVE);
+        return node;
+    }
+
+    // Creates a controller node descriptor from the Hazelcast member.
+    private DefaultControllerNode node(Member member) {
+        IpAddress ip = memberAddress(member);
+        return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+    }
+
+    private IpAddress memberAddress(Member member) {
+        return IpAddress.valueOf(member.getSocketAddress().getAddress());
+    }
+
+    // Interceptor for membership events.
+    private class InternalMembershipListener implements MembershipListener {
+        @Override
+        public void memberAdded(MembershipEvent membershipEvent) {
+            log.info("Member {} added", membershipEvent.getMember());
+            ControllerNode node = addNode(node(membershipEvent.getMember()));
+            notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
+        }
+
+        @Override
+        public void memberRemoved(MembershipEvent membershipEvent) {
+            log.info("Member {} removed", membershipEvent.getMember());
+            NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
+            states.put(nodeId, State.INACTIVE);
+            notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
+        }
+
+        @Override
+        public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+            log.info("Member {} attribute {} changed to {}",
+                     memberAttributeEvent.getMember(),
+                     memberAttributeEvent.getKey(),
+                     memberAttributeEvent.getValue());
+        }
+    }
+}