Handle cluster event in separate thread in FPM manager to avoid blocking event loop

Change-Id: I04c26836c0f3badd9795597524323bd9c1c705b5
(cherry picked from commit 1c9a0b4efe1f990cbbdb5d08590cd3510ce6e2c5)
diff --git a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 662fdda..4deaebb 100644
--- a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -91,6 +91,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static java.util.concurrent.Executors.newCachedThreadPool;
@@ -151,6 +153,8 @@
     private final InternalClusterListener clusterListener = new InternalClusterListener();
     private AsyncDistributedLock asyncLock;
 
+    private ExecutorService clusterEventExecutor;
+
     private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
 
     private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
@@ -232,6 +236,8 @@
         appId = coreService.registerApplication(APP_NAME, peers::destroy);
 
         asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
+
+        clusterEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("fpm-event-main", "%d", log));
         clusterService.addListener(clusterListener);
 
         log.info("Started");
@@ -248,6 +254,7 @@
         componentConfigService.unregisterProperties(getClass(), false);
 
         clusterService.removeListener(clusterListener);
+        clusterEventExecutor.shutdown();
         asyncLock.unlock();
 
         log.info("Stopped");
@@ -774,48 +781,50 @@
     private class InternalClusterListener implements ClusterEventListener {
         @Override
         public void event(ClusterEvent event) {
+            clusterEventExecutor.execute(() -> {
                 log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
-            switch (event.type()) {
-                case INSTANCE_READY:
-                    // When current node is healing from a network partition,
-                    // seeing INSTANCE_READY means current node has the ability to read from the cluster,
-                    // but it is possible that current node still can't write to the cluster at this moment.
-                    // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
-                    // after current node can write.
-                    // Adding 15 seconds retry for the current node to be able to write.
-                    asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
-                        if (result != null && result.isPresent()) {
-                            log.debug("Lock obtained. Push local FPM routes to route store");
-                            // All FPM routes on current node will be pushed again even when current node is not
-                            // the one that becomes READY. A better way is to do this only on the minority nodes.
-                            pushFpmRoutes();
-                            localPeers.forEach((key, value) -> peers.put(key, value));
-                            asyncLock.unlock();
-                        } else {
-                            log.debug("Fail to obtain lock. Abort.");
-                        }
-                    });
-                    break;
-                case INSTANCE_DEACTIVATED:
-                case INSTANCE_REMOVED:
-                    ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
-                        if (value != null) {
-                            value.stream()
+                switch (event.type()) {
+                    case INSTANCE_READY:
+                        // When current node is healing from a network partition,
+                        // seeing INSTANCE_READY means current node has the ability to read from the cluster,
+                        // but it is possible that current node still can't write to the cluster at this moment.
+                        // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
+                        // after current node can write.
+                        // Adding 15 seconds retry for the current node to be able to write.
+                        asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
+                            if (result != null && result.isPresent()) {
+                                log.debug("Lock obtained. Push local FPM routes to route store");
+                                // All FPM routes on current node will be pushed again even when current node is not
+                                // the one that becomes READY. A better way is to do this only on the minority nodes.
+                                pushFpmRoutes();
+                                localPeers.forEach((key, value) -> peers.put(key, value));
+                                asyncLock.unlock();
+                            } else {
+                                log.debug("Fail to obtain lock. Abort.");
+                            }
+                        });
+                        break;
+                    case INSTANCE_DEACTIVATED:
+                    case INSTANCE_REMOVED:
+                        ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
+                            if (value != null) {
+                                value.stream()
                                     .filter(i -> i.connectedTo().equals(event.subject().id()))
                                     .findAny()
                                     .ifPresent(value::remove);
 
-                            if (value.isEmpty()) {
-                                peers.remove(key);
+                                if (value.isEmpty()) {
+                                    peers.remove(key);
+                                }
                             }
-                        }
-                    });
-                    break;
-                case INSTANCE_ADDED:
-                case INSTANCE_ACTIVATED:
-                default:
-                    break;
-            }
+                        });
+                        break;
+                    case INSTANCE_ADDED:
+                    case INSTANCE_ACTIVATED:
+                    default:
+                        break;
+                }
+            });
         }
     }