AntiEntropy sketch + required Serializer work

Change-Id: Ibac5f4eede6b420202683114c3262e01b7264eff
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
new file mode 100644
index 0000000..398f8f7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
@@ -0,0 +1,47 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_ADVERTISEMENT;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.Timestamp;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Anti-Entropy advertisement message.
+ *
+ * @param <ID> ID type
+ */
+public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
+
+    private final NodeId sender;
+    private final ImmutableMap<ID, Timestamp> advertisement;
+
+    /**
+     * Creates anti-entropy advertisement message.
+     *
+     * @param sender sender of this message
+     * @param advertisement timestamp information of the data sender holds
+     */
+    public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
+        super(AE_ADVERTISEMENT);
+        this.sender = sender;
+        this.advertisement = ImmutableMap.copyOf(advertisement);
+    }
+
+    public NodeId sender() {
+        return sender;
+    }
+
+    public ImmutableMap<ID, Timestamp> advertisement() {
+        return advertisement;
+    }
+
+    // Default constructor for serializer
+    protected AntiEntropyAdvertisement() {
+        super(AE_ADVERTISEMENT);
+        this.sender = null;
+        this.advertisement = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
new file mode 100644
index 0000000..7a52e09
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
@@ -0,0 +1,55 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_REPLY;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.device.impl.VersionedValue;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
+
+    private final NodeId sender;
+    private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion;
+    private final ImmutableSet<ID> request;
+
+    /**
+     * Creates a reply to anti-entropy message.
+     *
+     * @param sender sender of this message
+     * @param suggestion collection of more recent values, sender had
+     * @param request Collection of identifiers
+     */
+    public AntiEntropyReply(NodeId sender,
+                            Map<ID, VersionedValue<VALUE>> suggestion,
+                            Set<ID> request) {
+        super(AE_REPLY);
+        this.sender = sender;
+        this.suggestion = ImmutableMap.copyOf(suggestion);
+        this.request = ImmutableSet.copyOf(request);
+    }
+
+    public NodeId sender() {
+        return sender;
+    }
+
+    public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() {
+        return suggestion;
+    }
+
+    public ImmutableSet<ID> request() {
+        return request;
+    }
+
+    // Default constructor for serializer
+    protected AntiEntropyReply() {
+        super(AE_REPLY);
+        this.sender = null;
+        this.suggestion = null;
+        this.request = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index c7badf2..97cbf1d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -15,6 +15,12 @@
     LEAVING_MEMBER,
 
     /** Signifies a heart-beat message. */
-    ECHO
+    ECHO,
+
+    /** Anti-Entropy advertisement message. */
+    AE_ADVERTISEMENT,
+
+    /** Anti-Entropy reply message. */
+    AE_REPLY,
 
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
index 1a85c53..c18b4da 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
@@ -42,4 +42,12 @@
     public Timestamp timestamp() {
         return timestamp;
     }
+
+
+    // Default constructor for serializer
+    protected VersionedValue() {
+        this.entity = null;
+        this.isUp = false;
+        this.timestamp = null;
+    }
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
new file mode 100644
index 0000000..244cc57
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
@@ -0,0 +1,49 @@
+package org.onlab.onos.store.serializers;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onlab.util.KryoPool.FamilySerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+import com.google.common.collect.ImmutableMap;
+
+/**
+* Kryo Serializer for {@link ImmutableMap}.
+*/
+public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
+
+    private final MapSerializer mapSerializer = new MapSerializer();
+
+    public ImmutableMapSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableMap<?, ?> object) {
+        // wrapping with unmodifiableMap proxy
+        // to avoid Kryo from writing only the reference marker of this instance,
+        // which will be embedded right before this method call.
+        kryo.writeObject(output, Collections.unmodifiableMap(object), mapSerializer);
+    }
+
+    @Override
+    public ImmutableMap<?, ?> read(Kryo kryo, Input input,
+                                    Class<ImmutableMap<?, ?>> type) {
+        Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
+        return ImmutableMap.copyOf(map);
+    }
+
+    @Override
+    public void registerFamilies(Kryo kryo) {
+        kryo.register(ImmutableMap.of().getClass(), this);
+        kryo.register(ImmutableMap.of(1, 2).getClass(), this);
+        kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
+        // TODO register required ImmutableMap variants
+    }
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
new file mode 100644
index 0000000..c08bf9a
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.serializers;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onlab.util.KryoPool.FamilySerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableSet;
+
+/**
+* Kryo Serializer for {@link ImmutableSet}.
+*/
+public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
+
+    private final CollectionSerializer serializer = new CollectionSerializer();
+
+    public ImmutableSetSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableSet<?> object) {
+        kryo.writeObject(output, object.asList(), serializer);
+    }
+
+    @Override
+    public ImmutableSet<?> read(Kryo kryo, Input input,
+                                Class<ImmutableSet<?>> type) {
+        List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
+        return ImmutableSet.copyOf(elms);
+    }
+
+    @Override
+    public void registerFamilies(Kryo kryo) {
+        kryo.register(ImmutableSet.of().getClass(), this);
+        kryo.register(ImmutableSet.of(1).getClass(), this);
+        kryo.register(ImmutableSet.of(1, 2).getClass(), this);
+        // TODO register required ImmutableSet variants
+    }
+}
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
new file mode 100644
index 0000000..8e74072
--- /dev/null
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
@@ -0,0 +1,128 @@
+package org.onlab.onos.store.serializers;
+
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.onlab.onos.net.PortNumber.portNumber;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultLink;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoPool;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.testing.EqualsTester;
+
+import de.javakaffee.kryoserializers.URISerializer;
+
+public class KryoSerializerTests {
+    private static final ProviderId PID = new ProviderId("of", "foo");
+    private static final DeviceId DID1 = deviceId("of:foo");
+    private static final DeviceId DID2 = deviceId("of:bar");
+    private static final PortNumber P1 = portNumber(1);
+    private static final PortNumber P2 = portNumber(2);
+    private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
+    private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
+    private static final String MFR = "whitebox";
+    private static final String HW = "1.1.x";
+    private static final String SW1 = "3.8.1";
+    private static final String SW2 = "3.9.5";
+    private static final String SN = "43311-12345";
+    private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
+
+    private static KryoPool kryos;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        kryos = KryoPool.newBuilder()
+                .register(
+                        ArrayList.class,
+                        HashMap.class
+                        )
+                .register(
+                        Device.Type.class,
+                        Link.Type.class
+
+//                      ControllerNode.State.class,
+//                        DefaultControllerNode.class,
+//                        MastershipRole.class,
+//                        Port.class,
+//                        Element.class,
+                        )
+                .register(ConnectPoint.class, new ConnectPointSerializer())
+                .register(DefaultLink.class, new DefaultLinkSerializer())
+                .register(DefaultPort.class, new DefaultPortSerializer())
+                .register(DeviceId.class, new DeviceIdSerializer())
+                .register(ImmutableMap.class, new ImmutableMapSerializer())
+                .register(ImmutableSet.class, new ImmutableSetSerializer())
+                .register(IpPrefix.class, new IpPrefixSerializer())
+                .register(LinkKey.class, new LinkKeySerializer())
+                .register(NodeId.class, new NodeIdSerializer())
+                .register(PortNumber.class, new PortNumberSerializer())
+                .register(ProviderId.class, new ProviderIdSerializer())
+
+                .register(DefaultDevice.class)
+
+                .register(URI.class, new URISerializer())
+                .build();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // removing Kryo instance to use fresh Kryo on each tests
+        kryos.getKryo();
+    }
+
+    private static <T> void testSerialized(T original) {
+        ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+        kryos.serialize(original, buffer);
+        buffer.flip();
+        T copy = kryos.deserialize(buffer);
+
+        new EqualsTester()
+            .addEqualityGroup(original, copy)
+            .testEquals();
+    }
+
+
+    @Test
+    public final void test() {
+        testSerialized(new ConnectPoint(DID1, P1));
+        testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
+        testSerialized(new DefaultPort(DEV1, P1, true));
+        testSerialized(DID1);
+        testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
+        testSerialized(ImmutableMap.of(DID1, DEV1));
+        testSerialized(ImmutableMap.of());
+        testSerialized(ImmutableSet.of(DID1, DID2));
+        testSerialized(ImmutableSet.of(DID1));
+        testSerialized(ImmutableSet.of());
+        testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
+        testSerialized(new LinkKey(CP1, CP2));
+        testSerialized(new NodeId("SomeNodeIdentifier"));
+        testSerialized(P1);
+        testSerialized(PID);
+    }
+
+}
diff --git a/utils/misc/src/main/java/org/onlab/util/KryoPool.java b/utils/misc/src/main/java/org/onlab/util/KryoPool.java
index be662a6..3fae0c5 100644
--- a/utils/misc/src/main/java/org/onlab/util/KryoPool.java
+++ b/utils/misc/src/main/java/org/onlab/util/KryoPool.java
@@ -239,12 +239,41 @@
         Kryo kryo = new Kryo();
         kryo.setRegistrationRequired(registrationRequired);
         for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
-            if (registry.getRight() == null) {
+            final Serializer<?> serializer = registry.getRight();
+            if (serializer == null) {
                 kryo.register(registry.getLeft());
             } else {
-                kryo.register(registry.getLeft(), registry.getRight());
+                kryo.register(registry.getLeft(), serializer);
+                if (serializer instanceof FamilySerializer) {
+                    FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
+                    fser.registerFamilies(kryo);
+                }
             }
         }
         return kryo;
     }
+
+    /**
+     * Serializer implementation, which required registration of family of Classes.
+     * @param <T> base type of this serializer.
+     */
+    public abstract static class FamilySerializer<T> extends Serializer<T> {
+
+
+        public FamilySerializer(boolean acceptsNull) {
+            super(acceptsNull);
+        }
+
+        public FamilySerializer(boolean acceptsNull, boolean immutable) {
+            super(acceptsNull, immutable);
+        }
+
+        /**
+         * Registers other classes this Serializer supports.
+         *
+         * @param kryo instance to register classes to
+         */
+        public void registerFamilies(Kryo kryo) {
+        }
+    }
 }