moved Hazelcast based Mastership+Cluster store to onos-code-dist bundle

Change-Id: I304f916f3a400eaf050a5351825634349790e1bf
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
index 4dc67d4..f38df5a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
@@ -16,6 +16,7 @@
 import java.util.Iterator;
 import java.util.Set;
 
+//Not used right now
 /**
  * Allows for reading and writing cluster definition as a JSON file.
  */
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
index 74c22f1..6ab594e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
@@ -2,6 +2,7 @@
 
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 
+//Not used right now
 public final class ClusterManagementMessageSubjects {
     // avoid instantiation
     private ClusterManagementMessageSubjects() {}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
index 30b847f..1a65798 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
@@ -2,6 +2,7 @@
 
 import org.onlab.onos.cluster.ControllerNode;
 
+//Not used right now
 /**
  * Contains information that will be published when a cluster membership event occurs.
  */
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
index cdfd145..07c6c31 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
@@ -1,5 +1,6 @@
 package org.onlab.onos.store.cluster.impl;
 
+//Not used right now
 public enum ClusterMembershipEventType {
     NEW_MEMBER,
     LEAVING_MEMBER,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
index b82a835..daef7bd 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
@@ -4,6 +4,7 @@
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.packet.IpPrefix;
 
+// Not used right now
 /**
  * Simple back interface through which connection manager can interact with
  * the cluster store.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
new file mode 100644
index 0000000..59267d1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -0,0 +1,164 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterStore;
+import org.onlab.onos.cluster.ClusterStoreDelegate;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.hz.AbsentInvalidatingLoadingCache;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.OptionalCacheLoader;
+import org.onlab.packet.IpPrefix;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.cache.CacheBuilder.newBuilder;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
+import static org.onlab.onos.cluster.ControllerNode.State;
+
+/**
+ * Distributed implementation of the cluster nodes store.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedClusterStore
+        extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
+        implements ClusterStore {
+
+    private IMap<byte[], byte[]> rawNodes;
+    private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
+
+    private String listenerId;
+    private final MembershipListener listener = new InternalMembershipListener();
+    private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+
+    @Override
+    @Activate
+    public void activate() {
+        super.activate();
+        listenerId = theInstance.getCluster().addMembershipListener(listener);
+
+        rawNodes = theInstance.getMap("nodes");
+        OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
+                = new OptionalCacheLoader<>(serializer, rawNodes);
+        nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+        rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
+
+        loadClusterNodes();
+
+        log.info("Started");
+    }
+
+    // Loads the initial set of cluster nodes
+    private void loadClusterNodes() {
+        for (Member member : theInstance.getCluster().getMembers()) {
+            addNode(node(member));
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        theInstance.getCluster().removeMembershipListener(listenerId);
+        log.info("Stopped");
+    }
+
+    @Override
+    public ControllerNode getLocalNode() {
+        return node(theInstance.getCluster().getLocalMember());
+    }
+
+    @Override
+    public Set<ControllerNode> getNodes() {
+        ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
+        for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
+            builder.add(optional.get());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public ControllerNode getNode(NodeId nodeId) {
+        return nodes.getUnchecked(nodeId).orNull();
+    }
+
+    @Override
+    public State getState(NodeId nodeId) {
+        State state = states.get(nodeId);
+        return state == null ? State.INACTIVE : state;
+    }
+
+    @Override
+    public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
+        return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
+    }
+
+    @Override
+    public void removeNode(NodeId nodeId) {
+        synchronized (this) {
+            rawNodes.remove(serialize(nodeId));
+            nodes.invalidate(nodeId);
+        }
+    }
+
+    // Adds a new node based on the specified member
+    private synchronized ControllerNode addNode(DefaultControllerNode node) {
+        rawNodes.put(serialize(node.id()), serialize(node));
+        nodes.put(node.id(), Optional.of(node));
+        states.put(node.id(), State.ACTIVE);
+        return node;
+    }
+
+    // Creates a controller node descriptor from the Hazelcast member.
+    private DefaultControllerNode node(Member member) {
+        IpPrefix ip = memberAddress(member);
+        return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+    }
+
+    private IpPrefix memberAddress(Member member) {
+        byte[] address = member.getSocketAddress().getAddress().getAddress();
+        return IpPrefix.valueOf(address);
+    }
+
+    // Interceptor for membership events.
+    private class InternalMembershipListener implements MembershipListener {
+        @Override
+        public void memberAdded(MembershipEvent membershipEvent) {
+            log.info("Member {} added", membershipEvent.getMember());
+            ControllerNode node = addNode(node(membershipEvent.getMember()));
+            notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
+        }
+
+        @Override
+        public void memberRemoved(MembershipEvent membershipEvent) {
+            log.info("Member {} removed", membershipEvent.getMember());
+            NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
+            states.put(nodeId, State.INACTIVE);
+            notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
+        }
+
+        @Override
+        public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+            log.info("Member {} attribute {} changed to {}",
+                     memberAttributeEvent.getMember(),
+                     memberAttributeEvent.getKey(),
+                     memberAttributeEvent.getValue());
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak
deleted file mode 100644
index c781b23..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak
+++ /dev/null
@@ -1,185 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.onos.cluster.ClusterEvent;
-import org.onlab.onos.cluster.ClusterStore;
-import org.onlab.onos.cluster.ClusterStoreDelegate;
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
-import org.onlab.packet.IpPrefix;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.onlab.onos.cluster.ControllerNode.State;
-import static org.onlab.packet.IpPrefix.valueOf;
-
-/**
- * Distributed implementation of the cluster nodes store.
- */
-//@Component(immediate = true)
-//@Service
-public class DistributedClusterStore
-        extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
-        implements ClusterStore {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private DefaultControllerNode localNode;
-    private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
-    private final Map<NodeId, State> states = new ConcurrentHashMap<>();
-    private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
-            .maximumSize(1000)
-            .expireAfterWrite(/*ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * */3, TimeUnit.MILLISECONDS)
-            .removalListener(new LivenessCacheRemovalListener()).build();
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationAdminService clusterCommunicationAdminService;
-
-    private final ClusterNodesDelegate nodesDelegate = new InternalNodesDelegate();
-
-    @Activate
-    public void activate() throws IOException {
-        loadClusterDefinition();
-        establishSelfIdentity();
-
-        // Start-up the comm service and prime it with the loaded nodes.
-        clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
-        for (DefaultControllerNode node : nodes.values()) {
-            clusterCommunicationAdminService.addNode(node);
-        }
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    /**
-     * Loads the cluster definition file.
-     */
-    private void loadClusterDefinition() {
-        ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
-        try {
-            Set<DefaultControllerNode> storedNodes = cds.read();
-            for (DefaultControllerNode node : storedNodes) {
-                nodes.put(node.id(), node);
-            }
-        } catch (IOException e) {
-            log.error("Unable to read cluster definitions", e);
-        }
-    }
-
-    /**
-     * Determines who the local controller node is.
-     */
-    private void establishSelfIdentity() {
-        // Establishes the controller's own identity.
-        IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
-        localNode = nodes.get(new NodeId(ip.toString()));
-
-        // As a fall-back, let's make sure we at least know who we are.
-        if (localNode == null) {
-            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
-            nodes.put(localNode.id(), localNode);
-        }
-        states.put(localNode.id(), State.ACTIVE);
-    }
-
-    @Override
-    public ControllerNode getLocalNode() {
-        return localNode;
-    }
-
-    @Override
-    public Set<ControllerNode> getNodes() {
-        ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
-        return builder.addAll(nodes.values()).build();
-    }
-
-    @Override
-    public ControllerNode getNode(NodeId nodeId) {
-        return nodes.get(nodeId);
-    }
-
-    @Override
-    public State getState(NodeId nodeId) {
-        State state = states.get(nodeId);
-        return state == null ? State.INACTIVE : state;
-    }
-
-    @Override
-    public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
-        DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
-        nodes.put(nodeId, node);
-        clusterCommunicationAdminService.addNode(node);
-        return node;
-    }
-
-    @Override
-    public void removeNode(NodeId nodeId) {
-        if (nodeId.equals(localNode.id())) {
-            nodes.clear();
-            nodes.put(localNode.id(), localNode);
-
-        } else {
-            // Remove the other node.
-            DefaultControllerNode node = nodes.remove(nodeId);
-            if (node != null) {
-                clusterCommunicationAdminService.removeNode(node);
-            }
-        }
-    }
-
-    // Entity to handle back calls from the connection manager.
-    private class InternalNodesDelegate implements ClusterNodesDelegate {
-        @Override
-        public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
-            DefaultControllerNode node = nodes.get(nodeId);
-            if (node == null) {
-                node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
-            }
-            states.put(nodeId, State.ACTIVE);
-            livenessCache.put(nodeId, node);
-            return node;
-        }
-
-        @Override
-        public void nodeVanished(NodeId nodeId) {
-            states.put(nodeId, State.INACTIVE);
-        }
-
-        @Override
-        public void nodeRemoved(NodeId nodeId) {
-            removeNode(nodeId);
-        }
-    }
-
-    private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
-
-        @Override
-        public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
-            NodeId nodeId = entry.getKey();
-            log.warn("Failed to receive heartbeats from controller: " + nodeId);
-            nodesDelegate.nodeVanished(nodeId);
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
index 8d273ac..9f36b88 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -1,4 +1,4 @@
 /**
- * Distributed cluster store and messaging subsystem implementation.
+ * Implementation of a distributed cluster node store using Hazelcast.
  */
 package org.onlab.onos.store.cluster.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java
new file mode 100644
index 0000000..8164467
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java
@@ -0,0 +1,76 @@
+package org.onlab.onos.store.hz;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * Wrapper around LoadingCache to handle negative hit scenario.
+ * <p>
+ * When the LoadingCache returned Absent,
+ * this implementation will invalidate the entry immediately to avoid
+ * caching negative hits.
+ *
+ * @param <K> Cache key type
+ * @param <V> Cache value type. (Optional{@literal <V>})
+ */
+public class AbsentInvalidatingLoadingCache<K, V> extends
+        SimpleForwardingLoadingCache<K, Optional<V>> {
+
+    /**
+     * Constructor.
+     *
+     * @param delegate actual {@link LoadingCache} to delegate loading.
+     */
+    public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) {
+        super(delegate);
+    }
+
+    @Override
+    public Optional<V> get(K key) throws ExecutionException {
+        Optional<V> v = super.get(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> getUnchecked(K key) {
+        Optional<V> v = super.getUnchecked(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> apply(K key) {
+        return getUnchecked(key);
+    }
+
+    @Override
+    public Optional<V> getIfPresent(Object key) {
+        Optional<V> v = super.getIfPresent(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> get(K key, Callable<? extends Optional<V>> valueLoader)
+            throws ExecutionException {
+
+        Optional<V> v = super.get(key, valueLoader);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    // TODO should we be also checking getAll, etc.
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java
new file mode 100644
index 0000000..e42ec45
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java
@@ -0,0 +1,244 @@
+package org.onlab.onos.store.hz;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.hazelcast.core.EntryAdapter;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MapEvent;
+import com.hazelcast.core.Member;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.StoreDelegate;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstraction of a distributed store based on Hazelcast.
+ */
+@Component
+public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
+        extends AbstractStore<E, D> {
+
+    protected final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StoreService storeService;
+
+    protected StoreSerializer serializer;
+
+    protected HazelcastInstance theInstance;
+
+    @Activate
+    public void activate() {
+        serializer = new KryoSerializer();
+        theInstance = storeService.getHazelcastInstance();
+    }
+
+    /**
+     * Serializes the specified object using the backing store service.
+     *
+     * @param obj object to be serialized
+     * @return serialized object
+     */
+    protected byte[] serialize(Object obj) {
+        return serializer.encode(obj);
+    }
+
+    /**
+     * Deserializes the specified object using the backing store service.
+     *
+     * @param bytes bytes to be deserialized
+     * @param <T>   type of object
+     * @return deserialized object
+     */
+    protected <T> T deserialize(byte[] bytes) {
+        return serializer.decode(bytes);
+    }
+
+
+    /**
+     * An IMap entry listener, which reflects each remote event to the cache.
+     *
+     * @param <K> IMap key type after deserialization
+     * @param <V> IMap value type after deserialization
+     */
+    public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+        private final Member localMember;
+        private LoadingCache<K, Optional<V>> cache;
+
+        /**
+         * Constructor.
+         *
+         * @param cache cache to update
+         */
+        public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
+            this.localMember = theInstance.getCluster().getLocalMember();
+            this.cache = checkNotNull(cache);
+        }
+
+        @Override
+        public void mapCleared(MapEvent event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            cache.invalidateAll();
+        }
+
+        @Override
+        public void entryAdded(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V newVal = deserialize(event.getValue());
+            Optional<V> newValue = Optional.of(newVal);
+            cache.asMap().putIfAbsent(key, newValue);
+            onAdd(key, newVal);
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V oldVal = deserialize(event.getOldValue());
+            Optional<V> oldValue = Optional.fromNullable(oldVal);
+            V newVal = deserialize(event.getValue());
+            Optional<V> newValue = Optional.of(newVal);
+            cache.asMap().replace(key, oldValue, newValue);
+            onUpdate(key, oldVal, newVal);
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V val = deserialize(event.getOldValue());
+            cache.invalidate(key);
+            onRemove(key, val);
+        }
+
+        /**
+         * Cache entry addition hook.
+         *
+         * @param key    new key
+         * @param newVal new value
+         */
+        protected void onAdd(K key, V newVal) {
+        }
+
+        /**
+         * Cache entry update hook.
+         *
+         * @param key    new key
+         * @param oldValue old value
+         * @param newVal new value
+         */
+        protected void onUpdate(K key, V oldValue, V newVal) {
+        }
+
+        /**
+         * Cache entry remove hook.
+         *
+         * @param key new key
+         * @param val old value
+         */
+        protected void onRemove(K key, V val) {
+        }
+    }
+
+    /**
+     * Distributed object remote event entry listener.
+     *
+     * @param <K> Entry key type after deserialization
+     * @param <V> Entry value type after deserialization
+     */
+    public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+        private final Member localMember;
+
+        public RemoteEventHandler() {
+            this.localMember = theInstance.getCluster().getLocalMember();
+        }
+        @Override
+        public void entryAdded(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V newVal = deserialize(event.getValue());
+            onAdd(key, newVal);
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V val = deserialize(event.getValue());
+            onRemove(key, val);
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V oldVal = deserialize(event.getOldValue());
+            V newVal = deserialize(event.getValue());
+            onUpdate(key, oldVal, newVal);
+        }
+
+        /**
+         * Remote entry addition hook.
+         *
+         * @param key    new key
+         * @param newVal new value
+         */
+        protected void onAdd(K key, V newVal) {
+        }
+
+        /**
+         * Remote entry update hook.
+         *
+         * @param key    new key
+         * @param oldValue old value
+         * @param newVal new value
+         */
+        protected void onUpdate(K key, V oldValue, V newVal) {
+        }
+
+        /**
+         * Remote entry remove hook.
+         *
+         * @param key new key
+         * @param val old value
+         */
+        protected void onRemove(K key, V val) {
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java
new file mode 100644
index 0000000..e555a51
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.hz;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheLoader;
+import com.hazelcast.core.IMap;
+
+/**
+ * CacheLoader to wrap Map value with Optional,
+ * to handle negative hit on underlying IMap.
+ *
+ * @param <K> IMap key type after deserialization
+ * @param <V> IMap value type after deserialization
+ */
+public final class OptionalCacheLoader<K, V> extends
+        CacheLoader<K, Optional<V>> {
+
+    private final StoreSerializer serializer;
+    private IMap<byte[], byte[]> rawMap;
+
+    /**
+     * Constructor.
+     *
+     * @param serializer to use for serialization
+     * @param rawMap underlying IMap
+     */
+    public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
+        this.serializer = checkNotNull(serializer);
+        this.rawMap = checkNotNull(rawMap);
+    }
+
+    @Override
+    public Optional<V> load(K key) throws Exception {
+        byte[] keyBytes = serializer.encode(key);
+        byte[] valBytes = rawMap.get(keyBytes);
+        if (valBytes == null) {
+            return Optional.absent();
+        }
+        V dev = serializer.decode(valBytes);
+        return Optional.of(dev);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java
new file mode 100644
index 0000000..16063ff
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java
@@ -0,0 +1,615 @@
+package org.onlab.onos.store.hz;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.EntryView;
+import com.hazelcast.core.ExecutionCallback;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.MapEvent;
+import com.hazelcast.map.EntryProcessor;
+import com.hazelcast.map.MapInterceptor;
+import com.hazelcast.mapreduce.JobTracker;
+import com.hazelcast.mapreduce.aggregation.Aggregation;
+import com.hazelcast.mapreduce.aggregation.Supplier;
+import com.hazelcast.monitor.LocalMapStats;
+import com.hazelcast.query.Predicate;
+
+// TODO: implement Predicate, etc. if we need them.
+/**
+ * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
+ * Key and Value using StoreSerializer.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class SMap<K, V> implements IMap<K, V> {
+
+    private final IMap<byte[], byte[]> m;
+    private final StoreSerializer serializer;
+
+    /**
+     * Creates a SMap instance.
+     *
+     * @param baseMap base IMap to use
+     * @param serializer serializer to use for both key and value
+     */
+    public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
+        this.m = checkNotNull(baseMap);
+        this.serializer = checkNotNull(serializer);
+    }
+
+    @Override
+    public int size() {
+        return m.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return m.isEmpty();
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> map) {
+        Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
+        for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
+            sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
+        }
+        m.putAll(sm);
+    }
+
+    @Deprecated
+    @Override
+    public Object getId() {
+        return m.getId();
+    }
+
+    @Override
+    public String getPartitionKey() {
+        return m.getPartitionKey();
+    }
+
+    @Override
+    public String getName() {
+        return m.getName();
+    }
+
+    @Override
+    public String getServiceName() {
+        return m.getServiceName();
+    }
+
+    @Override
+    public void destroy() {
+        m.destroy();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return m.containsKey(serializeKey(key));
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return m.containsValue(serializeVal(value));
+    }
+
+    @Override
+    public V get(Object key) {
+        return deserializeVal(m.get(serializeKey(key)));
+    }
+
+    @Override
+    public V put(K key, V value) {
+        return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
+    }
+
+    @Override
+    public V remove(Object key) {
+        return deserializeVal(m.remove(serializeKey(key)));
+    }
+
+    @Override
+    public boolean remove(Object key, Object value) {
+        return m.remove(serializeKey(key), serializeVal(value));
+    }
+
+    @Override
+    public void delete(Object key) {
+        m.delete(serializeKey(key));
+    }
+
+    @Override
+    public void flush() {
+        m.flush();
+    }
+
+    @Override
+    public Map<K, V> getAll(Set<K> keys) {
+        Set<byte[]> sk = serializeKeySet(keys);
+        Map<byte[], byte[]> bm = m.getAll(sk);
+        Map<K, V> dsm = new HashMap<>(bm.size());
+        for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
+            dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
+        }
+        return dsm;
+    }
+
+    @Override
+    public void loadAll(boolean replaceExistingValues) {
+        m.loadAll(replaceExistingValues);
+    }
+
+    @Override
+    public void loadAll(Set<K> keys, boolean replaceExistingValues) {
+        Set<byte[]> sk = serializeKeySet(keys);
+        m.loadAll(sk, replaceExistingValues);
+    }
+
+    @Override
+    public void clear() {
+        m.clear();
+    }
+
+    @Override
+    public Future<V> getAsync(K key) {
+        Future<byte[]> f = m.getAsync(serializeKey(key));
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public Future<V> putAsync(K key, V value) {
+        Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
+        Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public Future<V> removeAsync(K key) {
+        Future<byte[]> f = m.removeAsync(serializeKey(key));
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
+        return m.tryRemove(serializeKey(key), timeout, timeunit);
+    }
+
+    @Override
+    public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
+        return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
+    }
+
+    @Override
+    public V put(K key, V value, long ttl, TimeUnit timeunit) {
+        return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
+    }
+
+    @Override
+    public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
+        m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
+        return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
+    }
+
+    @Override
+    public V replace(K key, V value) {
+        return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
+    }
+
+    @Override
+    public void set(K key, V value) {
+        m.set(serializeKey(key), serializeVal(value));
+    }
+
+    @Override
+    public void set(K key, V value, long ttl, TimeUnit timeunit) {
+        m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
+    }
+
+    @Override
+    public void lock(K key) {
+        m.lock(serializeKey(key));
+     }
+
+    @Override
+    public void lock(K key, long leaseTime, TimeUnit timeUnit) {
+        m.lock(serializeKey(key), leaseTime, timeUnit);
+    }
+
+    @Override
+    public boolean isLocked(K key) {
+        return m.isLocked(serializeKey(key));
+    }
+
+    @Override
+    public boolean tryLock(K key) {
+        return m.tryLock(serializeKey(key));
+    }
+
+    @Override
+    public boolean tryLock(K key, long time, TimeUnit timeunit)
+            throws InterruptedException {
+        return m.tryLock(serializeKey(key), time, timeunit);
+    }
+
+    @Override
+    public void unlock(K key) {
+        m.unlock(serializeKey(key));
+    }
+
+    @Override
+    public void forceUnlock(K key) {
+        m.forceUnlock(serializeKey(key));
+    }
+
+    @Override
+    public String addLocalEntryListener(EntryListener<K, V> listener) {
+        return m.addLocalEntryListener(new BaseEntryListener(listener));
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addLocalEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addLocalEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, K key, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addInterceptor(MapInterceptor interceptor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void removeInterceptor(String id) {
+        m.removeInterceptor(id);
+    }
+
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener,
+            boolean includeValue) {
+        return m.addEntryListener(new BaseEntryListener(listener), includeValue);
+    }
+
+    @Override
+    public boolean removeEntryListener(String id) {
+        return m.removeEntryListener(id);
+    }
+
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener, K key,
+            boolean includeValue) {
+        return m.addEntryListener(new BaseEntryListener(listener),
+                serializeKey(key), includeValue);
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, K key, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public EntryView<K, V> getEntryView(K key) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean evict(K key) {
+        return m.evict(serializeKey(key));
+    }
+
+    @Override
+    public void evictAll() {
+        m.evictAll();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return deserializeKeySet(m.keySet());
+    }
+
+    @Override
+    public Collection<V> values() {
+        return deserializeVal(m.values());
+    }
+
+    @Override
+    public Set<java.util.Map.Entry<K, V>> entrySet() {
+        return deserializeEntrySet(m.entrySet());
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Set<K> keySet(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Collection<V> values(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Set<K> localKeySet() {
+        return deserializeKeySet(m.localKeySet());
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Set<K> localKeySet(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public void addIndex(String attribute, boolean ordered) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LocalMapStats getLocalMapStats() {
+        return m.getLocalMapStats();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Object executeOnKey(K key, EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Map<K, Object> executeOnKeys(Set<K> keys,
+            EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void submitToKey(K key, EntryProcessor entryProcessor,
+            ExecutionCallback callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Future submitToKey(K key, EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
+            Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public <SuppliedValue, Result> Result aggregate(
+            Supplier<K, V, SuppliedValue> supplier,
+            Aggregation<K, SuppliedValue, Result> aggregation) {
+
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public <SuppliedValue, Result> Result aggregate(
+            Supplier<K, V, SuppliedValue> supplier,
+            Aggregation<K, SuppliedValue, Result> aggregation,
+            JobTracker jobTracker) {
+
+        throw new UnsupportedOperationException();
+    }
+
+    private byte[] serializeKey(Object key) {
+        return serializer.encode(key);
+    }
+
+    private K deserializeKey(byte[] key) {
+        return serializer.decode(key);
+    }
+
+    private byte[] serializeVal(Object val) {
+        return serializer.encode(val);
+    }
+
+    private V deserializeVal(byte[] val) {
+        if (val == null) {
+            return null;
+        }
+        return serializer.decode(val.clone());
+    }
+
+    private Set<byte[]> serializeKeySet(Set<K> keys) {
+        Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
+        for (K key : keys) {
+            sk.add(serializeKey(key));
+        }
+        return sk;
+    }
+
+    private Set<K> deserializeKeySet(Set<byte[]> keys) {
+        Set<K> dsk = new HashSet<>(keys.size());
+        for (byte[] key : keys) {
+            dsk.add(deserializeKey(key));
+        }
+        return dsk;
+    }
+
+    private Collection<V> deserializeVal(Collection<byte[]> vals) {
+        Collection<V> dsl = new ArrayList<>(vals.size());
+        for (byte[] val : vals) {
+            dsl.add(deserializeVal(val));
+        }
+        return dsl;
+    }
+
+    private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
+                        Set<java.util.Map.Entry<byte[], byte[]>> entries) {
+
+        Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
+        for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
+            dse.add(Pair.of(deserializeKey(entry.getKey()),
+                            deserializeVal(entry.getValue())));
+        }
+        return dse;
+    }
+
+    private final class BaseEntryListener
+        implements EntryListener<byte[], byte[]> {
+
+            private final EntryListener<K, V> listener;
+
+        public BaseEntryListener(EntryListener<K, V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void mapEvicted(MapEvent event) {
+            listener.mapEvicted(event);
+        }
+
+        @Override
+        public void mapCleared(MapEvent event) {
+            listener.mapCleared(event);
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    deserializeVal(event.getOldValue()),
+                    deserializeVal(event.getValue()));
+
+            listener.entryUpdated(evt);
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    deserializeVal(event.getOldValue()),
+                    null);
+
+            listener.entryRemoved(evt);
+        }
+
+        @Override
+        public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    deserializeVal(event.getOldValue()),
+                    deserializeVal(event.getValue()));
+
+            listener.entryEvicted(evt);
+        }
+
+        @Override
+        public void entryAdded(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    null,
+                    deserializeVal(event.getValue()));
+
+            listener.entryAdded(evt);
+        }
+    }
+
+    private final class DeserializeVal implements Function<byte[], V> {
+        @Override
+        public V apply(byte[] input) {
+            return deserializeVal(input);
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java
new file mode 100644
index 0000000..6674989
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java
@@ -0,0 +1,52 @@
+package org.onlab.onos.store.hz;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+
+/**
+ * Auxiliary bootstrap of distributed store.
+ */
+@Component(immediate = true)
+@Service
+public class StoreManager implements StoreService {
+
+    protected static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected HazelcastInstance instance;
+
+    @Activate
+    public void activate() {
+        try {
+            Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
+            instance = Hazelcast.newHazelcastInstance(config);
+            log.info("Started");
+        } catch (FileNotFoundException e) {
+            log.error("Unable to configure Hazelcast", e);
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        instance.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public HazelcastInstance getHazelcastInstance() {
+        return instance;
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java
new file mode 100644
index 0000000..d79e51c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java
@@ -0,0 +1,18 @@
+package org.onlab.onos.store.hz;
+
+import com.hazelcast.core.HazelcastInstance;
+
+/**
+ * Bootstrap service to get a handle on a share Hazelcast instance.
+ */
+public interface StoreService {
+
+    /**
+     * Returns the shared Hazelcast instance for use as a distributed store
+     * backing.
+     *
+     * @return shared Hazelcast instance
+     */
+    HazelcastInstance getHazelcastInstance();
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java
new file mode 100644
index 0000000..d92f543
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Common abstractions and facilities for implementing distributed store
+ * using Hazelcast.
+ */
+package org.onlab.onos.store.hz;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..ecc8982
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -0,0 +1,372 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStore;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.MapEvent;
+
+import static org.onlab.onos.net.MastershipRole.*;
+
+/**
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore
+extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
+implements MastershipStore {
+
+    //initial term/TTL value
+    private static final Integer INIT = 0;
+
+    //device to node roles
+    protected SMap<DeviceId, RoleValue> roleMap;
+    //devices to terms
+    protected SMap<DeviceId, Integer> terms;
+    //last-known cluster size, used for tie-breaking when partitioning occurs
+    protected IAtomicLong clusterSize;
+
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Override
+    @Activate
+    public void activate() {
+        super.activate();
+
+        this.serializer = new KryoSerializer() {
+            @Override
+            protected void setupKryoPool() {
+                serializerPool = KryoNamespace.newBuilder()
+                        .register(KryoNamespaces.API)
+
+                        .register(RoleValue.class, new RoleValueSerializer())
+                        .build()
+                        .populate(1);
+            }
+        };
+
+        roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
+        roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
+        terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
+        clusterSize = theInstance.getAtomicLong("clustersize");
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+        final RoleValue roleInfo = getRoleValue(deviceId);
+        if (roleInfo.contains(MASTER, nodeId)) {
+            return MASTER;
+        }
+        if (roleInfo.contains(STANDBY, nodeId)) {
+            return STANDBY;
+        }
+        return NONE;
+    }
+
+    @Override
+    public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+
+        MastershipRole role = getRole(nodeId, deviceId);
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            switch (role) {
+                case MASTER:
+                    //reinforce mastership
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
+                    return null;
+                case STANDBY:
+                case NONE:
+                    NodeId current = rv.get(MASTER);
+                    if (current != null) {
+                        //backup and replace current master
+                        rv.reassign(current, NONE, STANDBY);
+                        rv.replace(current, nodeId, MASTER);
+                    } else {
+                        //no master before so just add.
+                        rv.add(MASTER, nodeId);
+                    }
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
+                    updateTerm(deviceId);
+                    return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+                    return null;
+            }
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public NodeId getMaster(DeviceId deviceId) {
+        return getNode(MASTER, deviceId);
+    }
+
+
+    @Override
+    public RoleInfo getNodes(DeviceId deviceId) {
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            return rv.roleInfo();
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public Set<DeviceId> getDevices(NodeId nodeId) {
+        ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+
+        for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
+            if (nodeId.equals(el.getValue().get(MASTER))) {
+                builder.add(el.getKey());
+            }
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public MastershipRole requestRole(DeviceId deviceId) {
+        NodeId local = clusterService.getLocalNode().id();
+
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            MastershipRole role = getRole(local, deviceId);
+            switch (role) {
+                case MASTER:
+                    rv.reassign(local, STANDBY, NONE);
+                    terms.putIfAbsent(deviceId, INIT);
+                    roleMap.put(deviceId, rv);
+                    break;
+                case STANDBY:
+                    rv.reassign(local, NONE, STANDBY);
+                    roleMap.put(deviceId, rv);
+                    terms.putIfAbsent(deviceId, INIT);
+                    break;
+                case NONE:
+                    //either we're the first standby, or first to device.
+                    //for latter, claim mastership.
+                    if (rv.get(MASTER) == null) {
+                        rv.add(MASTER, local);
+                        rv.reassign(local, STANDBY, NONE);
+                        updateTerm(deviceId);
+                        role = MastershipRole.MASTER;
+                    } else {
+                        rv.add(STANDBY, local);
+                        rv.reassign(local, NONE, STANDBY);
+                        role = MastershipRole.STANDBY;
+                    }
+                    roleMap.put(deviceId, rv);
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return role;
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public MastershipTerm getTermFor(DeviceId deviceId) {
+        RoleValue rv = getRoleValue(deviceId);
+        if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) {
+            return null;
+        }
+        return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId));
+    }
+
+    @Override
+    public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+        MastershipEvent event = null;
+
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    event = reelect(nodeId, deviceId, rv);
+                    //fall through to reinforce role
+                case STANDBY:
+                    //fall through to reinforce role
+                case NONE:
+                    rv.reassign(nodeId, NONE, STANDBY);
+                    roleMap.put(deviceId, rv);
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return event;
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+        MastershipEvent event = null;
+
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    event = reelect(nodeId, deviceId, rv);
+                    //fall through to reinforce relinquishment
+                case STANDBY:
+                    //fall through to reinforce relinquishment
+                case NONE:
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
+                    break;
+                default:
+                log.warn("unknown Mastership Role {}", role);
+            }
+            return event;
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    //helper to fetch a new master candidate for a given device.
+    private MastershipEvent reelect(
+            NodeId current, DeviceId deviceId, RoleValue rv) {
+
+        //if this is an queue it'd be neater.
+        NodeId backup = null;
+        for (NodeId n : rv.nodesOfRole(STANDBY)) {
+            if (!current.equals(n)) {
+                backup = n;
+                break;
+            }
+        }
+
+        if (backup == null) {
+            log.info("{} giving up and going to NONE for {}", current, deviceId);
+            rv.remove(MASTER, current);
+            roleMap.put(deviceId, rv);
+            return null;
+        } else {
+            log.info("{} trying to pass mastership for {} to {}", current, deviceId, backup);
+            rv.replace(current, backup, MASTER);
+            rv.reassign(backup, STANDBY, NONE);
+            roleMap.put(deviceId, rv);
+            Integer term = terms.get(deviceId);
+            terms.put(deviceId, ++term);
+            return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+        }
+    }
+
+    //return the RoleValue structure for a device, or create one
+    private RoleValue getRoleValue(DeviceId deviceId) {
+        RoleValue value = roleMap.get(deviceId);
+        if (value == null) {
+            value = new RoleValue();
+            RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
+            if (concurrentlyAdded != null) {
+                return concurrentlyAdded;
+            }
+        }
+        return value;
+    }
+
+    //get first applicable node out of store-unique structure.
+    private NodeId getNode(MastershipRole role, DeviceId deviceId) {
+        RoleValue value = roleMap.get(deviceId);
+        if (value != null) {
+            return value.get(role);
+        }
+        return null;
+    }
+
+    //adds or updates term information.
+    private void updateTerm(DeviceId deviceId) {
+        terms.lock(deviceId);
+        try {
+            Integer term = terms.get(deviceId);
+            if (term == null) {
+                terms.put(deviceId, INIT);
+            } else {
+                terms.put(deviceId, ++term);
+            }
+        } finally {
+            terms.unlock(deviceId);
+        }
+    }
+
+    private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> {
+
+        @Override
+        public void entryAdded(EntryEvent<DeviceId, RoleValue> event) {
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) {
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) {
+
+            notifyDelegate(new MastershipEvent(
+                    MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
+        }
+
+        @Override
+        public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) {
+        }
+
+        @Override
+        public void mapEvicted(MapEvent event) {
+        }
+
+        @Override
+        public void mapCleared(MapEvent event) {
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
new file mode 100644
index 0000000..7447161
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
@@ -0,0 +1,122 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.net.MastershipRole;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+
+/**
+ * A structure that holds node mastership roles associated with a
+ * {@link DeviceId}. This structure needs to be locked through IMap.
+ */
+final class RoleValue {
+
+    protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
+
+    public RoleValue() {
+        value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
+        value.put(MastershipRole.STANDBY, new LinkedList<NodeId>());
+        value.put(MastershipRole.NONE, new LinkedList<NodeId>());
+    }
+
+    // exposing internals for serialization purpose only
+    Map<MastershipRole, List<NodeId>> value() {
+        return Collections.unmodifiableMap(value);
+    }
+
+    public List<NodeId> nodesOfRole(MastershipRole type) {
+        return value.get(type);
+    }
+
+    public NodeId get(MastershipRole type) {
+        return value.get(type).isEmpty() ? null : value.get(type).get(0);
+    }
+
+    public boolean contains(MastershipRole type, NodeId nodeId) {
+        return value.get(type).contains(nodeId);
+    }
+
+    /**
+     * Associates a node to a certain role.
+     *
+     * @param type the role
+     * @param nodeId the node ID of the node to associate
+     */
+    public void add(MastershipRole type, NodeId nodeId) {
+        List<NodeId> nodes = value.get(type);
+
+        if (!nodes.contains(nodeId)) {
+            nodes.add(nodeId);
+        }
+    }
+
+    /**
+     * Removes a node from a certain role.
+     *
+     * @param type the role
+     * @param nodeId the ID of the node to remove
+     * @return
+     */
+    public boolean remove(MastershipRole type, NodeId nodeId) {
+        List<NodeId> nodes = value.get(type);
+        if (!nodes.isEmpty()) {
+            return nodes.remove(nodeId);
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Reassigns a node from one role to another. If the node was not of the
+     * old role, it will still be assigned the new role.
+     *
+     * @param nodeId the Node ID of node changing roles
+     * @param from the old role
+     * @param to the new role
+     */
+    public void reassign(NodeId nodeId, MastershipRole from, MastershipRole to) {
+        remove(from, nodeId);
+        add(to, nodeId);
+    }
+
+    /**
+     * Replaces a node in one role with another node. Even if there is no node to
+     * replace, the new node is associated to the role.
+     *
+     * @param from the old NodeId to replace
+     * @param to the new NodeId
+     * @param type the role associated with the old NodeId
+     */
+    public void replace(NodeId from, NodeId to, MastershipRole type) {
+        remove(type, from);
+        add(type, to);
+    }
+
+    /**
+     * Summarizes this RoleValue as a RoleInfo. Note that master and/or backups
+     * may be empty, so the values should be checked for safety.
+     *
+     * @return the RoleInfo.
+     */
+    public RoleInfo roleInfo() {
+        return new RoleInfo(
+                get(MastershipRole.MASTER), nodesOfRole(MastershipRole.STANDBY));
+    }
+
+    @Override
+    public String toString() {
+        ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+        for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
+            helper.add(el.getKey().toString(), el.getValue());
+        }
+        return helper.toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
new file mode 100644
index 0000000..4450e5b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
@@ -0,0 +1,52 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.MastershipRole;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Serializer for RoleValues used by {@link DistributedMastershipStore}.
+ */
+public class RoleValueSerializer extends Serializer<RoleValue> {
+
+    //RoleValues are assumed to hold a Map of MastershipRoles (an enum)
+    //to a List of NodeIds.
+
+    @Override
+    public RoleValue read(Kryo kryo, Input input, Class<RoleValue> type) {
+        RoleValue rv = new RoleValue();
+        int size = input.readInt();
+        for (int i = 0; i < size; i++) {
+            MastershipRole role = MastershipRole.values()[input.readInt()];
+            int s = input.readInt();
+            for (int j = 0; j < s; j++) {
+                rv.add(role, new NodeId(input.readString()));
+            }
+        }
+        return rv;
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, RoleValue type) {
+        final Map<MastershipRole, List<NodeId>> map = type.value();
+        output.writeInt(map.size());
+
+        for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
+            output.writeInt(el.getKey().ordinal());
+
+            List<NodeId> nodes = el.getValue();
+            output.writeInt(nodes.size());
+            for (NodeId n : nodes) {
+                output.writeString(n.toString());
+            }
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
new file mode 100644
index 0000000..308c9ef
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of a distributed mastership store using Hazelcast.
+ */
+package org.onlab.onos.store.mastership.impl;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/hz/TestStoreManager.java b/core/store/dist/src/test/java/org/onlab/onos/store/hz/TestStoreManager.java
new file mode 100644
index 0000000..0740e4e
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/hz/TestStoreManager.java
@@ -0,0 +1,53 @@
+package org.onlab.onos.store.hz;
+
+import java.io.FileNotFoundException;
+import java.util.UUID;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.HazelcastInstance;
+
+/**
+ * Dummy StoreManager to use specified Hazelcast instance.
+ */
+public class TestStoreManager extends StoreManager {
+
+    /**
+     * Gets the Hazelcast Config for testing.
+     *
+     * @return
+     */
+    public static Config getTestConfig() {
+        Config config;
+        try {
+            config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
+        } catch (FileNotFoundException e) {
+            // falling back to default
+            config = new Config();
+        }
+        // avoid accidentally joining other cluster
+        config.getGroupConfig().setName(UUID.randomUUID().toString());
+        // quickly form single node cluster
+        config.getNetworkConfig().getJoin()
+            .getTcpIpConfig()
+            .setEnabled(true).setConnectionTimeoutSeconds(0);
+        config.getNetworkConfig().getJoin()
+            .getMulticastConfig()
+            .setEnabled(false);
+        return config;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param instance Hazelast instance to return on #getHazelcastInstance()
+     */
+    public TestStoreManager(HazelcastInstance instance) {
+        this.instance = instance;
+    }
+
+    @Override
+    public void activate() {
+        // Hazelcast setup removed from original code.
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java
new file mode 100644
index 0000000..4ecb8db
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java
@@ -0,0 +1,325 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.mastership.MastershipEvent.Type;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.hz.StoreManager;
+import org.onlab.onos.store.hz.StoreService;
+import org.onlab.onos.store.hz.TestStoreManager;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Sets;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+
+/**
+ * Test of the Hazelcast-based distributed MastershipStore implementation.
+ */
+public class DistributedMastershipStoreTest {
+
+    private static final DeviceId DID1 = DeviceId.deviceId("of:01");
+    private static final DeviceId DID2 = DeviceId.deviceId("of:02");
+    private static final DeviceId DID3 = DeviceId.deviceId("of:03");
+
+    private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+    private static final NodeId N1 = new NodeId("node1");
+    private static final NodeId N2 = new NodeId("node2");
+
+    private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
+    private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
+
+    private DistributedMastershipStore dms;
+    private TestDistributedMastershipStore testStore;
+    private KryoSerializer serializationMgr;
+    private StoreManager storeMgr;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // TODO should find a way to clean Hazelcast instance without shutdown.
+        Config config = TestStoreManager.getTestConfig();
+
+        storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
+        storeMgr.activate();
+
+        serializationMgr = new KryoSerializer();
+
+        dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
+        dms.clusterService = new TestClusterService();
+        dms.activate();
+
+        testStore = (TestDistributedMastershipStore) dms;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        dms.deactivate();
+
+        storeMgr.deactivate();
+    }
+
+    @Test
+    public void getRole() {
+        assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
+        testStore.put(DID1, N1, true, false, true);
+        assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
+        testStore.put(DID1, N2, false, true, false);
+        assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
+    }
+
+    @Test
+    public void getMaster() {
+        assertTrue("wrong store state:", dms.roleMap.isEmpty());
+
+        testStore.put(DID1, N1, true, false, false);
+        assertEquals("wrong master:", N1, dms.getMaster(DID1));
+        assertNull("wrong master:", dms.getMaster(DID2));
+    }
+
+    @Test
+    public void getDevices() {
+        assertTrue("wrong store state:", dms.roleMap.isEmpty());
+
+        testStore.put(DID1, N1, true, false, false);
+        testStore.put(DID2, N1, true, false, false);
+        testStore.put(DID3, N2, true, false, false);
+        assertEquals("wrong devices",
+                Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
+    }
+
+    @Test
+    public void requestRoleAndTerm() {
+        //CN1 is "local"
+        testStore.setCurrent(CN1);
+
+        //if already MASTER, nothing should happen
+        testStore.put(DID2, N1, true, false, false);
+        assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+
+        //populate maps with DID1, N1 thru NONE case
+        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertTrue("wrong state for store:", !dms.terms.isEmpty());
+        assertEquals("wrong term",
+                MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
+
+        //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
+        testStore.setCurrent(CN2);
+        assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+        assertEquals("wrong number of entries:", 2, dms.terms.size());
+
+        //change term and requestRole() again; should persist
+        testStore.increment(DID2);
+        assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+        assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
+    }
+
+    @Test
+    public void setMaster() {
+        //populate maps with DID1, N1 as MASTER thru NONE case
+        testStore.setCurrent(CN1);
+        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertNull("wrong event:", dms.setMaster(N1, DID1));
+
+        //switch over to N2
+        assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
+        System.out.println(dms.getTermFor(DID1).master() + ":" + dms.getTermFor(DID1).termNumber());
+        assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
+
+        //orphan switch - should be rare case
+        assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
+        assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
+        //disconnect and reconnect - sign of failing re-election or single-instance channel
+        dms.roleMap.clear();
+        dms.setMaster(N2, DID2);
+        assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
+    }
+
+    @Test
+    public void relinquishRole() {
+        //populate maps with DID1, N1 as MASTER thru NONE case
+        testStore.setCurrent(CN1);
+        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        //no backup, no new MASTER/event
+        assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+
+        dms.requestRole(DID1);
+
+        //add backup CN2, get it elected MASTER by relinquishing
+        testStore.setCurrent(CN2);
+        assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+        assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
+        assertEquals("wrong master", N2, dms.getMaster(DID1));
+
+        //all nodes "give up" on device, which goes back to NONE.
+        assertNull("wrong event:", dms.relinquishRole(N2, DID1));
+        assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
+
+        assertEquals("wrong number of retired nodes", 2,
+                dms.roleMap.get(DID1).nodesOfRole(NONE).size());
+
+        //bring nodes back
+        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        testStore.setCurrent(CN1);
+        assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+        assertEquals("wrong number of backup nodes", 1,
+                dms.roleMap.get(DID1).nodesOfRole(STANDBY).size());
+
+        //If STANDBY, should drop to NONE
+        assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+        assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
+
+        //NONE - nothing happens
+        assertNull("wrong event:", dms.relinquishRole(N1, DID2));
+        assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
+
+    }
+
+    @Ignore("Ignore until Delegate spec. is clear.")
+    @Test
+    public void testEvents() throws InterruptedException {
+        //shamelessly copy other distributed store tests
+        final CountDownLatch addLatch = new CountDownLatch(1);
+
+        MastershipStoreDelegate checkAdd = new MastershipStoreDelegate() {
+            @Override
+            public void notify(MastershipEvent event) {
+                assertEquals("wrong event:", Type.MASTER_CHANGED, event.type());
+                assertEquals("wrong subject", DID1, event.subject());
+                assertEquals("wrong subject", N1, event.roleInfo().master());
+                addLatch.countDown();
+            }
+        };
+
+        dms.setDelegate(checkAdd);
+        dms.setMaster(N1, DID1);
+        //this will fail until we do something about single-instance-ness
+        assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
+    }
+
+    private class TestDistributedMastershipStore extends
+            DistributedMastershipStore {
+        public TestDistributedMastershipStore(StoreService storeService,
+                KryoSerializer kryoSerialization) {
+            this.storeService = storeService;
+            this.serializer = kryoSerialization;
+        }
+
+        //helper to populate master/backup structures
+        public void put(DeviceId dev, NodeId node,
+                boolean master, boolean backup, boolean term) {
+            RoleValue rv = dms.roleMap.get(dev);
+            if (rv == null) {
+                rv = new RoleValue();
+            }
+
+            if (master) {
+                rv.add(MASTER, node);
+                rv.reassign(node, STANDBY, NONE);
+            }
+            if (backup) {
+                rv.add(STANDBY, node);
+                rv.remove(MASTER, node);
+                rv.remove(NONE, node);
+            }
+            if (term) {
+                dms.terms.put(dev, 0);
+            }
+            dms.roleMap.put(dev, rv);
+        }
+
+        //a dumb utility function.
+        public void dump() {
+            for (Map.Entry<DeviceId, RoleValue> el : dms.roleMap.entrySet()) {
+                System.out.println("DID: " + el.getKey());
+                for (MastershipRole role : MastershipRole.values()) {
+                    System.out.println("\t" + role.toString() + ":");
+                    for (NodeId n : el.getValue().nodesOfRole(role)) {
+                        System.out.println("\t\t" + n);
+                    }
+                }
+            }
+        }
+
+        //increment term for a device
+        public void increment(DeviceId dev) {
+            Integer t = dms.terms.get(dev);
+            if (t != null) {
+                dms.terms.put(dev, ++t);
+            }
+        }
+
+        //sets the "local" node
+        public void setCurrent(ControllerNode node) {
+            ((TestClusterService) clusterService).current = node;
+        }
+    }
+
+    private class TestClusterService implements ClusterService {
+
+        protected ControllerNode current;
+
+        @Override
+        public ControllerNode getLocalNode() {
+            return current;
+        }
+
+        @Override
+        public Set<ControllerNode> getNodes() {
+            return Sets.newHashSet(CN1, CN2);
+        }
+
+        @Override
+        public ControllerNode getNode(NodeId nodeId) {
+            return null;
+        }
+
+        @Override
+        public State getState(NodeId nodeId) {
+            return null;
+        }
+
+        @Override
+        public void addListener(ClusterEventListener listener) {
+        }
+
+        @Override
+        public void removeListener(ClusterEventListener listener) {
+        }
+
+    }
+
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java
new file mode 100644
index 0000000..93741b7
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import org.junit.Test;
+import org.onlab.onos.cluster.NodeId;
+
+import com.google.common.collect.Sets;
+
+public class RoleValueTest {
+
+    private static final RoleValue RV = new RoleValue();
+
+    private static final NodeId NID1 = new NodeId("node1");
+    private static final NodeId NID2 = new NodeId("node2");
+    private static final NodeId NID3 = new NodeId("node3");
+
+    @Test
+    public void add() {
+        assertEquals("faulty initialization: ", 3, RV.value.size());
+        RV.add(MASTER, NID1);
+        RV.add(STANDBY, NID2);
+        RV.add(STANDBY, NID3);
+
+        assertEquals("wrong nodeID: ", NID1, RV.get(MASTER));
+        assertTrue("wrong nodeIDs: ",
+                Sets.newHashSet(NID3, NID2).containsAll(RV.nodesOfRole(STANDBY)));
+    }
+}