moved Hazelcast based Mastership+Cluster store to onos-code-dist bundle
Change-Id: I304f916f3a400eaf050a5351825634349790e1bf
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..ecc8982
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -0,0 +1,372 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStore;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.MapEvent;
+
+import static org.onlab.onos.net.MastershipRole.*;
+
+/**
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore
+extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
+implements MastershipStore {
+
+ //initial term/TTL value
+ private static final Integer INIT = 0;
+
+ //device to node roles
+ protected SMap<DeviceId, RoleValue> roleMap;
+ //devices to terms
+ protected SMap<DeviceId, Integer> terms;
+ //last-known cluster size, used for tie-breaking when partitioning occurs
+ protected IAtomicLong clusterSize;
+
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Override
+ @Activate
+ public void activate() {
+ super.activate();
+
+ this.serializer = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+
+ .register(RoleValue.class, new RoleValueSerializer())
+ .build()
+ .populate(1);
+ }
+ };
+
+ roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
+ roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
+ terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
+ clusterSize = theInstance.getAtomicLong("clustersize");
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+ final RoleValue roleInfo = getRoleValue(deviceId);
+ if (roleInfo.contains(MASTER, nodeId)) {
+ return MASTER;
+ }
+ if (roleInfo.contains(STANDBY, nodeId)) {
+ return STANDBY;
+ }
+ return NONE;
+ }
+
+ @Override
+ public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+
+ MastershipRole role = getRole(nodeId, deviceId);
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ switch (role) {
+ case MASTER:
+ //reinforce mastership
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ return null;
+ case STANDBY:
+ case NONE:
+ NodeId current = rv.get(MASTER);
+ if (current != null) {
+ //backup and replace current master
+ rv.reassign(current, NONE, STANDBY);
+ rv.replace(current, nodeId, MASTER);
+ } else {
+ //no master before so just add.
+ rv.add(MASTER, nodeId);
+ }
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ updateTerm(deviceId);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
+ }
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public NodeId getMaster(DeviceId deviceId) {
+ return getNode(MASTER, deviceId);
+ }
+
+
+ @Override
+ public RoleInfo getNodes(DeviceId deviceId) {
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ return rv.roleInfo();
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public Set<DeviceId> getDevices(NodeId nodeId) {
+ ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+
+ for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
+ if (nodeId.equals(el.getValue().get(MASTER))) {
+ builder.add(el.getKey());
+ }
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public MastershipRole requestRole(DeviceId deviceId) {
+ NodeId local = clusterService.getLocalNode().id();
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(local, deviceId);
+ switch (role) {
+ case MASTER:
+ rv.reassign(local, STANDBY, NONE);
+ terms.putIfAbsent(deviceId, INIT);
+ roleMap.put(deviceId, rv);
+ break;
+ case STANDBY:
+ rv.reassign(local, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
+ terms.putIfAbsent(deviceId, INIT);
+ break;
+ case NONE:
+ //either we're the first standby, or first to device.
+ //for latter, claim mastership.
+ if (rv.get(MASTER) == null) {
+ rv.add(MASTER, local);
+ rv.reassign(local, STANDBY, NONE);
+ updateTerm(deviceId);
+ role = MastershipRole.MASTER;
+ } else {
+ rv.add(STANDBY, local);
+ rv.reassign(local, NONE, STANDBY);
+ role = MastershipRole.STANDBY;
+ }
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public MastershipTerm getTermFor(DeviceId deviceId) {
+ RoleValue rv = getRoleValue(deviceId);
+ if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) {
+ return null;
+ }
+ return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId));
+ }
+
+ @Override
+ public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ MastershipEvent event = null;
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId, rv);
+ //fall through to reinforce role
+ case STANDBY:
+ //fall through to reinforce role
+ case NONE:
+ rv.reassign(nodeId, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ MastershipEvent event = null;
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId, rv);
+ //fall through to reinforce relinquishment
+ case STANDBY:
+ //fall through to reinforce relinquishment
+ case NONE:
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ //helper to fetch a new master candidate for a given device.
+ private MastershipEvent reelect(
+ NodeId current, DeviceId deviceId, RoleValue rv) {
+
+ //if this is an queue it'd be neater.
+ NodeId backup = null;
+ for (NodeId n : rv.nodesOfRole(STANDBY)) {
+ if (!current.equals(n)) {
+ backup = n;
+ break;
+ }
+ }
+
+ if (backup == null) {
+ log.info("{} giving up and going to NONE for {}", current, deviceId);
+ rv.remove(MASTER, current);
+ roleMap.put(deviceId, rv);
+ return null;
+ } else {
+ log.info("{} trying to pass mastership for {} to {}", current, deviceId, backup);
+ rv.replace(current, backup, MASTER);
+ rv.reassign(backup, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ Integer term = terms.get(deviceId);
+ terms.put(deviceId, ++term);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ }
+ }
+
+ //return the RoleValue structure for a device, or create one
+ private RoleValue getRoleValue(DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value == null) {
+ value = new RoleValue();
+ RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
+ if (concurrentlyAdded != null) {
+ return concurrentlyAdded;
+ }
+ }
+ return value;
+ }
+
+ //get first applicable node out of store-unique structure.
+ private NodeId getNode(MastershipRole role, DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value != null) {
+ return value.get(role);
+ }
+ return null;
+ }
+
+ //adds or updates term information.
+ private void updateTerm(DeviceId deviceId) {
+ terms.lock(deviceId);
+ try {
+ Integer term = terms.get(deviceId);
+ if (term == null) {
+ terms.put(deviceId, INIT);
+ } else {
+ terms.put(deviceId, ++term);
+ }
+ } finally {
+ terms.unlock(deviceId);
+ }
+ }
+
+ private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> {
+
+ @Override
+ public void entryAdded(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) {
+
+ notifyDelegate(new MastershipEvent(
+ MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
+ }
+
+ @Override
+ public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void mapEvicted(MapEvent event) {
+ }
+
+ @Override
+ public void mapCleared(MapEvent event) {
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
new file mode 100644
index 0000000..7447161
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
@@ -0,0 +1,122 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.net.MastershipRole;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+
+/**
+ * A structure that holds node mastership roles associated with a
+ * {@link DeviceId}. This structure needs to be locked through IMap.
+ */
+final class RoleValue {
+
+ protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
+
+ public RoleValue() {
+ value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
+ value.put(MastershipRole.STANDBY, new LinkedList<NodeId>());
+ value.put(MastershipRole.NONE, new LinkedList<NodeId>());
+ }
+
+ // exposing internals for serialization purpose only
+ Map<MastershipRole, List<NodeId>> value() {
+ return Collections.unmodifiableMap(value);
+ }
+
+ public List<NodeId> nodesOfRole(MastershipRole type) {
+ return value.get(type);
+ }
+
+ public NodeId get(MastershipRole type) {
+ return value.get(type).isEmpty() ? null : value.get(type).get(0);
+ }
+
+ public boolean contains(MastershipRole type, NodeId nodeId) {
+ return value.get(type).contains(nodeId);
+ }
+
+ /**
+ * Associates a node to a certain role.
+ *
+ * @param type the role
+ * @param nodeId the node ID of the node to associate
+ */
+ public void add(MastershipRole type, NodeId nodeId) {
+ List<NodeId> nodes = value.get(type);
+
+ if (!nodes.contains(nodeId)) {
+ nodes.add(nodeId);
+ }
+ }
+
+ /**
+ * Removes a node from a certain role.
+ *
+ * @param type the role
+ * @param nodeId the ID of the node to remove
+ * @return
+ */
+ public boolean remove(MastershipRole type, NodeId nodeId) {
+ List<NodeId> nodes = value.get(type);
+ if (!nodes.isEmpty()) {
+ return nodes.remove(nodeId);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Reassigns a node from one role to another. If the node was not of the
+ * old role, it will still be assigned the new role.
+ *
+ * @param nodeId the Node ID of node changing roles
+ * @param from the old role
+ * @param to the new role
+ */
+ public void reassign(NodeId nodeId, MastershipRole from, MastershipRole to) {
+ remove(from, nodeId);
+ add(to, nodeId);
+ }
+
+ /**
+ * Replaces a node in one role with another node. Even if there is no node to
+ * replace, the new node is associated to the role.
+ *
+ * @param from the old NodeId to replace
+ * @param to the new NodeId
+ * @param type the role associated with the old NodeId
+ */
+ public void replace(NodeId from, NodeId to, MastershipRole type) {
+ remove(type, from);
+ add(type, to);
+ }
+
+ /**
+ * Summarizes this RoleValue as a RoleInfo. Note that master and/or backups
+ * may be empty, so the values should be checked for safety.
+ *
+ * @return the RoleInfo.
+ */
+ public RoleInfo roleInfo() {
+ return new RoleInfo(
+ get(MastershipRole.MASTER), nodesOfRole(MastershipRole.STANDBY));
+ }
+
+ @Override
+ public String toString() {
+ ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+ for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
+ helper.add(el.getKey().toString(), el.getValue());
+ }
+ return helper.toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
new file mode 100644
index 0000000..4450e5b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
@@ -0,0 +1,52 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.MastershipRole;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Serializer for RoleValues used by {@link DistributedMastershipStore}.
+ */
+public class RoleValueSerializer extends Serializer<RoleValue> {
+
+ //RoleValues are assumed to hold a Map of MastershipRoles (an enum)
+ //to a List of NodeIds.
+
+ @Override
+ public RoleValue read(Kryo kryo, Input input, Class<RoleValue> type) {
+ RoleValue rv = new RoleValue();
+ int size = input.readInt();
+ for (int i = 0; i < size; i++) {
+ MastershipRole role = MastershipRole.values()[input.readInt()];
+ int s = input.readInt();
+ for (int j = 0; j < s; j++) {
+ rv.add(role, new NodeId(input.readString()));
+ }
+ }
+ return rv;
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, RoleValue type) {
+ final Map<MastershipRole, List<NodeId>> map = type.value();
+ output.writeInt(map.size());
+
+ for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
+ output.writeInt(el.getKey().ordinal());
+
+ List<NodeId> nodes = el.getValue();
+ output.writeInt(nodes.size());
+ for (NodeId n : nodes) {
+ output.writeString(n.toString());
+ }
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
new file mode 100644
index 0000000..308c9ef
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of a distributed mastership store using Hazelcast.
+ */
+package org.onlab.onos.store.mastership.impl;