moved Hazelcast based Mastership+Cluster store to onos-code-dist bundle

Change-Id: I304f916f3a400eaf050a5351825634349790e1bf
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java
new file mode 100644
index 0000000..8164467
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java
@@ -0,0 +1,76 @@
+package org.onlab.onos.store.hz;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * Wrapper around LoadingCache to handle negative hit scenario.
+ * <p>
+ * When the LoadingCache returned Absent,
+ * this implementation will invalidate the entry immediately to avoid
+ * caching negative hits.
+ *
+ * @param <K> Cache key type
+ * @param <V> Cache value type. (Optional{@literal <V>})
+ */
+public class AbsentInvalidatingLoadingCache<K, V> extends
+        SimpleForwardingLoadingCache<K, Optional<V>> {
+
+    /**
+     * Constructor.
+     *
+     * @param delegate actual {@link LoadingCache} to delegate loading.
+     */
+    public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) {
+        super(delegate);
+    }
+
+    @Override
+    public Optional<V> get(K key) throws ExecutionException {
+        Optional<V> v = super.get(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> getUnchecked(K key) {
+        Optional<V> v = super.getUnchecked(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> apply(K key) {
+        return getUnchecked(key);
+    }
+
+    @Override
+    public Optional<V> getIfPresent(Object key) {
+        Optional<V> v = super.getIfPresent(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> get(K key, Callable<? extends Optional<V>> valueLoader)
+            throws ExecutionException {
+
+        Optional<V> v = super.get(key, valueLoader);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    // TODO should we be also checking getAll, etc.
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java
new file mode 100644
index 0000000..e42ec45
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java
@@ -0,0 +1,244 @@
+package org.onlab.onos.store.hz;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.hazelcast.core.EntryAdapter;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MapEvent;
+import com.hazelcast.core.Member;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.StoreDelegate;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstraction of a distributed store based on Hazelcast.
+ */
+@Component
+public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
+        extends AbstractStore<E, D> {
+
+    protected final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StoreService storeService;
+
+    protected StoreSerializer serializer;
+
+    protected HazelcastInstance theInstance;
+
+    @Activate
+    public void activate() {
+        serializer = new KryoSerializer();
+        theInstance = storeService.getHazelcastInstance();
+    }
+
+    /**
+     * Serializes the specified object using the backing store service.
+     *
+     * @param obj object to be serialized
+     * @return serialized object
+     */
+    protected byte[] serialize(Object obj) {
+        return serializer.encode(obj);
+    }
+
+    /**
+     * Deserializes the specified object using the backing store service.
+     *
+     * @param bytes bytes to be deserialized
+     * @param <T>   type of object
+     * @return deserialized object
+     */
+    protected <T> T deserialize(byte[] bytes) {
+        return serializer.decode(bytes);
+    }
+
+
+    /**
+     * An IMap entry listener, which reflects each remote event to the cache.
+     *
+     * @param <K> IMap key type after deserialization
+     * @param <V> IMap value type after deserialization
+     */
+    public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+        private final Member localMember;
+        private LoadingCache<K, Optional<V>> cache;
+
+        /**
+         * Constructor.
+         *
+         * @param cache cache to update
+         */
+        public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
+            this.localMember = theInstance.getCluster().getLocalMember();
+            this.cache = checkNotNull(cache);
+        }
+
+        @Override
+        public void mapCleared(MapEvent event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            cache.invalidateAll();
+        }
+
+        @Override
+        public void entryAdded(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V newVal = deserialize(event.getValue());
+            Optional<V> newValue = Optional.of(newVal);
+            cache.asMap().putIfAbsent(key, newValue);
+            onAdd(key, newVal);
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V oldVal = deserialize(event.getOldValue());
+            Optional<V> oldValue = Optional.fromNullable(oldVal);
+            V newVal = deserialize(event.getValue());
+            Optional<V> newValue = Optional.of(newVal);
+            cache.asMap().replace(key, oldValue, newValue);
+            onUpdate(key, oldVal, newVal);
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V val = deserialize(event.getOldValue());
+            cache.invalidate(key);
+            onRemove(key, val);
+        }
+
+        /**
+         * Cache entry addition hook.
+         *
+         * @param key    new key
+         * @param newVal new value
+         */
+        protected void onAdd(K key, V newVal) {
+        }
+
+        /**
+         * Cache entry update hook.
+         *
+         * @param key    new key
+         * @param oldValue old value
+         * @param newVal new value
+         */
+        protected void onUpdate(K key, V oldValue, V newVal) {
+        }
+
+        /**
+         * Cache entry remove hook.
+         *
+         * @param key new key
+         * @param val old value
+         */
+        protected void onRemove(K key, V val) {
+        }
+    }
+
+    /**
+     * Distributed object remote event entry listener.
+     *
+     * @param <K> Entry key type after deserialization
+     * @param <V> Entry value type after deserialization
+     */
+    public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+        private final Member localMember;
+
+        public RemoteEventHandler() {
+            this.localMember = theInstance.getCluster().getLocalMember();
+        }
+        @Override
+        public void entryAdded(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V newVal = deserialize(event.getValue());
+            onAdd(key, newVal);
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V val = deserialize(event.getValue());
+            onRemove(key, val);
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+            if (localMember.equals(event.getMember())) {
+                // ignore locally triggered event
+                return;
+            }
+            K key = deserialize(event.getKey());
+            V oldVal = deserialize(event.getOldValue());
+            V newVal = deserialize(event.getValue());
+            onUpdate(key, oldVal, newVal);
+        }
+
+        /**
+         * Remote entry addition hook.
+         *
+         * @param key    new key
+         * @param newVal new value
+         */
+        protected void onAdd(K key, V newVal) {
+        }
+
+        /**
+         * Remote entry update hook.
+         *
+         * @param key    new key
+         * @param oldValue old value
+         * @param newVal new value
+         */
+        protected void onUpdate(K key, V oldValue, V newVal) {
+        }
+
+        /**
+         * Remote entry remove hook.
+         *
+         * @param key new key
+         * @param val old value
+         */
+        protected void onRemove(K key, V val) {
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java
new file mode 100644
index 0000000..e555a51
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.hz;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheLoader;
+import com.hazelcast.core.IMap;
+
+/**
+ * CacheLoader to wrap Map value with Optional,
+ * to handle negative hit on underlying IMap.
+ *
+ * @param <K> IMap key type after deserialization
+ * @param <V> IMap value type after deserialization
+ */
+public final class OptionalCacheLoader<K, V> extends
+        CacheLoader<K, Optional<V>> {
+
+    private final StoreSerializer serializer;
+    private IMap<byte[], byte[]> rawMap;
+
+    /**
+     * Constructor.
+     *
+     * @param serializer to use for serialization
+     * @param rawMap underlying IMap
+     */
+    public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
+        this.serializer = checkNotNull(serializer);
+        this.rawMap = checkNotNull(rawMap);
+    }
+
+    @Override
+    public Optional<V> load(K key) throws Exception {
+        byte[] keyBytes = serializer.encode(key);
+        byte[] valBytes = rawMap.get(keyBytes);
+        if (valBytes == null) {
+            return Optional.absent();
+        }
+        V dev = serializer.decode(valBytes);
+        return Optional.of(dev);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java
new file mode 100644
index 0000000..16063ff
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java
@@ -0,0 +1,615 @@
+package org.onlab.onos.store.hz;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.EntryView;
+import com.hazelcast.core.ExecutionCallback;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.MapEvent;
+import com.hazelcast.map.EntryProcessor;
+import com.hazelcast.map.MapInterceptor;
+import com.hazelcast.mapreduce.JobTracker;
+import com.hazelcast.mapreduce.aggregation.Aggregation;
+import com.hazelcast.mapreduce.aggregation.Supplier;
+import com.hazelcast.monitor.LocalMapStats;
+import com.hazelcast.query.Predicate;
+
+// TODO: implement Predicate, etc. if we need them.
+/**
+ * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
+ * Key and Value using StoreSerializer.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class SMap<K, V> implements IMap<K, V> {
+
+    private final IMap<byte[], byte[]> m;
+    private final StoreSerializer serializer;
+
+    /**
+     * Creates a SMap instance.
+     *
+     * @param baseMap base IMap to use
+     * @param serializer serializer to use for both key and value
+     */
+    public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
+        this.m = checkNotNull(baseMap);
+        this.serializer = checkNotNull(serializer);
+    }
+
+    @Override
+    public int size() {
+        return m.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return m.isEmpty();
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> map) {
+        Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
+        for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
+            sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
+        }
+        m.putAll(sm);
+    }
+
+    @Deprecated
+    @Override
+    public Object getId() {
+        return m.getId();
+    }
+
+    @Override
+    public String getPartitionKey() {
+        return m.getPartitionKey();
+    }
+
+    @Override
+    public String getName() {
+        return m.getName();
+    }
+
+    @Override
+    public String getServiceName() {
+        return m.getServiceName();
+    }
+
+    @Override
+    public void destroy() {
+        m.destroy();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return m.containsKey(serializeKey(key));
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return m.containsValue(serializeVal(value));
+    }
+
+    @Override
+    public V get(Object key) {
+        return deserializeVal(m.get(serializeKey(key)));
+    }
+
+    @Override
+    public V put(K key, V value) {
+        return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
+    }
+
+    @Override
+    public V remove(Object key) {
+        return deserializeVal(m.remove(serializeKey(key)));
+    }
+
+    @Override
+    public boolean remove(Object key, Object value) {
+        return m.remove(serializeKey(key), serializeVal(value));
+    }
+
+    @Override
+    public void delete(Object key) {
+        m.delete(serializeKey(key));
+    }
+
+    @Override
+    public void flush() {
+        m.flush();
+    }
+
+    @Override
+    public Map<K, V> getAll(Set<K> keys) {
+        Set<byte[]> sk = serializeKeySet(keys);
+        Map<byte[], byte[]> bm = m.getAll(sk);
+        Map<K, V> dsm = new HashMap<>(bm.size());
+        for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
+            dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
+        }
+        return dsm;
+    }
+
+    @Override
+    public void loadAll(boolean replaceExistingValues) {
+        m.loadAll(replaceExistingValues);
+    }
+
+    @Override
+    public void loadAll(Set<K> keys, boolean replaceExistingValues) {
+        Set<byte[]> sk = serializeKeySet(keys);
+        m.loadAll(sk, replaceExistingValues);
+    }
+
+    @Override
+    public void clear() {
+        m.clear();
+    }
+
+    @Override
+    public Future<V> getAsync(K key) {
+        Future<byte[]> f = m.getAsync(serializeKey(key));
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public Future<V> putAsync(K key, V value) {
+        Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
+        Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public Future<V> removeAsync(K key) {
+        Future<byte[]> f = m.removeAsync(serializeKey(key));
+        return Futures.lazyTransform(f, new DeserializeVal());
+    }
+
+    @Override
+    public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
+        return m.tryRemove(serializeKey(key), timeout, timeunit);
+    }
+
+    @Override
+    public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
+        return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
+    }
+
+    @Override
+    public V put(K key, V value, long ttl, TimeUnit timeunit) {
+        return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
+    }
+
+    @Override
+    public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
+        m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
+        return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
+    }
+
+    @Override
+    public V replace(K key, V value) {
+        return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
+    }
+
+    @Override
+    public void set(K key, V value) {
+        m.set(serializeKey(key), serializeVal(value));
+    }
+
+    @Override
+    public void set(K key, V value, long ttl, TimeUnit timeunit) {
+        m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
+    }
+
+    @Override
+    public void lock(K key) {
+        m.lock(serializeKey(key));
+     }
+
+    @Override
+    public void lock(K key, long leaseTime, TimeUnit timeUnit) {
+        m.lock(serializeKey(key), leaseTime, timeUnit);
+    }
+
+    @Override
+    public boolean isLocked(K key) {
+        return m.isLocked(serializeKey(key));
+    }
+
+    @Override
+    public boolean tryLock(K key) {
+        return m.tryLock(serializeKey(key));
+    }
+
+    @Override
+    public boolean tryLock(K key, long time, TimeUnit timeunit)
+            throws InterruptedException {
+        return m.tryLock(serializeKey(key), time, timeunit);
+    }
+
+    @Override
+    public void unlock(K key) {
+        m.unlock(serializeKey(key));
+    }
+
+    @Override
+    public void forceUnlock(K key) {
+        m.forceUnlock(serializeKey(key));
+    }
+
+    @Override
+    public String addLocalEntryListener(EntryListener<K, V> listener) {
+        return m.addLocalEntryListener(new BaseEntryListener(listener));
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addLocalEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addLocalEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, K key, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addInterceptor(MapInterceptor interceptor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void removeInterceptor(String id) {
+        m.removeInterceptor(id);
+    }
+
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener,
+            boolean includeValue) {
+        return m.addEntryListener(new BaseEntryListener(listener), includeValue);
+    }
+
+    @Override
+    public boolean removeEntryListener(String id) {
+        return m.removeEntryListener(id);
+    }
+
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener, K key,
+            boolean includeValue) {
+        return m.addEntryListener(new BaseEntryListener(listener),
+                serializeKey(key), includeValue);
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public String addEntryListener(EntryListener<K, V> listener,
+            Predicate<K, V> predicate, K key, boolean includeValue) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public EntryView<K, V> getEntryView(K key) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean evict(K key) {
+        return m.evict(serializeKey(key));
+    }
+
+    @Override
+    public void evictAll() {
+        m.evictAll();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return deserializeKeySet(m.keySet());
+    }
+
+    @Override
+    public Collection<V> values() {
+        return deserializeVal(m.values());
+    }
+
+    @Override
+    public Set<java.util.Map.Entry<K, V>> entrySet() {
+        return deserializeEntrySet(m.entrySet());
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Set<K> keySet(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Collection<V> values(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Set<K> localKeySet() {
+        return deserializeKeySet(m.localKeySet());
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Set<K> localKeySet(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public void addIndex(String attribute, boolean ordered) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LocalMapStats getLocalMapStats() {
+        return m.getLocalMapStats();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Object executeOnKey(K key, EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Map<K, Object> executeOnKeys(Set<K> keys,
+            EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void submitToKey(K key, EntryProcessor entryProcessor,
+            ExecutionCallback callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Future submitToKey(K key, EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
+            Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public <SuppliedValue, Result> Result aggregate(
+            Supplier<K, V, SuppliedValue> supplier,
+            Aggregation<K, SuppliedValue, Result> aggregation) {
+
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated // marking method not implemented
+    @Override
+    public <SuppliedValue, Result> Result aggregate(
+            Supplier<K, V, SuppliedValue> supplier,
+            Aggregation<K, SuppliedValue, Result> aggregation,
+            JobTracker jobTracker) {
+
+        throw new UnsupportedOperationException();
+    }
+
+    private byte[] serializeKey(Object key) {
+        return serializer.encode(key);
+    }
+
+    private K deserializeKey(byte[] key) {
+        return serializer.decode(key);
+    }
+
+    private byte[] serializeVal(Object val) {
+        return serializer.encode(val);
+    }
+
+    private V deserializeVal(byte[] val) {
+        if (val == null) {
+            return null;
+        }
+        return serializer.decode(val.clone());
+    }
+
+    private Set<byte[]> serializeKeySet(Set<K> keys) {
+        Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
+        for (K key : keys) {
+            sk.add(serializeKey(key));
+        }
+        return sk;
+    }
+
+    private Set<K> deserializeKeySet(Set<byte[]> keys) {
+        Set<K> dsk = new HashSet<>(keys.size());
+        for (byte[] key : keys) {
+            dsk.add(deserializeKey(key));
+        }
+        return dsk;
+    }
+
+    private Collection<V> deserializeVal(Collection<byte[]> vals) {
+        Collection<V> dsl = new ArrayList<>(vals.size());
+        for (byte[] val : vals) {
+            dsl.add(deserializeVal(val));
+        }
+        return dsl;
+    }
+
+    private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
+                        Set<java.util.Map.Entry<byte[], byte[]>> entries) {
+
+        Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
+        for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
+            dse.add(Pair.of(deserializeKey(entry.getKey()),
+                            deserializeVal(entry.getValue())));
+        }
+        return dse;
+    }
+
+    private final class BaseEntryListener
+        implements EntryListener<byte[], byte[]> {
+
+            private final EntryListener<K, V> listener;
+
+        public BaseEntryListener(EntryListener<K, V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void mapEvicted(MapEvent event) {
+            listener.mapEvicted(event);
+        }
+
+        @Override
+        public void mapCleared(MapEvent event) {
+            listener.mapCleared(event);
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    deserializeVal(event.getOldValue()),
+                    deserializeVal(event.getValue()));
+
+            listener.entryUpdated(evt);
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    deserializeVal(event.getOldValue()),
+                    null);
+
+            listener.entryRemoved(evt);
+        }
+
+        @Override
+        public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    deserializeVal(event.getOldValue()),
+                    deserializeVal(event.getValue()));
+
+            listener.entryEvicted(evt);
+        }
+
+        @Override
+        public void entryAdded(EntryEvent<byte[], byte[]> event) {
+            EntryEvent<K, V> evt = new EntryEvent<K, V>(
+                    event.getSource(),
+                    event.getMember(),
+                    event.getEventType().getType(),
+                    deserializeKey(event.getKey()),
+                    null,
+                    deserializeVal(event.getValue()));
+
+            listener.entryAdded(evt);
+        }
+    }
+
+    private final class DeserializeVal implements Function<byte[], V> {
+        @Override
+        public V apply(byte[] input) {
+            return deserializeVal(input);
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java
new file mode 100644
index 0000000..6674989
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java
@@ -0,0 +1,52 @@
+package org.onlab.onos.store.hz;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+
+/**
+ * Auxiliary bootstrap of distributed store.
+ */
+@Component(immediate = true)
+@Service
+public class StoreManager implements StoreService {
+
+    protected static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected HazelcastInstance instance;
+
+    @Activate
+    public void activate() {
+        try {
+            Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
+            instance = Hazelcast.newHazelcastInstance(config);
+            log.info("Started");
+        } catch (FileNotFoundException e) {
+            log.error("Unable to configure Hazelcast", e);
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        instance.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public HazelcastInstance getHazelcastInstance() {
+        return instance;
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java
new file mode 100644
index 0000000..d79e51c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java
@@ -0,0 +1,18 @@
+package org.onlab.onos.store.hz;
+
+import com.hazelcast.core.HazelcastInstance;
+
+/**
+ * Bootstrap service to get a handle on a share Hazelcast instance.
+ */
+public interface StoreService {
+
+    /**
+     * Returns the shared Hazelcast instance for use as a distributed store
+     * backing.
+     *
+     * @return shared Hazelcast instance
+     */
+    HazelcastInstance getHazelcastInstance();
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java
new file mode 100644
index 0000000..d92f543
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Common abstractions and facilities for implementing distributed store
+ * using Hazelcast.
+ */
+package org.onlab.onos.store.hz;