Device Anti-Entropy
- create Advertisement
- handler for Advertisement
- register handler, background thread to send advertisement
Change-Id: I99e8a7d68747970c34b3c25c6d0489769d251446
diff --git a/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java b/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
index e1dcf9e..3967030 100644
--- a/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
+++ b/core/api/src/main/java/org/onlab/onos/net/device/DefaultPortDescription.java
@@ -4,6 +4,8 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.SparseAnnotations;
+import com.google.common.base.MoreObjects;
+
/**
* Default implementation of immutable port description.
*/
@@ -48,6 +50,15 @@
return isEnabled;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("number", number)
+ .add("isEnabled", isEnabled)
+ .add("annotations", annotations())
+ .toString();
+ }
+
// default constructor for serialization
private DefaultPortDescription() {
this.number = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
deleted file mode 100644
index 132f27a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.onlab.onos.store.common.impl;
-
-import java.util.Map;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.Timestamp;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Anti-Entropy advertisement message.
- * <p>
- * Message to advertise the information this node holds.
- *
- * @param <ID> ID type
- */
-public class AntiEntropyAdvertisement<ID> {
-
- private final NodeId sender;
- private final ImmutableMap<ID, Timestamp> advertisement;
-
- /**
- * Creates anti-entropy advertisement message.
- *
- * @param sender sender of this message
- * @param advertisement timestamp information of the data sender holds
- */
- public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
- this.sender = sender;
- this.advertisement = ImmutableMap.copyOf(advertisement);
- }
-
- public NodeId sender() {
- return sender;
- }
-
- public ImmutableMap<ID, Timestamp> advertisement() {
- return advertisement;
- }
-
- // Default constructor for serializer
- protected AntiEntropyAdvertisement() {
- this.sender = null;
- this.advertisement = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
deleted file mode 100644
index 033a1de..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.onlab.onos.store.common.impl;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.device.impl.VersionedValue;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * Anti-Entropy reply message.
- * <p>
- * Message to send in reply to advertisement or another reply.
- * Suggest to the sender about the more up-to-date data this node has,
- * and request for more recent data that the receiver has.
- */
-public class AntiEntropyReply<ID, V extends VersionedValue<?>> {
-
- private final NodeId sender;
- private final ImmutableMap<ID, V> suggestion;
- private final ImmutableSet<ID> request;
-
- /**
- * Creates a reply to anti-entropy message.
- *
- * @param sender sender of this message
- * @param suggestion collection of more recent values, sender had
- * @param request Collection of identifiers
- */
- public AntiEntropyReply(NodeId sender,
- Map<ID, V> suggestion,
- Set<ID> request) {
- this.sender = sender;
- this.suggestion = ImmutableMap.copyOf(suggestion);
- this.request = ImmutableSet.copyOf(request);
- }
-
- public NodeId sender() {
- return sender;
- }
-
- /**
- * Returns collection of values, which the recipient of this reply is likely
- * to be missing or has outdated version.
- *
- * @return
- */
- public ImmutableMap<ID, V> suggestion() {
- return suggestion;
- }
-
- /**
- * Returns collection of identifier to request.
- *
- * @return collection of identifier to request
- */
- public ImmutableSet<ID> request() {
- return request;
- }
-
- /**
- * Checks if reply contains any suggestion or request.
- *
- * @return true if nothing is suggested and requested
- */
- public boolean isEmpty() {
- return suggestion.isEmpty() && request.isEmpty();
- }
-
- // Default constructor for serializer
- protected AntiEntropyReply() {
- this.sender = null;
- this.suggestion = null;
- this.request = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/ControllerNodeToNodeId.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/ControllerNodeToNodeId.java
new file mode 100644
index 0000000..73187ee
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/ControllerNodeToNodeId.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.common.impl;
+
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
+
+import com.google.common.base.Function;
+
+/**
+ * Function to convert ControllerNode to NodeId.
+ */
+public final class ControllerNodeToNodeId
+ implements Function<ControllerNode, NodeId> {
+
+ private static final ControllerNodeToNodeId INSTANCE = new ControllerNodeToNodeId();
+
+ @Override
+ public NodeId apply(ControllerNode input) {
+ return input.id();
+ }
+
+ public static ControllerNodeToNodeId toNodeId() {
+ return INSTANCE;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
index 77b0a87..e803e74 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
@@ -30,6 +30,7 @@
/**
* Returns the value.
+ *
* @return value
*/
public T value() {
@@ -38,6 +39,7 @@
/**
* Returns the time stamp.
+ *
* @return time stamp
*/
public Timestamp timestamp() {
@@ -51,7 +53,16 @@
* @return true if this instance is newer.
*/
public boolean isNewer(Timestamped<T> other) {
- return this.timestamp.compareTo(checkNotNull(other).timestamp()) > 0;
+ return isNewer(checkNotNull(other).timestamp());
+ }
+
+ /**
+ * Tests if this timestamp is newer thatn the specified timestamp.
+ * @param timestamp to compare agains
+ * @return true if this instance is newer
+ */
+ public boolean isNewer(Timestamp timestamp) {
+ return this.timestamp.compareTo(checkNotNull(timestamp)) > 0;
}
@Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
deleted file mode 100644
index d05659b..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
-
-// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
-// TODO: Handle Port as part of these messages, or separate messages for Ports?
-
-public class DeviceAntiEntropyAdvertisement
- extends AntiEntropyAdvertisement<DeviceId> {
-
-
- public DeviceAntiEntropyAdvertisement(NodeId sender,
- Map<DeviceId, Timestamp> advertisement) {
- super(sender, advertisement);
- }
-
- // May need to add ProviderID, etc.
- public static DeviceAntiEntropyAdvertisement create(
- NodeId self,
- Collection<VersionedValue<Device>> localValues) {
-
- Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
- for (VersionedValue<Device> e : localValues) {
- ads.put(e.entity().id(), e.timestamp());
- }
- return new DeviceAntiEntropyAdvertisement(self, ads);
- }
-
- // For serializer
- protected DeviceAntiEntropyAdvertisement() {}
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
deleted file mode 100644
index e7a4d0a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.common.impl.AntiEntropyReply;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-public class DeviceAntiEntropyReply
- extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
-
-
- public DeviceAntiEntropyReply(NodeId sender,
- Map<DeviceId, VersionedValue<Device>> suggestion,
- Set<DeviceId> request) {
- super(sender, suggestion, request);
- }
-
- /**
- * Creates a reply to Anti-Entropy advertisement.
- *
- * @param advertisement to respond to
- * @param self node identifier representing local node
- * @param localValues local values held on this node
- * @return reply message
- */
- public static DeviceAntiEntropyReply reply(
- DeviceAntiEntropyAdvertisement advertisement,
- NodeId self,
- Collection<VersionedValue<Device>> localValues
- ) {
-
- ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
-
- ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
- sug = ImmutableMap.builder();
-
- Set<DeviceId> req = new HashSet<>(ads.keySet());
-
- for (VersionedValue<Device> e : localValues) {
- final DeviceId id = e.entity().id();
- final Timestamp local = e.timestamp();
- final Timestamp theirs = ads.get(id);
- if (theirs == null) {
- // they don't have it, suggest
- sug.put(id, e);
- // don't need theirs
- req.remove(id);
- } else if (local.compareTo(theirs) < 0) {
- // they got older one, suggest
- sug.put(id, e);
- // don't need theirs
- req.remove(id);
- } else if (local.equals(theirs)) {
- // same, don't need theirs
- req.remove(id);
- }
- }
-
- return new DeviceAntiEntropyReply(self, sug.build(), req);
- }
-
- /**
- * Creates a reply to request for values held locally.
- *
- * @param requests message containing the request
- * @param self node identifier representing local node
- * @param localValues local valeds held on this node
- * @return reply message
- */
- public static DeviceAntiEntropyReply reply(
- DeviceAntiEntropyReply requests,
- NodeId self,
- Map<DeviceId, VersionedValue<Device>> localValues
- ) {
-
- Set<DeviceId> reqs = requests.request();
-
- Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
- for (DeviceId id : reqs) {
- final VersionedValue<Device> value = localValues.get(id);
- if (value != null) {
- requested.put(id, value);
- }
- }
-
- Set<DeviceId> empty = ImmutableSet.of();
- return new DeviceAntiEntropyReply(self, requested, empty);
- }
-
- // For serializer
- protected DeviceAntiEntropyReply() {}
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
new file mode 100644
index 0000000..f7fd7bc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceDescriptions.java
@@ -0,0 +1,91 @@
+package org.onlab.onos.store.device.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.onos.net.DefaultAnnotations.union;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/*
+ * Collection of Description of a Device and Ports, given from a Provider.
+ */
+class DeviceDescriptions {
+
+ private volatile Timestamped<DeviceDescription> deviceDesc;
+
+ private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
+
+ public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
+ this.deviceDesc = checkNotNull(desc);
+ this.portDescs = new ConcurrentHashMap<>();
+ }
+
+ public Timestamp getLatestTimestamp() {
+ Timestamp latest = deviceDesc.timestamp();
+ for (Timestamped<PortDescription> desc : portDescs.values()) {
+ if (desc.timestamp().compareTo(latest) > 0) {
+ latest = desc.timestamp();
+ }
+ }
+ return latest;
+ }
+
+ public Timestamped<DeviceDescription> getDeviceDesc() {
+ return deviceDesc;
+ }
+
+ public Timestamped<PortDescription> getPortDesc(PortNumber number) {
+ return portDescs.get(number);
+ }
+
+ public Map<PortNumber, Timestamped<PortDescription>> getPortDescs() {
+ return Collections.unmodifiableMap(portDescs);
+ }
+
+ /**
+ * Puts DeviceDescription, merging annotations as necessary.
+ *
+ * @param newDesc new DeviceDescription
+ */
+ public synchronized void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+ Timestamped<DeviceDescription> oldOne = deviceDesc;
+ Timestamped<DeviceDescription> newOne = newDesc;
+ if (oldOne != null) {
+ SparseAnnotations merged = union(oldOne.value().annotations(),
+ newDesc.value().annotations());
+ newOne = new Timestamped<DeviceDescription>(
+ new DefaultDeviceDescription(newDesc.value(), merged),
+ newDesc.timestamp());
+ }
+ deviceDesc = newOne;
+ }
+
+ /**
+ * Puts PortDescription, merging annotations as necessary.
+ *
+ * @param newDesc new PortDescription
+ */
+ public synchronized void putPortDesc(Timestamped<PortDescription> newDesc) {
+ Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
+ Timestamped<PortDescription> newOne = newDesc;
+ if (oldOne != null) {
+ SparseAnnotations merged = union(oldOne.value().annotations(),
+ newDesc.value().annotations());
+ newOne = new Timestamped<PortDescription>(
+ new DefaultPortDescription(newDesc.value(), merged),
+ newDesc.timestamp());
+ }
+ portDescs.put(newOne.value().portNumber(), newOne);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 2221ea0..5319749 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -5,8 +5,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -14,6 +13,8 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
@@ -23,9 +24,6 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.SparseAnnotations;
-import org.onlab.onos.net.device.DefaultDeviceDescription;
-import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStore;
@@ -38,18 +36,22 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
-import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
+import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
+import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -57,19 +59,21 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.DefaultAnnotations.merge;
-import static org.onlab.onos.net.DefaultAnnotations.union;
import static com.google.common.base.Verify.verify;
+import static org.onlab.util.Tools.namedThreads;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
+import static org.onlab.onos.store.common.impl.ControllerNodeToNodeId.toNodeId;
// TODO: give me a better name
/**
@@ -86,8 +90,9 @@
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
- // TODO: Check if inner Map can be replaced with plain Map
+ // TODO: Check if inner Map can be replaced with plain Map.
// innerMap is used to lock a Device, thus instance should never be replaced.
+
// collection of Description given from various providers
private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>>
@@ -117,21 +122,23 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
- .register(KryoPoolUtil.API)
+ .register(DistributedStoreSerializers.COMMON)
+
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
.register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
.register(InternalDeviceRemovedEvent.class)
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
- .register(Timestamp.class)
- .register(Timestamped.class)
- .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .register(DeviceAntiEntropyAdvertisement.class)
+ .register(DeviceFragmentId.class)
+ .register(PortFragmentId.class)
.build()
.populate(1);
}
-
};
+ private ScheduledExecutorService executor;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
@@ -144,11 +151,35 @@
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+
+ executor =
+ newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
+
+ // TODO: Make these configurable
+ long initialDelaySec = 5;
+ long periodSec = 5;
+ // start anti-entropy thread
+ executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, periodSec, TimeUnit.SECONDS);
+
log.info("Started");
}
@Deactivate
public void deactivate() {
+
+ executor.shutdownNow();
+ try {
+ boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
+ if (timedout) {
+ log.error("Timeout during executor shutdown");
+ }
+ } catch (InterruptedException e) {
+ log.error("Error during executor shutdown", e);
+ }
+
deviceDescs.clear();
devices.clear();
devicePorts.clear();
@@ -543,14 +574,14 @@
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null ||
- deltaDesc == existingPortDesc ||
deltaDesc.isNewer(existingPortDesc)) {
// on new port or valid update
// update description
descs.putPortDesc(deltaDesc);
newPort = composePort(device, number, descsMap);
} else {
- // outdated event, ignored.
+ // same or outdated event, ignored.
+ log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
return null;
}
@@ -627,6 +658,14 @@
}
}
+ /**
+ * Checks if given timestamp is superseded by removal request
+ * with more recent timestamp.
+ *
+ * @param deviceId identifier of a device
+ * @param timestampToCheck timestamp of an event to check
+ * @return true if device is already removed
+ */
private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
Timestamp removalTimestamp = removalRequest.get(deviceId);
if (removalTimestamp != null &&
@@ -667,7 +706,7 @@
continue;
}
// TODO: should keep track of Description timestamp
- // and only merge conflicting keys when timestamp is newer
+ // and only merge conflicting keys when timestamp is newer.
// Currently assuming there will never be a key conflict between
// providers
@@ -708,7 +747,7 @@
continue;
}
// TODO: should keep track of Description timestamp
- // and only merge conflicting keys when timestamp is newer
+ // and only merge conflicting keys when timestamp is newer.
// Currently assuming there will never be a key conflict between
// providers
@@ -745,129 +784,258 @@
return providerDescs.get(pid);
}
- public static final class InitDeviceDescs
- implements ConcurrentInitializer<DeviceDescriptions> {
-
- private final Timestamped<DeviceDescription> deviceDesc;
-
- public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
- this.deviceDesc = checkNotNull(deviceDesc);
- }
- @Override
- public DeviceDescriptions get() throws ConcurrentException {
- return new DeviceDescriptions(deviceDesc);
- }
+ // TODO: should we be throwing exception?
+ private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.unicast(message, recipient);
}
-
- /**
- * Collection of Description of a Device and it's Ports given from a Provider.
- */
- public static class DeviceDescriptions {
-
- private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
- private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
-
- public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
- this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
- this.portDescs = new ConcurrentHashMap<>();
- }
-
- Timestamp getLatestTimestamp() {
- Timestamp latest = deviceDesc.get().timestamp();
- for (Timestamped<PortDescription> desc : portDescs.values()) {
- if (desc.timestamp().compareTo(latest) > 0) {
- latest = desc.timestamp();
- }
- }
- return latest;
- }
-
- public Timestamped<DeviceDescription> getDeviceDesc() {
- return deviceDesc.get();
- }
-
- public Timestamped<PortDescription> getPortDesc(PortNumber number) {
- return portDescs.get(number);
- }
-
- /**
- * Puts DeviceDescription, merging annotations as necessary.
- *
- * @param newDesc new DeviceDescription
- * @return previous DeviceDescription
- */
- public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
- Timestamped<DeviceDescription> oldOne = deviceDesc.get();
- Timestamped<DeviceDescription> newOne = newDesc;
- if (oldOne != null) {
- SparseAnnotations merged = union(oldOne.value().annotations(),
- newDesc.value().annotations());
- newOne = new Timestamped<DeviceDescription>(
- new DefaultDeviceDescription(newDesc.value(), merged),
- newDesc.timestamp());
- }
- return deviceDesc.getAndSet(newOne);
- }
-
- /**
- * Puts PortDescription, merging annotations as necessary.
- *
- * @param newDesc new PortDescription
- * @return previous PortDescription
- */
- public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
- Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
- Timestamped<PortDescription> newOne = newDesc;
- if (oldOne != null) {
- SparseAnnotations merged = union(oldOne.value().annotations(),
- newDesc.value().annotations());
- newOne = new Timestamped<PortDescription>(
- new DefaultPortDescription(newDesc.value(), merged),
- newDesc.timestamp());
- }
- return portDescs.put(newOne.value().portNumber(), newOne);
- }
+ // TODO: should we be throwing exception?
+ private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
}
private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
}
private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.PORT_UPDATE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+ }
+
+ private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalPortEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private DeviceAntiEntropyAdvertisement createAdvertisement() {
+ final NodeId self = clusterService.getLocalNode().id();
+
+ Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
+ final int portsPerDevice = 8; // random guess to minimize reallocation
+ Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
+ Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
+
+ for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
+ provs : deviceDescs.entrySet()) {
+
+ final DeviceId deviceId = provs.getKey();
+ final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
+ synchronized (devDescs) {
+
+ offline.put(deviceId, this.offline.get(deviceId));
+
+ for (Entry<ProviderId, DeviceDescriptions>
+ prov : devDescs.entrySet()) {
+
+ final ProviderId provId = prov.getKey();
+ final DeviceDescriptions descs = prov.getValue();
+
+ devices.put(new DeviceFragmentId(deviceId, provId),
+ descs.getDeviceDesc().timestamp());
+
+ for (Entry<PortNumber, Timestamped<PortDescription>>
+ portDesc : descs.getPortDescs().entrySet()) {
+
+ final PortNumber number = portDesc.getKey();
+ ports.put(new PortFragmentId(deviceId, provId, number),
+ portDesc.getValue().timestamp());
+ }
+ }
+ }
+ }
+
+ return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
+ }
+
+ /**
+ * Responds to anti-entropy advertisement message.
+ * <P>
+ * Notify sender about out-dated information using regular replication message.
+ * Send back advertisement to sender if not in sync.
+ *
+ * @param advertisement to respond to
+ */
+ private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
+
+ final NodeId sender = advertisement.sender();
+
+ Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
+ Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
+ Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
+
+ // Fragments to request
+ Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
+ Collection<PortFragmentId> reqPorts = new ArrayList<>();
+
+ for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
+ final DeviceId deviceId = de.getKey();
+ final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
+
+ synchronized (lDevice) {
+ // latestTimestamp across provider
+ // Note: can be null initially
+ Timestamp localLatest = offline.get(deviceId);
+
+ // handle device Ads
+ for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
+ final ProviderId provId = prov.getKey();
+ final DeviceDescriptions lDeviceDescs = prov.getValue();
+
+ final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
+
+
+ Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
+ Timestamp advDevTimestamp = devAds.get(devFragId);
+
+ if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
+ // remote does not have it or outdated, suggest
+ notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
+ } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
+ // local is outdated, request
+ reqDevices.add(devFragId);
+ }
+
+ // handle port Ads
+ for (Entry<PortNumber, Timestamped<PortDescription>>
+ pe : lDeviceDescs.getPortDescs().entrySet()) {
+
+ final PortNumber num = pe.getKey();
+ final Timestamped<PortDescription> lPort = pe.getValue();
+
+ final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
+
+ Timestamp advPortTimestamp = portAds.get(portFragId);
+ if ( advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
+ // remote does not have it or outdated, suggest
+ notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
+ } else if (!lPort.timestamp().equals(advPortTimestamp)) {
+ // local is outdated, request
+ log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
+ reqPorts.add(portFragId);
+ }
+
+ // remove port Ad already processed
+ portAds.remove(portFragId);
+ } // end local port loop
+
+ // remove device Ad already processed
+ devAds.remove(devFragId);
+
+ // find latest and update
+ final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
+ if (localLatest == null ||
+ providerLatest.compareTo(localLatest) > 0) {
+ localLatest = providerLatest;
+ }
+ } // end local provider loop
+
+ // checking if remote timestamp is more recent.
+ Timestamp rOffline = offlineAds.get(deviceId);
+ if (rOffline != null &&
+ rOffline.compareTo(localLatest) > 0) {
+ // remote offline timestamp suggests that the
+ // device is off-line
+ markOfflineInternal(deviceId, rOffline);
+ }
+
+ Timestamp lOffline = offline.get(deviceId);
+ if (lOffline != null && rOffline == null) {
+ // locally offline, but remote is online, suggest offline
+ notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
+ }
+
+ // remove device offline Ad already processed
+ offlineAds.remove(deviceId);
+ } // end local device loop
+ } // device lock
+
+ // If there is any Ads left, request them
+ log.trace("Ads left {}, {}", devAds, portAds);
+ reqDevices.addAll(devAds.keySet());
+ reqPorts.addAll(portAds.keySet());
+
+ if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
+ log.trace("Nothing to request to remote peer {}", sender);
+ return;
+ }
+
+ log.info("Need to sync {} {}", reqDevices, reqPorts);
+
+ // 2-way Anti-Entropy for now
+ try {
+ unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
+ } catch (IOException e) {
+ log.error("Failed to send response advertisement to " + sender, e);
+ }
+
+// Sketch of 3-way Anti-Entropy
+// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
+// ClusterMessage message = new ClusterMessage(
+// clusterService.getLocalNode().id(),
+// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
+// SERIALIZER.encode(request));
+//
+// try {
+// clusterCommunicator.unicast(message, advertisement.sender());
+// } catch (IOException e) {
+// log.error("Failed to send advertisement reply to "
+// + advertisement.sender(), e);
+// }
}
private void notifyDelegateIfNotNull(DeviceEvent event) {
@@ -876,6 +1044,54 @@
}
}
+ private final class SendAdvertisementTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ final NodeId self = clusterService.getLocalNode().id();
+ Set<ControllerNode> nodes = clusterService.getNodes();
+
+ ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+ .transform(toNodeId())
+ .toList();
+
+ if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+ log.info("No other peers in the cluster.");
+ return;
+ }
+
+ NodeId peer;
+ do {
+ int idx = RandomUtils.nextInt(0, nodeIds.size());
+ peer = nodeIds.get(idx);
+ } while (peer.equals(self));
+
+ DeviceAntiEntropyAdvertisement ad = createAdvertisement();
+
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ unicastMessage(peer, DEVICE_ADVERTISE, ad);
+ } catch (IOException e) {
+ log.error("Failed to send anti-entropy advertisement", e);
+ return;
+ }
+ } catch (Exception e) {
+ // catch all Exception to avoid Scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
+ }
+ }
+ }
+
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
@@ -940,6 +1156,7 @@
log.info("Received port status update event from peer: {}", message.sender());
InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+ log.info("{}", event);
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -948,4 +1165,15 @@
notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
}
}
+
+ private final class InternalDeviceAdvertisementListener
+ implements ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ log.info("Received Device advertisement from peer: {}", message.sender());
+ DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+ handleAdvertisement(advertisement);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
index 5272182..3168368 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -2,6 +2,7 @@
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+// TODO: add prefix to assure uniqueness.
/**
* MessageSubjects used by GossipDeviceStore peer-peer communication.
*/
@@ -14,4 +15,8 @@
public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
+
+ public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements");
+ // to be used with 3-way anti-entropy process
+ public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InitDeviceDescs.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InitDeviceDescs.java
new file mode 100644
index 0000000..2de2364
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InitDeviceDescs.java
@@ -0,0 +1,23 @@
+package org.onlab.onos.store.device.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+// FIXME: consider removing this class
+public final class InitDeviceDescs
+ implements ConcurrentInitializer<DeviceDescriptions> {
+
+ private final Timestamped<DeviceDescription> deviceDesc;
+
+ public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
+ this.deviceDesc = checkNotNull(deviceDesc);
+ }
+ @Override
+ public DeviceDescriptions get() throws ConcurrentException {
+ return new DeviceDescriptions(deviceDesc);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
index 4214384..344fe73 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -5,6 +5,8 @@
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
+import com.google.common.base.MoreObjects;
+
/**
* Information published by GossipDeviceStore to notify peers of a device
* change event.
@@ -36,6 +38,15 @@
return deviceDescription;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("deviceId", deviceId)
+ .add("deviceDescription", deviceDescription)
+ .toString();
+ }
+
// for serializer
protected InternalDeviceEvent() {
this.providerId = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
index d8942d6..4540efb 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
@@ -3,6 +3,8 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
+import com.google.common.base.MoreObjects;
+
/**
* Information published by GossipDeviceStore to notify peers of a device
* going offline.
@@ -30,6 +32,14 @@
return timestamp;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("deviceId", deviceId)
+ .add("timestamp", timestamp)
+ .toString();
+ }
+
// for serializer
@SuppressWarnings("unused")
private InternalDeviceOfflineEvent() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
index 6c8b905..42cb177 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
@@ -3,6 +3,8 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
+import com.google.common.base.MoreObjects;
+
/**
* Information published by GossipDeviceStore to notify peers of a device
* being administratively removed.
@@ -30,6 +32,14 @@
return timestamp;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("deviceId", deviceId)
+ .add("timestamp", timestamp)
+ .toString();
+ }
+
// for serializer
@SuppressWarnings("unused")
private InternalDeviceRemovedEvent() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
index 64e77ca..d1fc73a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -7,6 +7,8 @@
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
+import com.google.common.base.MoreObjects;
+
/**
* Information published by GossipDeviceStore to notify peers of a port
* change event.
@@ -38,6 +40,15 @@
return portDescriptions;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("deviceId", deviceId)
+ .add("portDescriptions", portDescriptions)
+ .toString();
+ }
+
// for serializer
protected InternalPortEvent() {
this.providerId = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
index 7d3854b..fd154da 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -5,6 +5,8 @@
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
+import com.google.common.base.MoreObjects;
+
/**
* Information published by GossipDeviceStore to notify peers of a port
* status change event.
@@ -36,6 +38,15 @@
return portDescription;
}
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("deviceId", deviceId)
+ .add("portDescription", portDescription)
+ .toString();
+ }
+
// for serializer
protected InternalPortStatusEvent() {
this.providerId = null;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
index 6ec4122..8f0c2b0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -35,6 +35,7 @@
Class<InternalPortStatusEvent> type) {
ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ @SuppressWarnings("unchecked")
Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
return new InternalPortStatusEvent(providerId, deviceId, portDescription);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..00873ad
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyAdvertisement.java
@@ -0,0 +1,57 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+
+/**
+ * Device Advertisement message.
+ */
+public class DeviceAntiEntropyAdvertisement {
+
+ private final NodeId sender;
+ private final Map<DeviceFragmentId, Timestamp> deviceFingerPrints;
+ private final Map<PortFragmentId, Timestamp> portFingerPrints;
+ private final Map<DeviceId, Timestamp> offline;
+
+
+ public DeviceAntiEntropyAdvertisement(NodeId sender,
+ Map<DeviceFragmentId, Timestamp> devices,
+ Map<PortFragmentId, Timestamp> ports,
+ Map<DeviceId, Timestamp> offline) {
+ this.sender = checkNotNull(sender);
+ this.deviceFingerPrints = checkNotNull(devices);
+ this.portFingerPrints = checkNotNull(ports);
+ this.offline = checkNotNull(offline);
+ }
+
+ public NodeId sender() {
+ return sender;
+ }
+
+ public Map<DeviceFragmentId, Timestamp> deviceFingerPrints() {
+ return deviceFingerPrints;
+ }
+
+ public Map<PortFragmentId, Timestamp> ports() {
+ return portFingerPrints;
+ }
+
+ public Map<DeviceId, Timestamp> offline() {
+ return offline;
+ }
+
+ // For serializer
+ @SuppressWarnings("unused")
+ private DeviceAntiEntropyAdvertisement() {
+ this.sender = null;
+ this.deviceFingerPrints = null;
+ this.portFingerPrints = null;
+ this.offline = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyRequest.java
new file mode 100644
index 0000000..6f3096b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceAntiEntropyRequest.java
@@ -0,0 +1,46 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+
+import org.onlab.onos.cluster.NodeId;
+
+/**
+ * Message to request for other peers information.
+ */
+public class DeviceAntiEntropyRequest {
+
+ private final NodeId sender;
+ private final Collection<DeviceFragmentId> devices;
+ private final Collection<PortFragmentId> ports;
+
+ public DeviceAntiEntropyRequest(NodeId sender,
+ Collection<DeviceFragmentId> devices,
+ Collection<PortFragmentId> ports) {
+
+ this.sender = checkNotNull(sender);
+ this.devices = checkNotNull(devices);
+ this.ports = checkNotNull(ports);
+ }
+
+ public NodeId sender() {
+ return sender;
+ }
+
+ public Collection<DeviceFragmentId> devices() {
+ return devices;
+ }
+
+ public Collection<PortFragmentId> ports() {
+ return ports;
+ }
+
+ // For serializer
+ @SuppressWarnings("unused")
+ private DeviceAntiEntropyRequest() {
+ this.sender = null;
+ this.devices = null;
+ this.ports = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceFragmentId.java
new file mode 100644
index 0000000..d4fcda9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/DeviceFragmentId.java
@@ -0,0 +1,54 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for DeviceDesctiption from a Provider.
+ */
+public final class DeviceFragmentId {
+ public final ProviderId providerId;
+ public final DeviceId deviceId;
+
+ public DeviceFragmentId(DeviceId deviceId, ProviderId providerId) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(providerId, deviceId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof DeviceFragmentId)) {
+ return false;
+ }
+ DeviceFragmentId that = (DeviceFragmentId) obj;
+ return Objects.equals(this.deviceId, that.deviceId) &&
+ Objects.equals(this.providerId, that.providerId);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("deviceId", deviceId)
+ .toString();
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private DeviceFragmentId() {
+ this.providerId = null;
+ this.deviceId = null;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/PortFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/PortFragmentId.java
new file mode 100644
index 0000000..8e7bac3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/PortFragmentId.java
@@ -0,0 +1,61 @@
+package org.onlab.onos.store.device.impl.peermsg;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for PortDescription from a Provider.
+ */
+public final class PortFragmentId {
+ public final ProviderId providerId;
+ public final DeviceId deviceId;
+ public final PortNumber portNumber;
+
+ public PortFragmentId(DeviceId deviceId, ProviderId providerId,
+ PortNumber portNumber) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portNumber = portNumber;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(providerId, deviceId, portNumber);
+ };
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof PortFragmentId)) {
+ return false;
+ }
+ PortFragmentId that = (PortFragmentId) obj;
+ return Objects.equals(this.deviceId, that.deviceId) &&
+ Objects.equals(this.portNumber, that.portNumber) &&
+ Objects.equals(this.providerId, that.providerId);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("deviceId", deviceId)
+ .add("portNumber", portNumber)
+ .toString();
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private PortFragmentId() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portNumber = null;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/package-info.java
new file mode 100644
index 0000000..5d9dc4b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/peermsg/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Structure and utilities used for inter-Node messaging.
+ */
+package org.onlab.onos.store.device.impl.peermsg;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
index a59b151..63245a3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
@@ -31,7 +31,6 @@
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.device.impl.VersionedValue;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/VersionedValue.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/link/impl/VersionedValue.java
index a0f485a..391cd1d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/VersionedValue.java
@@ -1,9 +1,10 @@
-package org.onlab.onos.store.device.impl;
+package org.onlab.onos.store.link.impl;
import java.util.Objects;
import org.onlab.onos.store.Timestamp;
+// TODO: remove once we stop using this
/**
* Wrapper class for a entity that is versioned
* and can either be up or down.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
index c0cefd6..e20b4a6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -35,4 +35,4 @@
byte[] payload = input.readBytes(payloadSize);
return new ClusterMessage(sender, subject, payload);
}
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java
new file mode 100644
index 0000000..e0ba906
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.util.KryoPool;
+
+public final class DistributedStoreSerializers {
+
+ /**
+ * KryoPool which can serialize ON.lab misc classes.
+ */
+ public static final KryoPool COMMON = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(Timestamped.class)
+ .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .build();
+
+ // avoid instantiation
+ private DistributedStoreSerializers() {}
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index 0c33cfe..f079874 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -25,6 +25,7 @@
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamp;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
@@ -63,7 +64,9 @@
Port.class,
DefaultPortDescription.class,
Element.class,
- Link.Type.class
+ Link.Type.class,
+ Timestamp.class
+
)
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
index 738086e..3920dd6 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -1,9 +1,6 @@
package org.onlab.onos.store.serializers;
import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.nio.ByteBuffer;
/**
@@ -11,10 +8,8 @@
*/
public class KryoSerializer implements StoreSerializer {
- private final Logger log = LoggerFactory.getLogger(getClass());
protected KryoPool serializerPool;
-
public KryoSerializer() {
setupKryoPool();
}