split onos-core-hz into
  onos-core-hz-common
  onos-core-hz-cluster
  onos-core-hz-net

Change-Id: Ie0ceb0de8e9e8af119433fef6f802444591eb4a4
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbsentInvalidatingLoadingCache.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbsentInvalidatingLoadingCache.java
new file mode 100644
index 0000000..4dd8669
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbsentInvalidatingLoadingCache.java
@@ -0,0 +1,76 @@
+package org.onlab.onos.store.common;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * Wrapper around LoadingCache to handle negative hit scenario.
+ * <p>
+ * When the LoadingCache returned Absent,
+ * this implementation will invalidate the entry immediately to avoid
+ * caching negative hits.
+ *
+ * @param <K> Cache key type
+ * @param <V> Cache value type. (Optional{@literal <V>})
+ */
+public class AbsentInvalidatingLoadingCache<K, V> extends
+        SimpleForwardingLoadingCache<K, Optional<V>> {
+
+    /**
+     * Constructor.
+     *
+     * @param delegate actual {@link LoadingCache} to delegate loading.
+     */
+    public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) {
+        super(delegate);
+    }
+
+    @Override
+    public Optional<V> get(K key) throws ExecutionException {
+        Optional<V> v = super.get(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> getUnchecked(K key) {
+        Optional<V> v = super.getUnchecked(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> apply(K key) {
+        return getUnchecked(key);
+    }
+
+    @Override
+    public Optional<V> getIfPresent(Object key) {
+        Optional<V> v = super.getIfPresent(key);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    @Override
+    public Optional<V> get(K key, Callable<? extends Optional<V>> valueLoader)
+            throws ExecutionException {
+
+        Optional<V> v = super.get(key, valueLoader);
+        if (!v.isPresent()) {
+            invalidate(key);
+        }
+        return v;
+    }
+
+    // TODO should we be also checking getAll, etc.
+}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
new file mode 100644
index 0000000..ab513af
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
@@ -0,0 +1,144 @@
+package org.onlab.onos.store.common;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.hazelcast.core.EntryAdapter;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MapEvent;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.StoreDelegate;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstraction of a distributed store based on Hazelcast.
+ */
+@Component(componentAbstract = true)
+public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
+        extends AbstractStore<E, D> {
+
+    protected final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StoreService storeService;
+
+    protected HazelcastInstance theInstance;
+
+    @Activate
+    public void activate() {
+        theInstance = storeService.getHazelcastInstance();
+    }
+
+    /**
+     * Serializes the specified object using the backing store service.
+     *
+     * @param obj object to be serialized
+     * @return serialized object
+     */
+    protected byte[] serialize(Object obj) {
+        return storeService.serialize(obj);
+    }
+
+    /**
+     * Deserializes the specified object using the backing store service.
+     *
+     * @param bytes bytes to be deserialized
+     * @param <T>   type of object
+     * @return deserialized object
+     */
+    protected <T> T deserialize(byte[] bytes) {
+        return storeService.deserialize(bytes);
+    }
+
+
+    /**
+     * An IMap entry listener, which reflects each remote event to the cache.
+     *
+     * @param <K> IMap key type after deserialization
+     * @param <V> IMap value type after deserialization
+     */
+    public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+        private LoadingCache<K, Optional<V>> cache;
+
+        /**
+         * Constructor.
+         *
+         * @param cache cache to update
+         */
+        public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) {
+            this.cache = checkNotNull(cache);
+        }
+
+        @Override
+        public void mapCleared(MapEvent event) {
+            cache.invalidateAll();
+        }
+
+        @Override
+        public void entryAdded(EntryEvent<byte[], byte[]> event) {
+            K key = deserialize(event.getKey());
+            V newVal = deserialize(event.getValue());
+            Optional<V> newValue = Optional.of(newVal);
+            cache.asMap().putIfAbsent(key, newValue);
+            onAdd(key, newVal);
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+            K key = deserialize(event.getKey());
+            V oldVal = deserialize(event.getOldValue());
+            Optional<V> oldValue = Optional.fromNullable(oldVal);
+            V newVal = deserialize(event.getValue());
+            Optional<V> newValue = Optional.of(newVal);
+            cache.asMap().replace(key, oldValue, newValue);
+            onUpdate(key, oldVal, newVal);
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+            K key = deserialize(event.getKey());
+            V val = deserialize(event.getOldValue());
+            cache.invalidate(key);
+            onRemove(key, val);
+        }
+
+        /**
+         * Cache entry addition hook.
+         *
+         * @param key    new key
+         * @param newVal new value
+         */
+        protected void onAdd(K key, V newVal) {
+        }
+
+        /**
+         * Cache entry update hook.
+         *
+         * @param key    new key
+         * @param oldValue old value
+         * @param newVal new value
+         */
+        protected void onUpdate(K key, V oldValue, V newVal) {
+        }
+
+        /**
+         * Cache entry remove hook.
+         *
+         * @param key new key
+         * @param val old value
+         */
+        protected void onRemove(K key, V val) {
+        }
+    }
+
+}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
new file mode 100644
index 0000000..dd2b872
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
@@ -0,0 +1,43 @@
+package org.onlab.onos.store.common;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheLoader;
+import com.hazelcast.core.IMap;
+
+/**
+ * CacheLoader to wrap Map value with Optional,
+ * to handle negative hit on underlying IMap.
+ *
+ * @param <K> IMap key type after deserialization
+ * @param <V> IMap value type after deserialization
+ */
+public final class OptionalCacheLoader<K, V> extends
+        CacheLoader<K, Optional<V>> {
+
+    private final StoreService storeService;
+    private IMap<byte[], byte[]> rawMap;
+
+    /**
+     * Constructor.
+     *
+     * @param storeService to use for serialization
+     * @param rawMap underlying IMap
+     */
+    public OptionalCacheLoader(StoreService storeService, IMap<byte[], byte[]> rawMap) {
+        this.storeService = checkNotNull(storeService);
+        this.rawMap = checkNotNull(rawMap);
+    }
+
+    @Override
+    public Optional<V> load(K key) throws Exception {
+        byte[] keyBytes = storeService.serialize(key);
+        byte[] valBytes = rawMap.get(keyBytes);
+        if (valBytes == null) {
+            return Optional.absent();
+        }
+        V dev = storeService.deserialize(valBytes);
+        return Optional.of(dev);
+    }
+}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreManager.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreManager.java
new file mode 100644
index 0000000..5685116
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreManager.java
@@ -0,0 +1,135 @@
+package org.onlab.onos.store.common;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+
+import de.javakaffee.kryoserializers.URISerializer;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultLink;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Element;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.serializers.ConnectPointSerializer;
+import org.onlab.onos.store.serializers.DefaultLinkSerializer;
+import org.onlab.onos.store.serializers.DefaultPortSerializer;
+import org.onlab.onos.store.serializers.DeviceIdSerializer;
+import org.onlab.onos.store.serializers.IpPrefixSerializer;
+import org.onlab.onos.store.serializers.LinkKeySerializer;
+import org.onlab.onos.store.serializers.NodeIdSerializer;
+import org.onlab.onos.store.serializers.PortNumberSerializer;
+import org.onlab.onos.store.serializers.ProviderIdSerializer;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Auxiliary bootstrap of distributed store.
+ */
+@Component(immediate = true)
+@Service
+public class StoreManager implements StoreService {
+
+    protected static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected HazelcastInstance instance;
+    private KryoPool serializerPool;
+
+
+    @Activate
+    public void activate() {
+        try {
+            Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
+            instance = Hazelcast.newHazelcastInstance(config);
+            setupKryoPool();
+            log.info("Started");
+        } catch (FileNotFoundException e) {
+            log.error("Unable to configure Hazelcast", e);
+        }
+    }
+
+    /**
+     * Sets up the common serialzers pool.
+     */
+    protected void setupKryoPool() {
+        // FIXME Slice out types used in common to separate pool/namespace.
+        serializerPool = KryoPool.newBuilder()
+                .register(ArrayList.class,
+                          HashMap.class,
+
+                          ControllerNode.State.class,
+                          Device.Type.class,
+
+                          DefaultControllerNode.class,
+                          DefaultDevice.class,
+                          MastershipRole.class,
+                          Port.class,
+                          Element.class,
+
+                          Link.Type.class
+                )
+                .register(IpPrefix.class, new IpPrefixSerializer())
+                .register(URI.class, new URISerializer())
+                .register(NodeId.class, new NodeIdSerializer())
+                .register(ProviderId.class, new ProviderIdSerializer())
+                .register(DeviceId.class, new DeviceIdSerializer())
+                .register(PortNumber.class, new PortNumberSerializer())
+                .register(DefaultPort.class, new DefaultPortSerializer())
+                .register(LinkKey.class, new LinkKeySerializer())
+                .register(ConnectPoint.class, new ConnectPointSerializer())
+                .register(DefaultLink.class, new DefaultLinkSerializer())
+                .build()
+                .populate(10);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        instance.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public HazelcastInstance getHazelcastInstance() {
+        return instance;
+    }
+
+
+    @Override
+    public byte[] serialize(final Object obj) {
+        return serializerPool.serialize(obj);
+    }
+
+    @Override
+    public <T> T deserialize(final byte[] bytes) {
+        if (bytes == null) {
+            return null;
+        }
+        return serializerPool.deserialize(bytes);
+    }
+
+}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreService.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreService.java
new file mode 100644
index 0000000..490183f
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/StoreService.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.common;
+
+import com.hazelcast.core.HazelcastInstance;
+
+/**
+ * Bootstrap service to get a handle on a share Hazelcast instance.
+ */
+public interface StoreService {
+
+    /**
+     * Returns the shared Hazelcast instance for use as a distributed store
+     * backing.
+     *
+     * @return shared Hazelcast instance
+     */
+    HazelcastInstance getHazelcastInstance();
+
+    /**
+     * Serializes the specified object into bytes using one of the
+     * pre-registered serializers.
+     *
+     * @param obj object to be serialized
+     * @return serialized bytes
+     */
+    public byte[] serialize(final Object obj);
+
+    /**
+     * Deserializes the specified bytes into an object using one of the
+     * pre-registered serializers.
+     *
+     * @param bytes bytes to be deserialized
+     * @return deserialized object
+     */
+    public <T> T deserialize(final byte[] bytes);
+
+}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/package-info.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/package-info.java
new file mode 100644
index 0000000..cf19812
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Common abstractions and facilities for implementing distributed store
+ * using Hazelcast.
+ */
+package org.onlab.onos.store.common;
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/impl/package-info.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/impl/package-info.java
new file mode 100644
index 0000000..5b3ec30
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Common facilities for use by various distributed stores.
+ */
+package org.onlab.onos.store.impl;