Make sure message handling executor references are appropriately managed when we modify the service configuration

Change-Id: I56866dd8c3359de0fcf827fc247024c65a63c5c2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 85015c9..781e24b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -180,6 +180,8 @@
 
     private IdGenerator idGenerator;
 
+    private NodeId local;
+
     @Activate
     public void activate(ComponentContext context) {
         configService.registerProperties(getClass());
@@ -188,71 +190,17 @@
 
         idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
 
+        local = clusterService.getLocalNode().id();
+
         // Cache to create SMap on demand
         smaps = CacheBuilder.newBuilder()
                 .softValues()
                 .build(new SMapLoader());
 
-        final NodeId local = clusterService.getLocalNode().id();
-
         messageHandlingExecutor = Executors.newFixedThreadPool(
                 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
 
-        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
-
-        clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
-            @Override
-            public void handle(ClusterMessage message) {
-                FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
-                log.trace("received completed notification for {}", event);
-                notifyDelegate(event);
-            }
-        }, messageHandlingExecutor);
-
-        clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
-
-            @Override
-            public void handle(ClusterMessage message) {
-                FlowRule rule = SERIALIZER.decode(message.payload());
-                log.trace("received get flow entry request for {}", rule);
-                FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
-                try {
-                    message.respond(SERIALIZER.encode(flowEntry));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
-            }
-        }, messageHandlingExecutor);
-
-        clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
-
-            @Override
-            public void handle(ClusterMessage message) {
-                DeviceId deviceId = SERIALIZER.decode(message.payload());
-                log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
-                Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
-                try {
-                    message.respond(SERIALIZER.encode(flowEntries));
-                } catch (IOException e) {
-                    log.error("Failed to respond to peer's getFlowEntries request", e);
-                }
-            }
-        }, messageHandlingExecutor);
-
-        clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
-
-            @Override
-            public void handle(ClusterMessage message) {
-                FlowEntry rule = SERIALIZER.decode(message.payload());
-                log.trace("received get flow entry request for {}", rule);
-                FlowRuleEvent event = removeFlowRuleInternal(rule);
-                try {
-                    message.respond(SERIALIZER.encode(event));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
-            }
-        }, messageHandlingExecutor);
+        registerMessageHandlers(messageHandlingExecutor);
 
         replicaInfoEventListener = new InternalReplicaInfoEventListener();
 
@@ -264,12 +212,8 @@
     @Deactivate
     public void deactivate(ComponentContext context) {
         configService.unregisterProperties(getClass(), false);
-        clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
-        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
-        clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
-        clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
-        clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
-        messageHandlingExecutor.shutdown();
+        unregisterMessageHandlers();
+        messageHandlingExecutor.shutdownNow();
         replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }
@@ -297,15 +241,87 @@
             newBackupEnabled = DEFAULT_BACKUP_ENABLED;
         }
 
-        if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
-            msgHandlerPoolSize = newPoolSize;
+        if (newBackupEnabled != backupEnabled) {
             backupEnabled = newBackupEnabled;
+        }
+        if (newPoolSize != msgHandlerPoolSize) {
+            msgHandlerPoolSize = newPoolSize;
             ExecutorService oldMsgHandler = messageHandlingExecutor;
             messageHandlingExecutor = Executors.newFixedThreadPool(
                     msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
+
+            // replace previously registered handlers.
+            registerMessageHandlers(messageHandlingExecutor);
             oldMsgHandler.shutdown();
-            logConfig("Reconfigured");
         }
+        logConfig("Reconfigured");
+    }
+
+    private void registerMessageHandlers(ExecutorService executor) {
+
+        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), executor);
+
+        clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
+                log.trace("received completed notification for {}", event);
+                notifyDelegate(event);
+            }
+        }, executor);
+
+        clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRule rule = SERIALIZER.decode(message.payload());
+                log.trace("received get flow entry request for {}", rule);
+                FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
+                try {
+                    message.respond(SERIALIZER.encode(flowEntry));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        }, executor);
+
+        clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                DeviceId deviceId = SERIALIZER.decode(message.payload());
+                log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
+                Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
+                try {
+                    message.respond(SERIALIZER.encode(flowEntries));
+                } catch (IOException e) {
+                    log.error("Failed to respond to peer's getFlowEntries request", e);
+                }
+            }
+        }, executor);
+
+        clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowEntry rule = SERIALIZER.decode(message.payload());
+                log.trace("received get flow entry request for {}", rule);
+                FlowRuleEvent event = removeFlowRuleInternal(rule);
+                try {
+                    message.respond(SERIALIZER.encode(event));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        }, executor);
+    }
+
+    private void unregisterMessageHandlers() {
+        clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
+        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+        clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
+        clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
+        clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
     }
 
     private void logConfig(String prefix) {
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 6628bb2..099880d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -200,12 +200,12 @@
 
     @Override
     public void registerHandler(String type, MessageHandler handler) {
-        handlers.putIfAbsent(type, handler);
+        handlers.put(type, handler);
     }
 
     @Override
     public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
-        handlers.putIfAbsent(type, new MessageHandler() {
+        handlers.put(type, new MessageHandler() {
             @Override
             public void handle(Message message) throws IOException {
                 executor.submit(() -> {