Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
index f4b9710..8d273ac 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -1,4 +1,4 @@
/**
* Distributed cluster store and messaging subsystem implementation.
*/
-package org.onlab.onos.store.cluster.impl;
\ No newline at end of file
+package org.onlab.onos.store.cluster.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
index 398f8f7..f9e0d63 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
@@ -10,6 +10,8 @@
/**
* Anti-Entropy advertisement message.
+ * <p>
+ * Message to advertise the information this node holds.
*
* @param <ID> ID type
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
index 7a52e09..9bc095e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
@@ -11,10 +11,17 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
+/**
+ * Anti-Entropy reply message.
+ * <p>
+ * Message to send in reply to advertisement or another reply.
+ * Suggest to the sender about the more up-to-date data this node has,
+ * and request for more recent data that the receiver has.
+ */
+public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
private final NodeId sender;
- private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion;
+ private final ImmutableMap<ID, V> suggestion;
private final ImmutableSet<ID> request;
/**
@@ -25,7 +32,7 @@
* @param request Collection of identifiers
*/
public AntiEntropyReply(NodeId sender,
- Map<ID, VersionedValue<VALUE>> suggestion,
+ Map<ID, V> suggestion,
Set<ID> request) {
super(AE_REPLY);
this.sender = sender;
@@ -37,14 +44,34 @@
return sender;
}
- public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() {
+ /**
+ * Returns collection of values, which the recipient of this reply is likely
+ * to be missing or has outdated version.
+ *
+ * @return
+ */
+ public ImmutableMap<ID, V> suggestion() {
return suggestion;
}
+ /**
+ * Returns collection of identifier to request.
+ *
+ * @return collection of identifier to request
+ */
public ImmutableSet<ID> request() {
return request;
}
+ /**
+ * Checks if reply contains any suggestion or request.
+ *
+ * @return true if nothing is suggested and requested
+ */
+ public boolean isEmpty() {
+ return suggestion.isEmpty() && request.isEmpty();
+ }
+
// Default constructor for serializer
protected AntiEntropyReply() {
super(AE_REPLY);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
index 5276b0b..326dbe8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
@@ -1,4 +1,4 @@
/**
* Cluster messaging APIs for the use by the various distributed stores.
*/
-package org.onlab.onos.store.cluster.messaging;
\ No newline at end of file
+package org.onlab.onos.store.cluster.messaging;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..301884c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
+
+// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
+// TODO: Handle Port as part of these messages, or separate messages for Ports?
+
+public class DeviceAntiEntropyAdvertisement
+ extends AntiEntropyAdvertisement<DeviceId> {
+
+
+ public DeviceAntiEntropyAdvertisement(NodeId sender,
+ Map<DeviceId, Timestamp> advertisement) {
+ super(sender, advertisement);
+ }
+
+ // May need to add ProviderID, etc.
+ public static DeviceAntiEntropyAdvertisement create(
+ NodeId self,
+ Collection<VersionedValue<Device>> localValues) {
+
+ Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
+ for (VersionedValue<Device> e : localValues) {
+ ads.put(e.entity().id(), e.timestamp());
+ }
+ return new DeviceAntiEntropyAdvertisement(self, ads);
+ }
+
+ // For serializer
+ protected DeviceAntiEntropyAdvertisement() {}
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
new file mode 100644
index 0000000..011713e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
@@ -0,0 +1,102 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class DeviceAntiEntropyReply
+ extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
+
+
+ public DeviceAntiEntropyReply(NodeId sender,
+ Map<DeviceId, VersionedValue<Device>> suggestion,
+ Set<DeviceId> request) {
+ super(sender, suggestion, request);
+ }
+
+ /**
+ * Creates a reply to Anti-Entropy advertisement.
+ *
+ * @param advertisement to respond to
+ * @param self node identifier representing local node
+ * @param localValues local values held on this node
+ * @return reply message
+ */
+ public static DeviceAntiEntropyReply reply(
+ DeviceAntiEntropyAdvertisement advertisement,
+ NodeId self,
+ Collection<VersionedValue<Device>> localValues
+ ) {
+
+ ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
+
+ ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
+ sug = ImmutableMap.builder();
+
+ Set<DeviceId> req = new HashSet<>(ads.keySet());
+
+ for (VersionedValue<Device> e : localValues) {
+ final DeviceId id = e.entity().id();
+ final Timestamp local = e.timestamp();
+ final Timestamp theirs = ads.get(id);
+ if (theirs == null) {
+ // they don't have it, suggest
+ sug.put(id, e);
+ // don't need theirs
+ req.remove(id);
+ } else if (local.compareTo(theirs) < 0) {
+ // they got older one, suggest
+ sug.put(id, e);
+ // don't need theirs
+ req.remove(id);
+ } else if (local.equals(theirs)) {
+ // same, don't need theirs
+ req.remove(id);
+ }
+ }
+
+ return new DeviceAntiEntropyReply(self, sug.build(), req);
+ }
+
+ /**
+ * Creates a reply to request for values held locally.
+ *
+ * @param requests message containing the request
+ * @param self node identifier representing local node
+ * @param localValues local valeds held on this node
+ * @return reply message
+ */
+ public static DeviceAntiEntropyReply reply(
+ DeviceAntiEntropyReply requests,
+ NodeId self,
+ Map<DeviceId, VersionedValue<Device>> localValues
+ ) {
+
+ Set<DeviceId> reqs = requests.request();
+
+ Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
+ for (DeviceId id : reqs) {
+ final VersionedValue<Device> value = localValues.get(id);
+ if (value != null) {
+ requested.put(id, value);
+ }
+ }
+
+ Set<DeviceId> empty = ImmutableSet.of();
+ return new DeviceAntiEntropyReply(self, requested, empty);
+ }
+
+ // For serializer
+ protected DeviceAntiEntropyReply() {}
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
index bd5f2fd..4a0d347 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
@@ -40,6 +40,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
@@ -59,8 +60,8 @@
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
- private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices;
- private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
+ private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
+ private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
index c18b4da..a0f485a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
@@ -1,5 +1,7 @@
package org.onlab.onos.store.device.impl;
+import java.util.Objects;
+
import org.onlab.onos.store.Timestamp;
/**
@@ -44,6 +46,29 @@
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(entity, timestamp, isUp);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ VersionedValue<T> that = (VersionedValue<T>) obj;
+ return Objects.equals(this.entity, that.entity) &&
+ Objects.equals(this.timestamp, that.timestamp) &&
+ Objects.equals(this.isUp, that.isUp);
+ }
+
// Default constructor for serializer
protected VersionedValue() {
this.entity = null;
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
index 84e1b73..4b8410d 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
@@ -1,6 +1,7 @@
package org.onlab.onos.store.serializers;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -100,4 +101,14 @@
return serializerPool.deserialize(bytes);
}
+ @Override
+ public void serialize(Object obj, ByteBuffer buffer) {
+ serializerPool.serialize(obj, buffer);
+ }
+
+ @Override
+ public <T> T deserialize(ByteBuffer buffer) {
+ return serializerPool.deserialize(buffer);
+ }
+
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
index e92cc4b..385128c 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
@@ -1,5 +1,7 @@
package org.onlab.onos.store.serializers;
+import java.nio.ByteBuffer;
+
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
@@ -16,6 +18,15 @@
public byte[] serialize(final Object obj);
/**
+ * Serializes the specified object into bytes using one of the
+ * pre-registered serializers.
+ *
+ * @param obj object to be serialized
+ * @param buffer to write serialized bytes
+ */
+ public void serialize(final Object obj, ByteBuffer buffer);
+
+ /**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
@@ -24,4 +35,12 @@
*/
public <T> T deserialize(final byte[] bytes);
+ /**
+ * Deserializes the specified bytes into an object using one of the
+ * pre-registered serializers.
+ *
+ * @param buffer bytes to be deserialized
+ * @return deserialized object
+ */
+ public <T> T deserialize(final ByteBuffer buffer);
}