Make sure message handling executor references are appropriately managed when we modify the service configuration
Change-Id: I56866dd8c3359de0fcf827fc247024c65a63c5c2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 85015c9..781e24b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -180,6 +180,8 @@
private IdGenerator idGenerator;
+ private NodeId local;
+
@Activate
public void activate(ComponentContext context) {
configService.registerProperties(getClass());
@@ -188,71 +190,17 @@
idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
+ local = clusterService.getLocalNode().id();
+
// Cache to create SMap on demand
smaps = CacheBuilder.newBuilder()
.softValues()
.build(new SMapLoader());
- final NodeId local = clusterService.getLocalNode().id();
-
messageHandlingExecutor = Executors.newFixedThreadPool(
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
- clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
-
- clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
- @Override
- public void handle(ClusterMessage message) {
- FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
- log.trace("received completed notification for {}", event);
- notifyDelegate(event);
- }
- }, messageHandlingExecutor);
-
- clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- FlowRule rule = SERIALIZER.decode(message.payload());
- log.trace("received get flow entry request for {}", rule);
- FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
- try {
- message.respond(SERIALIZER.encode(flowEntry));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- }
- }, messageHandlingExecutor);
-
- clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER.decode(message.payload());
- log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
- Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
- try {
- message.respond(SERIALIZER.encode(flowEntries));
- } catch (IOException e) {
- log.error("Failed to respond to peer's getFlowEntries request", e);
- }
- }
- }, messageHandlingExecutor);
-
- clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- FlowEntry rule = SERIALIZER.decode(message.payload());
- log.trace("received get flow entry request for {}", rule);
- FlowRuleEvent event = removeFlowRuleInternal(rule);
- try {
- message.respond(SERIALIZER.encode(event));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- }
- }, messageHandlingExecutor);
+ registerMessageHandlers(messageHandlingExecutor);
replicaInfoEventListener = new InternalReplicaInfoEventListener();
@@ -264,12 +212,8 @@
@Deactivate
public void deactivate(ComponentContext context) {
configService.unregisterProperties(getClass(), false);
- clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
- clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
- clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
- messageHandlingExecutor.shutdown();
+ unregisterMessageHandlers();
+ messageHandlingExecutor.shutdownNow();
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
@@ -297,15 +241,87 @@
newBackupEnabled = DEFAULT_BACKUP_ENABLED;
}
- if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
- msgHandlerPoolSize = newPoolSize;
+ if (newBackupEnabled != backupEnabled) {
backupEnabled = newBackupEnabled;
+ }
+ if (newPoolSize != msgHandlerPoolSize) {
+ msgHandlerPoolSize = newPoolSize;
ExecutorService oldMsgHandler = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
+
+ // replace previously registered handlers.
+ registerMessageHandlers(messageHandlingExecutor);
oldMsgHandler.shutdown();
- logConfig("Reconfigured");
}
+ logConfig("Reconfigured");
+ }
+
+ private void registerMessageHandlers(ExecutorService executor) {
+
+ clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), executor);
+
+ clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
+ log.trace("received completed notification for {}", event);
+ notifyDelegate(event);
+ }
+ }, executor);
+
+ clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRule rule = SERIALIZER.decode(message.payload());
+ log.trace("received get flow entry request for {}", rule);
+ FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
+ try {
+ message.respond(SERIALIZER.encode(flowEntry));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ }, executor);
+
+ clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ DeviceId deviceId = SERIALIZER.decode(message.payload());
+ log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
+ Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
+ try {
+ message.respond(SERIALIZER.encode(flowEntries));
+ } catch (IOException e) {
+ log.error("Failed to respond to peer's getFlowEntries request", e);
+ }
+ }
+ }, executor);
+
+ clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowEntry rule = SERIALIZER.decode(message.payload());
+ log.trace("received get flow entry request for {}", rule);
+ FlowRuleEvent event = removeFlowRuleInternal(rule);
+ try {
+ message.respond(SERIALIZER.encode(event));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ }, executor);
+ }
+
+ private void unregisterMessageHandlers() {
+ clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
+ clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+ clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
+ clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
+ clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
}
private void logConfig(String prefix) {
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 6628bb2..099880d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -200,12 +200,12 @@
@Override
public void registerHandler(String type, MessageHandler handler) {
- handlers.putIfAbsent(type, handler);
+ handlers.put(type, handler);
}
@Override
public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
- handlers.putIfAbsent(type, new MessageHandler() {
+ handlers.put(type, new MessageHandler() {
@Override
public void handle(Message message) throws IOException {
executor.submit(() -> {