FPM routes removal when instance goes down or is removed
Change-Id: Ifda1bbc12e2474a13d3f47ded2e1befa1c8b1994
diff --git a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 455e546..abb8ddd 100644
--- a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -16,6 +16,7 @@
package org.onosproject.routing.fpm;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -38,13 +39,10 @@
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
-import org.onlab.packet.IpAddress;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip6Address;
+import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
-import org.onosproject.net.intf.Interface;
-import org.onosproject.net.host.InterfaceIpAddress;
-import org.onosproject.net.intf.InterfaceService;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
@@ -52,10 +50,16 @@
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.core.CoreService;
import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.host.InterfaceIpAddress;
+import org.onosproject.net.intf.Interface;
+import org.onosproject.net.intf.InterfaceService;
import org.onosproject.routeservice.Route;
import org.onosproject.routeservice.RouteAdminService;
+import org.onosproject.routing.fpm.api.FpmPrefixStore;
+import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
+import org.onosproject.routing.fpm.api.FpmRecord;
import org.onosproject.routing.fpm.protocol.FpmHeader;
import org.onosproject.routing.fpm.protocol.Netlink;
import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
@@ -64,12 +68,12 @@
import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
import org.onosproject.routing.fpm.protocol.RtNetlink;
import org.onosproject.routing.fpm.protocol.RtProtocol;
+import org.onosproject.store.StoreDelegate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.onosproject.store.StoreDelegate;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +85,6 @@
import java.util.Dictionary;
import java.util.HashSet;
import java.util.LinkedList;
-
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -90,9 +93,6 @@
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
-import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
-import org.onosproject.routing.fpm.api.FpmPrefixStore;
-import org.onosproject.routing.fpm.api.FpmRecord;
/**
* Forwarding Plane Manager (FPM) route source.
@@ -153,6 +153,9 @@
private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
+ //Local cache for peers to be used in case of cluster partition.
+ private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
+
@Property(name = "clearRoutes", boolValue = true,
label = "Whether to clear routes when the FPM connection goes down")
private boolean clearRoutes = true;
@@ -648,6 +651,7 @@
}
infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
+ localPeers.put(peer, infos);
return infos;
});
@@ -671,7 +675,10 @@
infos.stream()
.filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
.findAny()
- .ifPresent(i -> infos.remove(i));
+ .ifPresent(i -> {
+ infos.remove(i);
+ localPeers.get(peer).remove(i);
+ });
if (infos.isEmpty()) {
return null;
@@ -729,7 +736,7 @@
private class InternalClusterListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
- log.debug("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
+ log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
switch (event.type()) {
case INSTANCE_READY:
// When current node is healing from a network partition,
@@ -744,6 +751,7 @@
// All FPM routes on current node will be pushed again even when current node is not
// the one that becomes READY. A better way is to do this only on the minority nodes.
pushFpmRoutes();
+ localPeers.forEach((key, value) -> peers.put(key, value));
asyncLock.unlock();
} else {
log.debug("Fail to obtain lock. Abort.");
@@ -751,8 +759,21 @@
});
break;
case INSTANCE_DEACTIVATED:
- case INSTANCE_ADDED:
case INSTANCE_REMOVED:
+ ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
+ if (value != null) {
+ value.stream()
+ .filter(i -> i.connectedTo().equals(event.subject().id()))
+ .findAny()
+ .ifPresent(value::remove);
+
+ if (value.isEmpty()) {
+ peers.remove(key);
+ }
+ }
+ });
+ break;
+ case INSTANCE_ADDED:
case INSTANCE_ACTIVATED:
default:
break;