Run Anti-Entropy in background
Change-Id: I233185d15f52359899427e214339be44cb62971c
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index aae808a..d11fa11 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -89,6 +89,7 @@
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static com.google.common.base.Verify.verify;
+import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
@@ -159,7 +160,7 @@
}
};
- private ScheduledExecutorService executor;
+ private ScheduledExecutorService backgroundExecutor;
@Activate
public void activate() {
@@ -177,14 +178,14 @@
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
- executor =
- newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
+ backgroundExecutor =
+ newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
- executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
@@ -193,9 +194,9 @@
@Deactivate
public void deactivate() {
- executor.shutdownNow();
+ backgroundExecutor.shutdownNow();
try {
- boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
+ boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
if (timedout) {
log.error("Timeout during executor shutdown");
}
@@ -1359,7 +1360,17 @@
public void handle(ClusterMessage message) {
log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- handleAdvertisement(advertisement);
+ backgroundExecutor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handleAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling Device advertisements.", e);
+ }
+ }
+ });
}
}
}