moved Hazelcast based Mastership+Cluster store to onos-code-dist bundle

Change-Id: I304f916f3a400eaf050a5351825634349790e1bf
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..ecc8982
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -0,0 +1,372 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStore;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.MapEvent;
+
+import static org.onlab.onos.net.MastershipRole.*;
+
+/**
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore
+extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
+implements MastershipStore {
+
+    //initial term/TTL value
+    private static final Integer INIT = 0;
+
+    //device to node roles
+    protected SMap<DeviceId, RoleValue> roleMap;
+    //devices to terms
+    protected SMap<DeviceId, Integer> terms;
+    //last-known cluster size, used for tie-breaking when partitioning occurs
+    protected IAtomicLong clusterSize;
+
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Override
+    @Activate
+    public void activate() {
+        super.activate();
+
+        this.serializer = new KryoSerializer() {
+            @Override
+            protected void setupKryoPool() {
+                serializerPool = KryoNamespace.newBuilder()
+                        .register(KryoNamespaces.API)
+
+                        .register(RoleValue.class, new RoleValueSerializer())
+                        .build()
+                        .populate(1);
+            }
+        };
+
+        roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
+        roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
+        terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
+        clusterSize = theInstance.getAtomicLong("clustersize");
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+        final RoleValue roleInfo = getRoleValue(deviceId);
+        if (roleInfo.contains(MASTER, nodeId)) {
+            return MASTER;
+        }
+        if (roleInfo.contains(STANDBY, nodeId)) {
+            return STANDBY;
+        }
+        return NONE;
+    }
+
+    @Override
+    public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+
+        MastershipRole role = getRole(nodeId, deviceId);
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            switch (role) {
+                case MASTER:
+                    //reinforce mastership
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
+                    return null;
+                case STANDBY:
+                case NONE:
+                    NodeId current = rv.get(MASTER);
+                    if (current != null) {
+                        //backup and replace current master
+                        rv.reassign(current, NONE, STANDBY);
+                        rv.replace(current, nodeId, MASTER);
+                    } else {
+                        //no master before so just add.
+                        rv.add(MASTER, nodeId);
+                    }
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
+                    updateTerm(deviceId);
+                    return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+                    return null;
+            }
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public NodeId getMaster(DeviceId deviceId) {
+        return getNode(MASTER, deviceId);
+    }
+
+
+    @Override
+    public RoleInfo getNodes(DeviceId deviceId) {
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            return rv.roleInfo();
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public Set<DeviceId> getDevices(NodeId nodeId) {
+        ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+
+        for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
+            if (nodeId.equals(el.getValue().get(MASTER))) {
+                builder.add(el.getKey());
+            }
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public MastershipRole requestRole(DeviceId deviceId) {
+        NodeId local = clusterService.getLocalNode().id();
+
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            MastershipRole role = getRole(local, deviceId);
+            switch (role) {
+                case MASTER:
+                    rv.reassign(local, STANDBY, NONE);
+                    terms.putIfAbsent(deviceId, INIT);
+                    roleMap.put(deviceId, rv);
+                    break;
+                case STANDBY:
+                    rv.reassign(local, NONE, STANDBY);
+                    roleMap.put(deviceId, rv);
+                    terms.putIfAbsent(deviceId, INIT);
+                    break;
+                case NONE:
+                    //either we're the first standby, or first to device.
+                    //for latter, claim mastership.
+                    if (rv.get(MASTER) == null) {
+                        rv.add(MASTER, local);
+                        rv.reassign(local, STANDBY, NONE);
+                        updateTerm(deviceId);
+                        role = MastershipRole.MASTER;
+                    } else {
+                        rv.add(STANDBY, local);
+                        rv.reassign(local, NONE, STANDBY);
+                        role = MastershipRole.STANDBY;
+                    }
+                    roleMap.put(deviceId, rv);
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return role;
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public MastershipTerm getTermFor(DeviceId deviceId) {
+        RoleValue rv = getRoleValue(deviceId);
+        if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) {
+            return null;
+        }
+        return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId));
+    }
+
+    @Override
+    public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+        MastershipEvent event = null;
+
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    event = reelect(nodeId, deviceId, rv);
+                    //fall through to reinforce role
+                case STANDBY:
+                    //fall through to reinforce role
+                case NONE:
+                    rv.reassign(nodeId, NONE, STANDBY);
+                    roleMap.put(deviceId, rv);
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return event;
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    @Override
+    public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+        MastershipEvent event = null;
+
+        roleMap.lock(deviceId);
+        try {
+            RoleValue rv = getRoleValue(deviceId);
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    event = reelect(nodeId, deviceId, rv);
+                    //fall through to reinforce relinquishment
+                case STANDBY:
+                    //fall through to reinforce relinquishment
+                case NONE:
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
+                    break;
+                default:
+                log.warn("unknown Mastership Role {}", role);
+            }
+            return event;
+        } finally {
+            roleMap.unlock(deviceId);
+        }
+    }
+
+    //helper to fetch a new master candidate for a given device.
+    private MastershipEvent reelect(
+            NodeId current, DeviceId deviceId, RoleValue rv) {
+
+        //if this is an queue it'd be neater.
+        NodeId backup = null;
+        for (NodeId n : rv.nodesOfRole(STANDBY)) {
+            if (!current.equals(n)) {
+                backup = n;
+                break;
+            }
+        }
+
+        if (backup == null) {
+            log.info("{} giving up and going to NONE for {}", current, deviceId);
+            rv.remove(MASTER, current);
+            roleMap.put(deviceId, rv);
+            return null;
+        } else {
+            log.info("{} trying to pass mastership for {} to {}", current, deviceId, backup);
+            rv.replace(current, backup, MASTER);
+            rv.reassign(backup, STANDBY, NONE);
+            roleMap.put(deviceId, rv);
+            Integer term = terms.get(deviceId);
+            terms.put(deviceId, ++term);
+            return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+        }
+    }
+
+    //return the RoleValue structure for a device, or create one
+    private RoleValue getRoleValue(DeviceId deviceId) {
+        RoleValue value = roleMap.get(deviceId);
+        if (value == null) {
+            value = new RoleValue();
+            RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
+            if (concurrentlyAdded != null) {
+                return concurrentlyAdded;
+            }
+        }
+        return value;
+    }
+
+    //get first applicable node out of store-unique structure.
+    private NodeId getNode(MastershipRole role, DeviceId deviceId) {
+        RoleValue value = roleMap.get(deviceId);
+        if (value != null) {
+            return value.get(role);
+        }
+        return null;
+    }
+
+    //adds or updates term information.
+    private void updateTerm(DeviceId deviceId) {
+        terms.lock(deviceId);
+        try {
+            Integer term = terms.get(deviceId);
+            if (term == null) {
+                terms.put(deviceId, INIT);
+            } else {
+                terms.put(deviceId, ++term);
+            }
+        } finally {
+            terms.unlock(deviceId);
+        }
+    }
+
+    private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> {
+
+        @Override
+        public void entryAdded(EntryEvent<DeviceId, RoleValue> event) {
+        }
+
+        @Override
+        public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) {
+        }
+
+        @Override
+        public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) {
+
+            notifyDelegate(new MastershipEvent(
+                    MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
+        }
+
+        @Override
+        public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) {
+        }
+
+        @Override
+        public void mapEvicted(MapEvent event) {
+        }
+
+        @Override
+        public void mapCleared(MapEvent event) {
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
new file mode 100644
index 0000000..7447161
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
@@ -0,0 +1,122 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.net.MastershipRole;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+
+/**
+ * A structure that holds node mastership roles associated with a
+ * {@link DeviceId}. This structure needs to be locked through IMap.
+ */
+final class RoleValue {
+
+    protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
+
+    public RoleValue() {
+        value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
+        value.put(MastershipRole.STANDBY, new LinkedList<NodeId>());
+        value.put(MastershipRole.NONE, new LinkedList<NodeId>());
+    }
+
+    // exposing internals for serialization purpose only
+    Map<MastershipRole, List<NodeId>> value() {
+        return Collections.unmodifiableMap(value);
+    }
+
+    public List<NodeId> nodesOfRole(MastershipRole type) {
+        return value.get(type);
+    }
+
+    public NodeId get(MastershipRole type) {
+        return value.get(type).isEmpty() ? null : value.get(type).get(0);
+    }
+
+    public boolean contains(MastershipRole type, NodeId nodeId) {
+        return value.get(type).contains(nodeId);
+    }
+
+    /**
+     * Associates a node to a certain role.
+     *
+     * @param type the role
+     * @param nodeId the node ID of the node to associate
+     */
+    public void add(MastershipRole type, NodeId nodeId) {
+        List<NodeId> nodes = value.get(type);
+
+        if (!nodes.contains(nodeId)) {
+            nodes.add(nodeId);
+        }
+    }
+
+    /**
+     * Removes a node from a certain role.
+     *
+     * @param type the role
+     * @param nodeId the ID of the node to remove
+     * @return
+     */
+    public boolean remove(MastershipRole type, NodeId nodeId) {
+        List<NodeId> nodes = value.get(type);
+        if (!nodes.isEmpty()) {
+            return nodes.remove(nodeId);
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Reassigns a node from one role to another. If the node was not of the
+     * old role, it will still be assigned the new role.
+     *
+     * @param nodeId the Node ID of node changing roles
+     * @param from the old role
+     * @param to the new role
+     */
+    public void reassign(NodeId nodeId, MastershipRole from, MastershipRole to) {
+        remove(from, nodeId);
+        add(to, nodeId);
+    }
+
+    /**
+     * Replaces a node in one role with another node. Even if there is no node to
+     * replace, the new node is associated to the role.
+     *
+     * @param from the old NodeId to replace
+     * @param to the new NodeId
+     * @param type the role associated with the old NodeId
+     */
+    public void replace(NodeId from, NodeId to, MastershipRole type) {
+        remove(type, from);
+        add(type, to);
+    }
+
+    /**
+     * Summarizes this RoleValue as a RoleInfo. Note that master and/or backups
+     * may be empty, so the values should be checked for safety.
+     *
+     * @return the RoleInfo.
+     */
+    public RoleInfo roleInfo() {
+        return new RoleInfo(
+                get(MastershipRole.MASTER), nodesOfRole(MastershipRole.STANDBY));
+    }
+
+    @Override
+    public String toString() {
+        ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+        for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
+            helper.add(el.getKey().toString(), el.getValue());
+        }
+        return helper.toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
new file mode 100644
index 0000000..4450e5b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
@@ -0,0 +1,52 @@
+package org.onlab.onos.store.mastership.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.MastershipRole;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Serializer for RoleValues used by {@link DistributedMastershipStore}.
+ */
+public class RoleValueSerializer extends Serializer<RoleValue> {
+
+    //RoleValues are assumed to hold a Map of MastershipRoles (an enum)
+    //to a List of NodeIds.
+
+    @Override
+    public RoleValue read(Kryo kryo, Input input, Class<RoleValue> type) {
+        RoleValue rv = new RoleValue();
+        int size = input.readInt();
+        for (int i = 0; i < size; i++) {
+            MastershipRole role = MastershipRole.values()[input.readInt()];
+            int s = input.readInt();
+            for (int j = 0; j < s; j++) {
+                rv.add(role, new NodeId(input.readString()));
+            }
+        }
+        return rv;
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, RoleValue type) {
+        final Map<MastershipRole, List<NodeId>> map = type.value();
+        output.writeInt(map.size());
+
+        for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
+            output.writeInt(el.getKey().ordinal());
+
+            List<NodeId> nodes = el.getValue();
+            output.writeInt(nodes.size());
+            for (NodeId n : nodes) {
+                output.writeString(n.toString());
+            }
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
new file mode 100644
index 0000000..308c9ef
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of a distributed mastership store using Hazelcast.
+ */
+package org.onlab.onos.store.mastership.impl;