Run Anti-Entropy in background
Change-Id: I233185d15f52359899427e214339be44cb62971c
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index aae808a..d11fa11 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -89,6 +89,7 @@
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static com.google.common.base.Verify.verify;
+import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
@@ -159,7 +160,7 @@
}
};
- private ScheduledExecutorService executor;
+ private ScheduledExecutorService backgroundExecutor;
@Activate
public void activate() {
@@ -177,14 +178,14 @@
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
- executor =
- newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
+ backgroundExecutor =
+ newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
- executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
@@ -193,9 +194,9 @@
@Deactivate
public void deactivate() {
- executor.shutdownNow();
+ backgroundExecutor.shutdownNow();
try {
- boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
+ boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
if (timedout) {
log.error("Timeout during executor shutdown");
}
@@ -1359,7 +1360,17 @@
public void handle(ClusterMessage message) {
log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- handleAdvertisement(advertisement);
+ backgroundExecutor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handleAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling Device advertisements.", e);
+ }
+ }
+ });
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index 1b61d86..41d72c3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -23,6 +23,7 @@
import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
import static org.onlab.util.Tools.namedThreads;
+import static org.onlab.util.Tools.minPriority;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
@@ -136,7 +137,7 @@
}
};
- private ScheduledExecutorService executor;
+ private ScheduledExecutorService backgroundExecutor;
@Activate
public void activate() {
@@ -150,14 +151,14 @@
GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
new InternalHostAntiEntropyAdvertisementListener());
- executor =
- newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+ backgroundExecutor =
+ newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
- executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
@@ -165,9 +166,9 @@
@Deactivate
public void deactivate() {
- executor.shutdownNow();
+ backgroundExecutor.shutdownNow();
try {
- if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("Timeout during executor shutdown");
}
} catch (InterruptedException e) {
@@ -642,7 +643,17 @@
public void handle(ClusterMessage message) {
log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- handleAntiEntropyAdvertisement(advertisement);
+ backgroundExecutor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling Host advertisements", e);
+ }
+ }
+ });
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 63ee874..c465866 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
+
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -87,6 +88,7 @@
import static org.onlab.onos.net.LinkKey.linkKey;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
+import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -139,7 +141,7 @@
}
};
- private ScheduledExecutorService executor;
+ private ScheduledExecutorService backgroundExecutors;
@Activate
public void activate() {
@@ -154,14 +156,14 @@
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
- executor =
- newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+ backgroundExecutors =
+ newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
- executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
@@ -170,9 +172,9 @@
@Deactivate
public void deactivate() {
- executor.shutdownNow();
+ backgroundExecutors.shutdownNow();
try {
- if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("Timeout during executor shutdown");
}
} catch (InterruptedException e) {
@@ -794,7 +796,18 @@
public void handle(ClusterMessage message) {
log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- handleAntiEntropyAdvertisement(advertisement);
+ backgroundExecutors.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown while handling Link advertisements", e);
+ throw e;
+ }
+ }
+ });
}
}
}
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index b6a76d0..2b261a6 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -46,6 +46,19 @@
}
/**
+ * Returns a thread factory that produces threads with MIN_PRIORITY.
+ *
+ * @param factory backing ThreadFactory
+ * @return thread factory
+ */
+ public static ThreadFactory minPriority(ThreadFactory factory) {
+ return new ThreadFactoryBuilder()
+ .setThreadFactory(factory)
+ .setPriority(Thread.MIN_PRIORITY)
+ .build();
+ }
+
+ /**
* Converts a string from hex to long.
*
* @param string hex number in string form; sans 0x