Handle cluster event in separate thread in FPM manager to avoid blocking event loop
Change-Id: I04c26836c0f3badd9795597524323bd9c1c705b5
(cherry picked from commit 1c9a0b4efe1f990cbbdb5d08590cd3510ce6e2c5)
diff --git a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 662fdda..4deaebb 100644
--- a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -91,6 +91,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newCachedThreadPool;
@@ -151,6 +153,8 @@
private final InternalClusterListener clusterListener = new InternalClusterListener();
private AsyncDistributedLock asyncLock;
+ private ExecutorService clusterEventExecutor;
+
private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
@@ -232,6 +236,8 @@
appId = coreService.registerApplication(APP_NAME, peers::destroy);
asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
+
+ clusterEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("fpm-event-main", "%d", log));
clusterService.addListener(clusterListener);
log.info("Started");
@@ -248,6 +254,7 @@
componentConfigService.unregisterProperties(getClass(), false);
clusterService.removeListener(clusterListener);
+ clusterEventExecutor.shutdown();
asyncLock.unlock();
log.info("Stopped");
@@ -774,48 +781,50 @@
private class InternalClusterListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
+ clusterEventExecutor.execute(() -> {
log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
- switch (event.type()) {
- case INSTANCE_READY:
- // When current node is healing from a network partition,
- // seeing INSTANCE_READY means current node has the ability to read from the cluster,
- // but it is possible that current node still can't write to the cluster at this moment.
- // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
- // after current node can write.
- // Adding 15 seconds retry for the current node to be able to write.
- asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
- if (result != null && result.isPresent()) {
- log.debug("Lock obtained. Push local FPM routes to route store");
- // All FPM routes on current node will be pushed again even when current node is not
- // the one that becomes READY. A better way is to do this only on the minority nodes.
- pushFpmRoutes();
- localPeers.forEach((key, value) -> peers.put(key, value));
- asyncLock.unlock();
- } else {
- log.debug("Fail to obtain lock. Abort.");
- }
- });
- break;
- case INSTANCE_DEACTIVATED:
- case INSTANCE_REMOVED:
- ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
- if (value != null) {
- value.stream()
+ switch (event.type()) {
+ case INSTANCE_READY:
+ // When current node is healing from a network partition,
+ // seeing INSTANCE_READY means current node has the ability to read from the cluster,
+ // but it is possible that current node still can't write to the cluster at this moment.
+ // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
+ // after current node can write.
+ // Adding 15 seconds retry for the current node to be able to write.
+ asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
+ if (result != null && result.isPresent()) {
+ log.debug("Lock obtained. Push local FPM routes to route store");
+ // All FPM routes on current node will be pushed again even when current node is not
+ // the one that becomes READY. A better way is to do this only on the minority nodes.
+ pushFpmRoutes();
+ localPeers.forEach((key, value) -> peers.put(key, value));
+ asyncLock.unlock();
+ } else {
+ log.debug("Fail to obtain lock. Abort.");
+ }
+ });
+ break;
+ case INSTANCE_DEACTIVATED:
+ case INSTANCE_REMOVED:
+ ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
+ if (value != null) {
+ value.stream()
.filter(i -> i.connectedTo().equals(event.subject().id()))
.findAny()
.ifPresent(value::remove);
- if (value.isEmpty()) {
- peers.remove(key);
+ if (value.isEmpty()) {
+ peers.remove(key);
+ }
}
- }
- });
- break;
- case INSTANCE_ADDED:
- case INSTANCE_ACTIVATED:
- default:
- break;
- }
+ });
+ break;
+ case INSTANCE_ADDED:
+ case INSTANCE_ACTIVATED:
+ default:
+ break;
+ }
+ });
}
}