Device Anti-Entropy
- create Advertisement
- handler for Advertisement
- register handler, background thread to send advertisement
Change-Id: I99e8a7d68747970c34b3c25c6d0489769d251446
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 2221ea0..5319749 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -5,8 +5,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -14,6 +13,8 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
@@ -23,9 +24,6 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.SparseAnnotations;
-import org.onlab.onos.net.device.DefaultDeviceDescription;
-import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStore;
@@ -38,18 +36,22 @@
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
-import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
-import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
+import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
+import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -57,19 +59,21 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.DefaultAnnotations.merge;
-import static org.onlab.onos.net.DefaultAnnotations.union;
import static com.google.common.base.Verify.verify;
+import static org.onlab.util.Tools.namedThreads;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
+import static org.onlab.onos.store.common.impl.ControllerNodeToNodeId.toNodeId;
// TODO: give me a better name
/**
@@ -86,8 +90,9 @@
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
- // TODO: Check if inner Map can be replaced with plain Map
+ // TODO: Check if inner Map can be replaced with plain Map.
// innerMap is used to lock a Device, thus instance should never be replaced.
+
// collection of Description given from various providers
private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>>
@@ -117,21 +122,23 @@
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
- .register(KryoPoolUtil.API)
+ .register(DistributedStoreSerializers.COMMON)
+
.register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
.register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
.register(InternalDeviceRemovedEvent.class)
.register(InternalPortEvent.class, new InternalPortEventSerializer())
.register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
- .register(Timestamp.class)
- .register(Timestamped.class)
- .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .register(DeviceAntiEntropyAdvertisement.class)
+ .register(DeviceFragmentId.class)
+ .register(PortFragmentId.class)
.build()
.populate(1);
}
-
};
+ private ScheduledExecutorService executor;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
@@ -144,11 +151,35 @@
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
+
+ executor =
+ newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
+
+ // TODO: Make these configurable
+ long initialDelaySec = 5;
+ long periodSec = 5;
+ // start anti-entropy thread
+ executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, periodSec, TimeUnit.SECONDS);
+
log.info("Started");
}
@Deactivate
public void deactivate() {
+
+ executor.shutdownNow();
+ try {
+ boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
+ if (timedout) {
+ log.error("Timeout during executor shutdown");
+ }
+ } catch (InterruptedException e) {
+ log.error("Error during executor shutdown", e);
+ }
+
deviceDescs.clear();
devices.clear();
devicePorts.clear();
@@ -543,14 +574,14 @@
final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
if (existingPortDesc == null ||
- deltaDesc == existingPortDesc ||
deltaDesc.isNewer(existingPortDesc)) {
// on new port or valid update
// update description
descs.putPortDesc(deltaDesc);
newPort = composePort(device, number, descsMap);
} else {
- // outdated event, ignored.
+ // same or outdated event, ignored.
+ log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
return null;
}
@@ -627,6 +658,14 @@
}
}
+ /**
+ * Checks if given timestamp is superseded by removal request
+ * with more recent timestamp.
+ *
+ * @param deviceId identifier of a device
+ * @param timestampToCheck timestamp of an event to check
+ * @return true if device is already removed
+ */
private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
Timestamp removalTimestamp = removalRequest.get(deviceId);
if (removalTimestamp != null &&
@@ -667,7 +706,7 @@
continue;
}
// TODO: should keep track of Description timestamp
- // and only merge conflicting keys when timestamp is newer
+ // and only merge conflicting keys when timestamp is newer.
// Currently assuming there will never be a key conflict between
// providers
@@ -708,7 +747,7 @@
continue;
}
// TODO: should keep track of Description timestamp
- // and only merge conflicting keys when timestamp is newer
+ // and only merge conflicting keys when timestamp is newer.
// Currently assuming there will never be a key conflict between
// providers
@@ -745,129 +784,258 @@
return providerDescs.get(pid);
}
- public static final class InitDeviceDescs
- implements ConcurrentInitializer<DeviceDescriptions> {
-
- private final Timestamped<DeviceDescription> deviceDesc;
-
- public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
- this.deviceDesc = checkNotNull(deviceDesc);
- }
- @Override
- public DeviceDescriptions get() throws ConcurrentException {
- return new DeviceDescriptions(deviceDesc);
- }
+ // TODO: should we be throwing exception?
+ private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.unicast(message, recipient);
}
-
- /**
- * Collection of Description of a Device and it's Ports given from a Provider.
- */
- public static class DeviceDescriptions {
-
- private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
- private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
-
- public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
- this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
- this.portDescs = new ConcurrentHashMap<>();
- }
-
- Timestamp getLatestTimestamp() {
- Timestamp latest = deviceDesc.get().timestamp();
- for (Timestamped<PortDescription> desc : portDescs.values()) {
- if (desc.timestamp().compareTo(latest) > 0) {
- latest = desc.timestamp();
- }
- }
- return latest;
- }
-
- public Timestamped<DeviceDescription> getDeviceDesc() {
- return deviceDesc.get();
- }
-
- public Timestamped<PortDescription> getPortDesc(PortNumber number) {
- return portDescs.get(number);
- }
-
- /**
- * Puts DeviceDescription, merging annotations as necessary.
- *
- * @param newDesc new DeviceDescription
- * @return previous DeviceDescription
- */
- public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
- Timestamped<DeviceDescription> oldOne = deviceDesc.get();
- Timestamped<DeviceDescription> newOne = newDesc;
- if (oldOne != null) {
- SparseAnnotations merged = union(oldOne.value().annotations(),
- newDesc.value().annotations());
- newOne = new Timestamped<DeviceDescription>(
- new DefaultDeviceDescription(newDesc.value(), merged),
- newDesc.timestamp());
- }
- return deviceDesc.getAndSet(newOne);
- }
-
- /**
- * Puts PortDescription, merging annotations as necessary.
- *
- * @param newDesc new PortDescription
- * @return previous PortDescription
- */
- public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
- Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
- Timestamped<PortDescription> newOne = newDesc;
- if (oldOne != null) {
- SparseAnnotations merged = union(oldOne.value().annotations(),
- newDesc.value().annotations());
- newOne = new Timestamped<PortDescription>(
- new DefaultPortDescription(newDesc.value(), merged),
- newDesc.timestamp());
- }
- return portDescs.put(newOne.value().portNumber(), newOne);
- }
+ // TODO: should we be throwing exception?
+ private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
}
private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
}
private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.PORT_UPDATE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+ }
+
+ private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalPortEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
+ try {
+ unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
+ } catch (IOException e) {
+ log.error("Failed to send" + event + " to " + recipient, e);
+ }
+ }
+
+ private DeviceAntiEntropyAdvertisement createAdvertisement() {
+ final NodeId self = clusterService.getLocalNode().id();
+
+ Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size());
+ final int portsPerDevice = 8; // random guess to minimize reallocation
+ Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice);
+ Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size());
+
+ for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>>
+ provs : deviceDescs.entrySet()) {
+
+ final DeviceId deviceId = provs.getKey();
+ final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue();
+ synchronized (devDescs) {
+
+ offline.put(deviceId, this.offline.get(deviceId));
+
+ for (Entry<ProviderId, DeviceDescriptions>
+ prov : devDescs.entrySet()) {
+
+ final ProviderId provId = prov.getKey();
+ final DeviceDescriptions descs = prov.getValue();
+
+ devices.put(new DeviceFragmentId(deviceId, provId),
+ descs.getDeviceDesc().timestamp());
+
+ for (Entry<PortNumber, Timestamped<PortDescription>>
+ portDesc : descs.getPortDescs().entrySet()) {
+
+ final PortNumber number = portDesc.getKey();
+ ports.put(new PortFragmentId(deviceId, provId, number),
+ portDesc.getValue().timestamp());
+ }
+ }
+ }
+ }
+
+ return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline);
+ }
+
+ /**
+ * Responds to anti-entropy advertisement message.
+ * <P>
+ * Notify sender about out-dated information using regular replication message.
+ * Send back advertisement to sender if not in sync.
+ *
+ * @param advertisement to respond to
+ */
+ private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
+
+ final NodeId sender = advertisement.sender();
+
+ Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
+ Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
+ Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
+
+ // Fragments to request
+ Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
+ Collection<PortFragmentId> reqPorts = new ArrayList<>();
+
+ for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
+ final DeviceId deviceId = de.getKey();
+ final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
+
+ synchronized (lDevice) {
+ // latestTimestamp across provider
+ // Note: can be null initially
+ Timestamp localLatest = offline.get(deviceId);
+
+ // handle device Ads
+ for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
+ final ProviderId provId = prov.getKey();
+ final DeviceDescriptions lDeviceDescs = prov.getValue();
+
+ final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
+
+
+ Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
+ Timestamp advDevTimestamp = devAds.get(devFragId);
+
+ if (advDevTimestamp == null || lProvDevice.isNewer(advDevTimestamp)) {
+ // remote does not have it or outdated, suggest
+ notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
+ } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
+ // local is outdated, request
+ reqDevices.add(devFragId);
+ }
+
+ // handle port Ads
+ for (Entry<PortNumber, Timestamped<PortDescription>>
+ pe : lDeviceDescs.getPortDescs().entrySet()) {
+
+ final PortNumber num = pe.getKey();
+ final Timestamped<PortDescription> lPort = pe.getValue();
+
+ final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
+
+ Timestamp advPortTimestamp = portAds.get(portFragId);
+ if ( advPortTimestamp == null || lPort.isNewer(advPortTimestamp)) {
+ // remote does not have it or outdated, suggest
+ notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
+ } else if (!lPort.timestamp().equals(advPortTimestamp)) {
+ // local is outdated, request
+ log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
+ reqPorts.add(portFragId);
+ }
+
+ // remove port Ad already processed
+ portAds.remove(portFragId);
+ } // end local port loop
+
+ // remove device Ad already processed
+ devAds.remove(devFragId);
+
+ // find latest and update
+ final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
+ if (localLatest == null ||
+ providerLatest.compareTo(localLatest) > 0) {
+ localLatest = providerLatest;
+ }
+ } // end local provider loop
+
+ // checking if remote timestamp is more recent.
+ Timestamp rOffline = offlineAds.get(deviceId);
+ if (rOffline != null &&
+ rOffline.compareTo(localLatest) > 0) {
+ // remote offline timestamp suggests that the
+ // device is off-line
+ markOfflineInternal(deviceId, rOffline);
+ }
+
+ Timestamp lOffline = offline.get(deviceId);
+ if (lOffline != null && rOffline == null) {
+ // locally offline, but remote is online, suggest offline
+ notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
+ }
+
+ // remove device offline Ad already processed
+ offlineAds.remove(deviceId);
+ } // end local device loop
+ } // device lock
+
+ // If there is any Ads left, request them
+ log.trace("Ads left {}, {}", devAds, portAds);
+ reqDevices.addAll(devAds.keySet());
+ reqPorts.addAll(portAds.keySet());
+
+ if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
+ log.trace("Nothing to request to remote peer {}", sender);
+ return;
+ }
+
+ log.info("Need to sync {} {}", reqDevices, reqPorts);
+
+ // 2-way Anti-Entropy for now
+ try {
+ unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
+ } catch (IOException e) {
+ log.error("Failed to send response advertisement to " + sender, e);
+ }
+
+// Sketch of 3-way Anti-Entropy
+// DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
+// ClusterMessage message = new ClusterMessage(
+// clusterService.getLocalNode().id(),
+// GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
+// SERIALIZER.encode(request));
+//
+// try {
+// clusterCommunicator.unicast(message, advertisement.sender());
+// } catch (IOException e) {
+// log.error("Failed to send advertisement reply to "
+// + advertisement.sender(), e);
+// }
}
private void notifyDelegateIfNotNull(DeviceEvent event) {
@@ -876,6 +1044,54 @@
}
}
+ private final class SendAdvertisementTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ final NodeId self = clusterService.getLocalNode().id();
+ Set<ControllerNode> nodes = clusterService.getNodes();
+
+ ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+ .transform(toNodeId())
+ .toList();
+
+ if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+ log.info("No other peers in the cluster.");
+ return;
+ }
+
+ NodeId peer;
+ do {
+ int idx = RandomUtils.nextInt(0, nodeIds.size());
+ peer = nodeIds.get(idx);
+ } while (peer.equals(self));
+
+ DeviceAntiEntropyAdvertisement ad = createAdvertisement();
+
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ unicastMessage(peer, DEVICE_ADVERTISE, ad);
+ } catch (IOException e) {
+ log.error("Failed to send anti-entropy advertisement", e);
+ return;
+ }
+ } catch (Exception e) {
+ // catch all Exception to avoid Scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
+ }
+ }
+ }
+
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
@@ -940,6 +1156,7 @@
log.info("Received port status update event from peer: {}", message.sender());
InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+ log.info("{}", event);
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
@@ -948,4 +1165,15 @@
notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
}
}
+
+ private final class InternalDeviceAdvertisementListener
+ implements ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ log.info("Received Device advertisement from peer: {}", message.sender());
+ DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+ handleAdvertisement(advertisement);
+ }
+ }
}