Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
index 94c20ea..9bc095e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
@@ -18,10 +18,10 @@
  * Suggest to the sender about the more up-to-date data this node has,
  * and request for more recent data that the receiver has.
  */
-public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
+public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
 
     private final NodeId sender;
-    private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion;
+    private final ImmutableMap<ID, V> suggestion;
     private final ImmutableSet<ID> request;
 
     /**
@@ -32,7 +32,7 @@
      * @param request Collection of identifiers
      */
     public AntiEntropyReply(NodeId sender,
-                            Map<ID, VersionedValue<VALUE>> suggestion,
+                            Map<ID, V> suggestion,
                             Set<ID> request) {
         super(AE_REPLY);
         this.sender = sender;
@@ -44,14 +44,34 @@
         return sender;
     }
 
-    public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() {
+    /**
+     * Returns collection of values, which the recipient of this reply is likely
+     * to be missing or has outdated version.
+     *
+     * @return
+     */
+    public ImmutableMap<ID, V> suggestion() {
         return suggestion;
     }
 
+    /**
+     * Returns collection of identifier to request.
+     *
+     * @return collection of identifier to request
+     */
     public ImmutableSet<ID> request() {
         return request;
     }
 
+    /**
+     * Checks if reply contains any suggestion or request.
+     *
+     * @return true if nothing is suggested and requested
+     */
+    public boolean isEmpty() {
+        return suggestion.isEmpty() && request.isEmpty();
+    }
+
     // Default constructor for serializer
     protected AntiEntropyReply() {
         super(AE_REPLY);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..301884c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
+
+// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
+// TODO: Handle Port as part of these messages, or separate messages for Ports?
+
+public class DeviceAntiEntropyAdvertisement
+    extends AntiEntropyAdvertisement<DeviceId> {
+
+
+    public DeviceAntiEntropyAdvertisement(NodeId sender,
+            Map<DeviceId, Timestamp> advertisement) {
+        super(sender, advertisement);
+    }
+
+    // May need to add ProviderID, etc.
+    public static DeviceAntiEntropyAdvertisement create(
+            NodeId self,
+            Collection<VersionedValue<Device>> localValues) {
+
+        Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
+        for (VersionedValue<Device> e : localValues) {
+            ads.put(e.entity().id(), e.timestamp());
+        }
+        return new DeviceAntiEntropyAdvertisement(self, ads);
+    }
+
+    // For serializer
+    protected DeviceAntiEntropyAdvertisement() {}
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
new file mode 100644
index 0000000..011713e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
@@ -0,0 +1,102 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class DeviceAntiEntropyReply
+    extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
+
+
+    public DeviceAntiEntropyReply(NodeId sender,
+            Map<DeviceId, VersionedValue<Device>> suggestion,
+            Set<DeviceId> request) {
+        super(sender, suggestion, request);
+    }
+
+    /**
+     * Creates a reply to Anti-Entropy advertisement.
+     *
+     * @param advertisement to respond to
+     * @param self node identifier representing local node
+     * @param localValues local values held on this node
+     * @return reply message
+     */
+    public static DeviceAntiEntropyReply reply(
+            DeviceAntiEntropyAdvertisement advertisement,
+            NodeId self,
+            Collection<VersionedValue<Device>> localValues
+            ) {
+
+        ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
+
+        ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
+            sug = ImmutableMap.builder();
+
+        Set<DeviceId> req = new HashSet<>(ads.keySet());
+
+        for (VersionedValue<Device> e : localValues) {
+            final DeviceId id = e.entity().id();
+            final Timestamp local = e.timestamp();
+            final Timestamp theirs = ads.get(id);
+            if (theirs == null) {
+                // they don't have it, suggest
+                sug.put(id, e);
+                // don't need theirs
+                req.remove(id);
+            } else if (local.compareTo(theirs) < 0) {
+                // they got older one, suggest
+                sug.put(id, e);
+                // don't need theirs
+                req.remove(id);
+            } else if (local.equals(theirs)) {
+                // same, don't need theirs
+                req.remove(id);
+            }
+        }
+
+        return new DeviceAntiEntropyReply(self, sug.build(), req);
+    }
+
+    /**
+     * Creates a reply to request for values held locally.
+     *
+     * @param requests message containing the request
+     * @param self node identifier representing local node
+     * @param localValues local valeds held on this node
+     * @return reply message
+     */
+    public static DeviceAntiEntropyReply reply(
+            DeviceAntiEntropyReply requests,
+            NodeId self,
+            Map<DeviceId, VersionedValue<Device>> localValues
+            ) {
+
+        Set<DeviceId> reqs = requests.request();
+
+        Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
+        for (DeviceId id : reqs) {
+            final VersionedValue<Device> value = localValues.get(id);
+            if (value != null) {
+                requested.put(id, value);
+            }
+        }
+
+        Set<DeviceId> empty = ImmutableSet.of();
+        return new DeviceAntiEntropyReply(self, requested, empty);
+    }
+
+    // For serializer
+    protected DeviceAntiEntropyReply() {}
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
index bd5f2fd..4a0d347 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
@@ -40,6 +40,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
@@ -59,8 +60,8 @@
 
     public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
 
-    private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices;
-    private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
+    private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
+    private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClockService clockService;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
index c18b4da..a0f485a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
@@ -1,5 +1,7 @@
 package org.onlab.onos.store.device.impl;
 
+import java.util.Objects;
+
 import org.onlab.onos.store.Timestamp;
 
 /**
@@ -44,6 +46,29 @@
     }
 
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(entity, timestamp, isUp);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        @SuppressWarnings("unchecked")
+        VersionedValue<T> that = (VersionedValue<T>) obj;
+        return Objects.equals(this.entity, that.entity) &&
+                Objects.equals(this.timestamp, that.timestamp) &&
+                Objects.equals(this.isUp, that.isUp);
+    }
+
     // Default constructor for serializer
     protected VersionedValue() {
         this.entity = null;
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
index 84e1b73..4b8410d 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
@@ -1,6 +1,7 @@
 package org.onlab.onos.store.serializers;
 
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 
@@ -100,4 +101,14 @@
         return serializerPool.deserialize(bytes);
     }
 
+    @Override
+    public void serialize(Object obj, ByteBuffer buffer) {
+        serializerPool.serialize(obj, buffer);
+    }
+
+    @Override
+    public <T> T deserialize(ByteBuffer buffer) {
+        return serializerPool.deserialize(buffer);
+    }
+
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
index e92cc4b..385128c 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
@@ -1,5 +1,7 @@
 package org.onlab.onos.store.serializers;
 
+import java.nio.ByteBuffer;
+
 // TODO: To be replaced with SerializationService from IOLoop activity
 /**
  * Service to serialize Objects into byte array.
@@ -16,6 +18,15 @@
     public byte[] serialize(final Object obj);
 
     /**
+     * Serializes the specified object into bytes using one of the
+     * pre-registered serializers.
+     *
+     * @param obj object to be serialized
+     * @param buffer to write serialized bytes
+     */
+    public void serialize(final Object obj, ByteBuffer buffer);
+
+    /**
      * Deserializes the specified bytes into an object using one of the
      * pre-registered serializers.
      *
@@ -24,4 +35,12 @@
      */
     public <T> T deserialize(final byte[] bytes);
 
+    /**
+     * Deserializes the specified bytes into an object using one of the
+     * pre-registered serializers.
+     *
+     * @param buffer bytes to be deserialized
+     * @return deserialized object
+     */
+    public <T> T deserialize(final ByteBuffer buffer);
 }
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
new file mode 100644
index 0000000..3903491
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
@@ -0,0 +1,26 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.net.MastershipRole;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link org.onlab.onos.net.MastershipRole}.
+ */
+public class MastershipRoleSerializer extends Serializer<MastershipRole> {
+
+    @Override
+    public MastershipRole read(Kryo kryo, Input input, Class<MastershipRole> type) {
+        final String role = kryo.readObject(input, String.class);
+        return MastershipRole.valueOf(role);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, MastershipRole object) {
+        kryo.writeObject(output, object.toString());
+    }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
new file mode 100644
index 0000000..a5d6198
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
@@ -0,0 +1,29 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link org.onlab.onos.cluster.MastershipTerm}.
+ */
+public class MastershipTermSerializer extends Serializer<MastershipTerm> {
+
+    @Override
+    public MastershipTerm read(Kryo kryo, Input input, Class<MastershipTerm> type) {
+        final NodeId node = new NodeId(kryo.readObject(input, String.class));
+        final int term = input.readInt();
+        return MastershipTerm.of(node, term);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, MastershipTerm object) {
+        output.writeString(object.master().toString());
+        output.writeInt(object.termNumber());
+    }
+
+}
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
index 8e74072..c972d1a 100644
--- a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
@@ -12,6 +12,7 @@
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.onlab.onos.cluster.MastershipTerm;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.DefaultDevice;
@@ -21,6 +22,7 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Link;
 import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.MastershipRole;
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.packet.IpPrefix;
@@ -81,6 +83,9 @@
                 .register(DefaultDevice.class)
 
                 .register(URI.class, new URISerializer())
+
+                .register(MastershipRole.class, new MastershipRoleSerializer())
+                .register(MastershipTerm.class, new MastershipTermSerializer())
                 .build();
     }
 
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStoreTest.java
index fa39542..78878bf 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStoreTest.java
@@ -9,7 +9,6 @@
 import org.onlab.onos.cluster.MastershipTerm;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.provider.ProviderId;
 
 import com.google.common.collect.Sets;
 
@@ -24,8 +23,6 @@
  */
 public class SimpleMastershipStoreTest {
 
-    private static final ProviderId PID = new ProviderId("of", "foo");
-
     private static final DeviceId DID1 = DeviceId.deviceId("of:01");
     private static final DeviceId DID2 = DeviceId.deviceId("of:02");
     private static final DeviceId DID3 = DeviceId.deviceId("of:03");