FPM routes removal when instance goes down or is removed

Change-Id: Ifda1bbc12e2474a13d3f47ded2e1befa1c8b1994
diff --git a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 455e546..abb8ddd 100644
--- a/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/app/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.routing.fpm;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -38,13 +39,10 @@
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.timeout.IdleStateHandler;
 import org.jboss.netty.util.HashedWheelTimer;
-import org.onlab.packet.IpAddress;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.Ip6Address;
+import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
-import org.onosproject.net.intf.Interface;
-import org.onosproject.net.host.InterfaceIpAddress;
-import org.onosproject.net.intf.InterfaceService;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
@@ -52,10 +50,16 @@
 import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.core.CoreService;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.host.InterfaceIpAddress;
+import org.onosproject.net.intf.Interface;
+import org.onosproject.net.intf.InterfaceService;
 import org.onosproject.routeservice.Route;
 import org.onosproject.routeservice.RouteAdminService;
+import org.onosproject.routing.fpm.api.FpmPrefixStore;
+import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
+import org.onosproject.routing.fpm.api.FpmRecord;
 import org.onosproject.routing.fpm.protocol.FpmHeader;
 import org.onosproject.routing.fpm.protocol.Netlink;
 import org.onosproject.routing.fpm.protocol.NetlinkMessageType;
@@ -64,12 +68,12 @@
 import org.onosproject.routing.fpm.protocol.RouteAttributeGateway;
 import org.onosproject.routing.fpm.protocol.RtNetlink;
 import org.onosproject.routing.fpm.protocol.RtProtocol;
+import org.onosproject.store.StoreDelegate;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncDistributedLock;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.onosproject.store.StoreDelegate;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,7 +85,6 @@
 import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.LinkedList;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -90,9 +93,6 @@
 
 import static java.util.concurrent.Executors.newCachedThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
-import org.onosproject.routing.fpm.api.FpmPrefixStoreEvent;
-import org.onosproject.routing.fpm.api.FpmPrefixStore;
-import org.onosproject.routing.fpm.api.FpmRecord;
 
 /**
  * Forwarding Plane Manager (FPM) route source.
@@ -153,6 +153,9 @@
 
     private Map<FpmPeer, Map<IpPrefix, Route>> fpmRoutes = new ConcurrentHashMap<>();
 
+    //Local cache for peers to be used in case of cluster partition.
+    private Map<FpmPeer, Set<FpmConnectionInfo>> localPeers = new ConcurrentHashMap<>();
+
     @Property(name = "clearRoutes", boolValue = true,
             label = "Whether to clear routes when the FPM connection goes down")
     private boolean clearRoutes = true;
@@ -648,6 +651,7 @@
                 }
 
                 infos.add(new FpmConnectionInfo(localNode, peer, System.currentTimeMillis()));
+                localPeers.put(peer, infos);
                 return infos;
             });
 
@@ -671,7 +675,10 @@
                 infos.stream()
                         .filter(i -> i.connectedTo().equals(clusterService.getLocalNode().id()))
                         .findAny()
-                        .ifPresent(i -> infos.remove(i));
+                        .ifPresent(i -> {
+                            infos.remove(i);
+                            localPeers.get(peer).remove(i);
+                        });
 
                 if (infos.isEmpty()) {
                     return null;
@@ -729,7 +736,7 @@
     private class InternalClusterListener implements ClusterEventListener {
         @Override
         public void event(ClusterEvent event) {
-            log.debug("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
+                log.info("Receives ClusterEvent {} for {}", event.type(), event.subject().id());
             switch (event.type()) {
                 case INSTANCE_READY:
                     // When current node is healing from a network partition,
@@ -744,6 +751,7 @@
                             // All FPM routes on current node will be pushed again even when current node is not
                             // the one that becomes READY. A better way is to do this only on the minority nodes.
                             pushFpmRoutes();
+                            localPeers.forEach((key, value) -> peers.put(key, value));
                             asyncLock.unlock();
                         } else {
                             log.debug("Fail to obtain lock. Abort.");
@@ -751,8 +759,21 @@
                     });
                     break;
                 case INSTANCE_DEACTIVATED:
-                case INSTANCE_ADDED:
                 case INSTANCE_REMOVED:
+                    ImmutableMap.copyOf(peers.asJavaMap()).forEach((key, value) -> {
+                        if (value != null) {
+                            value.stream()
+                                    .filter(i -> i.connectedTo().equals(event.subject().id()))
+                                    .findAny()
+                                    .ifPresent(value::remove);
+
+                            if (value.isEmpty()) {
+                                peers.remove(key);
+                            }
+                        }
+                    });
+                    break;
+                case INSTANCE_ADDED:
                 case INSTANCE_ACTIVATED:
                 default:
                     break;