CORD-2581 Push FPM routes to route store again when an instance rejoin the cluster

Also introduce a new CLI command 'fpm-push-routes'
    - Allow user to manually push local FPM routes to route store again just in case

Change-Id: If8fff2e1e56cc4465a4b0979c576c060ef9ca94f
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
index e92b489..57119de 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
@@ -29,4 +29,9 @@
      * @return a map of FPM peer with related information
      */
     Map<FpmPeer, FpmPeerInfo> peers();
+
+    /**
+     * Pushes all local FPM routes to route store.
+     */
+    void pushFpmRoutes();
 }
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 178b489..29b6f82 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.routing.fpm;
 
+import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,6 +42,8 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
@@ -54,6 +57,7 @@
 import org.onosproject.routing.fpm.protocol.RtNetlink;
 import org.onosproject.routing.fpm.protocol.RtProtocol;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDistributedLock;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
@@ -62,6 +66,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
@@ -87,6 +92,7 @@
     private static final int FPM_PORT = 2620;
     private static final String APP_NAME = "org.onosproject.fpm";
     private static final int IDLE_TIMEOUT_SECS = 5;
+    private static final String LOCK_NAME = "fpm-manager-lock";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
@@ -106,6 +112,8 @@
     private ServerBootstrap serverBootstrap;
     private Channel serverChannel;
     private ChannelGroup allChannels = new DefaultChannelGroup();
+    private final InternalClusterListener clusterListener = new InternalClusterListener();
+    private AsyncDistributedLock asyncLock;
 
     private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
 
@@ -138,6 +146,9 @@
 
         coreService.registerApplication(APP_NAME, peers::destroy);
 
+        clusterService.addListener(clusterListener);
+        asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
+
         log.info("Started");
     }
 
@@ -150,6 +161,10 @@
         stopServer();
         fpmRoutes.clear();
         componentConfigService.unregisterProperties(getClass(), false);
+
+        clusterService.removeListener(clusterListener);
+        asyncLock.unlock();
+
         log.info("Stopped");
     }
 
@@ -292,8 +307,12 @@
             break;
         }
 
-        routeService.withdraw(withdraws);
-        routeService.update(updates);
+        updateRouteStore(updates, withdraws);
+    }
+
+    private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
+        routeService.withdraw(routesToRemove);
+        routeService.update(routesToAdd);
     }
 
 
@@ -301,7 +320,7 @@
         log.info("Clearing all routes for peer {}", peer);
         Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
         if (routes != null) {
-            routeService.withdraw(routes.values());
+            updateRouteStore(Lists.newArrayList(), routes.values());
         }
     }
 
@@ -371,4 +390,45 @@
         }
     }
 
+    private class InternalClusterListener implements ClusterEventListener {
+        @Override
+        public void event(ClusterEvent event) {
+            log.debug("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
+            switch (event.type()) {
+                case INSTANCE_READY:
+                    // When current node is healing from a network partition,
+                    // seeing INSTANCE_READY means current node has the ability to read from the cluster,
+                    // but it is possible that current node still can't write to the cluster at this moment.
+                    // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
+                    // after current node can write.
+                    // Adding 15 seconds retry for the current node to be able to write.
+                    asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
+                        if (result != null && result.isPresent()) {
+                            log.debug("Lock obtained. Push local FPM routes to route store");
+                            // All FPM routes on current node will be pushed again even when current node is not
+                            // the one that becomes READY. A better way is to do this only on the minority nodes.
+                            pushFpmRoutes();
+                            asyncLock.unlock();
+                        } else {
+                            log.debug("Fail to obtain lock. Abort.");
+                        }
+                    });
+                    break;
+                case INSTANCE_DEACTIVATED:
+                case INSTANCE_ADDED:
+                case INSTANCE_REMOVED:
+                case INSTANCE_ACTIVATED:
+                default:
+                    break;
+            }
+        }
+    }
+
+    public void pushFpmRoutes() {
+        Set<Route> routes = fpmRoutes.values().stream()
+                .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
+                .collect(Collectors.toSet());
+        updateRouteStore(routes, Lists.newArrayList());
+        log.info("{} FPM routes have been updated to route store", routes.size());
+    }
 }
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmPushRoutesCommand.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmPushRoutesCommand.java
new file mode 100644
index 0000000..29f0b63
--- /dev/null
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmPushRoutesCommand.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm.cli;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.routing.fpm.FpmInfoService;
+
+
+/**
+ * Displays the current FPM connections.
+ */
+@Command(scope = "onos", name = "fpm-push-routes",
+        description = "Pushes all local FPM routes to route store")
+public class FpmPushRoutesCommand extends AbstractShellCommand {
+    @Override
+    protected void execute() {
+        FpmInfoService fpmInfo = get(FpmInfoService.class);
+        fpmInfo.pushFpmRoutes();
+    }
+}
diff --git a/apps/routing/fpm/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/routing/fpm/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 3af7033..a227f6f 100644
--- a/apps/routing/fpm/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/apps/routing/fpm/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -19,5 +19,8 @@
     <command>
       <action class="org.onosproject.routing.fpm.cli.FpmConnectionsList"/>
     </command>
+    <command>
+      <action class="org.onosproject.routing.fpm.cli.FpmPushRoutesCommand"/>
+    </command>
   </command-bundle>
 </blueprint>