moved Hazelcast based Mastership+Cluster store to onos-code-dist bundle
Change-Id: I304f916f3a400eaf050a5351825634349790e1bf
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
index 4dc67d4..f38df5a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
@@ -16,6 +16,7 @@
import java.util.Iterator;
import java.util.Set;
+//Not used right now
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
index 74c22f1..6ab594e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
@@ -2,6 +2,7 @@
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+//Not used right now
public final class ClusterManagementMessageSubjects {
// avoid instantiation
private ClusterManagementMessageSubjects() {}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
index 30b847f..1a65798 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
@@ -2,6 +2,7 @@
import org.onlab.onos.cluster.ControllerNode;
+//Not used right now
/**
* Contains information that will be published when a cluster membership event occurs.
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
index cdfd145..07c6c31 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
@@ -1,5 +1,6 @@
package org.onlab.onos.store.cluster.impl;
+//Not used right now
public enum ClusterMembershipEventType {
NEW_MEMBER,
LEAVING_MEMBER,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
index b82a835..daef7bd 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
@@ -4,6 +4,7 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
+// Not used right now
/**
* Simple back interface through which connection manager can interact with
* the cluster store.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
new file mode 100644
index 0000000..59267d1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -0,0 +1,164 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterStore;
+import org.onlab.onos.cluster.ClusterStoreDelegate;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.hz.AbsentInvalidatingLoadingCache;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.OptionalCacheLoader;
+import org.onlab.packet.IpPrefix;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.cache.CacheBuilder.newBuilder;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
+import static org.onlab.onos.cluster.ControllerNode.State;
+
+/**
+ * Distributed implementation of the cluster nodes store.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedClusterStore
+ extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
+ implements ClusterStore {
+
+ private IMap<byte[], byte[]> rawNodes;
+ private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
+
+ private String listenerId;
+ private final MembershipListener listener = new InternalMembershipListener();
+ private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+
+ @Override
+ @Activate
+ public void activate() {
+ super.activate();
+ listenerId = theInstance.getCluster().addMembershipListener(listener);
+
+ rawNodes = theInstance.getMap("nodes");
+ OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
+ = new OptionalCacheLoader<>(serializer, rawNodes);
+ nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+ rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
+
+ loadClusterNodes();
+
+ log.info("Started");
+ }
+
+ // Loads the initial set of cluster nodes
+ private void loadClusterNodes() {
+ for (Member member : theInstance.getCluster().getMembers()) {
+ addNode(node(member));
+ }
+ }
+
+ @Deactivate
+ public void deactivate() {
+ theInstance.getCluster().removeMembershipListener(listenerId);
+ log.info("Stopped");
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return node(theInstance.getCluster().getLocalMember());
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
+ for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
+ builder.add(optional.get());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return nodes.getUnchecked(nodeId).orNull();
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ State state = states.get(nodeId);
+ return state == null ? State.INACTIVE : state;
+ }
+
+ @Override
+ public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
+ return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
+ }
+
+ @Override
+ public void removeNode(NodeId nodeId) {
+ synchronized (this) {
+ rawNodes.remove(serialize(nodeId));
+ nodes.invalidate(nodeId);
+ }
+ }
+
+ // Adds a new node based on the specified member
+ private synchronized ControllerNode addNode(DefaultControllerNode node) {
+ rawNodes.put(serialize(node.id()), serialize(node));
+ nodes.put(node.id(), Optional.of(node));
+ states.put(node.id(), State.ACTIVE);
+ return node;
+ }
+
+ // Creates a controller node descriptor from the Hazelcast member.
+ private DefaultControllerNode node(Member member) {
+ IpPrefix ip = memberAddress(member);
+ return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ }
+
+ private IpPrefix memberAddress(Member member) {
+ byte[] address = member.getSocketAddress().getAddress().getAddress();
+ return IpPrefix.valueOf(address);
+ }
+
+ // Interceptor for membership events.
+ private class InternalMembershipListener implements MembershipListener {
+ @Override
+ public void memberAdded(MembershipEvent membershipEvent) {
+ log.info("Member {} added", membershipEvent.getMember());
+ ControllerNode node = addNode(node(membershipEvent.getMember()));
+ notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
+ }
+
+ @Override
+ public void memberRemoved(MembershipEvent membershipEvent) {
+ log.info("Member {} removed", membershipEvent.getMember());
+ NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
+ states.put(nodeId, State.INACTIVE);
+ notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
+ }
+
+ @Override
+ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+ log.info("Member {} attribute {} changed to {}",
+ memberAttributeEvent.getMember(),
+ memberAttributeEvent.getKey(),
+ memberAttributeEvent.getValue());
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak
deleted file mode 100644
index c781b23..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java.bak
+++ /dev/null
@@ -1,185 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.onos.cluster.ClusterEvent;
-import org.onlab.onos.cluster.ClusterStore;
-import org.onlab.onos.cluster.ClusterStoreDelegate;
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
-import org.onlab.packet.IpPrefix;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.onlab.onos.cluster.ControllerNode.State;
-import static org.onlab.packet.IpPrefix.valueOf;
-
-/**
- * Distributed implementation of the cluster nodes store.
- */
-//@Component(immediate = true)
-//@Service
-public class DistributedClusterStore
- extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
- implements ClusterStore {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private DefaultControllerNode localNode;
- private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
- private final Map<NodeId, State> states = new ConcurrentHashMap<>();
- private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .expireAfterWrite(/*ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * */3, TimeUnit.MILLISECONDS)
- .removalListener(new LivenessCacheRemovalListener()).build();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationAdminService clusterCommunicationAdminService;
-
- private final ClusterNodesDelegate nodesDelegate = new InternalNodesDelegate();
-
- @Activate
- public void activate() throws IOException {
- loadClusterDefinition();
- establishSelfIdentity();
-
- // Start-up the comm service and prime it with the loaded nodes.
- clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
- for (DefaultControllerNode node : nodes.values()) {
- clusterCommunicationAdminService.addNode(node);
- }
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- /**
- * Loads the cluster definition file.
- */
- private void loadClusterDefinition() {
- ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
- try {
- Set<DefaultControllerNode> storedNodes = cds.read();
- for (DefaultControllerNode node : storedNodes) {
- nodes.put(node.id(), node);
- }
- } catch (IOException e) {
- log.error("Unable to read cluster definitions", e);
- }
- }
-
- /**
- * Determines who the local controller node is.
- */
- private void establishSelfIdentity() {
- // Establishes the controller's own identity.
- IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
- localNode = nodes.get(new NodeId(ip.toString()));
-
- // As a fall-back, let's make sure we at least know who we are.
- if (localNode == null) {
- localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
- nodes.put(localNode.id(), localNode);
- }
- states.put(localNode.id(), State.ACTIVE);
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return localNode;
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
- return builder.addAll(nodes.values()).build();
- }
-
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- return nodes.get(nodeId);
- }
-
- @Override
- public State getState(NodeId nodeId) {
- State state = states.get(nodeId);
- return state == null ? State.INACTIVE : state;
- }
-
- @Override
- public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
- DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
- nodes.put(nodeId, node);
- clusterCommunicationAdminService.addNode(node);
- return node;
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- if (nodeId.equals(localNode.id())) {
- nodes.clear();
- nodes.put(localNode.id(), localNode);
-
- } else {
- // Remove the other node.
- DefaultControllerNode node = nodes.remove(nodeId);
- if (node != null) {
- clusterCommunicationAdminService.removeNode(node);
- }
- }
- }
-
- // Entity to handle back calls from the connection manager.
- private class InternalNodesDelegate implements ClusterNodesDelegate {
- @Override
- public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
- DefaultControllerNode node = nodes.get(nodeId);
- if (node == null) {
- node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
- }
- states.put(nodeId, State.ACTIVE);
- livenessCache.put(nodeId, node);
- return node;
- }
-
- @Override
- public void nodeVanished(NodeId nodeId) {
- states.put(nodeId, State.INACTIVE);
- }
-
- @Override
- public void nodeRemoved(NodeId nodeId) {
- removeNode(nodeId);
- }
- }
-
- private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
-
- @Override
- public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
- NodeId nodeId = entry.getKey();
- log.warn("Failed to receive heartbeats from controller: " + nodeId);
- nodesDelegate.nodeVanished(nodeId);
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
index 8d273ac..9f36b88 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -1,4 +1,4 @@
/**
- * Distributed cluster store and messaging subsystem implementation.
+ * Implementation of a distributed cluster node store using Hazelcast.
*/
package org.onlab.onos.store.cluster.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java
new file mode 100644
index 0000000..8164467
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbsentInvalidatingLoadingCache.java
@@ -0,0 +1,76 @@
+package org.onlab.onos.store.hz;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * Wrapper around LoadingCache to handle negative hit scenario.
+ * <p>
+ * When the LoadingCache returned Absent,
+ * this implementation will invalidate the entry immediately to avoid
+ * caching negative hits.
+ *
+ * @param <K> Cache key type
+ * @param <V> Cache value type. (Optional{@literal <V>})
+ */
+public class AbsentInvalidatingLoadingCache<K, V> extends
+ SimpleForwardingLoadingCache<K, Optional<V>> {
+
+ /**
+ * Constructor.
+ *
+ * @param delegate actual {@link LoadingCache} to delegate loading.
+ */
+ public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public Optional<V> get(K key) throws ExecutionException {
+ Optional<V> v = super.get(key);
+ if (!v.isPresent()) {
+ invalidate(key);
+ }
+ return v;
+ }
+
+ @Override
+ public Optional<V> getUnchecked(K key) {
+ Optional<V> v = super.getUnchecked(key);
+ if (!v.isPresent()) {
+ invalidate(key);
+ }
+ return v;
+ }
+
+ @Override
+ public Optional<V> apply(K key) {
+ return getUnchecked(key);
+ }
+
+ @Override
+ public Optional<V> getIfPresent(Object key) {
+ Optional<V> v = super.getIfPresent(key);
+ if (!v.isPresent()) {
+ invalidate(key);
+ }
+ return v;
+ }
+
+ @Override
+ public Optional<V> get(K key, Callable<? extends Optional<V>> valueLoader)
+ throws ExecutionException {
+
+ Optional<V> v = super.get(key, valueLoader);
+ if (!v.isPresent()) {
+ invalidate(key);
+ }
+ return v;
+ }
+
+ // TODO should we be also checking getAll, etc.
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java
new file mode 100644
index 0000000..e42ec45
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/AbstractHazelcastStore.java
@@ -0,0 +1,244 @@
+package org.onlab.onos.store.hz;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.hazelcast.core.EntryAdapter;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MapEvent;
+import com.hazelcast.core.Member;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.StoreDelegate;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstraction of a distributed store based on Hazelcast.
+ */
+@Component
+public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
+ extends AbstractStore<E, D> {
+
+ protected final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StoreService storeService;
+
+ protected StoreSerializer serializer;
+
+ protected HazelcastInstance theInstance;
+
+ @Activate
+ public void activate() {
+ serializer = new KryoSerializer();
+ theInstance = storeService.getHazelcastInstance();
+ }
+
+ /**
+ * Serializes the specified object using the backing store service.
+ *
+ * @param obj object to be serialized
+ * @return serialized object
+ */
+ protected byte[] serialize(Object obj) {
+ return serializer.encode(obj);
+ }
+
+ /**
+ * Deserializes the specified object using the backing store service.
+ *
+ * @param bytes bytes to be deserialized
+ * @param <T> type of object
+ * @return deserialized object
+ */
+ protected <T> T deserialize(byte[] bytes) {
+ return serializer.decode(bytes);
+ }
+
+
+ /**
+ * An IMap entry listener, which reflects each remote event to the cache.
+ *
+ * @param <K> IMap key type after deserialization
+ * @param <V> IMap value type after deserialization
+ */
+ public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+ private final Member localMember;
+ private LoadingCache<K, Optional<V>> cache;
+
+ /**
+ * Constructor.
+ *
+ * @param cache cache to update
+ */
+ public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
+ this.localMember = theInstance.getCluster().getLocalMember();
+ this.cache = checkNotNull(cache);
+ }
+
+ @Override
+ public void mapCleared(MapEvent event) {
+ if (localMember.equals(event.getMember())) {
+ // ignore locally triggered event
+ return;
+ }
+ cache.invalidateAll();
+ }
+
+ @Override
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ if (localMember.equals(event.getMember())) {
+ // ignore locally triggered event
+ return;
+ }
+ K key = deserialize(event.getKey());
+ V newVal = deserialize(event.getValue());
+ Optional<V> newValue = Optional.of(newVal);
+ cache.asMap().putIfAbsent(key, newValue);
+ onAdd(key, newVal);
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+ if (localMember.equals(event.getMember())) {
+ // ignore locally triggered event
+ return;
+ }
+ K key = deserialize(event.getKey());
+ V oldVal = deserialize(event.getOldValue());
+ Optional<V> oldValue = Optional.fromNullable(oldVal);
+ V newVal = deserialize(event.getValue());
+ Optional<V> newValue = Optional.of(newVal);
+ cache.asMap().replace(key, oldValue, newValue);
+ onUpdate(key, oldVal, newVal);
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+ if (localMember.equals(event.getMember())) {
+ // ignore locally triggered event
+ return;
+ }
+ K key = deserialize(event.getKey());
+ V val = deserialize(event.getOldValue());
+ cache.invalidate(key);
+ onRemove(key, val);
+ }
+
+ /**
+ * Cache entry addition hook.
+ *
+ * @param key new key
+ * @param newVal new value
+ */
+ protected void onAdd(K key, V newVal) {
+ }
+
+ /**
+ * Cache entry update hook.
+ *
+ * @param key new key
+ * @param oldValue old value
+ * @param newVal new value
+ */
+ protected void onUpdate(K key, V oldValue, V newVal) {
+ }
+
+ /**
+ * Cache entry remove hook.
+ *
+ * @param key new key
+ * @param val old value
+ */
+ protected void onRemove(K key, V val) {
+ }
+ }
+
+ /**
+ * Distributed object remote event entry listener.
+ *
+ * @param <K> Entry key type after deserialization
+ * @param <V> Entry value type after deserialization
+ */
+ public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+ private final Member localMember;
+
+ public RemoteEventHandler() {
+ this.localMember = theInstance.getCluster().getLocalMember();
+ }
+ @Override
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ if (localMember.equals(event.getMember())) {
+ // ignore locally triggered event
+ return;
+ }
+ K key = deserialize(event.getKey());
+ V newVal = deserialize(event.getValue());
+ onAdd(key, newVal);
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+ if (localMember.equals(event.getMember())) {
+ // ignore locally triggered event
+ return;
+ }
+ K key = deserialize(event.getKey());
+ V val = deserialize(event.getValue());
+ onRemove(key, val);
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+ if (localMember.equals(event.getMember())) {
+ // ignore locally triggered event
+ return;
+ }
+ K key = deserialize(event.getKey());
+ V oldVal = deserialize(event.getOldValue());
+ V newVal = deserialize(event.getValue());
+ onUpdate(key, oldVal, newVal);
+ }
+
+ /**
+ * Remote entry addition hook.
+ *
+ * @param key new key
+ * @param newVal new value
+ */
+ protected void onAdd(K key, V newVal) {
+ }
+
+ /**
+ * Remote entry update hook.
+ *
+ * @param key new key
+ * @param oldValue old value
+ * @param newVal new value
+ */
+ protected void onUpdate(K key, V oldValue, V newVal) {
+ }
+
+ /**
+ * Remote entry remove hook.
+ *
+ * @param key new key
+ * @param val old value
+ */
+ protected void onRemove(K key, V val) {
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java
new file mode 100644
index 0000000..e555a51
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/OptionalCacheLoader.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.hz;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheLoader;
+import com.hazelcast.core.IMap;
+
+/**
+ * CacheLoader to wrap Map value with Optional,
+ * to handle negative hit on underlying IMap.
+ *
+ * @param <K> IMap key type after deserialization
+ * @param <V> IMap value type after deserialization
+ */
+public final class OptionalCacheLoader<K, V> extends
+ CacheLoader<K, Optional<V>> {
+
+ private final StoreSerializer serializer;
+ private IMap<byte[], byte[]> rawMap;
+
+ /**
+ * Constructor.
+ *
+ * @param serializer to use for serialization
+ * @param rawMap underlying IMap
+ */
+ public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
+ this.serializer = checkNotNull(serializer);
+ this.rawMap = checkNotNull(rawMap);
+ }
+
+ @Override
+ public Optional<V> load(K key) throws Exception {
+ byte[] keyBytes = serializer.encode(key);
+ byte[] valBytes = rawMap.get(keyBytes);
+ if (valBytes == null) {
+ return Optional.absent();
+ }
+ V dev = serializer.decode(valBytes);
+ return Optional.of(dev);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java
new file mode 100644
index 0000000..16063ff
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SMap.java
@@ -0,0 +1,615 @@
+package org.onlab.onos.store.hz;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.EntryView;
+import com.hazelcast.core.ExecutionCallback;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.MapEvent;
+import com.hazelcast.map.EntryProcessor;
+import com.hazelcast.map.MapInterceptor;
+import com.hazelcast.mapreduce.JobTracker;
+import com.hazelcast.mapreduce.aggregation.Aggregation;
+import com.hazelcast.mapreduce.aggregation.Supplier;
+import com.hazelcast.monitor.LocalMapStats;
+import com.hazelcast.query.Predicate;
+
+// TODO: implement Predicate, etc. if we need them.
+/**
+ * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
+ * Key and Value using StoreSerializer.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class SMap<K, V> implements IMap<K, V> {
+
+ private final IMap<byte[], byte[]> m;
+ private final StoreSerializer serializer;
+
+ /**
+ * Creates a SMap instance.
+ *
+ * @param baseMap base IMap to use
+ * @param serializer serializer to use for both key and value
+ */
+ public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
+ this.m = checkNotNull(baseMap);
+ this.serializer = checkNotNull(serializer);
+ }
+
+ @Override
+ public int size() {
+ return m.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return m.isEmpty();
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> map) {
+ Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
+ for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
+ sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
+ }
+ m.putAll(sm);
+ }
+
+ @Deprecated
+ @Override
+ public Object getId() {
+ return m.getId();
+ }
+
+ @Override
+ public String getPartitionKey() {
+ return m.getPartitionKey();
+ }
+
+ @Override
+ public String getName() {
+ return m.getName();
+ }
+
+ @Override
+ public String getServiceName() {
+ return m.getServiceName();
+ }
+
+ @Override
+ public void destroy() {
+ m.destroy();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return m.containsKey(serializeKey(key));
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return m.containsValue(serializeVal(value));
+ }
+
+ @Override
+ public V get(Object key) {
+ return deserializeVal(m.get(serializeKey(key)));
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
+ }
+
+ @Override
+ public V remove(Object key) {
+ return deserializeVal(m.remove(serializeKey(key)));
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ return m.remove(serializeKey(key), serializeVal(value));
+ }
+
+ @Override
+ public void delete(Object key) {
+ m.delete(serializeKey(key));
+ }
+
+ @Override
+ public void flush() {
+ m.flush();
+ }
+
+ @Override
+ public Map<K, V> getAll(Set<K> keys) {
+ Set<byte[]> sk = serializeKeySet(keys);
+ Map<byte[], byte[]> bm = m.getAll(sk);
+ Map<K, V> dsm = new HashMap<>(bm.size());
+ for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
+ dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
+ }
+ return dsm;
+ }
+
+ @Override
+ public void loadAll(boolean replaceExistingValues) {
+ m.loadAll(replaceExistingValues);
+ }
+
+ @Override
+ public void loadAll(Set<K> keys, boolean replaceExistingValues) {
+ Set<byte[]> sk = serializeKeySet(keys);
+ m.loadAll(sk, replaceExistingValues);
+ }
+
+ @Override
+ public void clear() {
+ m.clear();
+ }
+
+ @Override
+ public Future<V> getAsync(K key) {
+ Future<byte[]> f = m.getAsync(serializeKey(key));
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public Future<V> putAsync(K key, V value) {
+ Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
+ Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public Future<V> removeAsync(K key) {
+ Future<byte[]> f = m.removeAsync(serializeKey(key));
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
+ return m.tryRemove(serializeKey(key), timeout, timeunit);
+ }
+
+ @Override
+ public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
+ return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
+ }
+
+ @Override
+ public V put(K key, V value, long ttl, TimeUnit timeunit) {
+ return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
+ }
+
+ @Override
+ public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
+ m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
+ return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
+ }
+
+ @Override
+ public V replace(K key, V value) {
+ return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
+ }
+
+ @Override
+ public void set(K key, V value) {
+ m.set(serializeKey(key), serializeVal(value));
+ }
+
+ @Override
+ public void set(K key, V value, long ttl, TimeUnit timeunit) {
+ m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
+ }
+
+ @Override
+ public void lock(K key) {
+ m.lock(serializeKey(key));
+ }
+
+ @Override
+ public void lock(K key, long leaseTime, TimeUnit timeUnit) {
+ m.lock(serializeKey(key), leaseTime, timeUnit);
+ }
+
+ @Override
+ public boolean isLocked(K key) {
+ return m.isLocked(serializeKey(key));
+ }
+
+ @Override
+ public boolean tryLock(K key) {
+ return m.tryLock(serializeKey(key));
+ }
+
+ @Override
+ public boolean tryLock(K key, long time, TimeUnit timeunit)
+ throws InterruptedException {
+ return m.tryLock(serializeKey(key), time, timeunit);
+ }
+
+ @Override
+ public void unlock(K key) {
+ m.unlock(serializeKey(key));
+ }
+
+ @Override
+ public void forceUnlock(K key) {
+ m.forceUnlock(serializeKey(key));
+ }
+
+ @Override
+ public String addLocalEntryListener(EntryListener<K, V> listener) {
+ return m.addLocalEntryListener(new BaseEntryListener(listener));
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addLocalEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addLocalEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, K key, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addInterceptor(MapInterceptor interceptor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeInterceptor(String id) {
+ m.removeInterceptor(id);
+ }
+
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener,
+ boolean includeValue) {
+ return m.addEntryListener(new BaseEntryListener(listener), includeValue);
+ }
+
+ @Override
+ public boolean removeEntryListener(String id) {
+ return m.removeEntryListener(id);
+ }
+
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener, K key,
+ boolean includeValue) {
+ return m.addEntryListener(new BaseEntryListener(listener),
+ serializeKey(key), includeValue);
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, K key, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public EntryView<K, V> getEntryView(K key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean evict(K key) {
+ return m.evict(serializeKey(key));
+ }
+
+ @Override
+ public void evictAll() {
+ m.evictAll();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return deserializeKeySet(m.keySet());
+ }
+
+ @Override
+ public Collection<V> values() {
+ return deserializeVal(m.values());
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<K, V>> entrySet() {
+ return deserializeEntrySet(m.entrySet());
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<K> keySet(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Collection<V> values(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<K> localKeySet() {
+ return deserializeKeySet(m.localKeySet());
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<K> localKeySet(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public void addIndex(String attribute, boolean ordered) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LocalMapStats getLocalMapStats() {
+ return m.getLocalMapStats();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Object executeOnKey(K key, EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Map<K, Object> executeOnKeys(Set<K> keys,
+ EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void submitToKey(K key, EntryProcessor entryProcessor,
+ ExecutionCallback callback) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Future submitToKey(K key, EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
+ Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public <SuppliedValue, Result> Result aggregate(
+ Supplier<K, V, SuppliedValue> supplier,
+ Aggregation<K, SuppliedValue, Result> aggregation) {
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public <SuppliedValue, Result> Result aggregate(
+ Supplier<K, V, SuppliedValue> supplier,
+ Aggregation<K, SuppliedValue, Result> aggregation,
+ JobTracker jobTracker) {
+
+ throw new UnsupportedOperationException();
+ }
+
+ private byte[] serializeKey(Object key) {
+ return serializer.encode(key);
+ }
+
+ private K deserializeKey(byte[] key) {
+ return serializer.decode(key);
+ }
+
+ private byte[] serializeVal(Object val) {
+ return serializer.encode(val);
+ }
+
+ private V deserializeVal(byte[] val) {
+ if (val == null) {
+ return null;
+ }
+ return serializer.decode(val.clone());
+ }
+
+ private Set<byte[]> serializeKeySet(Set<K> keys) {
+ Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
+ for (K key : keys) {
+ sk.add(serializeKey(key));
+ }
+ return sk;
+ }
+
+ private Set<K> deserializeKeySet(Set<byte[]> keys) {
+ Set<K> dsk = new HashSet<>(keys.size());
+ for (byte[] key : keys) {
+ dsk.add(deserializeKey(key));
+ }
+ return dsk;
+ }
+
+ private Collection<V> deserializeVal(Collection<byte[]> vals) {
+ Collection<V> dsl = new ArrayList<>(vals.size());
+ for (byte[] val : vals) {
+ dsl.add(deserializeVal(val));
+ }
+ return dsl;
+ }
+
+ private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
+ Set<java.util.Map.Entry<byte[], byte[]>> entries) {
+
+ Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
+ for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
+ dse.add(Pair.of(deserializeKey(entry.getKey()),
+ deserializeVal(entry.getValue())));
+ }
+ return dse;
+ }
+
+ private final class BaseEntryListener
+ implements EntryListener<byte[], byte[]> {
+
+ private final EntryListener<K, V> listener;
+
+ public BaseEntryListener(EntryListener<K, V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void mapEvicted(MapEvent event) {
+ listener.mapEvicted(event);
+ }
+
+ @Override
+ public void mapCleared(MapEvent event) {
+ listener.mapCleared(event);
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ deserializeVal(event.getOldValue()),
+ deserializeVal(event.getValue()));
+
+ listener.entryUpdated(evt);
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ deserializeVal(event.getOldValue()),
+ null);
+
+ listener.entryRemoved(evt);
+ }
+
+ @Override
+ public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ deserializeVal(event.getOldValue()),
+ deserializeVal(event.getValue()));
+
+ listener.entryEvicted(evt);
+ }
+
+ @Override
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ null,
+ deserializeVal(event.getValue()));
+
+ listener.entryAdded(evt);
+ }
+ }
+
+ private final class DeserializeVal implements Function<byte[], V> {
+ @Override
+ public V apply(byte[] input) {
+ return deserializeVal(input);
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java
new file mode 100644
index 0000000..6674989
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreManager.java
@@ -0,0 +1,52 @@
+package org.onlab.onos.store.hz;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+
+/**
+ * Auxiliary bootstrap of distributed store.
+ */
+@Component(immediate = true)
+@Service
+public class StoreManager implements StoreService {
+
+ protected static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected HazelcastInstance instance;
+
+ @Activate
+ public void activate() {
+ try {
+ Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
+ instance = Hazelcast.newHazelcastInstance(config);
+ log.info("Started");
+ } catch (FileNotFoundException e) {
+ log.error("Unable to configure Hazelcast", e);
+ }
+ }
+
+ @Deactivate
+ public void deactivate() {
+ instance.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public HazelcastInstance getHazelcastInstance() {
+ return instance;
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java
new file mode 100644
index 0000000..d79e51c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/StoreService.java
@@ -0,0 +1,18 @@
+package org.onlab.onos.store.hz;
+
+import com.hazelcast.core.HazelcastInstance;
+
+/**
+ * Bootstrap service to get a handle on a share Hazelcast instance.
+ */
+public interface StoreService {
+
+ /**
+ * Returns the shared Hazelcast instance for use as a distributed store
+ * backing.
+ *
+ * @return shared Hazelcast instance
+ */
+ HazelcastInstance getHazelcastInstance();
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java
new file mode 100644
index 0000000..d92f543
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Common abstractions and facilities for implementing distributed store
+ * using Hazelcast.
+ */
+package org.onlab.onos.store.hz;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..ecc8982
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -0,0 +1,372 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStore;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.MapEvent;
+
+import static org.onlab.onos.net.MastershipRole.*;
+
+/**
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore
+extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
+implements MastershipStore {
+
+ //initial term/TTL value
+ private static final Integer INIT = 0;
+
+ //device to node roles
+ protected SMap<DeviceId, RoleValue> roleMap;
+ //devices to terms
+ protected SMap<DeviceId, Integer> terms;
+ //last-known cluster size, used for tie-breaking when partitioning occurs
+ protected IAtomicLong clusterSize;
+
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Override
+ @Activate
+ public void activate() {
+ super.activate();
+
+ this.serializer = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+
+ .register(RoleValue.class, new RoleValueSerializer())
+ .build()
+ .populate(1);
+ }
+ };
+
+ roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
+ roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
+ terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
+ clusterSize = theInstance.getAtomicLong("clustersize");
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+ final RoleValue roleInfo = getRoleValue(deviceId);
+ if (roleInfo.contains(MASTER, nodeId)) {
+ return MASTER;
+ }
+ if (roleInfo.contains(STANDBY, nodeId)) {
+ return STANDBY;
+ }
+ return NONE;
+ }
+
+ @Override
+ public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+
+ MastershipRole role = getRole(nodeId, deviceId);
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ switch (role) {
+ case MASTER:
+ //reinforce mastership
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ return null;
+ case STANDBY:
+ case NONE:
+ NodeId current = rv.get(MASTER);
+ if (current != null) {
+ //backup and replace current master
+ rv.reassign(current, NONE, STANDBY);
+ rv.replace(current, nodeId, MASTER);
+ } else {
+ //no master before so just add.
+ rv.add(MASTER, nodeId);
+ }
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ updateTerm(deviceId);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
+ }
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public NodeId getMaster(DeviceId deviceId) {
+ return getNode(MASTER, deviceId);
+ }
+
+
+ @Override
+ public RoleInfo getNodes(DeviceId deviceId) {
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ return rv.roleInfo();
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public Set<DeviceId> getDevices(NodeId nodeId) {
+ ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+
+ for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
+ if (nodeId.equals(el.getValue().get(MASTER))) {
+ builder.add(el.getKey());
+ }
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public MastershipRole requestRole(DeviceId deviceId) {
+ NodeId local = clusterService.getLocalNode().id();
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(local, deviceId);
+ switch (role) {
+ case MASTER:
+ rv.reassign(local, STANDBY, NONE);
+ terms.putIfAbsent(deviceId, INIT);
+ roleMap.put(deviceId, rv);
+ break;
+ case STANDBY:
+ rv.reassign(local, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
+ terms.putIfAbsent(deviceId, INIT);
+ break;
+ case NONE:
+ //either we're the first standby, or first to device.
+ //for latter, claim mastership.
+ if (rv.get(MASTER) == null) {
+ rv.add(MASTER, local);
+ rv.reassign(local, STANDBY, NONE);
+ updateTerm(deviceId);
+ role = MastershipRole.MASTER;
+ } else {
+ rv.add(STANDBY, local);
+ rv.reassign(local, NONE, STANDBY);
+ role = MastershipRole.STANDBY;
+ }
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public MastershipTerm getTermFor(DeviceId deviceId) {
+ RoleValue rv = getRoleValue(deviceId);
+ if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) {
+ return null;
+ }
+ return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId));
+ }
+
+ @Override
+ public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ MastershipEvent event = null;
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId, rv);
+ //fall through to reinforce role
+ case STANDBY:
+ //fall through to reinforce role
+ case NONE:
+ rv.reassign(nodeId, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ MastershipEvent event = null;
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId, rv);
+ //fall through to reinforce relinquishment
+ case STANDBY:
+ //fall through to reinforce relinquishment
+ case NONE:
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ //helper to fetch a new master candidate for a given device.
+ private MastershipEvent reelect(
+ NodeId current, DeviceId deviceId, RoleValue rv) {
+
+ //if this is an queue it'd be neater.
+ NodeId backup = null;
+ for (NodeId n : rv.nodesOfRole(STANDBY)) {
+ if (!current.equals(n)) {
+ backup = n;
+ break;
+ }
+ }
+
+ if (backup == null) {
+ log.info("{} giving up and going to NONE for {}", current, deviceId);
+ rv.remove(MASTER, current);
+ roleMap.put(deviceId, rv);
+ return null;
+ } else {
+ log.info("{} trying to pass mastership for {} to {}", current, deviceId, backup);
+ rv.replace(current, backup, MASTER);
+ rv.reassign(backup, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ Integer term = terms.get(deviceId);
+ terms.put(deviceId, ++term);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ }
+ }
+
+ //return the RoleValue structure for a device, or create one
+ private RoleValue getRoleValue(DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value == null) {
+ value = new RoleValue();
+ RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
+ if (concurrentlyAdded != null) {
+ return concurrentlyAdded;
+ }
+ }
+ return value;
+ }
+
+ //get first applicable node out of store-unique structure.
+ private NodeId getNode(MastershipRole role, DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value != null) {
+ return value.get(role);
+ }
+ return null;
+ }
+
+ //adds or updates term information.
+ private void updateTerm(DeviceId deviceId) {
+ terms.lock(deviceId);
+ try {
+ Integer term = terms.get(deviceId);
+ if (term == null) {
+ terms.put(deviceId, INIT);
+ } else {
+ terms.put(deviceId, ++term);
+ }
+ } finally {
+ terms.unlock(deviceId);
+ }
+ }
+
+ private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> {
+
+ @Override
+ public void entryAdded(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) {
+
+ notifyDelegate(new MastershipEvent(
+ MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
+ }
+
+ @Override
+ public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void mapEvicted(MapEvent event) {
+ }
+
+ @Override
+ public void mapCleared(MapEvent event) {
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
new file mode 100644
index 0000000..7447161
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
@@ -0,0 +1,122 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.net.MastershipRole;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+
+/**
+ * A structure that holds node mastership roles associated with a
+ * {@link DeviceId}. This structure needs to be locked through IMap.
+ */
+final class RoleValue {
+
+ protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
+
+ public RoleValue() {
+ value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
+ value.put(MastershipRole.STANDBY, new LinkedList<NodeId>());
+ value.put(MastershipRole.NONE, new LinkedList<NodeId>());
+ }
+
+ // exposing internals for serialization purpose only
+ Map<MastershipRole, List<NodeId>> value() {
+ return Collections.unmodifiableMap(value);
+ }
+
+ public List<NodeId> nodesOfRole(MastershipRole type) {
+ return value.get(type);
+ }
+
+ public NodeId get(MastershipRole type) {
+ return value.get(type).isEmpty() ? null : value.get(type).get(0);
+ }
+
+ public boolean contains(MastershipRole type, NodeId nodeId) {
+ return value.get(type).contains(nodeId);
+ }
+
+ /**
+ * Associates a node to a certain role.
+ *
+ * @param type the role
+ * @param nodeId the node ID of the node to associate
+ */
+ public void add(MastershipRole type, NodeId nodeId) {
+ List<NodeId> nodes = value.get(type);
+
+ if (!nodes.contains(nodeId)) {
+ nodes.add(nodeId);
+ }
+ }
+
+ /**
+ * Removes a node from a certain role.
+ *
+ * @param type the role
+ * @param nodeId the ID of the node to remove
+ * @return
+ */
+ public boolean remove(MastershipRole type, NodeId nodeId) {
+ List<NodeId> nodes = value.get(type);
+ if (!nodes.isEmpty()) {
+ return nodes.remove(nodeId);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Reassigns a node from one role to another. If the node was not of the
+ * old role, it will still be assigned the new role.
+ *
+ * @param nodeId the Node ID of node changing roles
+ * @param from the old role
+ * @param to the new role
+ */
+ public void reassign(NodeId nodeId, MastershipRole from, MastershipRole to) {
+ remove(from, nodeId);
+ add(to, nodeId);
+ }
+
+ /**
+ * Replaces a node in one role with another node. Even if there is no node to
+ * replace, the new node is associated to the role.
+ *
+ * @param from the old NodeId to replace
+ * @param to the new NodeId
+ * @param type the role associated with the old NodeId
+ */
+ public void replace(NodeId from, NodeId to, MastershipRole type) {
+ remove(type, from);
+ add(type, to);
+ }
+
+ /**
+ * Summarizes this RoleValue as a RoleInfo. Note that master and/or backups
+ * may be empty, so the values should be checked for safety.
+ *
+ * @return the RoleInfo.
+ */
+ public RoleInfo roleInfo() {
+ return new RoleInfo(
+ get(MastershipRole.MASTER), nodesOfRole(MastershipRole.STANDBY));
+ }
+
+ @Override
+ public String toString() {
+ ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+ for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
+ helper.add(el.getKey().toString(), el.getValue());
+ }
+ return helper.toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
new file mode 100644
index 0000000..4450e5b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
@@ -0,0 +1,52 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.MastershipRole;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Serializer for RoleValues used by {@link DistributedMastershipStore}.
+ */
+public class RoleValueSerializer extends Serializer<RoleValue> {
+
+ //RoleValues are assumed to hold a Map of MastershipRoles (an enum)
+ //to a List of NodeIds.
+
+ @Override
+ public RoleValue read(Kryo kryo, Input input, Class<RoleValue> type) {
+ RoleValue rv = new RoleValue();
+ int size = input.readInt();
+ for (int i = 0; i < size; i++) {
+ MastershipRole role = MastershipRole.values()[input.readInt()];
+ int s = input.readInt();
+ for (int j = 0; j < s; j++) {
+ rv.add(role, new NodeId(input.readString()));
+ }
+ }
+ return rv;
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, RoleValue type) {
+ final Map<MastershipRole, List<NodeId>> map = type.value();
+ output.writeInt(map.size());
+
+ for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
+ output.writeInt(el.getKey().ordinal());
+
+ List<NodeId> nodes = el.getValue();
+ output.writeInt(nodes.size());
+ for (NodeId n : nodes) {
+ output.writeString(n.toString());
+ }
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
new file mode 100644
index 0000000..308c9ef
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of a distributed mastership store using Hazelcast.
+ */
+package org.onlab.onos.store.mastership.impl;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/hz/TestStoreManager.java b/core/store/dist/src/test/java/org/onlab/onos/store/hz/TestStoreManager.java
new file mode 100644
index 0000000..0740e4e
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/hz/TestStoreManager.java
@@ -0,0 +1,53 @@
+package org.onlab.onos.store.hz;
+
+import java.io.FileNotFoundException;
+import java.util.UUID;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.core.HazelcastInstance;
+
+/**
+ * Dummy StoreManager to use specified Hazelcast instance.
+ */
+public class TestStoreManager extends StoreManager {
+
+ /**
+ * Gets the Hazelcast Config for testing.
+ *
+ * @return
+ */
+ public static Config getTestConfig() {
+ Config config;
+ try {
+ config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
+ } catch (FileNotFoundException e) {
+ // falling back to default
+ config = new Config();
+ }
+ // avoid accidentally joining other cluster
+ config.getGroupConfig().setName(UUID.randomUUID().toString());
+ // quickly form single node cluster
+ config.getNetworkConfig().getJoin()
+ .getTcpIpConfig()
+ .setEnabled(true).setConnectionTimeoutSeconds(0);
+ config.getNetworkConfig().getJoin()
+ .getMulticastConfig()
+ .setEnabled(false);
+ return config;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param instance Hazelast instance to return on #getHazelcastInstance()
+ */
+ public TestStoreManager(HazelcastInstance instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public void activate() {
+ // Hazelcast setup removed from original code.
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java
new file mode 100644
index 0000000..4ecb8db
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java
@@ -0,0 +1,325 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.mastership.MastershipEvent.Type;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.hz.StoreManager;
+import org.onlab.onos.store.hz.StoreService;
+import org.onlab.onos.store.hz.TestStoreManager;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Sets;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+
+/**
+ * Test of the Hazelcast-based distributed MastershipStore implementation.
+ */
+public class DistributedMastershipStoreTest {
+
+ private static final DeviceId DID1 = DeviceId.deviceId("of:01");
+ private static final DeviceId DID2 = DeviceId.deviceId("of:02");
+ private static final DeviceId DID3 = DeviceId.deviceId("of:03");
+
+ private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+ private static final NodeId N1 = new NodeId("node1");
+ private static final NodeId N2 = new NodeId("node2");
+
+ private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
+ private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
+
+ private DistributedMastershipStore dms;
+ private TestDistributedMastershipStore testStore;
+ private KryoSerializer serializationMgr;
+ private StoreManager storeMgr;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // TODO should find a way to clean Hazelcast instance without shutdown.
+ Config config = TestStoreManager.getTestConfig();
+
+ storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
+ storeMgr.activate();
+
+ serializationMgr = new KryoSerializer();
+
+ dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
+ dms.clusterService = new TestClusterService();
+ dms.activate();
+
+ testStore = (TestDistributedMastershipStore) dms;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dms.deactivate();
+
+ storeMgr.deactivate();
+ }
+
+ @Test
+ public void getRole() {
+ assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
+ testStore.put(DID1, N1, true, false, true);
+ assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
+ testStore.put(DID1, N2, false, true, false);
+ assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
+ }
+
+ @Test
+ public void getMaster() {
+ assertTrue("wrong store state:", dms.roleMap.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ assertEquals("wrong master:", N1, dms.getMaster(DID1));
+ assertNull("wrong master:", dms.getMaster(DID2));
+ }
+
+ @Test
+ public void getDevices() {
+ assertTrue("wrong store state:", dms.roleMap.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ testStore.put(DID2, N1, true, false, false);
+ testStore.put(DID3, N2, true, false, false);
+ assertEquals("wrong devices",
+ Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
+ }
+
+ @Test
+ public void requestRoleAndTerm() {
+ //CN1 is "local"
+ testStore.setCurrent(CN1);
+
+ //if already MASTER, nothing should happen
+ testStore.put(DID2, N1, true, false, false);
+ assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+
+ //populate maps with DID1, N1 thru NONE case
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertTrue("wrong state for store:", !dms.terms.isEmpty());
+ assertEquals("wrong term",
+ MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
+
+ //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
+ testStore.setCurrent(CN2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong number of entries:", 2, dms.terms.size());
+
+ //change term and requestRole() again; should persist
+ testStore.increment(DID2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void setMaster() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertNull("wrong event:", dms.setMaster(N1, DID1));
+
+ //switch over to N2
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
+ System.out.println(dms.getTermFor(DID1).master() + ":" + dms.getTermFor(DID1).termNumber());
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
+
+ //orphan switch - should be rare case
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
+ assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
+ //disconnect and reconnect - sign of failing re-election or single-instance channel
+ dms.roleMap.clear();
+ dms.setMaster(N2, DID2);
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void relinquishRole() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ //no backup, no new MASTER/event
+ assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+
+ dms.requestRole(DID1);
+
+ //add backup CN2, get it elected MASTER by relinquishing
+ testStore.setCurrent(CN2);
+ assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
+ assertEquals("wrong master", N2, dms.getMaster(DID1));
+
+ //all nodes "give up" on device, which goes back to NONE.
+ assertNull("wrong event:", dms.relinquishRole(N2, DID1));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
+
+ assertEquals("wrong number of retired nodes", 2,
+ dms.roleMap.get(DID1).nodesOfRole(NONE).size());
+
+ //bring nodes back
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong number of backup nodes", 1,
+ dms.roleMap.get(DID1).nodesOfRole(STANDBY).size());
+
+ //If STANDBY, should drop to NONE
+ assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
+
+ //NONE - nothing happens
+ assertNull("wrong event:", dms.relinquishRole(N1, DID2));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
+
+ }
+
+ @Ignore("Ignore until Delegate spec. is clear.")
+ @Test
+ public void testEvents() throws InterruptedException {
+ //shamelessly copy other distributed store tests
+ final CountDownLatch addLatch = new CountDownLatch(1);
+
+ MastershipStoreDelegate checkAdd = new MastershipStoreDelegate() {
+ @Override
+ public void notify(MastershipEvent event) {
+ assertEquals("wrong event:", Type.MASTER_CHANGED, event.type());
+ assertEquals("wrong subject", DID1, event.subject());
+ assertEquals("wrong subject", N1, event.roleInfo().master());
+ addLatch.countDown();
+ }
+ };
+
+ dms.setDelegate(checkAdd);
+ dms.setMaster(N1, DID1);
+ //this will fail until we do something about single-instance-ness
+ assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
+ }
+
+ private class TestDistributedMastershipStore extends
+ DistributedMastershipStore {
+ public TestDistributedMastershipStore(StoreService storeService,
+ KryoSerializer kryoSerialization) {
+ this.storeService = storeService;
+ this.serializer = kryoSerialization;
+ }
+
+ //helper to populate master/backup structures
+ public void put(DeviceId dev, NodeId node,
+ boolean master, boolean backup, boolean term) {
+ RoleValue rv = dms.roleMap.get(dev);
+ if (rv == null) {
+ rv = new RoleValue();
+ }
+
+ if (master) {
+ rv.add(MASTER, node);
+ rv.reassign(node, STANDBY, NONE);
+ }
+ if (backup) {
+ rv.add(STANDBY, node);
+ rv.remove(MASTER, node);
+ rv.remove(NONE, node);
+ }
+ if (term) {
+ dms.terms.put(dev, 0);
+ }
+ dms.roleMap.put(dev, rv);
+ }
+
+ //a dumb utility function.
+ public void dump() {
+ for (Map.Entry<DeviceId, RoleValue> el : dms.roleMap.entrySet()) {
+ System.out.println("DID: " + el.getKey());
+ for (MastershipRole role : MastershipRole.values()) {
+ System.out.println("\t" + role.toString() + ":");
+ for (NodeId n : el.getValue().nodesOfRole(role)) {
+ System.out.println("\t\t" + n);
+ }
+ }
+ }
+ }
+
+ //increment term for a device
+ public void increment(DeviceId dev) {
+ Integer t = dms.terms.get(dev);
+ if (t != null) {
+ dms.terms.put(dev, ++t);
+ }
+ }
+
+ //sets the "local" node
+ public void setCurrent(ControllerNode node) {
+ ((TestClusterService) clusterService).current = node;
+ }
+ }
+
+ private class TestClusterService implements ClusterService {
+
+ protected ControllerNode current;
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return current;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet(CN1, CN2);
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ }
+
+ }
+
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java
new file mode 100644
index 0000000..93741b7
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import org.junit.Test;
+import org.onlab.onos.cluster.NodeId;
+
+import com.google.common.collect.Sets;
+
+public class RoleValueTest {
+
+ private static final RoleValue RV = new RoleValue();
+
+ private static final NodeId NID1 = new NodeId("node1");
+ private static final NodeId NID2 = new NodeId("node2");
+ private static final NodeId NID3 = new NodeId("node3");
+
+ @Test
+ public void add() {
+ assertEquals("faulty initialization: ", 3, RV.value.size());
+ RV.add(MASTER, NID1);
+ RV.add(STANDBY, NID2);
+ RV.add(STANDBY, NID3);
+
+ assertEquals("wrong nodeID: ", NID1, RV.get(MASTER));
+ assertTrue("wrong nodeIDs: ",
+ Sets.newHashSet(NID3, NID2).containsAll(RV.nodesOfRole(STANDBY)));
+ }
+}