Never process incoming messages on the netty event loop thread pool.
Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.
Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index cdab90d..c33d2ea 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -16,10 +16,12 @@
package org.onosproject.store.cluster.messaging;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.onosproject.cluster.NodeId;
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
// TODO: remove IOExceptions?
/**
@@ -77,9 +79,19 @@
* @param subject message subject
* @param subscriber message subscriber
*/
+ @Deprecated
void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
/**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param subscriber message subscriber
+ * @param executor executor to use for running handler.
+ */
+ void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
+
+ /**
* Removes a subscriber for the specified message subject.
*
* @param subject message subject
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 0fd21d0..1c3c098 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -18,6 +18,7 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -55,6 +56,7 @@
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -90,6 +92,8 @@
private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
+ private ExecutorService messageHandlingExecutor;
+
private EventuallyConsistentMap<ApplicationId, Application> apps;
private EventuallyConsistentMap<Application, InternalState> states;
private EventuallyConsistentMap<Application, Set<Permission>> permissions;
@@ -109,7 +113,10 @@
.register(KryoNamespaces.API)
.register(InternalState.class);
- clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer());
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/app", "message-handler"));
+
+ clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
clusterCommunicator,
@@ -145,6 +152,8 @@
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
+ messageHandlingExecutor.shutdown();
apps.destroy();
states.destroy();
permissions.destroy();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 7298f2c..a276367 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Maps;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.IAtomicLong;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -113,6 +114,8 @@
private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
new MessageSubject("hz-leadership-events");
+ private ExecutorService messageHandlingExecutor;
+
@Activate
protected void activate() {
localNodeId = clusterService.getLocalNode().id();
@@ -124,7 +127,13 @@
topicConfig.setName(TOPIC_HZ_ID);
storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
- clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener());
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/leadership", "message-handler"));
+
+ clusterCommunicator.addSubscriber(
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ new InternalLeadershipEventListener(),
+ messageHandlingExecutor);
log.info("Hazelcast Leadership Service started");
}
@@ -132,6 +141,7 @@
@Deactivate
protected void deactivate() {
eventDispatcher.removeSink(LeadershipEvent.class);
+ messageHandlingExecutor.shutdown();
clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
for (Topic topic : topics.values()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
index 6919ea1..2e414d4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
@@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -43,6 +44,7 @@
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -92,6 +94,8 @@
private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
+ private ExecutorService messageHandlingExecutor;
+
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -109,9 +113,14 @@
addListener(peerAdvertiser);
addListener(leaderBoardUpdater);
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/leadership",
+ "peer-advertisement-handler"));
+
clusterCommunicator.addSubscriber(
LEADERSHIP_UPDATES,
- new PeerAdvertisementHandler());
+ new PeerAdvertisementHandler(),
+ messageHandlingExecutor);
log.info("Started.");
}
@@ -123,6 +132,7 @@
clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
+ messageHandlingExecutor.shutdown();
threadPool.shutdown();
log.info("Stopped.");
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 5c20718..2df3e7a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -16,6 +16,7 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -44,6 +45,7 @@
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
@@ -183,6 +185,13 @@
}
@Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber,
+ ExecutorService executor) {
+ messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
+ }
+
+ @Override
public void removeSubscriber(MessageSubject subject) {
messagingService.unregisterHandler(subject.value());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 602fd9f..f16bd06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -176,29 +176,36 @@
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
- clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+ new InternalDeviceOfflineEventListener(),
+ executor);
+ clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
+ new InternalRemoveRequestListener(),
+ executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
+ new InternalDeviceAdvertisementListener(),
+ backgroundExecutor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
+
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -1325,17 +1332,11 @@
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling device update", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device update", e);
+ }
}
}
@@ -1350,17 +1351,11 @@
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling device offline", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device offline", e);
+ }
}
}
@@ -1371,17 +1366,11 @@
log.debug("Received device remove request from peer: {}", message.sender());
DeviceId did = SERIALIZER.decode(message.payload());
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- removeDevice(did);
- } catch (Exception e) {
- log.warn("Exception thrown handling device remove", e);
- }
- }
- });
+ try {
+ removeDevice(did);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device remove", e);
+ }
}
}
@@ -1396,17 +1385,11 @@
DeviceId deviceId = event.deviceId();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling device removed", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device removed", e);
+ }
}
}
@@ -1428,17 +1411,11 @@
return;
}
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
- } catch (Exception e) {
- log.warn("Exception thrown handling port update", e);
- }
- }
- });
+ try {
+ notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
}
}
@@ -1460,17 +1437,11 @@
return;
}
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling port update", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
+ }
}
}
@@ -1481,17 +1452,11 @@
public void handle(ClusterMessage message) {
log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- backgroundExecutor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- handleAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling Device advertisements.", e);
- }
- }
- });
+ try {
+ handleAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling Device advertisements.", e);
+ }
}
}
@@ -1507,13 +1472,11 @@
DeviceId deviceId = event.deviceId();
DeviceDescription deviceDescription = event.deviceDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- createOrUpdateDevice(providerId, deviceId, deviceDescription);
- }
- });
+ try {
+ createOrUpdateDevice(providerId, deviceId, deviceDescription);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device injected event.", e);
+ }
}
}
@@ -1529,13 +1492,11 @@
DeviceId deviceId = event.deviceId();
List<PortDescription> portDescriptions = event.portDescriptions();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- updatePorts(providerId, deviceId, portDescriptions);
- }
- });
+ try {
+ updatePorts(providerId, deviceId, portDescriptions);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port injected event.", e);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 19e77bb..fe8f1a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -162,13 +162,13 @@
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalPutEventListener());
+ new InternalPutEventListener(), executor);
removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
clusterCommunicator.addSubscriber(removeMessageSubject,
- new InternalRemoveEventListener());
+ new InternalRemoveEventListener(), executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- new InternalAntiEntropyListener());
+ new InternalAntiEntropyListener(), backgroundExecutor);
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -728,13 +728,11 @@
log.trace("Received anti-entropy advertisement from peer: {}",
message.sender());
AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
- backgroundExecutor.submit(() -> {
- try {
- handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling advertisements", e);
- }
- });
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling advertisements", e);
+ }
}
}
@@ -745,25 +743,23 @@
log.debug("Received put event from peer: {}", message.sender());
InternalPutEvent<K, V> event = serializer.decode(message.payload());
- executor.submit(() -> {
- try {
- for (PutEntry<K, V> entry : event.entries()) {
- K key = entry.key();
- V value = entry.value();
- Timestamp timestamp = entry.timestamp();
+ try {
+ for (PutEntry<K, V> entry : event.entries()) {
+ K key = entry.key();
+ V value = entry.value();
+ Timestamp timestamp = entry.timestamp();
- if (putInternal(key, value, timestamp)) {
- EventuallyConsistentMapEvent<K, V> externalEvent =
- new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key,
- value);
- notifyListeners(externalEvent);
- }
+ if (putInternal(key, value, timestamp)) {
+ EventuallyConsistentMapEvent<K, V> externalEvent =
+ new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, key,
+ value);
+ notifyListeners(externalEvent);
}
- } catch (Exception e) {
- log.warn("Exception thrown handling put", e);
}
- });
+ } catch (Exception e) {
+ log.warn("Exception thrown handling put", e);
+ }
}
}
@@ -773,25 +769,22 @@
public void handle(ClusterMessage message) {
log.debug("Received remove event from peer: {}", message.sender());
InternalRemoveEvent<K> event = serializer.decode(message.payload());
+ try {
+ for (RemoveEntry<K> entry : event.entries()) {
+ K key = entry.key();
+ Timestamp timestamp = entry.timestamp();
- executor.submit(() -> {
- try {
- for (RemoveEntry<K> entry : event.entries()) {
- K key = entry.key();
- Timestamp timestamp = entry.timestamp();
-
- if (removeInternal(key, timestamp)) {
- EventuallyConsistentMapEvent<K, V> externalEvent
- = new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE,
- key, null);
- notifyListeners(externalEvent);
- }
+ if (removeInternal(key, timestamp)) {
+ EventuallyConsistentMapEvent<K, V> externalEvent
+ = new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE,
+ key, null);
+ notifyListeners(externalEvent);
}
- } catch (Exception e) {
- log.warn("Exception thrown handling remove", e);
}
- });
+ } catch (Exception e) {
+ log.warn("Exception thrown handling remove", e);
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 86bc173..e28a753 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -107,6 +107,9 @@
private final Logger log = getLogger(getClass());
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+
private InternalFlowTable flowTable = new InternalFlowTable();
/*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
@@ -132,6 +135,7 @@
// Cache of SMaps used for backup data. each SMap contain device flow table
private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
+ private ExecutorService messageHandlingExecutor;
private final ExecutorService backupExecutors =
Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
@@ -172,7 +176,11 @@
final NodeId local = clusterService.getLocalNode().id();
- clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/flow", "message-handlers"));
+
+ clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
@Override
@@ -181,7 +189,7 @@
log.trace("received completed notification for {}", event);
notifyDelegate(event);
}
- });
+ }, messageHandlingExecutor);
clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
@@ -196,7 +204,7 @@
log.error("Failed to respond back", e);
}
}
- });
+ }, messageHandlingExecutor);
clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
@@ -211,7 +219,7 @@
log.error("Failed to respond to peer's getFlowEntries request", e);
}
}
- });
+ }, messageHandlingExecutor);
clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
@@ -226,7 +234,7 @@
log.error("Failed to respond back", e);
}
}
- });
+ }, messageHandlingExecutor);
replicaInfoEventListener = new InternalReplicaInfoEventListener();
@@ -242,6 +250,7 @@
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
+ messageHandlingExecutor.shutdown();
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
@@ -421,7 +430,7 @@
switch (op.operator()) {
case ADD:
entry = new DefaultFlowEntry(op.target());
- // always add requested FlowRule

+ // always add requested FlowRule
// Note: 2 equal FlowEntry may have different treatment
flowTable.remove(entry.deviceId(), entry);
flowTable.add(entry);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 6effe1b..708faf6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -78,6 +78,9 @@
private final Logger log = getLogger(getClass());
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@@ -102,6 +105,8 @@
private final ExecutorService futureListeners = Executors
.newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
+ private ExecutorService messageHandlingExecutor;
+
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -120,6 +125,11 @@
@Activate
public void activate() {
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/flow", "message-handlers"));
+
clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
new ClusterMessageHandler() {
@@ -141,7 +151,7 @@
}
}, futureListeners);
}
- });
+ }, messageHandlingExecutor);
replicaInfoManager.addListener(replicaInfoEventListener);
@@ -151,6 +161,7 @@
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
+ messageHandlingExecutor.shutdown();
replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index f472d27..c65dc79 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -154,21 +154,22 @@
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(
- HOST_UPDATED_MSG,
- new InternalHostEventListener());
- clusterCommunicator.addSubscriber(
- HOST_REMOVED_MSG,
- new InternalHostRemovedEventListener());
- clusterCommunicator.addSubscriber(
- HOST_ANTI_ENTROPY_ADVERTISEMENT,
- new InternalHostAntiEntropyAdvertisementListener());
executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
+ clusterCommunicator.addSubscriber(
+ HOST_UPDATED_MSG,
+ new InternalHostEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ HOST_REMOVED_MSG,
+ new InternalHostRemovedEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ HOST_ANTI_ENTROPY_ADVERTISEMENT,
+ new InternalHostAntiEntropyAdvertisementListener(), backgroundExecutor);
+
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
@@ -512,20 +513,14 @@
HostDescription hostDescription = event.hostDescription();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
- hostId,
- hostDescription,
- timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling host removed", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
+ hostId,
+ hostDescription,
+ timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling host removed", e);
+ }
}
}
@@ -540,17 +535,11 @@
HostId hostId = event.hostId();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling host removed", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling host removed", e);
+ }
}
}
@@ -720,17 +709,11 @@
public void handle(ClusterMessage message) {
log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- backgroundExecutor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling Host advertisements", e);
- }
- }
- });
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling Host advertisements", e);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 60d54e3..54c35ec 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -158,24 +158,24 @@
@Activate
public void activate() {
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_UPDATE,
- new InternalLinkEventListener());
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_REMOVED,
- new InternalLinkRemovedEventListener());
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
- new InternalLinkAntiEntropyAdvertisementListener());
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_INJECTED,
- new LinkInjectedEventListener());
-
executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
backgroundExecutors =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_UPDATE,
+ new InternalLinkEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_REMOVED,
+ new InternalLinkRemovedEventListener(), executor);
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
+ new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
+ clusterCommunicator.addSubscriber(
+ GossipLinkStoreMessageSubjects.LINK_INJECTED,
+ new LinkInjectedEventListener(), executor);
+
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
@@ -822,17 +822,11 @@
ProviderId providerId = event.providerId();
Timestamped<LinkDescription> linkDescription = event.linkDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling link event", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link event", e);
+ }
}
}
@@ -847,17 +841,11 @@
LinkKey linkKey = event.linkKey();
Timestamp timestamp = event.timestamp();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling link removed", e);
- }
- }
- });
+ try {
+ notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling link removed", e);
+ }
}
}
@@ -868,18 +856,12 @@
public void handle(ClusterMessage message) {
log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- backgroundExecutors.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown while handling Link advertisements", e);
- throw e;
- }
- }
- });
+ try {
+ handleAntiEntropyAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown while handling Link advertisements", e);
+ throw e;
+ }
}
}
@@ -894,13 +876,11 @@
ProviderId providerId = linkInjectedEvent.providerId();
LinkDescription linkDescription = linkInjectedEvent.linkDescription();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- createOrUpdateLink(providerId, linkDescription);
- }
- });
+ try {
+ createOrUpdateLink(providerId, linkDescription);
+ } catch (Exception e) {
+ log.warn("Exception thrown while handling link injected event", e);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 9fd4c7b..35e28c7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,9 +15,12 @@
*/
package org.onosproject.store.packet.impl;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -55,6 +58,9 @@
private final Logger log = getLogger(getClass());
+ // TODO: make this configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@@ -77,16 +83,24 @@
}
};
+ private ExecutorService messageHandlingExecutor;
+
@Activate
public void activate() {
- log.info("Started");
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/flow", "message-handlers"));
communicationService.addSubscriber(
- PACKET_OUT_SUBJECT, new InternalClusterMessageHandler());
+ PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), messageHandlingExecutor);
+
+ log.info("Started");
}
@Deactivate
public void deactivate() {
+ communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
+ messageHandlingExecutor.shutdown();
log.info("Stopped");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index e224cd7..9c12528 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -16,6 +16,7 @@
package org.onosproject.store.statistic.impl;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -48,11 +49,14 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,6 +72,9 @@
private final Logger log = getLogger(getClass());
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@@ -97,10 +104,17 @@
}
};;
+ private ExecutorService messageHandlingExecutor;
+
private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
@Activate
public void activate() {
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/statistic", "message-handlers"));
+
clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
@Override
@@ -112,7 +126,7 @@
log.error("Failed to respond back", e);
}
}
- });
+ }, messageHandlingExecutor);
clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
@@ -125,12 +139,15 @@
log.error("Failed to respond back", e);
}
}
- });
+ }, messageHandlingExecutor);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ clusterCommunicator.removeSubscriber(GET_PREVIOUS);
+ clusterCommunicator.removeSubscriber(GET_CURRENT);
+ messageHandlingExecutor.shutdown();
log.info("Stopped");
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 3711f7f..7054bd3 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -17,6 +17,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
@@ -62,6 +63,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
@@ -152,7 +154,7 @@
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
- anyObject(ClusterMessageHandler.class));
+ anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
expectLastCall().anyTimes();
replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService();
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index fc85795..e6670de 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -46,6 +46,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -129,7 +130,7 @@
// allows us to get a reference to the map's internal cluster message
// handlers so we can induce events coming in from a peer.
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
- anyObject(ClusterMessageHandler.class));
+ anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
replay(clusterCommunicator);
@@ -731,6 +732,21 @@
}
@Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber,
+ ExecutorService executor) {
+ if (subject.equals(PUT_MESSAGE_SUBJECT)) {
+ putHandler = subscriber;
+ } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
+ removeHandler = subscriber;
+ } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+ antiEntropyHandler = subscriber;
+ } else {
+ throw new RuntimeException("Unexpected message subject " + subject.toString());
+ }
+ }
+
+ @Override
public void removeSubscriber(MessageSubject subject) {}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index eeca2a8b..03e82bc 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -16,6 +16,7 @@
package org.onosproject.store.link.impl;
import com.google.common.collect.Iterables;
+
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
@@ -56,6 +57,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.easymock.EasyMock.*;
@@ -140,7 +142,8 @@
// TODO mock clusterCommunicator
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
- anyObject(ClusterMessageHandler.class));
+ anyObject(ClusterMessageHandler.class),
+ anyObject(ExecutorService.class));
expectLastCall().anyTimes();
replay(clusterCommunicator);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index efc0ae1..12e1d87 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,6 +16,7 @@
package org.onlab.netty;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
@@ -47,7 +48,16 @@
* Registers a new message handler for message type.
* @param type message type.
* @param handler message handler
+ * @param executor executor to use for running message handler logic.
*/
+ public void registerHandler(String type, MessageHandler handler, ExecutorService executor);
+
+ /**
+ * Registers a new message handler for message type.
+ * @param type message type.
+ * @param handler message handler
+ */
+ @Deprecated
public void registerHandler(String type, MessageHandler handler);
/**
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 69806b1..b84e193 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -41,6 +41,7 @@
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@@ -213,6 +214,22 @@
}
@Override
+ public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
+ handlers.putIfAbsent(hashToLong(type), new MessageHandler() {
+ @Override
+ public void handle(Message message) throws IOException {
+ executor.submit(() -> {
+ try {
+ handler.handle(message);
+ } catch (Exception e) {
+ log.warn("Failed to process message of type {}", type, e);
+ }
+ });
+ }
+ });
+ }
+
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}