Adding BoundedThreadPool and BlockingBoolean
Updating EventuallyConsistentMap to use BoundedThreadPool for broadcast threads,
and disabling anti-entropy for now.
Change-Id: Id1bfcdaf1d0a19745fe7336e4ac9eaf649871d5d
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index ead1ab0..e5c3e40 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -53,6 +53,12 @@
<dependency>
<groupId>org.onosproject</groupId>
+ <artifactId>onlab-misc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
<artifactId>onos-core-common</artifactId>
</dependency>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8de348a..eecb20d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -54,8 +54,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.minPriority;
/**
* Distributed Map implementation which uses optimistic replication and gossip
@@ -149,16 +149,23 @@
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
- executor = Executors //FIXME
- .newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+ // should be a normal executor; it's used for receiving messages
+ //TODO make # of threads configurable
+ executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
- broadcastMessageExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
+ // sending executor; should be capped
+ //TODO make # of threads configurable
+ broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify"));
+ newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify"));
backgroundExecutor =
- newSingleThreadScheduledExecutor(minPriority(
- groupedThreads("onos/ecm", mapName + "-bg-%d")));
+ //FIXME anti-entropy can take >60 seconds and it blocks fg workers
+ // ... dropping minPriority to try to help until this can be parallel
+ newSingleThreadScheduledExecutor(//minPriority(
+ groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
// start anti-entropy thread
+ //TODO disable anti-entropy for now in testing (it is unstable)
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec,
TimeUnit.SECONDS);
@@ -494,8 +501,8 @@
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
- //broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
- clusterCommunicator.broadcast(message);
+ broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
+// clusterCommunicator.broadcast(message);
}
private void unicastMessage(NodeId peer,