Run Anti-Entropy in background
Change-Id: I233185d15f52359899427e214339be44cb62971c
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 63ee874..c465866 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
+
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -87,6 +88,7 @@
import static org.onlab.onos.net.LinkKey.linkKey;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
+import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -139,7 +141,7 @@
}
};
- private ScheduledExecutorService executor;
+ private ScheduledExecutorService backgroundExecutors;
@Activate
public void activate() {
@@ -154,14 +156,14 @@
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
- executor =
- newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+ backgroundExecutors =
+ newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
- executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
@@ -170,9 +172,9 @@
@Deactivate
public void deactivate() {
- executor.shutdownNow();
+ backgroundExecutors.shutdownNow();
try {
- if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
log.error("Timeout during executor shutdown");
}
} catch (InterruptedException e) {
@@ -794,7 +796,18 @@
public void handle(ClusterMessage message) {
log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- handleAntiEntropyAdvertisement(advertisement);
+ backgroundExecutors.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown while handling Link advertisements", e);
+ throw e;
+ }
+ }
+ });
}
}
}