CORD-2581 Push FPM routes to route store again when an instance rejoin the cluster
Also introduce a new CLI command 'fpm-push-routes'
- Allow user to manually push local FPM routes to route store again just in case
Change-Id: If8fff2e1e56cc4465a4b0979c576c060ef9ca94f
diff --git a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
index a160fc0..c604707 100644
--- a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmInfoService.java
@@ -36,4 +36,9 @@
* @return true or false
*/
boolean isPdPushEnabled();
+
+ /**
+ * Pushes all local FPM routes to route store.
+ */
+ void pushFpmRoutes();
}
diff --git a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 0cf9935..56d193e 100644
--- a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -16,6 +16,7 @@
package org.onosproject.routing.fpm;
+import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -47,6 +48,8 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
@@ -62,6 +65,7 @@
import org.onosproject.routing.fpm.protocol.RtNetlink;
import org.onosproject.routing.fpm.protocol.RtProtocol;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
@@ -72,6 +76,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
@@ -101,6 +106,7 @@
private static final int FPM_PORT = 2620;
private static final String APP_NAME = "org.onosproject.fpm";
private static final int IDLE_TIMEOUT_SECS = 5;
+ private static final String LOCK_NAME = "fpm-manager-lock";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -141,6 +147,8 @@
private ServerBootstrap serverBootstrap;
private Channel serverChannel;
private ChannelGroup allChannels = new DefaultChannelGroup();
+ private final InternalClusterListener clusterListener = new InternalClusterListener();
+ private AsyncDistributedLock asyncLock;
private ConsistentMap<FpmPeer, Set<FpmConnectionInfo>> peers;
@@ -219,6 +227,9 @@
appId = coreService.registerApplication(APP_NAME, peers::destroy);
+ clusterService.addListener(clusterListener);
+ asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
+
log.info("Started");
}
@@ -231,6 +242,10 @@
stopServer();
fpmRoutes.clear();
componentConfigService.unregisterProperties(getClass(), false);
+
+ clusterService.removeListener(clusterListener);
+ asyncLock.unlock();
+
log.info("Stopped");
}
@@ -463,15 +478,19 @@
break;
}
- routeService.withdraw(withdraws);
- routeService.update(updates);
+ updateRouteStore(updates, withdraws);
+ }
+
+ private synchronized void updateRouteStore(Collection<Route> routesToAdd, Collection<Route> routesToRemove) {
+ routeService.withdraw(routesToRemove);
+ routeService.update(routesToAdd);
}
private void clearRoutes(FpmPeer peer) {
log.info("Clearing all routes for peer {}", peer);
Map<IpPrefix, Route> routes = fpmRoutes.remove(peer);
if (routes != null) {
- routeService.withdraw(routes.values());
+ updateRouteStore(Lists.newArrayList(), routes.values());
}
}
@@ -730,4 +749,46 @@
}
}
}
+
+ private class InternalClusterListener implements ClusterEventListener {
+ @Override
+ public void event(ClusterEvent event) {
+ log.debug("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
+ switch (event.type()) {
+ case INSTANCE_READY:
+ // When current node is healing from a network partition,
+ // seeing INSTANCE_READY means current node has the ability to read from the cluster,
+ // but it is possible that current node still can't write to the cluster at this moment.
+ // The AsyncDistributedLock is introduced to ensure we attempt to push FPM routes
+ // after current node can write.
+ // Adding 15 seconds retry for the current node to be able to write.
+ asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
+ if (result != null && result.isPresent()) {
+ log.debug("Lock obtained. Push local FPM routes to route store");
+ // All FPM routes on current node will be pushed again even when current node is not
+ // the one that becomes READY. A better way is to do this only on the minority nodes.
+ pushFpmRoutes();
+ asyncLock.unlock();
+ } else {
+ log.debug("Fail to obtain lock. Abort.");
+ }
+ });
+ break;
+ case INSTANCE_DEACTIVATED:
+ case INSTANCE_ADDED:
+ case INSTANCE_REMOVED:
+ case INSTANCE_ACTIVATED:
+ default:
+ break;
+ }
+ }
+ }
+
+ public void pushFpmRoutes() {
+ Set<Route> routes = fpmRoutes.values().stream()
+ .map(Map::entrySet).flatMap(Set::stream).map(Map.Entry::getValue)
+ .collect(Collectors.toSet());
+ updateRouteStore(routes, Lists.newArrayList());
+ log.info("{} FPM routes have been updated to route store", routes.size());
+ }
}
diff --git a/apps/routing/fpm/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/routing/fpm/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 3af7033..a227f6f 100644
--- a/apps/routing/fpm/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/apps/routing/fpm/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -19,5 +19,8 @@
<command>
<action class="org.onosproject.routing.fpm.cli.FpmConnectionsList"/>
</command>
+ <command>
+ <action class="org.onosproject.routing.fpm.cli.FpmPushRoutesCommand"/>
+ </command>
</command-bundle>
</blueprint>
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmPushRoutesCommand.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmPushRoutesCommand.java
new file mode 100644
index 0000000..29f0b63
--- /dev/null
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/cli/FpmPushRoutesCommand.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.routing.fpm.cli;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.routing.fpm.FpmInfoService;
+
+
+/**
+ * Displays the current FPM connections.
+ */
+@Command(scope = "onos", name = "fpm-push-routes",
+ description = "Pushes all local FPM routes to route store")
+public class FpmPushRoutesCommand extends AbstractShellCommand {
+ @Override
+ protected void execute() {
+ FpmInfoService fpmInfo = get(FpmInfoService.class);
+ fpmInfo.pushFpmRoutes();
+ }
+}