[ONOS-6267] Support configurable Executors for primitives
- Support user-provided Executors in primitive builders
- Implement default per-partition per-primitive serial executor using a shared thread pool
- Implement Executor wrappers for all primitive types
Change-Id: I53acfb173a9b49a992a9a388983791d9735ed54a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 3150363..bacda00 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.impl;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
@@ -23,6 +24,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -81,6 +84,9 @@
private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+ private final ExecutorService sharedPrimitiveExecutor = Executors.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(),
+ groupedThreads("onos/primitives", "primitive-events", log));
@Activate
public void activate() {
@@ -93,6 +99,7 @@
messagingService,
clusterService,
CatalystSerializers.getSerializer(),
+ sharedPrimitiveExecutor,
new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()