Use a ScheduleExecutorService in CopycatTransportService instead of creating a new thread per connection

Change-Id: Ic075209093e89e2502fb750d1a79509d6fcccc19
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 03a5a71..36a1958 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -23,6 +23,8 @@
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
@@ -37,6 +39,7 @@
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Connection;
 import io.atomix.catalyst.transport.Server;
+import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
 import io.atomix.catalyst.util.concurrent.SingleThreadContext;
 import io.atomix.catalyst.util.concurrent.ThreadContext;
 
@@ -48,6 +51,7 @@
     private final Logger log = getLogger(getClass());
     private final AtomicBoolean listening = new AtomicBoolean(false);
     private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
+    private final ScheduledExecutorService executorService;
     private final PartitionId partitionId;
     private final MessagingService messagingService;
     private final String messageSubject;
@@ -57,6 +61,8 @@
         this.partitionId = checkNotNull(partitionId);
         this.messagingService = checkNotNull(messagingService);
         this.messageSubject = String.format("onos-copycat-%s", partitionId);
+        this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
+                new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
     }
 
     @Override
@@ -105,6 +111,7 @@
     @Override
     public CompletableFuture<Void> close() {
         messagingService.unregisterHandler(messageSubject);
+        executorService.shutdown();
         return CompletableFuture.completedFuture(null);
     }
 
@@ -116,6 +123,6 @@
         if (context != null) {
             return context;
         }
-        return new SingleThreadContext("copycat-transport-server-" + partitionId, parentContext.serializer().clone());
+        return new SingleThreadContext(executorService, parentContext.serializer().clone());
     }
 }