Adding BoundedThreadPool and BlockingBoolean

Updating EventuallyConsistentMap to use BoundedThreadPool for broadcast threads,
and disabling anti-entropy for now.

Change-Id: Id1bfcdaf1d0a19745fe7336e4ac9eaf649871d5d
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index ead1ab0..e5c3e40 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -53,6 +53,12 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onlab-misc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onos-core-common</artifactId>
         </dependency>
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8de348a..eecb20d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -54,8 +54,8 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.minPriority;
 
 /**
  * Distributed Map implementation which uses optimistic replication and gossip
@@ -149,16 +149,23 @@
         items = new ConcurrentHashMap<>();
         removedItems = new ConcurrentHashMap<>();
 
-        executor = Executors //FIXME
-                .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+        // should be a normal executor; it's used for receiving messages
+        //TODO make # of threads configurable
+        executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
 
-        broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
+        // sending executor; should be capped
+        //TODO make # of threads configurable
+        broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
+                newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
 
         backgroundExecutor =
-                newSingleThreadScheduledExecutor(minPriority(
-                        groupedThreads("onos/ecm", mapName + "-bg-%d")));
+                //FIXME anti-entropy can take >60 seconds and it blocks fg workers
+                // ... dropping minPriority to try to help until this can be parallel
+                newSingleThreadScheduledExecutor(//minPriority(
+                        groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
 
         // start anti-entropy thread
+        //TODO disable anti-entropy for now in testing (it is unstable)
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                                                initialDelaySec, periodSec,
                                                TimeUnit.SECONDS);
@@ -494,8 +501,8 @@
                 clusterService.getLocalNode().id(),
                 subject,
                 serializer.encode(event));
-        //broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
-        clusterCommunicator.broadcast(message);
+        broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
+//        clusterCommunicator.broadcast(message);
     }
 
     private void unicastMessage(NodeId peer,