Run Anti-Entropy in background

Change-Id: I233185d15f52359899427e214339be44cb62971c
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index aae808a..d11fa11 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -89,6 +89,7 @@
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 import static org.onlab.onos.net.DefaultAnnotations.merge;
 import static com.google.common.base.Verify.verify;
+import static org.onlab.util.Tools.minPriority;
 import static org.onlab.util.Tools.namedThreads;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
@@ -159,7 +160,7 @@
         }
     };
 
-    private ScheduledExecutorService executor;
+    private ScheduledExecutorService backgroundExecutor;
 
     @Activate
     public void activate() {
@@ -177,14 +178,14 @@
         clusterCommunicator.addSubscriber(
                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
 
-        executor =
-                newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d"));
+        backgroundExecutor =
+                newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
 
         // TODO: Make these configurable
         long initialDelaySec = 5;
         long periodSec = 5;
         // start anti-entropy thread
-        executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                     initialDelaySec, periodSec, TimeUnit.SECONDS);
 
         log.info("Started");
@@ -193,9 +194,9 @@
     @Deactivate
     public void deactivate() {
 
-        executor.shutdownNow();
+        backgroundExecutor.shutdownNow();
         try {
-            boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS);
+            boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
             if (timedout) {
                 log.error("Timeout during executor shutdown");
             }
@@ -1359,7 +1360,17 @@
         public void handle(ClusterMessage message) {
             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
             DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            handleAdvertisement(advertisement);
+            backgroundExecutor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        handleAdvertisement(advertisement);
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling Device advertisements.", e);
+                    }
+                }
+            });
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index 1b61d86..41d72c3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -23,6 +23,7 @@
 import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
 import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
 import static org.onlab.util.Tools.namedThreads;
+import static org.onlab.util.Tools.minPriority;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.IOException;
@@ -136,7 +137,7 @@
         }
     };
 
-    private ScheduledExecutorService executor;
+    private ScheduledExecutorService backgroundExecutor;
 
     @Activate
     public void activate() {
@@ -150,14 +151,14 @@
                 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
                 new InternalHostAntiEntropyAdvertisementListener());
 
-        executor =
-                newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+        backgroundExecutor =
+                newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
 
         // TODO: Make these configurable
         long initialDelaySec = 5;
         long periodSec = 5;
         // start anti-entropy thread
-        executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                     initialDelaySec, periodSec, TimeUnit.SECONDS);
 
         log.info("Started");
@@ -165,9 +166,9 @@
 
     @Deactivate
     public void deactivate() {
-        executor.shutdownNow();
+        backgroundExecutor.shutdownNow();
         try {
-            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+            if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                 log.error("Timeout during executor shutdown");
             }
         } catch (InterruptedException e) {
@@ -642,7 +643,17 @@
         public void handle(ClusterMessage message) {
             log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
             HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            handleAntiEntropyAdvertisement(advertisement);
+            backgroundExecutor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        handleAntiEntropyAdvertisement(advertisement);
+                    } catch (Exception e) {
+                        log.warn("Exception thrown handling Host advertisements", e);
+                    }
+                }
+            });
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 63ee874..c465866 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.SetMultimap;
+
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -87,6 +88,7 @@
 import static org.onlab.onos.net.LinkKey.linkKey;
 import static org.onlab.onos.net.link.LinkEvent.Type.*;
 import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
+import static org.onlab.util.Tools.minPriority;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -139,7 +141,7 @@
         }
     };
 
-    private ScheduledExecutorService executor;
+    private ScheduledExecutorService backgroundExecutors;
 
     @Activate
     public void activate() {
@@ -154,14 +156,14 @@
                 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
                 new InternalLinkAntiEntropyAdvertisementListener());
 
-        executor =
-                newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+        backgroundExecutors =
+                newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
 
         // TODO: Make these configurable
         long initialDelaySec = 5;
         long periodSec = 5;
         // start anti-entropy thread
-        executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+        backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
                     initialDelaySec, periodSec, TimeUnit.SECONDS);
 
         log.info("Started");
@@ -170,9 +172,9 @@
     @Deactivate
     public void deactivate() {
 
-        executor.shutdownNow();
+        backgroundExecutors.shutdownNow();
         try {
-            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+            if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
                 log.error("Timeout during executor shutdown");
             }
         } catch (InterruptedException e) {
@@ -794,7 +796,18 @@
         public void handle(ClusterMessage message) {
             log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
             LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            handleAntiEntropyAdvertisement(advertisement);
+            backgroundExecutors.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        handleAntiEntropyAdvertisement(advertisement);
+                    } catch (Exception e) {
+                        log.warn("Exception thrown while handling Link advertisements", e);
+                        throw e;
+                    }
+                }
+            });
         }
     }
 }
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index b6a76d0..2b261a6 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -46,6 +46,19 @@
     }
 
     /**
+     * Returns a thread factory that produces threads with MIN_PRIORITY.
+     *
+     * @param factory backing ThreadFactory
+     * @return thread factory
+     */
+    public static ThreadFactory minPriority(ThreadFactory factory) {
+        return new ThreadFactoryBuilder()
+                    .setThreadFactory(factory)
+                    .setPriority(Thread.MIN_PRIORITY)
+                    .build();
+    }
+
+    /**
      * Converts a string from hex to long.
      *
      * @param string hex number in string form; sans 0x