[ONOS-6267] Support configurable Executors for primitives
- Support user-provided Executors in primitive builders
- Implement default per-partition per-primitive serial executor using a shared thread pool
- Implement Executor wrappers for all primitive types

Change-Id: I53acfb173a9b49a992a9a388983791d9735ed54a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 3150363..bacda00 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.store.primitives.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.File;
@@ -23,6 +24,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -81,6 +84,9 @@
     private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
     private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
     private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+    private final ExecutorService sharedPrimitiveExecutor = Executors.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(),
+            groupedThreads("onos/primitives", "primitive-events", log));
 
     @Activate
     public void activate() {
@@ -93,6 +99,7 @@
                                messagingService,
                                clusterService,
                                CatalystSerializers.getSerializer(),
+                               sharedPrimitiveExecutor,
                                new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
 
         CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()