fixes for RoleValue serialization

Change-Id: Ie51d0e16a0623061790523920f6a22aa18e74517
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index bc32375..d0eae2d 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -24,11 +24,12 @@
 import org.onlab.onos.net.MastershipRole;
 import org.onlab.onos.store.common.AbstractHazelcastStore;
 import org.onlab.onos.store.common.SMap;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
 import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 
 import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.IAtomicLong;
 
 import static org.onlab.onos.net.MastershipRole.*;
 
@@ -49,6 +50,9 @@
     protected SMap<DeviceId, RoleValue> roleMap;
     //devices to terms
     protected SMap<DeviceId, Integer> terms;
+    //last-known cluster size, used for tie-breaking when partitioning occurs
+    protected IAtomicLong clusterSize;
+
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -59,8 +63,21 @@
     public void activate() {
         super.activate();
 
-        roleMap = new SMap(theInstance.getMap("nodeRoles"), new KryoSerializer());
-        terms = new SMap(theInstance.getMap("terms"), new KryoSerializer());
+        this.serializer = new KryoSerializer() {
+            @Override
+            protected void setupKryoPool() {
+                serializerPool = KryoPool.newBuilder()
+                        .register(KryoPoolUtil.API)
+
+                        .register(RoleValue.class, new RoleValueSerializer())
+                        .build()
+                        .populate(1);
+            }
+        };
+
+        roleMap = new SMap(theInstance.getMap("nodeRoles"), this.serializer);
+        terms = new SMap(theInstance.getMap("terms"), this.serializer);
+        clusterSize = theInstance.getAtomicLong("clustersize");
        // roleMap.addEntryListener(new RemoteMasterShipEventHandler(), true);
 
         log.info("Started");
@@ -103,6 +120,7 @@
                 case MASTER:
                     //reinforce mastership
                     rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
                     return null;
                 case STANDBY:
                     NodeId current = rv.get(MASTER);
@@ -115,11 +133,13 @@
                         rv.add(MASTER, nodeId);
                     }
                     rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
                     updateTerm(deviceId);
                     return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
                 case NONE:
                     rv.add(MASTER, nodeId);
                     rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
                     updateTerm(deviceId);
                     return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
                 default:
@@ -133,7 +153,7 @@
 
     @Override
     public NodeId getMaster(DeviceId deviceId) {
-        return getMaster(deviceId);
+        return getNode(MASTER, deviceId);
     }
 
 
@@ -181,15 +201,18 @@
             switch (role) {
                 case MASTER:
                     rv.reassign(local, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
                     break;
                 case STANDBY:
                     rv.reassign(local, NONE, STANDBY);
+                    roleMap.put(deviceId, rv);
                     terms.putIfAbsent(deviceId, INIT);
                     break;
                 case NONE:
                     //claim mastership
                     rv.add(MASTER, local);
                     rv.reassign(local, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
                     updateTerm(deviceId);
                     role = MastershipRole.MASTER;
                     break;
@@ -221,12 +244,13 @@
             MastershipRole role = getRole(nodeId, deviceId);
             switch (role) {
                 case MASTER:
-                    event = reelect(nodeId, deviceId);
+                    event = reelect(nodeId, deviceId, rv);
                     //fall through to reinforce role
                 case STANDBY:
                     //fall through to reinforce role
                 case NONE:
                     rv.reassign(nodeId, NONE, STANDBY);
+                    roleMap.put(deviceId, rv);
                     break;
                 default:
                     log.warn("unknown Mastership Role {}", role);
@@ -247,12 +271,13 @@
             MastershipRole role = getRole(nodeId, deviceId);
             switch (role) {
                 case MASTER:
-                    event = reelect(nodeId, deviceId);
+                    event = reelect(nodeId, deviceId, rv);
                     //fall through to reinforce relinquishment
                 case STANDBY:
                     //fall through to reinforce relinquishment
                 case NONE:
                     rv.reassign(nodeId, STANDBY, NONE);
+                    roleMap.put(deviceId, rv);
                     break;
                 default:
                 log.warn("unknown Mastership Role {}", role);
@@ -264,8 +289,7 @@
     }
 
     //helper to fetch a new master candidate for a given device.
-    private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
-        RoleValue rv = roleMap.get(deviceId);
+    private MastershipEvent reelect(NodeId current, DeviceId deviceId, RoleValue rv) {
 
         //if this is an queue it'd be neater.
         NodeId backup = null;
@@ -278,10 +302,12 @@
 
         if (backup == null) {
             rv.remove(MASTER, current);
+            roleMap.put(deviceId, rv);
             return null;
         } else {
             rv.replace(current, backup, MASTER);
             rv.reassign(backup, STANDBY, NONE);
+            roleMap.put(deviceId, rv);
             Integer term = terms.get(deviceId);
             terms.put(deviceId, ++term);
             return new MastershipEvent(
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
index b5d1a64..f2c3559 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
@@ -1,6 +1,7 @@
 package org.onlab.onos.store.mastership.impl;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -14,7 +15,7 @@
  */
 public class RoleValue {
 
-    Map<MastershipRole, List<NodeId>> value;
+    protected Map<MastershipRole, List<NodeId>> value = new HashMap<>();
 
     public RoleValue() {
         value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
@@ -96,4 +97,16 @@
         add(type, to);
     }
 
+    @Override
+    public String toString() {
+        final StringBuilder builder = new StringBuilder();
+        for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
+            builder.append(el.getKey().toString()).append(": [");
+            for (NodeId n : el.getValue()) {
+                builder.append(n);
+            }
+            builder.append("]\n");
+        }
+        return builder.toString();
+    }
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/RoleValueSerializer.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
similarity index 93%
rename from core/store/serializers/src/main/java/org/onlab/onos/store/serializers/RoleValueSerializer.java
rename to core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
index a6dc8dc..22d1b35 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/RoleValueSerializer.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
@@ -1,11 +1,10 @@
-package org.onlab.onos.store.serializers;
+package org.onlab.onos.store.mastership.impl;
 
 import java.util.List;
 import java.util.Map;
 
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.store.mastership.impl.RoleValue;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
@@ -13,7 +12,7 @@
 import com.esotericsoftware.kryo.io.Output;
 
 /**
- * Serializer for RoleValues used by {@link DistributedMastershipStore}
+ * Serializer for RoleValues used by {@link DistributedMastershipStore}.
  */
 public class RoleValueSerializer extends Serializer<RoleValue> {
 
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java
index 6435c11..956b0be 100644
--- a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStoreTest.java
@@ -116,7 +116,6 @@
         testStore.put(DID1, N1, true, false, false);
         testStore.put(DID2, N1, true, false, false);
         testStore.put(DID3, N2, true, false, false);
-
         assertEquals("wrong devices",
                 Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
     }
@@ -244,7 +243,6 @@
             RoleValue rv = dms.roleMap.get(dev);
             if (rv == null) {
                 rv = new RoleValue();
-                dms.roleMap.put(dev, rv);
             }
 
             if (master) {
@@ -259,6 +257,7 @@
             if (term) {
                 dms.terms.put(dev, 0);
             }
+            dms.roleMap.put(dev, rv);
         }
 
         //a dumb utility function.
@@ -266,9 +265,9 @@
             for (Map.Entry<DeviceId, RoleValue> el : dms.roleMap.entrySet()) {
                 System.out.println("DID: " + el.getKey());
                 for (MastershipRole role : MastershipRole.values()) {
-                    System.out.println(role.toString() + ":");
+                    System.out.println("\t" + role.toString() + ":");
                     for (NodeId n : el.getValue().nodesOfRole(role)) {
-                        System.out.println("\t" + n);
+                        System.out.println("\t\t" + n);
                     }
                 }
             }
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java
new file mode 100644
index 0000000..93741b7
--- /dev/null
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/mastership/impl/RoleValueTest.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import org.junit.Test;
+import org.onlab.onos.cluster.NodeId;
+
+import com.google.common.collect.Sets;
+
+public class RoleValueTest {
+
+    private static final RoleValue RV = new RoleValue();
+
+    private static final NodeId NID1 = new NodeId("node1");
+    private static final NodeId NID2 = new NodeId("node2");
+    private static final NodeId NID3 = new NodeId("node3");
+
+    @Test
+    public void add() {
+        assertEquals("faulty initialization: ", 3, RV.value.size());
+        RV.add(MASTER, NID1);
+        RV.add(STANDBY, NID2);
+        RV.add(STANDBY, NID3);
+
+        assertEquals("wrong nodeID: ", NID1, RV.get(MASTER));
+        assertTrue("wrong nodeIDs: ",
+                Sets.newHashSet(NID3, NID2).containsAll(RV.nodesOfRole(STANDBY)));
+    }
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index d64ecbe..98b1ee3 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -26,7 +26,6 @@
 import org.onlab.onos.net.link.DefaultLinkDescription;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.mastership.impl.RoleValue;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.util.KryoPool;
@@ -66,7 +65,6 @@
                     DefaultDevice.class,
                     DefaultDeviceDescription.class,
                     DefaultLinkDescription.class,
-                    RoleValue.class,
                     Port.class,
                     DefaultPortDescription.class,
                     Element.class,
@@ -84,7 +82,6 @@
             .register(ConnectPoint.class, new ConnectPointSerializer())
             .register(DefaultLink.class, new DefaultLinkSerializer())
             .register(MastershipTerm.class, new MastershipTermSerializer())
-            .register(RoleValue.class, new RoleValueSerializer())
 
             .build();
 
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
deleted file mode 100644
index dab5aa8..0000000
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import org.onlab.onos.net.MastershipRole;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Kryo Serializer for {@link org.onlab.onos.net.MastershipRole}.
- */
-public class MastershipRoleSerializer extends Serializer<MastershipRole> {
-
-    /**
-     * Creates {@link MastershipRole} serializer instance.
-     */
-    public MastershipRoleSerializer() {
-        // non-null, immutable
-        super(false, true);
-    }
-
-    @Override
-    public MastershipRole read(Kryo kryo, Input input, Class<MastershipRole> type) {
-        final String role = kryo.readObject(input, String.class);
-        return MastershipRole.valueOf(role);
-    }
-
-    @Override
-    public void write(Kryo kryo, Output output, MastershipRole object) {
-        kryo.writeObject(output, object.toString());
-    }
-
-}
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
index 1bc89ec..df58ea5 100644
--- a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
@@ -22,11 +22,9 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Link;
 import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.MastershipRole;
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.SparseAnnotations;
 import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.mastership.impl.RoleValue;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.util.KryoPool;
@@ -59,13 +57,6 @@
             .remove("A1")
             .set("B3", "b3")
             .build();
-    private static final RoleValue RV = new RoleValue();
-    static {
-        RV.add(MastershipRole.MASTER, new NodeId("node1"));
-        RV.add(MastershipRole.STANDBY, new NodeId("node2"));
-        RV.add(MastershipRole.STANDBY, new NodeId("node3"));
-        RV.add(MastershipRole.NONE, new NodeId("node4"));
-    };
 
     private static KryoPool kryos;
 
@@ -123,9 +114,6 @@
         testSerialized(PIDA);
         testSerialized(new NodeId("bar"));
         testSerialized(MastershipTerm.of(new NodeId("foo"), 2));
-        for (MastershipRole role : MastershipRole.values()) {
-            testSerialized(role);
-        }
     }
 
     @Test